diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/MyTest.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/MyTest.java new file mode 100644 index 00000000000..47f975347a7 --- /dev/null +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/MyTest.java @@ -0,0 +1,82 @@ +/* + * Copyright 2002-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.web.reactive.function.client; + +import java.io.IOException; +import java.time.Duration; +import java.util.function.Consumer; + +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.util.concurrent.ImmediateEventExecutor; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; +import reactor.core.publisher.Mono; +import reactor.netty.FutureMono; +import reactor.netty.http.client.HttpClient; +import reactor.netty.resources.ConnectionProvider; +import reactor.netty.resources.LoopResources; +import reactor.netty.tcp.TcpClient; +import reactor.test.StepVerifier; + +import org.springframework.http.HttpHeaders; +import org.springframework.http.client.reactive.ClientHttpConnector; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; +import org.springframework.mock.web.test.server.MockWebSession; + +import static org.junit.Assert.*; + +/** + * + * @author Rossen Stoyanchev + */ +public class MyTest { + + public static void main(String[] args) throws IOException { + +LoopResources resources = LoopResources.create("test-loop"); +ConnectionProvider provider = ConnectionProvider.elastic("test-pool"); +TcpClient tcpClient = TcpClient.create(provider).runOn(resources, false); +HttpClient httpClient = HttpClient.from(tcpClient); + +WebClient webClient = WebClient.builder() + .clientConnector(new ReactorClientHttpConnector(httpClient)) + .build(); + +makeCalls(webClient); + +provider.dispose(); +resources.dispose(); + +//Mono result1 = FutureMono.from(channelGroup.close()); +//Mono result2 = connProvider.disposeLater(); +//Mono result3 = loopResources.disposeLater(); +//Mono.whenDelayError(result1, result2, result3).block(Duration.ofSeconds(5)); + + System.in.read(); + System.exit(0); + + } + + private static void makeCalls(WebClient webClient) { + webClient.get().uri("http://httpbin.org/ip") + .retrieve() + .bodyToMono(String.class) + .block(Duration.ofSeconds(5)); + } + +} diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/handler/ConcurrentWebSocketSessionDecoratorTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/handler/ConcurrentWebSocketSessionDecoratorTests.java index 5af30742d6f..e790ace4ce1 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/handler/ConcurrentWebSocketSessionDecoratorTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/handler/ConcurrentWebSocketSessionDecoratorTests.java @@ -32,103 +32,74 @@ import org.springframework.web.socket.WebSocketSession; import static org.junit.Assert.*; /** - * Unit tests for - * {@link org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator}. - * + * Unit tests for {@link ConcurrentWebSocketSessionDecorator}. * @author Rossen Stoyanchev */ @SuppressWarnings("resource") public class ConcurrentWebSocketSessionDecoratorTests { - @Test public void send() throws IOException { TestWebSocketSession session = new TestWebSocketSession(); session.setOpen(true); - ConcurrentWebSocketSessionDecorator concurrentSession = + ConcurrentWebSocketSessionDecorator decorator = new ConcurrentWebSocketSessionDecorator(session, 1000, 1024); TextMessage textMessage = new TextMessage("payload"); - concurrentSession.sendMessage(textMessage); + decorator.sendMessage(textMessage); assertEquals(1, session.getSentMessages().size()); assertEquals(textMessage, session.getSentMessages().get(0)); - assertEquals(0, concurrentSession.getBufferSize()); - assertEquals(0, concurrentSession.getTimeSinceSendStarted()); + assertEquals(0, decorator.getBufferSize()); + assertEquals(0, decorator.getTimeSinceSendStarted()); assertTrue(session.isOpen()); } @Test public void sendAfterBlockedSend() throws IOException, InterruptedException { - BlockingSession blockingSession = new BlockingSession(); - blockingSession.setOpen(true); - CountDownLatch sentMessageLatch = blockingSession.getSentMessageLatch(); + BlockingSession session = new BlockingSession(); + session.setOpen(true); - final ConcurrentWebSocketSessionDecorator concurrentSession = - new ConcurrentWebSocketSessionDecorator(blockingSession, 10 * 1000, 1024); + final ConcurrentWebSocketSessionDecorator decorator = + new ConcurrentWebSocketSessionDecorator(session, 10 * 1000, 1024); - Executors.newSingleThreadExecutor().submit((Runnable) () -> { - TextMessage message = new TextMessage("slow message"); - try { - concurrentSession.sendMessage(message); - } - catch (IOException e) { - e.printStackTrace(); - } - }); + sendBlockingMessage(decorator); - assertTrue(sentMessageLatch.await(5, TimeUnit.SECONDS)); - - // ensure some send time elapses - Thread.sleep(100); - assertTrue(concurrentSession.getTimeSinceSendStarted() > 0); + Thread.sleep(50); + assertTrue(decorator.getTimeSinceSendStarted() > 0); TextMessage payload = new TextMessage("payload"); for (int i = 0; i < 5; i++) { - concurrentSession.sendMessage(payload); + decorator.sendMessage(payload); } - assertTrue(concurrentSession.getTimeSinceSendStarted() > 0); - assertEquals(5 * payload.getPayloadLength(), concurrentSession.getBufferSize()); - assertTrue(blockingSession.isOpen()); + assertTrue(decorator.getTimeSinceSendStarted() > 0); + assertEquals(5 * payload.getPayloadLength(), decorator.getBufferSize()); + assertTrue(session.isOpen()); } @Test public void sendTimeLimitExceeded() throws IOException, InterruptedException { - BlockingSession blockingSession = new BlockingSession(); - blockingSession.setId("123"); - blockingSession.setOpen(true); - CountDownLatch sentMessageLatch = blockingSession.getSentMessageLatch(); - - int sendTimeLimit = 100; - int bufferSizeLimit = 1024; + BlockingSession session = new BlockingSession(); + session.setId("123"); + session.setOpen(true); - final ConcurrentWebSocketSessionDecorator concurrentSession = - new ConcurrentWebSocketSessionDecorator(blockingSession, sendTimeLimit, bufferSizeLimit); + final ConcurrentWebSocketSessionDecorator decorator = + new ConcurrentWebSocketSessionDecorator(session, 100, 1024); - Executors.newSingleThreadExecutor().submit((Runnable) () -> { - TextMessage message = new TextMessage("slow message"); - try { - concurrentSession.sendMessage(message); - } - catch (IOException e) { - e.printStackTrace(); - } - }); + sendBlockingMessage(decorator); - assertTrue(sentMessageLatch.await(5, TimeUnit.SECONDS)); - - // ensure some send time elapses - Thread.sleep(sendTimeLimit + 100); + // Exceed send time.. + Thread.sleep(200); try { TextMessage payload = new TextMessage("payload"); - concurrentSession.sendMessage(payload); + decorator.sendMessage(payload); fail("Expected exception"); } catch (SessionLimitExceededException ex) { @@ -142,28 +113,14 @@ public class ConcurrentWebSocketSessionDecoratorTests { @Test public void sendBufferSizeExceeded() throws IOException, InterruptedException { - BlockingSession blockingSession = new BlockingSession(); - blockingSession.setId("123"); - blockingSession.setOpen(true); - CountDownLatch sentMessageLatch = blockingSession.getSentMessageLatch(); - - int sendTimeLimit = 10 * 1000; - int bufferSizeLimit = 1024; - - final ConcurrentWebSocketSessionDecorator concurrentSession = - new ConcurrentWebSocketSessionDecorator(blockingSession, sendTimeLimit, bufferSizeLimit); + BlockingSession session = new BlockingSession(); + session.setId("123"); + session.setOpen(true); - Executors.newSingleThreadExecutor().submit((Runnable) () -> { - TextMessage message = new TextMessage("slow message"); - try { - concurrentSession.sendMessage(message); - } - catch (IOException e) { - e.printStackTrace(); - } - }); + final ConcurrentWebSocketSessionDecorator decorator = + new ConcurrentWebSocketSessionDecorator(session, 10*1000, 1024); - assertTrue(sentMessageLatch.await(5, TimeUnit.SECONDS)); + sendBlockingMessage(decorator); StringBuilder sb = new StringBuilder(); for (int i = 0 ; i < 1023; i++) { @@ -171,13 +128,13 @@ public class ConcurrentWebSocketSessionDecoratorTests { } TextMessage message = new TextMessage(sb.toString()); - concurrentSession.sendMessage(message); + decorator.sendMessage(message); - assertEquals(1023, concurrentSession.getBufferSize()); - assertTrue(blockingSession.isOpen()); + assertEquals(1023, decorator.getBufferSize()); + assertTrue(session.isOpen()); try { - concurrentSession.sendMessage(message); + decorator.sendMessage(message); fail("Expected exception"); } catch (SessionLimitExceededException ex) { @@ -191,35 +148,35 @@ public class ConcurrentWebSocketSessionDecoratorTests { @Test public void closeStatusNormal() throws Exception { - BlockingSession delegate = new BlockingSession(); - delegate.setOpen(true); - WebSocketSession decorator = new ConcurrentWebSocketSessionDecorator(delegate, 10 * 1000, 1024); + BlockingSession session = new BlockingSession(); + session.setOpen(true); + WebSocketSession decorator = new ConcurrentWebSocketSessionDecorator(session, 10 * 1000, 1024); decorator.close(CloseStatus.PROTOCOL_ERROR); - assertEquals(CloseStatus.PROTOCOL_ERROR, delegate.getCloseStatus()); + assertEquals(CloseStatus.PROTOCOL_ERROR, session.getCloseStatus()); decorator.close(CloseStatus.SERVER_ERROR); - assertEquals("Should have been ignored", CloseStatus.PROTOCOL_ERROR, delegate.getCloseStatus()); + assertEquals("Should have been ignored", CloseStatus.PROTOCOL_ERROR, session.getCloseStatus()); } @Test public void closeStatusChangesToSessionNotReliable() throws Exception { - BlockingSession blockingSession = new BlockingSession(); - blockingSession.setId("123"); - blockingSession.setOpen(true); - CountDownLatch sentMessageLatch = blockingSession.getSentMessageLatch(); + BlockingSession session = new BlockingSession(); + session.setId("123"); + session.setOpen(true); + CountDownLatch sentMessageLatch = session.getSentMessageLatch(); int sendTimeLimit = 100; int bufferSizeLimit = 1024; - final ConcurrentWebSocketSessionDecorator concurrentSession = - new ConcurrentWebSocketSessionDecorator(blockingSession, sendTimeLimit, bufferSizeLimit); + final ConcurrentWebSocketSessionDecorator decorator = + new ConcurrentWebSocketSessionDecorator(session, sendTimeLimit, bufferSizeLimit); Executors.newSingleThreadExecutor().submit((Runnable) () -> { TextMessage message = new TextMessage("slow message"); try { - concurrentSession.sendMessage(message); + decorator.sendMessage(message); } catch (IOException e) { e.printStackTrace(); @@ -231,10 +188,24 @@ public class ConcurrentWebSocketSessionDecoratorTests { // ensure some send time elapses Thread.sleep(sendTimeLimit + 100); - concurrentSession.close(CloseStatus.PROTOCOL_ERROR); + decorator.close(CloseStatus.PROTOCOL_ERROR); assertEquals("CloseStatus should have changed to SESSION_NOT_RELIABLE", - CloseStatus.SESSION_NOT_RELIABLE, blockingSession.getCloseStatus()); + CloseStatus.SESSION_NOT_RELIABLE, session.getCloseStatus()); + } + + private void sendBlockingMessage(ConcurrentWebSocketSessionDecorator session) throws InterruptedException { + Executors.newSingleThreadExecutor().submit(() -> { + TextMessage message = new TextMessage("slow message"); + try { + session.sendMessage(message); + } + catch (IOException e) { + e.printStackTrace(); + } + }); + BlockingSession delegate = (BlockingSession) session.getDelegate(); + assertTrue(delegate.getSentMessageLatch().await(5, TimeUnit.SECONDS)); }