1   /*
2    * SPDX-FileCopyrightText: none
3    * SPDX-License-Identifier: CC0-1.0
4    */
5   
6   package gov.nist.secauto.metaschema.core.metapath.impl;
7   
8   import gov.nist.secauto.metaschema.core.metapath.item.IItem;
9   import gov.nist.secauto.metaschema.core.metapath.item.ISequence;
10  import gov.nist.secauto.metaschema.core.util.ObjectUtils;
11  
12  import java.util.List;
13  import java.util.Objects;
14  import java.util.concurrent.locks.Lock;
15  import java.util.concurrent.locks.ReentrantLock;
16  import java.util.stream.Collectors;
17  import java.util.stream.Stream;
18  
19  import edu.umd.cs.findbugs.annotations.NonNull;
20  
21  /**
22   * A Metapath sequence supporting an unbounded number of items backed initially
23   * by a stream.
24   *
25   * @param <ITEM>
26   *          the Java type of the items
27   */
28  public class StreamSequence<ITEM extends IItem>
29      extends AbstractSequence<ITEM> {
30  
31    private Stream<ITEM> stream;
32    private List<ITEM> list;
33    @NonNull
34    private final Lock instanceLock = new ReentrantLock();
35  
36    /**
37     * Construct a new sequence using the provided item stream.
38     *
39     * @param stream
40     *          the items to add to the sequence
41     */
42    public StreamSequence(@NonNull Stream<ITEM> stream) {
43      Objects.requireNonNull(stream, "stream");
44      this.stream = stream;
45    }
46  
47    @SuppressWarnings("PMD.NullAssignment")
48    @Override
49    protected List<ITEM> asList() {
50      instanceLock.lock();
51      try {
52        if (list == null) {
53          if (stream == null) {
54            throw new IllegalStateException(
55                "Unable to collect items into a list because the stream was already consumed.");
56          }
57          list = stream.collect(Collectors.toUnmodifiableList());
58          stream = null;
59        }
60      } finally {
61        instanceLock.unlock();
62      }
63      assert list != null;
64      return list;
65    }
66  
67    @Override
68    public ISequence<ITEM> reusable() {
69      // force the stream to be backed by a list
70      asList();
71      return this;
72    }
73  
74    @Override
75    public Stream<ITEM> stream() {
76      @NonNull
77      Stream<ITEM> retval;
78      // Ensure thread safety and prevent multiple consumptions of the stream
79      instanceLock.lock();
80      try {
81        if (list == null) {
82          if (stream == null) {
83            throw new IllegalStateException("The stream is already consumed.");
84          }
85          assert stream != null;
86          retval = stream;
87          stream = null; // NOPMD - readability
88        } else {
89          retval = ObjectUtils.notNull(list.stream());
90        }
91      } finally {
92        instanceLock.unlock();
93      }
94      return retval;
95    }
96  }