diff --git a/src/main/java/org/springframework/data/util/CloseableIterator.java b/src/main/java/org/springframework/data/util/CloseableIterator.java index 6210dfab1..af7de254c 100644 --- a/src/main/java/org/springframework/data/util/CloseableIterator.java +++ b/src/main/java/org/springframework/data/util/CloseableIterator.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2021 the original author or authors. + * Copyright 2015-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,6 @@ package org.springframework.data.util; import java.io.Closeable; import java.util.Iterator; import java.util.Spliterator; -import java.util.Spliterators; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -49,12 +48,14 @@ public interface CloseableIterator extends Iterator, Closeable { * The default implementation should be overridden by subclasses that can return a more efficient spliterator. To * preserve expected laziness behavior for the {@link #stream()} method, spliterators should either have the * characteristic of {@code IMMUTABLE} or {@code CONCURRENT}, or be late-binding. + *

+ * The default implementation does not report a size. * * @return a {@link Spliterator} over the elements in this {@link Iterator}. * @since 2.4 */ default Spliterator spliterator() { - return Spliterators.spliterator(this, 0, 0); + return new IteratorSpliterator<>(this); } /** diff --git a/src/main/java/org/springframework/data/util/IteratorSpliterator.java b/src/main/java/org/springframework/data/util/IteratorSpliterator.java new file mode 100644 index 000000000..32ebcb5ce --- /dev/null +++ b/src/main/java/org/springframework/data/util/IteratorSpliterator.java @@ -0,0 +1,117 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.util; + +import java.util.Comparator; +import java.util.Iterator; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.function.Consumer; + +/** + * A Spliterator using a given Iterator for element operations. The spliterator implements {@code trySplit} to permit + * limited parallelism. + */ +class IteratorSpliterator implements Spliterator { + + private static final int BATCH_UNIT = 1 << 10; // batch array size increment + private static final int MAX_BATCH = 1 << 25; // max batch array size; + private final Iterator it; + private long est; // size estimate + private int batch; // batch size for splits + + /** + * Creates a spliterator using the given iterator for traversal, and reporting the given initial size and + * characteristics. + * + * @param iterator the iterator for the source + */ + public IteratorSpliterator(Iterator iterator) { + this.it = iterator; + this.est = Long.MAX_VALUE; + } + + @Override + public Spliterator trySplit() { + /* + * Split into arrays of arithmetically increasing batch + * sizes. This will only improve parallel performance if + * per-element Consumer actions are more costly than + * transferring them into an array. The use of an + * arithmetic progression in split sizes provides overhead + * vs parallelism bounds that do not particularly favor or + * penalize cases of lightweight vs heavyweight element + * operations, across combinations of #elements vs #cores, + * whether or not either are known. We generate + * O(sqrt(#elements)) splits, allowing O(sqrt(#cores)) + * potential speedup. + */ + Iterator i = it; + long s = est; + if (s > 1 && i.hasNext()) { + int n = batch + BATCH_UNIT; + if (n > s) { + n = (int) s; + } + if (n > MAX_BATCH) { + n = MAX_BATCH; + } + Object[] a = new Object[n]; + int j = 0; + do { + a[j] = i.next(); + } while (++j < n && i.hasNext()); + batch = j; + if (est != Long.MAX_VALUE) { + est -= j; + } + return Spliterators.spliterator(a, 0, j, 0); + } + return null; + } + + @Override + public void forEachRemaining(Consumer action) { + it.forEachRemaining(action); + } + + @Override + public boolean tryAdvance(Consumer action) { + if (it.hasNext()) { + action.accept(it.next()); + return true; + } + return false; + } + + @Override + public long estimateSize() { + return -1; + } + + @Override + public int characteristics() { + return 0; + } + + @Override + public Comparator getComparator() { + if (hasCharacteristics(Spliterator.SORTED)) { + return null; + } + throw new IllegalStateException(); + } +} diff --git a/src/test/java/org/springframework/data/util/CloseableIteratorUnitTests.java b/src/test/java/org/springframework/data/util/CloseableIteratorUnitTests.java index d9c61b08d..e7b2c3509 100644 --- a/src/test/java/org/springframework/data/util/CloseableIteratorUnitTests.java +++ b/src/test/java/org/springframework/data/util/CloseableIteratorUnitTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 the original author or authors. + * Copyright 2020-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import org.junit.jupiter.api.Test; @@ -43,6 +44,26 @@ class CloseableIteratorUnitTests { assertThat(iterator.closed).isFalse(); } + @Test // GH-2519 + void shouldCount() { + + CloseableIteratorImpl iterator = new CloseableIteratorImpl<>(Arrays.asList("1", "2", "3").iterator()); + + long count = iterator.stream().count(); + + assertThat(count).isEqualTo(3); + } + + @Test // GH-2519 + void shouldCountLargeStream() { + + CloseableIteratorImpl iterator = new CloseableIteratorImpl<>(IntStream.range(0, 2048).boxed().iterator()); + + long count = iterator.stream().count(); + + assertThat(count).isEqualTo(2048); + } + @Test // DATACMNS-1637 void closeStreamShouldCloseIterator() {