From c552aaa6f1cd83582a61c7f1fd9e146da19c3c8a Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Tue, 30 Jun 2015 13:45:54 +0200 Subject: [PATCH] work, work. --- spring-web-reactive/build.gradle | 14 ++ ...ava => ByteArrayPublisherInputStream.java} | 41 ++-- .../rx/io/ByteArrayPublisherOutputStream.java | 54 +++++ .../rx/io/ByteBufPublisherOutputStream.java | 78 ------- .../util/AbstractUnicastAsyncSubscriber.java | 214 ++++++++++++++++++ .../util/AbstractUnicastSyncSubscriber.java | 76 +++++++ .../rx/util/BlockingByteBufQueue.java | 193 ---------------- .../util/BlockingByteBufQueueSubscriber.java | 77 ------- .../rx/util/BlockingSignalQueue.java | 123 ++++++++++ ...java => BlockingSignalQueuePublisher.java} | 18 +- .../util/BlockingSignalQueueSubscriber.java | 113 +++++++++ .../springframework/rx/util/OnComplete.java | 65 ++++++ .../org/springframework/rx/util/OnError.java | 69 ++++++ .../org/springframework/rx/util/OnNext.java | 70 ++++++ .../springframework/rx/util/OnSubscribe.java | 70 ++++++ .../org/springframework/rx/util/Signal.java | 39 ++++ .../web/servlet/AsyncContextSynchronizer.java | 69 ++++++ .../io/ByteBufPublisherInputStreamTests.java | 29 +-- .../BlockingByteBufQueuePublisherTests.java | 16 +- .../rx/util/BlockingByteBufQueueTests.java | 28 +-- 20 files changed, 1036 insertions(+), 420 deletions(-) rename spring-web-reactive/src/main/java/org/springframework/rx/io/{ByteBufPublisherInputStream.java => ByteArrayPublisherInputStream.java} (69%) create mode 100644 spring-web-reactive/src/main/java/org/springframework/rx/io/ByteArrayPublisherOutputStream.java delete mode 100644 spring-web-reactive/src/main/java/org/springframework/rx/io/ByteBufPublisherOutputStream.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/rx/util/AbstractUnicastAsyncSubscriber.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/rx/util/AbstractUnicastSyncSubscriber.java delete mode 100644 spring-web-reactive/src/main/java/org/springframework/rx/util/BlockingByteBufQueue.java delete mode 100644 spring-web-reactive/src/main/java/org/springframework/rx/util/BlockingByteBufQueueSubscriber.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/rx/util/BlockingSignalQueue.java rename spring-web-reactive/src/main/java/org/springframework/rx/util/{BlockingByteBufQueuePublisher.java => BlockingSignalQueuePublisher.java} (82%) create mode 100644 spring-web-reactive/src/main/java/org/springframework/rx/util/BlockingSignalQueueSubscriber.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/rx/util/OnComplete.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/rx/util/OnError.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/rx/util/OnNext.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/rx/util/OnSubscribe.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/rx/util/Signal.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/rx/web/servlet/AsyncContextSynchronizer.java diff --git a/spring-web-reactive/build.gradle b/spring-web-reactive/build.gradle index 71878c41825..c9c4c7150b2 100644 --- a/spring-web-reactive/build.gradle +++ b/spring-web-reactive/build.gradle @@ -1,5 +1,17 @@ +buildscript { + repositories { + maven { url 'http://repo.springsource.org/plugins-release' } + } + dependencies { + classpath 'org.springframework.build.gradle:propdeps-plugin:0.0.7' + } +} apply plugin: 'java' +apply plugin: 'propdeps' +apply plugin: 'propdeps-idea' +apply plugin: 'propdeps-maven' + repositories { mavenCentral() @@ -12,6 +24,8 @@ dependencies { compile "org.slf4j:slf4j-api:1.7.6" compile "ch.qos.logback:logback-classic:1.1.2" + provided "javax.servlet:javax.servlet-api:3.1.0" + testCompile "junit:junit:4.12" } diff --git a/spring-web-reactive/src/main/java/org/springframework/rx/io/ByteBufPublisherInputStream.java b/spring-web-reactive/src/main/java/org/springframework/rx/io/ByteArrayPublisherInputStream.java similarity index 69% rename from spring-web-reactive/src/main/java/org/springframework/rx/io/ByteBufPublisherInputStream.java rename to spring-web-reactive/src/main/java/org/springframework/rx/io/ByteArrayPublisherInputStream.java index 43e7fa7624d..d3251c04514 100644 --- a/spring-web-reactive/src/main/java/org/springframework/rx/io/ByteBufPublisherInputStream.java +++ b/spring-web-reactive/src/main/java/org/springframework/rx/io/ByteArrayPublisherInputStream.java @@ -14,34 +14,40 @@ package org.springframework.rx.io;/* * limitations under the License. */ +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufInputStream; import org.reactivestreams.Publisher; -import org.springframework.rx.util.BlockingByteBufQueue; -import org.springframework.rx.util.BlockingByteBufQueueSubscriber; +import org.springframework.rx.util.BlockingSignalQueue; +import org.springframework.rx.util.BlockingSignalQueueSubscriber; import org.springframework.util.Assert; /** + * {@code InputStream} implementation based on a byte array {@link Publisher}. + * * @author Arjen Poutsma */ -public class ByteBufPublisherInputStream extends InputStream { +public class ByteArrayPublisherInputStream extends InputStream { + + private final BlockingSignalQueue queue; - private final BlockingByteBufQueue queue; + private ByteArrayInputStream currentStream; - private ByteBufInputStream currentStream; - public ByteBufPublisherInputStream(Publisher publisher) { + /** + * Creates a new {@code ByteArrayPublisherInputStream} based on the given publisher. + * @param publisher the publisher to use + */ + public ByteArrayPublisherInputStream(Publisher publisher) { Assert.notNull(publisher, "'publisher' must not be null"); - this.queue = new BlockingByteBufQueue(); - publisher.subscribe(new BlockingByteBufQueueSubscriber(this.queue)); + this.queue = new BlockingSignalQueue(); + publisher.subscribe(new BlockingSignalQueueSubscriber(this.queue)); } - ByteBufPublisherInputStream(BlockingByteBufQueue queue) { + ByteArrayPublisherInputStream(BlockingSignalQueue queue) { Assert.notNull(queue, "'queue' must not be null"); this.queue = queue; } @@ -91,6 +97,7 @@ public class ByteBufPublisherInputStream extends InputStream { } } while (is != null); + return -1; } @@ -102,14 +109,14 @@ public class ByteBufPublisherInputStream extends InputStream { else if (this.queue.isComplete()) { return null; } - else if (this.queue.isHeadBuffer()) { - ByteBuf current = this.queue.pollBuffer(); - this.currentStream = new ByteBufInputStream(current); + else if (this.queue.isHeadSignal()) { + byte[] current = this.queue.pollSignal(); + this.currentStream = new ByteArrayInputStream(current); return this.currentStream; } else if (this.queue.isHeadError()) { Throwable t = this.queue.pollError(); - throw toIOException(t); + throw t instanceof IOException ? (IOException) t : new IOException(t); } } catch (InterruptedException ex) { @@ -118,8 +125,4 @@ public class ByteBufPublisherInputStream extends InputStream { return null; } - private static IOException toIOException(Throwable t) { - return t instanceof IOException ? (IOException) t : new IOException(t); - } - } diff --git a/spring-web-reactive/src/main/java/org/springframework/rx/io/ByteArrayPublisherOutputStream.java b/spring-web-reactive/src/main/java/org/springframework/rx/io/ByteArrayPublisherOutputStream.java new file mode 100644 index 00000000000..a89b9daee89 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/rx/io/ByteArrayPublisherOutputStream.java @@ -0,0 +1,54 @@ +package org.springframework.rx.io; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; + +import org.reactivestreams.Publisher; + +import org.springframework.rx.util.BlockingSignalQueue; +import org.springframework.rx.util.BlockingSignalQueuePublisher; + +/** + * {@code OutputStream} implementation that stores all written bytes, to be retrieved + * using {@link #toByteBufPublisher()}. + * @author Arjen Poutsma + */ +public class ByteArrayPublisherOutputStream extends OutputStream { + + private final BlockingSignalQueue queue = new BlockingSignalQueue(); + + /** + * Returns the written data as a {@code Publisher}. + * @return a publisher for the written bytes + */ + public Publisher toByteBufPublisher() { + return new BlockingSignalQueuePublisher(this.queue); + } + + @Override + public void write(int b) throws IOException { + write(new byte[]{(byte) b}); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + byte[] copy = Arrays.copyOf(b, len); + try { + this.queue.putSignal(copy); + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + + @Override + public void close() throws IOException { + try { + this.queue.complete(); + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } +} diff --git a/spring-web-reactive/src/main/java/org/springframework/rx/io/ByteBufPublisherOutputStream.java b/spring-web-reactive/src/main/java/org/springframework/rx/io/ByteBufPublisherOutputStream.java deleted file mode 100644 index a85125eb847..00000000000 --- a/spring-web-reactive/src/main/java/org/springframework/rx/io/ByteBufPublisherOutputStream.java +++ /dev/null @@ -1,78 +0,0 @@ -package org.springframework.rx.io; - -import java.io.IOException; -import java.io.OutputStream; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.UnpooledByteBufAllocator; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import org.springframework.rx.util.BlockingByteBufQueue; -import org.springframework.rx.util.BlockingByteBufQueuePublisher; -import org.springframework.util.Assert; - -/** - * @author Arjen Poutsma - */ -public class ByteBufPublisherOutputStream extends OutputStream { - - private final BlockingByteBufQueue queue = new BlockingByteBufQueue(); - - private final ByteBufAllocator bufferAllocator; - - public ByteBufPublisherOutputStream() { - this(new UnpooledByteBufAllocator(false)); - } - - public ByteBufPublisherOutputStream(ByteBufAllocator bufferAllocator) { - Assert.notNull(bufferAllocator, "'bufferAllocator' must not be null"); - this.bufferAllocator = bufferAllocator; - } - - public Publisher toByteBufPublisher() { - return new BlockingByteBufQueuePublisher(this.queue); - } - - @Override - public void write(int b) throws IOException { - ByteBuf buffer = this.bufferAllocator.buffer(1, 1); - buffer.writeByte(b); - putBuffer(buffer); - } - - @Override - public void write(byte[] b) throws IOException { - ByteBuf buffer = this.bufferAllocator.buffer(b.length, b.length); - buffer.writeBytes(b); - putBuffer(buffer); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - ByteBuf buffer = this.bufferAllocator.buffer(len, len); - buffer.writeBytes(b, off, len); - putBuffer(buffer); - } - - private void putBuffer(ByteBuf buffer) { - try { - this.queue.putBuffer(buffer); - } - catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - } - - @Override - public void close() throws IOException { - try { - this.queue.complete(); - } - catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - } -} diff --git a/spring-web-reactive/src/main/java/org/springframework/rx/util/AbstractUnicastAsyncSubscriber.java b/spring-web-reactive/src/main/java/org/springframework/rx/util/AbstractUnicastAsyncSubscriber.java new file mode 100644 index 00000000000..aece160be85 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/rx/util/AbstractUnicastAsyncSubscriber.java @@ -0,0 +1,214 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.rx.util; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import org.springframework.util.Assert; + +/** + * @author Arjen Poutsma + */ +public abstract class AbstractUnicastAsyncSubscriber implements Subscriber { + + private final Executor executor; + + private Subscription subscription; + + private boolean done; + + protected AbstractUnicastAsyncSubscriber(Executor executor) { + Assert.notNull(executor, "'executor' must not be null"); + + this.executor = executor; + } + + private void done() { + done = true; + + if (subscription != null) { + subscription.cancel(); + } + } + + // This method is invoked when the OnNext signals arrive + // Returns whether more elements are desired or not, and if no more elements are desired, + // for convenience. + protected abstract boolean whenNext(final T element); + + // This method is invoked when the OnComplete signal arrives + // override this method to implement your own custom onComplete logic. + protected void whenComplete() { + } + + // This method is invoked if the OnError signal arrives + // override this method to implement your own custom onError logic. + protected void whenError(Throwable error) { + } + + private void handleOnSubscribe(Subscription subscription) { + if (subscription == null) { + return; + } + if (this.subscription != null) { + subscription.cancel(); + } + else { + this.subscription = subscription; + this.subscription.request(1); + } + } + + private void handleOnNext(final T element) { + if (!done) { + try { + if (whenNext(element)) { + subscription.request(1); + } + else { + done(); + } + } + catch (final Throwable t) { + done(); + onError(t); + } + } + } + + private void handleOnComplete() { + done = true; + whenComplete(); + } + + private void handleOnError(final Throwable error) { + done = true; + whenError(error); + } + + // We implement the OnX methods on `Subscriber` to send Signals that we will process asycnhronously, but only one at a time + + @Override + public final void onSubscribe(final Subscription s) { + // As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Subscription` is `null` + if (s == null) { + throw null; + } + + signal(new OnSubscribe(s)); + } + + @Override + public final void onNext(final T element) { + // As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `element` is `null` + if (element == null) { + throw null; + } + + signal(new OnNext(element)); + } + + @Override + public final void onError(final Throwable t) { + // As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Throwable` is `null` + if (t == null) { + throw null; + } + + signal(new OnError(t)); + } + + @Override + public final void onComplete() { + signal(OnComplete.INSTANCE); + } + + private final ConcurrentLinkedQueue> inboundSignals = + new ConcurrentLinkedQueue>(); + + private final AtomicBoolean enabled = new AtomicBoolean(false); + + // What `signal` does is that it sends signals to the `Subscription` asynchronously + private void signal(final Signal signal) { + if (inboundSignals + .offer(signal)) // No need to null-check here as ConcurrentLinkedQueue does this for us + { + tryScheduleToExecute(); // Then we try to schedule it for execution, if it isn't already + } + } + + // This method makes sure that this `Subscriber` is only executing on one Thread at a time + private void tryScheduleToExecute() { + if (enabled.compareAndSet(false, true)) { + try { + executor.execute(new SignalRunnable()); + } + catch (Throwable t) { // If we can't run on the `Executor`, we need to fail gracefully and not violate rule 2.13 + if (!done) { + try { + done(); // First of all, this failure is not recoverable, so we need to cancel our subscription + } + finally { + inboundSignals.clear(); // We're not going to need these anymore + // This subscription is cancelled by now, but letting the Subscriber become schedulable again means + // that we can drain the inboundSignals queue if anything arrives after clearing + enabled.set(false); + } + } + } + } + } + + private class SignalRunnable implements Runnable { + + @Override + public void run() { + if (enabled.get()) { + try { + Signal s = inboundSignals.poll(); + if (!done) { + if (s.isOnNext()) { + handleOnNext(s.next()); + } + else if (s.isOnSubscribe()) { + handleOnSubscribe(s.subscription()); + } + else if (s.isOnError()) { + handleOnError(s.error()); + } + else if (s.isComplete()) { + handleOnComplete(); + } + } + } + finally { + enabled.set(false); + + if (!inboundSignals.isEmpty()) { + tryScheduleToExecute(); + } + } + } + + } + } +} \ No newline at end of file diff --git a/spring-web-reactive/src/main/java/org/springframework/rx/util/AbstractUnicastSyncSubscriber.java b/spring-web-reactive/src/main/java/org/springframework/rx/util/AbstractUnicastSyncSubscriber.java new file mode 100644 index 00000000000..85e4764d504 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/rx/util/AbstractUnicastSyncSubscriber.java @@ -0,0 +1,76 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.rx.util; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +/** + * @author Arjen Poutsma + */ +public abstract class AbstractUnicastSyncSubscriber implements Subscriber { + + private Subscription subscription; + + private boolean done = false; + + @Override + public final void onSubscribe(Subscription subscription) { + if (subscription == null) { + throw new NullPointerException(); + } + + if (this.subscription != null) { + subscription.cancel(); + } + else { + this.subscription = subscription; + this.subscription.request(1); + } + } + + @Override + public final void onNext(T element) { + if (element == null) { + throw new NullPointerException(); + } + + if (!done) { + try { + if (onNextInternal(element)) { + subscription.request(1); + } + else { + done(); + } + } + catch (Throwable t) { + done(); + onError(t); + } + } + } + + private void done() { + done = true; + subscription.cancel(); + } + + protected abstract boolean onNextInternal(final T element) throws Exception; + + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/rx/util/BlockingByteBufQueue.java b/spring-web-reactive/src/main/java/org/springframework/rx/util/BlockingByteBufQueue.java deleted file mode 100644 index eda67360848..00000000000 --- a/spring-web-reactive/src/main/java/org/springframework/rx/util/BlockingByteBufQueue.java +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Copyright 2002-2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.rx.util; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -import io.netty.buffer.ByteBuf; - -import org.springframework.util.Assert; - -/** - * A {@link BlockingQueue} aimed at working with {@code Publisher} instances. - * Mainly meant to bridge between reactive and non-reactive APIs, such as blocking - * streams. - * - *

Typically, this class will be used by two threads: one thread to put new elements on - * the stack by calling {@link #putBuffer(ByteBuf)}, possibly {@link #putError(Throwable)} - * and finally {@link #complete()}. The other thread will read elements by calling {@link - * #isHeadBuffer()} and {@link #isHeadError()}, while keeping an eye on {@link - * #isComplete()}. - * - * @author Arjen Poutsma - */ -public class BlockingByteBufQueue { - - private final BlockingQueue queue = new LinkedBlockingQueue(); - - /** - * Inserts the specified buffer into this queue, waiting if necessary for space to - * become available. - * @param buffer the buffer to add - */ - public void putBuffer(ByteBuf buffer) throws InterruptedException { - Assert.notNull(buffer, "'buffer' must not be null"); - Assert.state(!isComplete(), "Cannot put buffers in queue after complete()"); - this.queue.put(new ByteBufElement(buffer)); - } - - /** - * Inserts the specified error into this queue, waiting if necessary for space to - * become available. - * @param error the error to add - */ - public void putError(Throwable error) throws InterruptedException { - Assert.notNull(error, "'error' must not be null"); - Assert.state(!isComplete(), "Cannot put errors in queue after complete()"); - this.queue.put(new ErrorElement(error)); - } - - /** - * Marks the queue as complete. - */ - public void complete() throws InterruptedException { - this.queue.put(COMPLETE); - } - - /** - * Indicates whether the current head of this queue is a {@link ByteBuf}. - * @return {@code true} if the current head is a buffer; {@code false} otherwise - */ - public boolean isHeadBuffer() { - Element element = this.queue.peek(); - return element instanceof ByteBufElement; - } - - /** - * Indicates whether the current head of this queue is a {@link Throwable}. - * @return {@code true} if the current head is an error; {@code false} otherwise - */ - public boolean isHeadError() { - Element element = this.queue.peek(); - return element instanceof ErrorElement; - } - - /** - * Indicates whether there are more buffers or errors in this queue. - * @return {@code true} if there more elements in this queue; {@code false} otherwise - */ - public boolean isComplete() { - Element element = this.queue.peek(); - return COMPLETE == element; - } - - /** - * Retrieves and removes the buffer head of this queue. Should only be called after - * {@link #isHeadBuffer()} returns {@code true}. - * @return the head of the queue, as buffer - * @throws IllegalStateException if the current head of this queue is not a buffer - * @see #isHeadBuffer() - */ - public ByteBuf pollBuffer() throws InterruptedException { - Element element = this.queue.take(); - return element != null ? element.getBuffer() : null; - } - - /** - * Retrieves and removes the buffer error of this queue. Should only be called after - * {@link #isHeadError()} returns {@code true}. - * @return the head of the queue, as error - * @throws IllegalStateException if the current head of this queue is not a error - * @see #isHeadError() - */ - public Throwable pollError() throws InterruptedException { - Element element = this.queue.take(); - return element != null ? element.getError() : null; - } - - /** - * Removes all of the elements from this collection - */ - public void clear() { - this.queue.clear(); - } - - private interface Element { - - ByteBuf getBuffer(); - - Throwable getError(); - } - - private static class ByteBufElement implements Element { - - private final ByteBuf buffer; - - public ByteBufElement(ByteBuf buffer) { - if (buffer == null) { - throw new IllegalArgumentException("'buffer' should not be null"); - } - this.buffer = buffer; - } - - @Override - public ByteBuf getBuffer() { - return this.buffer; - } - - @Override - public Throwable getError() { - throw new IllegalStateException("No error on top of the queue"); - } - - } - - private static class ErrorElement implements Element { - - private final Throwable error; - - public ErrorElement(Throwable error) { - if (error == null) { - throw new IllegalArgumentException("'error' should not be null"); - } - this.error = error; - } - - @Override - public ByteBuf getBuffer() { - throw new IllegalStateException("No ByteBuf on top of the queue"); - } - - @Override - public Throwable getError() { - return this.error; - } - } - - private static final Element COMPLETE = new Element() { - @Override - public ByteBuf getBuffer() { - throw new IllegalStateException("No ByteBuf on top of the queue"); - } - - @Override - public Throwable getError() { - throw new IllegalStateException("No error on top of the queue"); - } - }; -} diff --git a/spring-web-reactive/src/main/java/org/springframework/rx/util/BlockingByteBufQueueSubscriber.java b/spring-web-reactive/src/main/java/org/springframework/rx/util/BlockingByteBufQueueSubscriber.java deleted file mode 100644 index dda8668ce6b..00000000000 --- a/spring-web-reactive/src/main/java/org/springframework/rx/util/BlockingByteBufQueueSubscriber.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright 2002-2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.rx.util; - -import io.netty.buffer.ByteBuf; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import org.springframework.util.Assert; - -/** - * @author Arjen Poutsma - */ -public class BlockingByteBufQueueSubscriber implements Subscriber { - - private final BlockingByteBufQueue queue; - - private Subscription subscription; - - public BlockingByteBufQueueSubscriber(BlockingByteBufQueue queue) { - Assert.notNull(queue, "'queue' must not be null"); - this.queue = queue; - } - - @Override - public void onSubscribe(Subscription subscription) { - this.subscription = subscription; - - this.subscription.request(1); - } - - @Override - public void onNext(ByteBuf byteBuf) { - try { - this.queue.putBuffer(byteBuf); - } - catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - this.subscription.request(1); - } - - @Override - public void onError(Throwable t) { - try { - this.queue.putError(t); - } - catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - this.subscription.request(1); - } - - @Override - public void onComplete() { - try { - this.queue.complete(); - } - catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - } -} diff --git a/spring-web-reactive/src/main/java/org/springframework/rx/util/BlockingSignalQueue.java b/spring-web-reactive/src/main/java/org/springframework/rx/util/BlockingSignalQueue.java new file mode 100644 index 00000000000..f30f7470d4b --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/rx/util/BlockingSignalQueue.java @@ -0,0 +1,123 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.rx.util; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import io.netty.buffer.ByteBuf; + +import org.springframework.util.Assert; + +/** + * A {@link BlockingQueue} aimed at working with {@code Publisher} instances. + * Mainly meant to bridge between reactive and non-reactive APIs, such as blocking + * streams. + * + *

Typically, this class will be used by two threads: one thread to put new elements on + * the stack by calling {@link #put(ByteBuf)}, possibly {@link #putError(Throwable)} and + * finally {@link #complete()}. The other thread will read elements by calling {@link + * #isHeadSignal()}/{@link #pollSignal()} and {@link #isHeadError()}/{@link #pollError()}, + * while keeping an eye on {@link #isComplete()}. + * @author Arjen Poutsma + */ +public class BlockingSignalQueue { + + private final BlockingQueue> queue = new LinkedBlockingQueue>(); + + + /** + * Inserts the specified signal into this queue, waiting if necessary for space to + * become available. + * @param t the signal to add + */ + public void putSignal(T t) throws InterruptedException { + Assert.notNull(t, "'t' must not be null"); + Assert.state(!isComplete(), "Cannot put signal in queue after complete()"); + this.queue.put(new OnNext(t)); + } + + /** + * Inserts the specified error into this queue, waiting if necessary for space to + * become available. + * @param error the error to add + */ + public void putError(Throwable error) throws InterruptedException { + Assert.notNull(error, "'error' must not be null"); + Assert.state(!isComplete(), "Cannot putSignal errors in queue after complete()"); + this.queue.put(new OnError(error)); + } + + /** + * Marks the queue as complete. + */ + public void complete() throws InterruptedException { + this.queue.put(OnComplete.INSTANCE); + } + + /** + * Indicates whether the current head of this queue is a signal. + * @return {@code true} if the current head is a signal; {@code false} otherwise + */ + public boolean isHeadSignal() { + Signal signal = this.queue.peek(); + return signal instanceof OnNext; + } + + /** + * Indicates whether the current head of this queue is a {@link Throwable}. + * @return {@code true} if the current head is an error; {@code false} otherwise + */ + public boolean isHeadError() { + Signal signal = this.queue.peek(); + return signal instanceof OnError; + } + + /** + * Indicates whether there are more buffers or errors in this queue. + * @return {@code true} if there more elements in this queue; {@code false} otherwise + */ + public boolean isComplete() { + Signal signal = this.queue.peek(); + return OnComplete.INSTANCE == signal; + } + + /** + * Retrieves and removes the signal head of this queue. Should only be called after + * {@link #isHeadSignal()} returns {@code true}. + * @return the head of the queue + * @throws IllegalStateException if the current head of this queue is not a buffer + * @see #isHeadSignal() + */ + public T pollSignal() throws InterruptedException { + Signal signal = this.queue.take(); + return signal != null ? signal.next() : null; + } + + /** + * Retrieves and removes the buffer error of this queue. Should only be called after + * {@link #isHeadError()} returns {@code true}. + * @return the head of the queue, as error + * @throws IllegalStateException if the current head of this queue is not a error + * @see #isHeadError() + */ + public Throwable pollError() throws InterruptedException { + Signal signal = this.queue.take(); + return signal != null ? signal.error() : null; + } + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/rx/util/BlockingByteBufQueuePublisher.java b/spring-web-reactive/src/main/java/org/springframework/rx/util/BlockingSignalQueuePublisher.java similarity index 82% rename from spring-web-reactive/src/main/java/org/springframework/rx/util/BlockingByteBufQueuePublisher.java rename to spring-web-reactive/src/main/java/org/springframework/rx/util/BlockingSignalQueuePublisher.java index 1bbd763ba58..a542cd8dbf1 100644 --- a/spring-web-reactive/src/main/java/org/springframework/rx/util/BlockingByteBufQueuePublisher.java +++ b/spring-web-reactive/src/main/java/org/springframework/rx/util/BlockingSignalQueuePublisher.java @@ -16,7 +16,6 @@ package org.springframework.rx.util; -import io.netty.buffer.ByteBuf; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -26,21 +25,21 @@ import org.springframework.util.Assert; /** * @author Arjen Poutsma */ -public class BlockingByteBufQueuePublisher implements Publisher { +public class BlockingSignalQueuePublisher implements Publisher { - private final BlockingByteBufQueue queue; + private final BlockingSignalQueue queue; - private Subscriber subscriber; + private Subscriber subscriber; private final Object subscriberMutex = new Object(); - public BlockingByteBufQueuePublisher(BlockingByteBufQueue queue) { + public BlockingSignalQueuePublisher(BlockingSignalQueue queue) { Assert.notNull(queue, "'queue' must not be null"); this.queue = queue; } @Override - public void subscribe(Subscriber subscriber) { + public void subscribe(Subscriber subscriber) { synchronized (this.subscriberMutex) { if (this.subscriber != null) { subscriber.onError( @@ -74,10 +73,10 @@ public class BlockingByteBufQueuePublisher implements Publisher { @Override public void run() { try { - while (!Thread.currentThread().isInterrupted()) + while (!Thread.currentThread().isInterrupted()) { if ((l < requestCount || requestCount == Long.MAX_VALUE) && - queue.isHeadBuffer()) { - subscriber.onNext(queue.pollBuffer()); + queue.isHeadSignal()) { + subscriber.onNext(queue.pollSignal()); l++; } else if (queue.isHeadError()) { @@ -88,6 +87,7 @@ public class BlockingByteBufQueuePublisher implements Publisher { subscriber.onComplete(); break; } + } } catch (InterruptedException ex) { // Allow thread to exit diff --git a/spring-web-reactive/src/main/java/org/springframework/rx/util/BlockingSignalQueueSubscriber.java b/spring-web-reactive/src/main/java/org/springframework/rx/util/BlockingSignalQueueSubscriber.java new file mode 100644 index 00000000000..b17c0d709cb --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/rx/util/BlockingSignalQueueSubscriber.java @@ -0,0 +1,113 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.rx.util; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import org.springframework.util.Assert; + +/** + * A simple byte array {@link Subscriber} that puts all published bytes on a + * {@link @BlockingSignalQueue}. + * + * @author Arjen Poutsma + */ +public class BlockingSignalQueueSubscriber implements Subscriber { + + /** + * The default request size to use. + */ + public static final int DEFAULT_REQUEST_SIZE = 1; + + private final BlockingSignalQueue queue; + + private Subscription subscription; + + private int initialRequestSize = DEFAULT_REQUEST_SIZE; + + private int requestSize = DEFAULT_REQUEST_SIZE; + + + /** + * Creates a new {@code BlockingSignalQueueSubscriber} using the given queue. + * @param queue the queue to use + */ + public BlockingSignalQueueSubscriber(BlockingSignalQueue queue) { + Assert.notNull(queue, "'queue' must not be null"); + this.queue = queue; + } + + /** + * Sets the request size used when subscribing, in {@link #onSubscribe(Subscription)}. + * Defaults to {@link #DEFAULT_REQUEST_SIZE}. + * @param initialRequestSize the initial request size + * @see Subscription#request(long) + */ + public void setInitialRequestSize(int initialRequestSize) { + this.initialRequestSize = initialRequestSize; + } + + /** + * Sets the request size used after data or an error comes in, in {@link + * #onNext(Object)} and {@link #onError(Throwable)}. Defaults to {@link + * #DEFAULT_REQUEST_SIZE}. + * @see Subscription#request(long) + */ + public void setRequestSize(int requestSize) { + this.requestSize = requestSize; + } + + @Override + public void onSubscribe(Subscription subscription) { + this.subscription = subscription; + + this.subscription.request(this.initialRequestSize); + } + + @Override + public void onNext(T t) { + try { + this.queue.putSignal(t); + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + this.subscription.request(requestSize); + } + + @Override + public void onError(Throwable t) { + try { + this.queue.putError(t); + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + this.subscription.request(requestSize); + } + + @Override + public void onComplete() { + try { + this.queue.complete(); + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } +} diff --git a/spring-web-reactive/src/main/java/org/springframework/rx/util/OnComplete.java b/spring-web-reactive/src/main/java/org/springframework/rx/util/OnComplete.java new file mode 100644 index 00000000000..c306e584cf5 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/rx/util/OnComplete.java @@ -0,0 +1,65 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.rx.util; + +import org.reactivestreams.Subscription; + +/** + * @author Arjen Poutsma + */ +class OnComplete implements Signal { + + public static final OnComplete INSTANCE = new OnComplete(); + + private OnComplete() { + } + + @Override + public boolean isComplete() { + return true; + } + + @Override + public boolean isOnNext() { + return false; + } + + @Override + public Object next() { + throw new IllegalStateException(); + } + + @Override + public boolean isOnError() { + return false; + } + + @Override + public Throwable error() { + throw new IllegalStateException(); + } + + @Override + public boolean isOnSubscribe() { + return false; + } + + @Override + public Subscription subscription() { + throw new IllegalStateException(); + } +} diff --git a/spring-web-reactive/src/main/java/org/springframework/rx/util/OnError.java b/spring-web-reactive/src/main/java/org/springframework/rx/util/OnError.java new file mode 100644 index 00000000000..01354859322 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/rx/util/OnError.java @@ -0,0 +1,69 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.rx.util; + +import org.reactivestreams.Subscription; + +import org.springframework.util.Assert; + +/** + * @author Arjen Poutsma + */ +final class OnError implements Signal { + + private final Throwable error; + + public OnError(Throwable error) { + Assert.notNull(error, "'error' must not be null"); + this.error = error; + } + + @Override + public boolean isOnError() { + return true; + } + + @Override + public Throwable error() { + return error; + } + + @Override + public boolean isOnNext() { + return false; + } + + @Override + public Object next() { + throw new IllegalStateException(); + } + + @Override + public boolean isOnSubscribe() { + return false; + } + + @Override + public Subscription subscription() { + throw new IllegalStateException(); + } + + @Override + public boolean isComplete() { + return false; + } +} diff --git a/spring-web-reactive/src/main/java/org/springframework/rx/util/OnNext.java b/spring-web-reactive/src/main/java/org/springframework/rx/util/OnNext.java new file mode 100644 index 00000000000..56e052e1aa7 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/rx/util/OnNext.java @@ -0,0 +1,70 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.rx.util; + +import org.reactivestreams.Subscription; + +import org.springframework.util.Assert; + +/** + * @author Arjen Poutsma + */ +class OnNext implements Signal { + + private final T next; + + public OnNext(T next) { + Assert.notNull(next, "'next' must not be null"); + this.next = next; + } + + @Override + public boolean isOnNext() { + return true; + } + + @Override + public T next() { + return next; + } + + + @Override + public boolean isOnError() { + return false; + } + + @Override + public Throwable error() { + throw new IllegalStateException(); + } + + @Override + public boolean isOnSubscribe() { + return false; + } + + @Override + public Subscription subscription() { + throw new IllegalStateException(); + } + + @Override + public boolean isComplete() { + return false; + } +} diff --git a/spring-web-reactive/src/main/java/org/springframework/rx/util/OnSubscribe.java b/spring-web-reactive/src/main/java/org/springframework/rx/util/OnSubscribe.java new file mode 100644 index 00000000000..d4731a403b3 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/rx/util/OnSubscribe.java @@ -0,0 +1,70 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.rx.util; + +import org.reactivestreams.Subscription; + +import org.springframework.util.Assert; + +/** + * @author Arjen Poutsma + */ +class OnSubscribe implements Signal { + + private final Subscription subscription; + + public OnSubscribe(Subscription subscription) { + Assert.notNull(subscription, "'subscription' must not be null"); + this.subscription = subscription; + } + + @Override + public boolean isOnSubscribe() { + return true; + } + + @Override + public Subscription subscription() { + return null; + } + + + @Override + public boolean isOnNext() { + return false; + } + + @Override + public Object next() { + throw new IllegalStateException(); + } + + @Override + public boolean isOnError() { + return false; + } + + @Override + public Throwable error() { + throw new IllegalStateException(); + } + + @Override + public boolean isComplete() { + return false; + } +} diff --git a/spring-web-reactive/src/main/java/org/springframework/rx/util/Signal.java b/spring-web-reactive/src/main/java/org/springframework/rx/util/Signal.java new file mode 100644 index 00000000000..1833ef210ec --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/rx/util/Signal.java @@ -0,0 +1,39 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.rx.util; + +import org.reactivestreams.Subscription; + +/** + * @author Arjen Poutsma + */ +interface Signal { + + boolean isOnNext(); + + T next(); + + boolean isOnError(); + + Throwable error(); + + boolean isOnSubscribe(); + + Subscription subscription(); + + boolean isComplete(); +} diff --git a/spring-web-reactive/src/main/java/org/springframework/rx/web/servlet/AsyncContextSynchronizer.java b/spring-web-reactive/src/main/java/org/springframework/rx/web/servlet/AsyncContextSynchronizer.java new file mode 100644 index 00000000000..9d2b8d08809 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/rx/web/servlet/AsyncContextSynchronizer.java @@ -0,0 +1,69 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.rx.web.servlet; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; +import javax.servlet.AsyncContext; +import javax.servlet.ServletInputStream; +import javax.servlet.ServletOutputStream; + +/** + * @author Arjen Poutsma + */ +class AsyncContextSynchronizer { + + private static final int READ_COMPLETE = 1; + + private static final int WRITE_COMPLETE = 1 << 1; + + private static final int COMPLETE = READ_COMPLETE | WRITE_COMPLETE; + + private final AsyncContext asyncContext; + + private final AtomicInteger complete = new AtomicInteger(0); + + public AsyncContextSynchronizer(AsyncContext asyncContext) { + this.asyncContext = asyncContext; + } + + public ServletInputStream getInputStream() throws IOException { + return this.asyncContext.getRequest().getInputStream(); + } + + public ServletOutputStream getOutputStream() throws IOException { + return this.asyncContext.getResponse().getOutputStream(); + } + + public void readComplete() { + if (complete.compareAndSet(WRITE_COMPLETE, COMPLETE)) { + this.asyncContext.complete(); + } + else { + this.complete.compareAndSet(0, READ_COMPLETE); + } + } + + public void writeComplete() { + if (complete.compareAndSet(READ_COMPLETE, COMPLETE)) { + this.asyncContext.complete(); + } + else { + this.complete.compareAndSet(0, WRITE_COMPLETE); + } + } +} diff --git a/spring-web-reactive/src/test/java/org/springframework/rx/io/ByteBufPublisherInputStreamTests.java b/spring-web-reactive/src/test/java/org/springframework/rx/io/ByteBufPublisherInputStreamTests.java index 24a7d9b0e94..cd8fa33ddf1 100644 --- a/spring-web-reactive/src/test/java/org/springframework/rx/io/ByteBufPublisherInputStreamTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/rx/io/ByteBufPublisherInputStreamTests.java @@ -16,15 +16,10 @@ package org.springframework.rx.io; -import java.io.EOFException; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import org.junit.Before; import org.junit.Test; -import org.springframework.rx.util.BlockingByteBufQueue; -import org.springframework.rx.util.BlockingByteBufQueuePublisher; +import org.springframework.rx.util.BlockingSignalQueue; import static org.junit.Assert.*; @@ -33,25 +28,22 @@ import static org.junit.Assert.*; */ public class ByteBufPublisherInputStreamTests { - private BlockingByteBufQueue queue; + private BlockingSignalQueue queue; - private ByteBufPublisherInputStream is; + private ByteArrayPublisherInputStream is; @Before public void setUp() throws Exception { - queue = new BlockingByteBufQueue(); - is = new ByteBufPublisherInputStream(queue); + queue = new BlockingSignalQueue(); + is = new ByteArrayPublisherInputStream(queue); } @Test public void readSingleByte() throws Exception { - ByteBuf abc = Unpooled.copiedBuffer(new byte[]{'a', 'b', 'c'}); - ByteBuf def = Unpooled.copiedBuffer(new byte[]{'d', 'e', 'f'}); - - queue.putBuffer(abc); - queue.putBuffer(def); + queue.putSignal(new byte[]{'a', 'b', 'c'}); + queue.putSignal(new byte[]{'d', 'e', 'f'}); queue.complete(); @@ -75,11 +67,8 @@ public class ByteBufPublisherInputStreamTests { @Test public void readBytes() throws Exception { - ByteBuf abc = Unpooled.copiedBuffer(new byte[]{'a', 'b', 'c'}); - ByteBuf def = Unpooled.copiedBuffer(new byte[]{'d', 'e', 'f'}); - - queue.putBuffer(abc); - queue.putBuffer(def); + queue.putSignal(new byte[]{'a', 'b', 'c'}); + queue.putSignal(new byte[]{'d', 'e', 'f'}); queue.complete(); byte[] buf = new byte[2]; diff --git a/spring-web-reactive/src/test/java/org/springframework/rx/util/BlockingByteBufQueuePublisherTests.java b/spring-web-reactive/src/test/java/org/springframework/rx/util/BlockingByteBufQueuePublisherTests.java index 719324f4b27..5c149668185 100644 --- a/spring-web-reactive/src/test/java/org/springframework/rx/util/BlockingByteBufQueuePublisherTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/rx/util/BlockingByteBufQueuePublisherTests.java @@ -33,14 +33,14 @@ import org.reactivestreams.Subscription; */ public class BlockingByteBufQueuePublisherTests { - private BlockingByteBufQueue queue; + private BlockingSignalQueue queue; - private BlockingByteBufQueuePublisher publisher; + private BlockingSignalQueuePublisher publisher; @Before public void setUp() throws Exception { - queue = new BlockingByteBufQueue(); - publisher = new BlockingByteBufQueuePublisher(queue); + queue = new BlockingSignalQueue(); + publisher = new BlockingSignalQueuePublisher(queue); } @Test @@ -48,8 +48,8 @@ public class BlockingByteBufQueuePublisherTests { ByteBuf abc = Unpooled.copiedBuffer(new byte[]{'a', 'b', 'c'}); ByteBuf def = Unpooled.copiedBuffer(new byte[]{'d', 'e', 'f'}); - queue.putBuffer(abc); - queue.putBuffer(def); + queue.putSignal(abc); + queue.putSignal(def); queue.complete(); final AtomicBoolean complete = new AtomicBoolean(false); @@ -90,8 +90,8 @@ public class BlockingByteBufQueuePublisherTests { ByteBuf abc = Unpooled.copiedBuffer(new byte[]{'a', 'b', 'c'}); ByteBuf def = Unpooled.copiedBuffer(new byte[]{'d', 'e', 'f'}); - queue.putBuffer(abc); - queue.putBuffer(def); + queue.putSignal(abc); + queue.putSignal(def); queue.complete(); final AtomicBoolean complete = new AtomicBoolean(false); diff --git a/spring-web-reactive/src/test/java/org/springframework/rx/util/BlockingByteBufQueueTests.java b/spring-web-reactive/src/test/java/org/springframework/rx/util/BlockingByteBufQueueTests.java index 5f16234fcc8..882c6a13c27 100644 --- a/spring-web-reactive/src/test/java/org/springframework/rx/util/BlockingByteBufQueueTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/rx/util/BlockingByteBufQueueTests.java @@ -27,11 +27,11 @@ import org.junit.Test; */ public class BlockingByteBufQueueTests { - private BlockingByteBufQueue queue; + private BlockingSignalQueue queue; @Before public void setUp() throws Exception { - queue = new BlockingByteBufQueue(); + queue = new BlockingSignalQueue(); } @Test @@ -39,41 +39,37 @@ public class BlockingByteBufQueueTests { ByteBuf abc = Unpooled.copiedBuffer(new byte[]{'a', 'b', 'c'}); ByteBuf def = Unpooled.copiedBuffer(new byte[]{'d', 'e', 'f'}); - queue.putBuffer(abc); - queue.putBuffer(def); + queue.putSignal(abc); + queue.putSignal(def); queue.complete(); - assertTrue(queue.isHeadBuffer()); + assertTrue(queue.isHeadSignal()); assertFalse(queue.isHeadError()); - assertSame(abc, queue.pollBuffer()); + assertSame(abc, queue.pollSignal()); - assertTrue(queue.isHeadBuffer()); + assertTrue(queue.isHeadSignal()); assertFalse(queue.isHeadError()); - assertSame(def, queue.pollBuffer()); + assertSame(def, queue.pollSignal()); assertTrue(queue.isComplete()); } - @Test - public void empty() throws Exception { - assertNull(queue.pollBuffer()); - } @Test public void error() throws Exception { ByteBuf abc = Unpooled.copiedBuffer(new byte[]{'a', 'b', 'c'}); Throwable error = new IllegalStateException(); - queue.putBuffer(abc); + queue.putSignal(abc); queue.putError(error); queue.complete(); - assertTrue(queue.isHeadBuffer()); + assertTrue(queue.isHeadSignal()); assertFalse(queue.isHeadError()); - assertSame(abc, queue.pollBuffer()); + assertSame(abc, queue.pollSignal()); assertTrue(queue.isHeadError()); - assertFalse(queue.isHeadBuffer()); + assertFalse(queue.isHeadSignal()); assertSame(error, queue.pollError()); assertTrue(queue.isComplete());