From 301528665d4ffd722c34cd26ebc2d74084d8c433 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Wed, 27 Jul 2016 17:44:24 +0300 Subject: [PATCH] Refactor AbstractResponseBodyFlushProcessor states With the current state machine - the implementation can hang after the last element when executing on Jetty. - in some cases there will be no flush after the last Publisher. --- .../AbstractResponseBodyFlushProcessor.java | 52 ++++++----- .../WriteOnlyHandlerIntegrationTests.java | 86 +++++++++++++++++++ 2 files changed, 117 insertions(+), 21 deletions(-) create mode 100644 spring-web/src/test/java/org/springframework/http/server/reactive/WriteOnlyHandlerIntegrationTests.java diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java index 811cb78a96c..207d9b1c2bc 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java @@ -128,7 +128,7 @@ abstract class AbstractResponseBodyFlushProcessor public void onSubscribe(AbstractResponseBodyFlushProcessor processor, Subscription subscription) { Objects.requireNonNull(subscription, "Subscription cannot be null"); - if (processor.changeState(this, SUBSCRIBED)) { + if (processor.changeState(this, REQUESTED)) { processor.subscription = subscription; subscription.request(1); } @@ -136,40 +136,55 @@ abstract class AbstractResponseBodyFlushProcessor super.onSubscribe(processor, subscription); } } - }, SUBSCRIBED { + }, + REQUESTED { @Override public void onNext(AbstractResponseBodyFlushProcessor processor, Publisher chunk) { - Processor chunkProcessor = - processor.createBodyProcessor(); - chunk.subscribe(chunkProcessor); - chunkProcessor.subscribe(new WriteSubscriber(processor)); + if (processor.changeState(this, RECEIVED)) { + Processor chunkProcessor = + processor.createBodyProcessor(); + chunk.subscribe(chunkProcessor); + chunkProcessor.subscribe(new WriteSubscriber(processor)); + } } @Override void onComplete(AbstractResponseBodyFlushProcessor processor) { - processor.subscriberCompleted = true; + if (processor.changeState(this, COMPLETED)) { + processor.publisherDelegate.publishComplete(); + } } - + }, + RECEIVED { @Override public void writeComplete(AbstractResponseBodyFlushProcessor processor) { + try { + processor.flush(); + } + catch (IOException ex) { + processor.cancel(); + processor.onError(ex); + } + if (processor.subscriberCompleted) { if (processor.changeState(this, COMPLETED)) { processor.publisherDelegate.publishComplete(); } } else { - try { - processor.flush(); + if (processor.changeState(this, REQUESTED)) { processor.subscription.request(1); } - catch (IOException ex) { - processor.cancel(); - processor.onError(ex); - } } } - }, COMPLETED { + + @Override + void onComplete(AbstractResponseBodyFlushProcessor processor) { + processor.subscriberCompleted = true; + } + }, + COMPLETED { @Override public void onNext(AbstractResponseBodyFlushProcessor processor, Publisher publisher) { @@ -186,11 +201,6 @@ abstract class AbstractResponseBodyFlushProcessor void onComplete(AbstractResponseBodyFlushProcessor processor) { // ignore } - - @Override - public void writeComplete(AbstractResponseBodyFlushProcessor processor) { - // ignore - } }; public void onSubscribe(AbstractResponseBodyFlushProcessor processor, @@ -214,7 +224,7 @@ abstract class AbstractResponseBodyFlushProcessor } public void writeComplete(AbstractResponseBodyFlushProcessor processor) { - throw new IllegalStateException(toString()); + // ignore } private static class WriteSubscriber implements Subscriber { diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/WriteOnlyHandlerIntegrationTests.java b/spring-web/src/test/java/org/springframework/http/server/reactive/WriteOnlyHandlerIntegrationTests.java new file mode 100644 index 00000000000..b7bafdfe420 --- /dev/null +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/WriteOnlyHandlerIntegrationTests.java @@ -0,0 +1,86 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.server.reactive; + +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.Random; + +import org.junit.Test; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.http.RequestEntity; +import org.springframework.http.ResponseEntity; +import org.springframework.http.server.reactive.HttpHandler; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpResponse; +import org.springframework.web.client.RestTemplate; + +import static org.junit.Assert.*; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * @author Violeta Georgieva + * @since 5.0 + */ +public class WriteOnlyHandlerIntegrationTests extends AbstractHttpHandlerIntegrationTests { + + private static final int REQUEST_SIZE = 4096 * 3; + + private Random rnd = new Random(); + + private byte[] body; + + + @Override + protected WriteOnlyHandler createHttpHandler() { + return new WriteOnlyHandler(); + } + + + @Test + public void writeOnly() throws Exception { + RestTemplate restTemplate = new RestTemplate(); + + this.body = randomBytes(); + RequestEntity request = RequestEntity.post( + new URI("http://localhost:" + port)).body( + "".getBytes(StandardCharsets.UTF_8)); + ResponseEntity response = restTemplate.exchange(request, byte[].class); + + assertArrayEquals(body, response.getBody()); + } + + + private byte[] randomBytes() { + byte[] buffer = new byte[REQUEST_SIZE]; + rnd.nextBytes(buffer); + return buffer; + } + + + public class WriteOnlyHandler implements HttpHandler { + + @Override + public Mono handle(ServerHttpRequest request, ServerHttpResponse response) { + DataBuffer buffer = response.bufferFactory().allocateBuffer(body.length); + buffer.write(body); + return response.writeAndFlushWith(Flux.just(Flux.just(buffer))); + } + } +}