Browse Source

StreamUtils.zip(…) now treats infinite streams correctly.

When an infinite Stream was handed into StreamUtils.zip(…) as first argument, the resulting stream was infinite, too, while inverting the argument order was limiting the resulting stream to the length of the finite one. This is now fixed by actually evaluating whether we can advance on both of the streams and shortcutting the process if that is not possible on either of the streams, limiting the processing of the overall Stream to the shorter of the two as already advertised in the Javadoc.

Fixes #2426.
2.4.x
Oliver Drotbohm 4 years ago
parent
commit
6aee21d2ec
No known key found for this signature in database
GPG Key ID: C25FBFA0DA493A1D
  1. 34
      src/main/java/org/springframework/data/util/Sink.java
  2. 21
      src/main/java/org/springframework/data/util/StreamUtils.java
  3. 9
      src/test/java/org/springframework/data/util/StreamUtilsTests.java

34
src/main/java/org/springframework/data/util/Sink.java

@ -0,0 +1,34 @@ @@ -0,0 +1,34 @@
package org.springframework.data.util;
import java.util.function.Consumer;
import org.springframework.lang.Nullable;
/**
* A simple {@link Consumer} that captures the instance handed into it.
*
* @author Oliver Drotbohm
* @since 2.4.12
*/
class Sink<T> implements Consumer<T> {
private T value;
/**
* Returns the value captured.
*
* @return
*/
public T getValue() {
return value;
}
/*
* (non-Javadoc)
* @see java.util.function.Consumer#accept(java.lang.Object)
*/
@Override
public void accept(@Nullable T t) {
this.value = t;
}
}

21
src/main/java/org/springframework/data/util/StreamUtils.java

@ -140,9 +140,26 @@ public interface StreamUtils { @@ -140,9 +140,26 @@ public interface StreamUtils {
@Override
@SuppressWarnings("null")
public boolean tryAdvance(Consumer<? super T> action) {
return lefts.tryAdvance(left -> rights.tryAdvance(right -> action.accept(combiner.apply(left, right))));
}
Sink<L> leftSink = new Sink<L>();
Sink<R> rightSink = new Sink<R>();
boolean leftAdvance = lefts.tryAdvance(leftSink);
if (!leftAdvance) {
return false;
}
boolean rightAdvance = rights.tryAdvance(rightSink);
if (!rightAdvance) {
return false;
}
action.accept(combiner.apply(leftSink.getValue(), rightSink.getValue()));
return true;
}
}, parallel);
}
}

9
src/test/java/org/springframework/data/util/StreamUtilsTests.java

@ -42,4 +42,13 @@ public class StreamUtilsTests { @@ -42,4 +42,13 @@ public class StreamUtilsTests {
assertThat(input).isEqualTo(output);
}
@Test // #2426
void combinesInfiniteStreamCorrectly() {
Stream<Long> indices = Stream.iterate(1L, n -> n + 1);
Stream<String> lines = Stream.of("first line", "second line");
assertThat(StreamUtils.zip(indices, lines, (index, line) -> index + ":" + line).count()).isEqualTo(2);
}
}

Loading…
Cancel
Save