Browse Source

Fix AbstractRequestBodyPublisher to comply with the spec

As per specification "The Subscription MUST allow the Subscriber to
call Subscription.request synchronously from within onNext or
onSubscribe". With the current implementation if Subscription.request
is called more than once when Subscriber.onSubscribe ISE will be
thrown - java.lang.IllegalStateException: DEMAND.
With this fix the implementation will not throw ISE and will allow
many invocations of Subscription.request when
Subscriber.onSubscribe.
pull/1158/head
Violeta Georgieva 10 years ago committed by Brian Clozel
parent
commit
00617d74de
  1. 7
      spring-web/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java
  2. 87
      spring-web/src/test/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisherTests.java

7
spring-web/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java

@ -252,6 +252,13 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> { @@ -252,6 +252,13 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
* {@link #NO_DEMAND} if there is no demand.
*/
DEMAND {
@Override
void request(AbstractRequestBodyPublisher publisher, long n) {
if (Operators.checkRequest(n, publisher.subscriber)) {
Operators.addAndGet(publisher.demand, n);
}
}
@Override
void onDataAvailable(AbstractRequestBodyPublisher publisher) {
if (publisher.changeState(this, READING)) {

87
spring-web/src/test/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisherTests.java

@ -0,0 +1,87 @@ @@ -0,0 +1,87 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.server.reactive;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.isA;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.core.io.buffer.DataBuffer;
import static org.junit.Assert.assertTrue;
/**
* Unit tests for {@link AbstractRequestBodyPublisher}
*
* @author Violeta Georgieva
* @since 5.0
*/
public class AbstractRequestBodyPublisherTests {
@Test
public void testReceiveTwoRequestCallsWhenOnSubscribe() {
@SuppressWarnings("unchecked")
Subscriber<DataBuffer> subscriber = mock(Subscriber.class);
doAnswer(new SubscriptionAnswer()).when(subscriber).onSubscribe(isA(Subscription.class));
TestRequestBodyPublisher publisher = new TestRequestBodyPublisher();
publisher.subscribe(subscriber);
publisher.onDataAvailable();
assertTrue(publisher.getReadCalls() == 2);
}
private static final class TestRequestBodyPublisher extends AbstractRequestBodyPublisher {
private int readCalls = 0;
@Override
protected void checkOnDataAvailable() {
// no-op
}
@Override
protected DataBuffer read() throws IOException {
readCalls++;
return mock(DataBuffer.class);
}
public int getReadCalls() {
return this.readCalls;
}
}
private static final class SubscriptionAnswer implements Answer<Subscription> {
@Override
public Subscription answer(InvocationOnMock invocation) throws Throwable {
Subscription arg = (Subscription) invocation.getArguments()[0];
arg.request(1);
arg.request(1);
return arg;
}
}
}
Loading…
Cancel
Save