From 00617d74de0f6cff02fbc752305baa7a9f574438 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Fri, 2 Sep 2016 11:19:56 +0300 Subject: [PATCH] Fix AbstractRequestBodyPublisher to comply with the spec As per specification "The Subscription MUST allow the Subscriber to call Subscription.request synchronously from within onNext or onSubscribe". With the current implementation if Subscription.request is called more than once when Subscriber.onSubscribe ISE will be thrown - java.lang.IllegalStateException: DEMAND. With this fix the implementation will not throw ISE and will allow many invocations of Subscription.request when Subscriber.onSubscribe. --- .../AbstractRequestBodyPublisher.java | 7 ++ .../AbstractRequestBodyPublisherTests.java | 87 +++++++++++++++++++ 2 files changed, 94 insertions(+) create mode 100644 spring-web/src/test/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisherTests.java diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java index 719362ef68f..79717c196a5 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java @@ -252,6 +252,13 @@ abstract class AbstractRequestBodyPublisher implements Publisher { * {@link #NO_DEMAND} if there is no demand. */ DEMAND { + @Override + void request(AbstractRequestBodyPublisher publisher, long n) { + if (Operators.checkRequest(n, publisher.subscriber)) { + Operators.addAndGet(publisher.demand, n); + } + } + @Override void onDataAvailable(AbstractRequestBodyPublisher publisher) { if (publisher.changeState(this, READING)) { diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisherTests.java b/spring-web/src/test/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisherTests.java new file mode 100644 index 00000000000..cda831cc08c --- /dev/null +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisherTests.java @@ -0,0 +1,87 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.server.reactive; + +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.isA; +import static org.mockito.Mockito.mock; + +import java.io.IOException; + +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.springframework.core.io.buffer.DataBuffer; + +import static org.junit.Assert.assertTrue; + +/** + * Unit tests for {@link AbstractRequestBodyPublisher} + * + * @author Violeta Georgieva + * @since 5.0 + */ +public class AbstractRequestBodyPublisherTests { + + @Test + public void testReceiveTwoRequestCallsWhenOnSubscribe() { + @SuppressWarnings("unchecked") + Subscriber subscriber = mock(Subscriber.class); + doAnswer(new SubscriptionAnswer()).when(subscriber).onSubscribe(isA(Subscription.class)); + + TestRequestBodyPublisher publisher = new TestRequestBodyPublisher(); + publisher.subscribe(subscriber); + publisher.onDataAvailable(); + + assertTrue(publisher.getReadCalls() == 2); + } + + private static final class TestRequestBodyPublisher extends AbstractRequestBodyPublisher { + + private int readCalls = 0; + + @Override + protected void checkOnDataAvailable() { + // no-op + } + + @Override + protected DataBuffer read() throws IOException { + readCalls++; + return mock(DataBuffer.class); + } + + public int getReadCalls() { + return this.readCalls; + } + + } + + private static final class SubscriptionAnswer implements Answer { + + @Override + public Subscription answer(InvocationOnMock invocation) throws Throwable { + Subscription arg = (Subscription) invocation.getArguments()[0]; + arg.request(1); + arg.request(1); + return arg; + } + + } +}