From 97bd0ccfecae9ee6f72fde06b16e9c670caed174 Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Thu, 4 Sep 2014 00:55:38 +0200 Subject: [PATCH] Polishing --- .../AsyncAnnotationBeanPostProcessor.java | 6 +- .../concurrent/ListenableFutureAdapter.java | 14 +- .../concurrent/ListenableFutureCallback.java | 6 +- .../ListenableFutureCallbackRegistry.java | 46 ++-- .../concurrent/ListenableFutureTaskTests.java | 18 +- .../stomp/StompBrokerRelayMessageHandler.java | 128 +++++----- .../web/client/AsyncRestTemplate.java | 197 +++++++-------- ...stractAsyncHttpRequestFactoryTestCase.java | 81 +++---- .../AsyncRestTemplateIntegrationTests.java | 225 +++++++----------- .../client/WebSocketConnectionManager.java | 17 +- 10 files changed, 306 insertions(+), 432 deletions(-) diff --git a/spring-context/src/main/java/org/springframework/scheduling/annotation/AsyncAnnotationBeanPostProcessor.java b/spring-context/src/main/java/org/springframework/scheduling/annotation/AsyncAnnotationBeanPostProcessor.java index 202f2ae47d5..9f8fe063958 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/annotation/AsyncAnnotationBeanPostProcessor.java +++ b/spring-context/src/main/java/org/springframework/scheduling/annotation/AsyncAnnotationBeanPostProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -49,8 +49,7 @@ import org.springframework.util.Assert; * @see #setBeforeExistingAdvisors */ @SuppressWarnings("serial") -public class AsyncAnnotationBeanPostProcessor extends AbstractAdvisingBeanPostProcessor - implements BeanFactoryAware { +public class AsyncAnnotationBeanPostProcessor extends AbstractAdvisingBeanPostProcessor implements BeanFactoryAware { private Class asyncAnnotationType; @@ -61,6 +60,7 @@ public class AsyncAnnotationBeanPostProcessor extends AbstractAdvisingBeanPostPr setBeforeExistingAdvisors(true); } + /** * Set the 'async' annotation type to be detected at either class or method * level. By default, both the {@link Async} annotation and the EJB 3.1 diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureAdapter.java b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureAdapter.java index 4123e0b5385..6eefa695744 100644 --- a/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureAdapter.java +++ b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,18 +30,16 @@ import java.util.concurrent.ExecutionException; * @author Arjen Poutsma * @since 4.0 */ -public abstract class ListenableFutureAdapter extends FutureAdapter - implements ListenableFuture { +public abstract class ListenableFutureAdapter extends FutureAdapter implements ListenableFuture { /** - * Constructs a new {@code ListenableFutureAdapter} with the given adaptee. + * Construct a new {@code ListenableFutureAdapter} with the given adaptee. * @param adaptee the future to adaptee to */ protected ListenableFutureAdapter(ListenableFuture adaptee) { super(adaptee); } - @Override public void addCallback(final ListenableFutureCallback callback) { ListenableFuture listenableAdaptee = (ListenableFuture) getAdaptee(); @@ -59,11 +57,11 @@ public abstract class ListenableFutureAdapter extends FutureAdapter onFailure(t); } } - @Override - public void onFailure(Throwable t) { - callback.onFailure(t); + public void onFailure(Throwable ex) { + callback.onFailure(ex); } }); } + } diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallback.java b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallback.java index d112f9fa142..924c443965f 100644 --- a/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallback.java +++ b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallback.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,8 +33,8 @@ public interface ListenableFutureCallback { /** * Called when the {@link ListenableFuture} fails to complete. - * @param t the exception that triggered the failure + * @param ex the exception that triggered the failure */ - void onFailure(Throwable t); + void onFailure(Throwable ex); } diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallbackRegistry.java b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallbackRegistry.java index ca76f6d72fa..0f1873de5ff 100644 --- a/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallbackRegistry.java +++ b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallbackRegistry.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -48,54 +48,52 @@ public class ListenableFutureCallbackRegistry { @SuppressWarnings("unchecked") public void addCallback(ListenableFutureCallback callback) { Assert.notNull(callback, "'callback' must not be null"); - - synchronized (mutex) { - switch (state) { + synchronized (this.mutex) { + switch (this.state) { case NEW: - callbacks.add(callback); + this.callbacks.add(callback); break; case SUCCESS: - callback.onSuccess((T)result); + callback.onSuccess((T) this.result); break; case FAILURE: - callback.onFailure((Throwable) result); + callback.onFailure((Throwable) this.result); break; } } } /** - * Triggers a {@link ListenableFutureCallback#onSuccess(Object)} call on all added - * callbacks with the given result + * Triggers a {@link ListenableFutureCallback#onSuccess(Object)} call on all + * added callbacks with the given result. * @param result the result to trigger the callbacks with */ public void success(T result) { - synchronized (mutex) { - state = State.SUCCESS; + synchronized (this.mutex) { + this.state = State.SUCCESS; this.result = result; - - while (!callbacks.isEmpty()) { - callbacks.poll().onSuccess(result); + while (!this.callbacks.isEmpty()) { + this.callbacks.poll().onSuccess(result); } } } /** - * Triggers a {@link ListenableFutureCallback#onFailure(Throwable)} call on all added - * callbacks with the given {@code Throwable}. - * @param t the exception to trigger the callbacks with + * Triggers a {@link ListenableFutureCallback#onFailure(Throwable)} call on all + * added callbacks with the given {@code Throwable}. + * @param ex the exception to trigger the callbacks with */ - public void failure(Throwable t) { - synchronized (mutex) { - state = State.FAILURE; - this.result = t; - - while (!callbacks.isEmpty()) { - callbacks.poll().onFailure(t); + public void failure(Throwable ex) { + synchronized (this.mutex) { + this.state = State.FAILURE; + this.result = ex; + while (!this.callbacks.isEmpty()) { + this.callbacks.poll().onFailure(ex); } } } + private enum State {NEW, SUCCESS, FAILURE} } diff --git a/spring-core/src/test/java/org/springframework/util/concurrent/ListenableFutureTaskTests.java b/spring-core/src/test/java/org/springframework/util/concurrent/ListenableFutureTaskTests.java index 84d351e1817..66eb49d1e9c 100644 --- a/spring-core/src/test/java/org/springframework/util/concurrent/ListenableFutureTaskTests.java +++ b/spring-core/src/test/java/org/springframework/util/concurrent/ListenableFutureTaskTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,10 +20,10 @@ import java.io.IOException; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; import org.junit.Test; +import static org.junit.Assert.*; + /** * @author Arjen Poutsma */ @@ -44,10 +44,9 @@ public class ListenableFutureTaskTests { public void onSuccess(String result) { assertEquals(s, result); } - @Override - public void onFailure(Throwable t) { - fail(t.getMessage()); + public void onFailure(Throwable ex) { + fail(ex.getMessage()); } }); task.run(); @@ -68,15 +67,12 @@ public class ListenableFutureTaskTests { public void onSuccess(String result) { fail("onSuccess not expected"); } - @Override - public void onFailure(Throwable t) { - assertEquals(s, t.getMessage()); + public void onFailure(Throwable ex) { + assertEquals(s, ex.getMessage()); } }); task.run(); } - - } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index 1c1f0d4ce44..e27895f3df9 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -41,8 +41,10 @@ import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.util.concurrent.ListenableFutureTask; /** - * A {@link org.springframework.messaging.MessageHandler} that handles messages by forwarding them to a STOMP broker. - * For each new {@link SimpMessageType#CONNECT CONNECT} message, an independent TCP + * A {@link org.springframework.messaging.MessageHandler} that handles messages by + * forwarding them to a STOMP broker. + * + *

For each new {@link SimpMessageType#CONNECT CONNECT} message, an independent TCP * connection to the broker is opened and used exclusively for all messages from the * client that originated the CONNECT message. Messages from the same client are * identified through the session id message header. Reversely, when the STOMP broker @@ -57,10 +59,10 @@ import org.springframework.util.concurrent.ListenableFutureTask; * shared and cannot be used to receive messages. Several properties are provided to * configure the "system" connection including: *

    - *
  • {@link #setSystemLogin(String)}
  • - *
  • {@link #setSystemPasscode(String)}
  • - *
  • {@link #setSystemHeartbeatSendInterval(long)}
  • - *
  • {@link #setSystemHeartbeatReceiveInterval(long)}
  • + *
  • {@link #setSystemLogin(String)}
  • + *
  • {@link #setSystemPasscode(String)}
  • + *
  • {@link #setSystemHeartbeatSendInterval(long)}
  • + *
  • {@link #setSystemHeartbeatReceiveInterval(long)}
  • *
* * @author Rossen Stoyanchev @@ -84,10 +86,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler private static final Message HEARTBEAT_MESSAGE; + static { + EMPTY_TASK.run(); SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(SimpMessageType.HEARTBEAT); HEARTBEAT_MESSAGE = MessageBuilder.withPayload(new byte[] {'\n'}).setHeaders(headers).build(); - EMPTY_TASK.run(); } @@ -124,7 +127,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler /** * Create a StompBrokerRelayMessageHandler instance with the given message channels * and destination prefixes. - * * @param clientInChannel the channel for receiving messages from clients (e.g. WebSocket clients) * @param clientOutChannel the channel for sending messages to clients (e.g. WebSocket clients) * @param brokerChannel the channel for the application to send messages to the broker @@ -135,11 +137,9 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler SubscribableChannel brokerChannel, Collection destinationPrefixes) { super(destinationPrefixes); - Assert.notNull(clientInChannel, "'clientInChannel' must not be null"); Assert.notNull(clientOutChannel, "'clientOutChannel' must not be null"); Assert.notNull(brokerChannel, "'brokerChannel' must not be null"); - this.clientInboundChannel = clientInChannel; this.clientOutboundChannel = clientOutChannel; this.brokerChannel = brokerChannel; @@ -155,7 +155,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } /** - * @return the STOMP message broker host. + * Return the STOMP message broker host. */ public String getRelayHost() { return this.relayHost; @@ -169,7 +169,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } /** - * @return the STOMP message broker port. + * Return the STOMP message broker port. */ public int getRelayPort() { return this.relayPort; @@ -187,7 +187,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } /** - * @return The interval, in milliseconds, at which the "system" connection will + * Return the interval, in milliseconds, at which the "system" connection will * send heartbeats to the STOMP broker. */ public long getSystemHeartbeatSendInterval() { @@ -207,7 +207,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } /** - * @return The interval, in milliseconds, at which the "system" connection expects + * Return the interval, in milliseconds, at which the "system" connection expects * to receive heartbeats from the STOMP broker. */ public long getSystemHeartbeatReceiveInterval() { @@ -217,8 +217,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler /** * Set the login to use when creating connections to the STOMP broker on * behalf of connected clients. - *

- * By default this is set to "guest". + *

By default this is set to "guest". * @see #setSystemLogin(String) */ public void setClientLogin(String clientLogin) { @@ -227,7 +226,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } /** - * @return the configured login to use for connections to the STOMP broker + * Return the configured login to use for connections to the STOMP broker * on behalf of connected clients. * @see #getSystemLogin() */ @@ -236,11 +235,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } /** - * Set the clientPasscode to use to create connections to the STOMP broker on + * Set the client passcode to use to create connections to the STOMP broker on * behalf of connected clients. - *

- * By default this is set to "guest". - * @see #setSystemPasscode(String) + *

By default this is set to "guest". + * @see #setSystemPasscode */ public void setClientPasscode(String clientPasscode) { Assert.hasText(clientPasscode, "clientPasscode must not be empty"); @@ -248,7 +246,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } /** - * @return the configured passocde to use for connections to the STOMP broker on + * Return the configured passcode to use for connections to the STOMP broker on * behalf of connected clients. * @see #getSystemPasscode() */ @@ -260,8 +258,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler * Set the login for the shared "system" connection used to send messages to * the STOMP broker from within the application, i.e. messages not associated * with a specific client session (e.g. REST/HTTP request handling method). - *

- * By default this is set to "guest". + *

By default this is set to "guest". */ public void setSystemLogin(String systemLogin) { Assert.hasText(systemLogin, "systemLogin must not be empty"); @@ -269,7 +266,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } /** - * @return the login used for the shared "system" connection to the STOMP broker + * Return the login used for the shared "system" connection to the STOMP broker. */ public String getSystemLogin() { return this.systemLogin; @@ -279,15 +276,14 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler * Set the passcode for the shared "system" connection used to send messages to * the STOMP broker from within the application, i.e. messages not associated * with a specific client session (e.g. REST/HTTP request handling method). - *

- * By default this is set to "guest". + *

By default this is set to "guest". */ public void setSystemPasscode(String systemPasscode) { this.systemPasscode = systemPasscode; } /** - * @return the passcode used for the shared "system" connection to the STOMP broker + * Return the passcode used for the shared "system" connection to the STOMP broker. */ public String getSystemPasscode() { return this.systemPasscode; @@ -306,7 +302,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } /** - * @return the configured virtual host value. + * Return the configured virtual host value. */ public String getVirtualHost() { return this.virtualHost; @@ -367,7 +363,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @Override protected void stopInternal() { - publishBrokerUnavailableEvent(); this.clientInboundChannel.unsubscribe(this); @@ -376,19 +371,18 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler try { this.tcpClient.shutdown().get(5000, TimeUnit.MILLISECONDS); } - catch (Throwable t) { - logger.error("Error while shutting down TCP client", t); + catch (Throwable ex) { + logger.error("Error in shutdown of TCP client", ex); } } @Override protected void handleMessageInternal(Message message) { - StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); String sessionId = headers.getSessionId(); if (!isBrokerAvailable()) { - if (sessionId == null || sessionId == SystemStompConnectionHandler.SESSION_ID) { + if (sessionId == null || sessionId.equals(SystemStompConnectionHandler.SESSION_ID)) { throw new MessageDeliveryException("Message broker is not active."); } if (SimpMessageType.CONNECT.equals(headers.getMessageType()) && logger.isErrorEnabled()) { @@ -418,7 +412,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler return; } - if ((command != null) && command.requiresDestination() && !checkDestinationPrefix(destination)) { + if (command != null && command.requiresDestination() && !checkDestinationPrefix(destination)) { if (logger.isTraceEnabled()) { logger.trace("Ignoring message to destination=" + destination); } @@ -482,15 +476,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler this(sessionId, connectHeaders, true); } - private StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders, - boolean isRemoteClientSession) { - - Assert.notNull(sessionId, "SessionId must not be null"); - Assert.notNull(connectHeaders, "ConnectHeaders must not be null"); - + private StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders, boolean isClientSession) { + Assert.notNull(sessionId, "'sessionId' must not be null"); + Assert.notNull(connectHeaders, "'connectHeaders' must not be null"); this.sessionId = sessionId; this.connectHeaders = connectHeaders; - this.isRemoteClientSession = isRemoteClientSession; + this.isRemoteClientSession = isClientSession; } public String getSessionId() { @@ -513,7 +504,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler /** * Invoked when any TCP connectivity issue is detected, i.e. failure to establish - * the TCP connection, failure to send a message, missed heartbeat. + * the TCP connection, failure to send a message, missed heartbeat, etc. */ protected void handleTcpConnectionFailure(String errorMessage, Throwable ex) { if (logger.isErrorEnabled()) { @@ -526,9 +517,9 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler try { clearConnection(); } - catch (Throwable t) { + catch (Throwable ex2) { if (logger.isErrorEnabled()) { - logger.error("Failed to close connection: " + t.getMessage()); + logger.error("Failed to close connection: " + ex2.getMessage()); } } } @@ -552,7 +543,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @Override public void handleMessage(Message message) { - StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); headers.setSessionId(this.sessionId); @@ -584,7 +574,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } private void initHeartbeats(StompHeaderAccessor connectedHeaders) { - // Remote clients do their own heartbeat management if (this.isRemoteClientSession) { return; @@ -592,11 +581,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler long clientSendInterval = this.connectHeaders.getHeartbeat()[0]; long clientReceiveInterval = this.connectHeaders.getHeartbeat()[1]; - long serverSendInterval = connectedHeaders.getHeartbeat()[0]; long serverReceiveInterval = connectedHeaders.getHeartbeat()[1]; - if ((clientSendInterval > 0) && (serverReceiveInterval > 0)) { + if (clientSendInterval > 0 && serverReceiveInterval > 0) { long interval = Math.max(clientSendInterval, serverReceiveInterval); this.tcpConnection.onWriteInactivity(new Runnable() { @Override @@ -605,10 +593,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler if (conn != null) { conn.send(HEARTBEAT_MESSAGE).addCallback( new ListenableFutureCallback() { - public void onFailure(Throwable t) { - handleTcpConnectionFailure("Failed to send heartbeat", t); + public void onSuccess(Void result) { + } + public void onFailure(Throwable ex) { + handleTcpConnectionFailure("Failed to send heartbeat", ex); } - public void onSuccess(Void result) {} }); } } @@ -620,8 +609,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler this.tcpConnection.onReadInactivity(new Runnable() { @Override public void run() { - handleTcpConnectionFailure("No hearbeat from broker for more than " + - interval + "ms, closing connection", null); + handleTcpConnectionFailure("No heartbeat from broker for more than " + interval + + "ms, closing connection", null); } }, interval); } @@ -652,17 +641,14 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler this.tcpConnection = null; clearConnection(); } - catch (Throwable t) { - if (logger.isErrorEnabled()) { - // Ignore - } + catch (Throwable ex) { + // Shouldn't happen with connection reset beforehand } } } /** * Forward the given message to the STOMP broker. - * *

The method checks whether we have an active TCP connection and have * received the STOMP CONNECTED frame. For client messages this should be * false only if we lose the TCP connection around the same time when a @@ -671,7 +657,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler * the "system" connection an exception is raised so that components sending * the message have a chance to handle it -- by default the broker message * channel is synchronous. - * *

Note that if messages arrive concurrently around the same time a TCP * connection is lost, there is a brief period of time before the connection * is reset when one or more messages may sneak through and an attempt made @@ -679,13 +664,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler * method simply lets them try and fail. For client sessions that may * result in an additional STOMP ERROR frame(s) being sent downstream but * code handling that downstream should be idempotent in such cases. - * - * @param message the message to send, never {@code null} + * @param message the message to send (never {@code null}) * @return a future to wait for the result */ @SuppressWarnings("unchecked") public ListenableFuture forward(final Message message) { - TcpConnection conn = this.tcpConnection; if (!this.isStompConnected) { @@ -725,12 +708,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } } @Override - public void onFailure(Throwable t) { + public void onFailure(Throwable ex) { if (tcpConnection == null) { // already reset } else { - handleTcpConnectionFailure("Failed to send message " + message, t); + handleTcpConnectionFailure("Failed to send message " + message, ex); } } }); @@ -742,13 +725,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler * Close the TCP connection to the broker and release the connection reference, * Any exception arising from closing the connection is propagated. The caller * must handle and log the exception accordingly. - * *

If the connection belongs to a client session, the connection handler * for the session (basically the current instance) is also released from the * {@link org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler}. */ public void clearConnection() { - if (this.isRemoteClientSession) { if (logger.isDebugEnabled()) { logger.debug("Removing session '" + sessionId + "' (total remaining=" + @@ -772,11 +753,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } } + private class SystemStompConnectionHandler extends StompConnectionHandler { public static final String SESSION_ID = "stompRelaySystemSessionId"; - public SystemStompConnectionHandler(StompHeaderAccessor connectHeaders) { super(SESSION_ID, connectHeaders, false); } @@ -788,8 +769,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } @Override - protected void handleTcpConnectionFailure(String errorMessage, Throwable t) { - super.handleTcpConnectionFailure(errorMessage, t); + protected void handleTcpConnectionFailure(String errorMessage, Throwable ex) { + super.handleTcpConnectionFailure(errorMessage, ex); publishBrokerUnavailableEvent(); } @@ -806,12 +787,13 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler future.get(); return future; } - catch (Throwable t) { - throw new MessageDeliveryException(message, t); + catch (Throwable ex) { + throw new MessageDeliveryException(message, ex); } } } + private static class Reactor11TcpClientFactory { public TcpOperations create(String host, int port) { @@ -820,6 +802,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } } + private static class Reactor10TcpClientFactory { public TcpOperations create(String host, int port) { @@ -828,6 +811,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } } + private static class VoidCallable implements Callable { @Override diff --git a/spring-web/src/main/java/org/springframework/web/client/AsyncRestTemplate.java b/spring-web/src/main/java/org/springframework/web/client/AsyncRestTemplate.java index 03ce53f4d2d..e56a48ee2b0 100644 --- a/spring-web/src/main/java/org/springframework/web/client/AsyncRestTemplate.java +++ b/spring-web/src/main/java/org/springframework/web/client/AsyncRestTemplate.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -88,8 +88,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe */ public AsyncRestTemplate(AsyncListenableTaskExecutor taskExecutor) { Assert.notNull(taskExecutor, "AsyncTaskExecutor must not be null"); - SimpleClientHttpRequestFactory requestFactory = - new SimpleClientHttpRequestFactory(); + SimpleClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory(); requestFactory.setTaskExecutor(taskExecutor); this.syncTemplate = new RestTemplate(requestFactory); setAsyncRequestFactory(requestFactory); @@ -114,8 +113,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe * @param asyncRequestFactory the asynchronous request factory * @param syncRequestFactory the synchronous request factory */ - public AsyncRestTemplate(AsyncClientHttpRequestFactory asyncRequestFactory, - ClientHttpRequestFactory syncRequestFactory) { + public AsyncRestTemplate(AsyncClientHttpRequestFactory asyncRequestFactory, ClientHttpRequestFactory syncRequestFactory) { this(asyncRequestFactory, new RestTemplate(syncRequestFactory)); } @@ -125,8 +123,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe * @param requestFactory the asynchronous request factory to use * @param restTemplate the synchronous template to use */ - public AsyncRestTemplate(AsyncClientHttpRequestFactory requestFactory, - RestTemplate restTemplate) { + public AsyncRestTemplate(AsyncClientHttpRequestFactory requestFactory, RestTemplate restTemplate) { Assert.notNull(restTemplate, "'restTemplate' must not be null"); this.syncTemplate = restTemplate; setAsyncRequestFactory(requestFactory); @@ -195,6 +192,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe return execute(url, HttpMethod.GET, requestCallback, responseExtractor); } + // HEAD @Override @@ -218,30 +216,29 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe // POST @Override - public ListenableFuture postForLocation(String url, HttpEntity request, - Object... uriVariables) throws RestClientException { + public ListenableFuture postForLocation(String url, HttpEntity request, Object... uriVariables) + throws RestClientException { + AsyncRequestCallback requestCallback = httpEntityCallback(request); ResponseExtractor headersExtractor = headersExtractor(); ListenableFuture headersFuture = - execute(url, HttpMethod.POST, requestCallback, headersExtractor, - uriVariables); + execute(url, HttpMethod.POST, requestCallback, headersExtractor, uriVariables); return extractLocationHeader(headersFuture); } @Override - public ListenableFuture postForLocation(String url, HttpEntity request, - Map uriVariables) throws RestClientException { + public ListenableFuture postForLocation(String url, HttpEntity request, Map uriVariables) + throws RestClientException { + AsyncRequestCallback requestCallback = httpEntityCallback(request); ResponseExtractor headersExtractor = headersExtractor(); ListenableFuture headersFuture = - execute(url, HttpMethod.POST, requestCallback, headersExtractor, - uriVariables); + execute(url, HttpMethod.POST, requestCallback, headersExtractor, uriVariables); return extractLocationHeader(headersFuture); } @Override - public ListenableFuture postForLocation(URI url, HttpEntity request) - throws RestClientException { + public ListenableFuture postForLocation(URI url, HttpEntity request) throws RestClientException { AsyncRequestCallback requestCallback = httpEntityCallback(request); ResponseExtractor headersExtractor = headersExtractor(); ListenableFuture headersFuture = @@ -251,7 +248,6 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe private static ListenableFuture extractLocationHeader(final ListenableFuture headersFuture) { return new ListenableFuture() { - @Override public void addCallback(final ListenableFutureCallback callback) { headersFuture.addCallback(new ListenableFutureCallback() { @@ -259,14 +255,12 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe public void onSuccess(HttpHeaders result) { callback.onSuccess(result.getLocation()); } - @Override - public void onFailure(Throwable t) { - callback.onFailure(t); + public void onFailure(Throwable ex) { + callback.onFailure(ex); } }); } - @Override public boolean cancel(boolean mayInterruptIfRunning) { return headersFuture.cancel(mayInterruptIfRunning); @@ -285,8 +279,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe return headers.getLocation(); } @Override - public URI get(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { + public URI get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { HttpHeaders headers = headersFuture.get(timeout, unit); return headers.getLocation(); } @@ -296,45 +289,41 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe @Override public ListenableFuture> postForEntity(String url, HttpEntity request, Class responseType, Object... uriVariables) throws RestClientException { + AsyncRequestCallback requestCallback = httpEntityCallback(request, responseType); - ResponseExtractor> responseExtractor = - responseEntityExtractor(responseType); - return execute(url, HttpMethod.POST, requestCallback, responseExtractor, - uriVariables); + ResponseExtractor> responseExtractor = responseEntityExtractor(responseType); + return execute(url, HttpMethod.POST, requestCallback, responseExtractor, uriVariables); } @Override public ListenableFuture> postForEntity(String url, HttpEntity request, - Class responseType, Map uriVariables) - throws RestClientException { + Class responseType, Map uriVariables) throws RestClientException { + AsyncRequestCallback requestCallback = httpEntityCallback(request, responseType); - ResponseExtractor> responseExtractor = - responseEntityExtractor(responseType); - return execute(url, HttpMethod.POST, requestCallback, responseExtractor, - uriVariables); + ResponseExtractor> responseExtractor = responseEntityExtractor(responseType); + return execute(url, HttpMethod.POST, requestCallback, responseExtractor, uriVariables); } @Override - public ListenableFuture> postForEntity(URI url, HttpEntity request, - Class responseType) throws RestClientException { + public ListenableFuture> postForEntity(URI url, HttpEntity request, Class responseType) + throws RestClientException { + AsyncRequestCallback requestCallback = httpEntityCallback(request, responseType); - ResponseExtractor> responseExtractor = - responseEntityExtractor(responseType); + ResponseExtractor> responseExtractor = responseEntityExtractor(responseType); return execute(url, HttpMethod.POST, requestCallback, responseExtractor); } + // PUT @Override - public ListenableFuture put(String url, HttpEntity request, Object... uriVariables) - throws RestClientException { + public ListenableFuture put(String url, HttpEntity request, Object... uriVariables) throws RestClientException { AsyncRequestCallback requestCallback = httpEntityCallback(request); return execute(url, HttpMethod.PUT, requestCallback, null, uriVariables); } @Override - public ListenableFuture put(String url, HttpEntity request, - Map uriVariables) throws RestClientException { + public ListenableFuture put(String url, HttpEntity request, Map uriVariables) throws RestClientException { AsyncRequestCallback requestCallback = httpEntityCallback(request); return execute(url, HttpMethod.PUT, requestCallback, null, uriVariables); } @@ -345,17 +334,16 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe return execute(url, HttpMethod.PUT, requestCallback, null); } + // DELETE @Override - public ListenableFuture delete(String url, Object... urlVariables) - throws RestClientException { + public ListenableFuture delete(String url, Object... urlVariables) throws RestClientException { return execute(url, HttpMethod.DELETE, null, null, urlVariables); } @Override - public ListenableFuture delete(String url, Map urlVariables) - throws RestClientException { + public ListenableFuture delete(String url, Map urlVariables) throws RestClientException { return execute(url, HttpMethod.DELETE, null, null, urlVariables); } @@ -364,6 +352,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe return execute(url, HttpMethod.DELETE, null, null); } + // OPTIONS @Override @@ -389,23 +378,19 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe private static ListenableFuture> extractAllowHeader(final ListenableFuture headersFuture) { return new ListenableFuture>() { - @Override - public void addCallback( - final ListenableFutureCallback> callback) { + public void addCallback(final ListenableFutureCallback> callback) { headersFuture.addCallback(new ListenableFutureCallback() { @Override public void onSuccess(HttpHeaders result) { callback.onSuccess(result.getAllow()); } - @Override - public void onFailure(Throwable t) { - callback.onFailure(t); + public void onFailure(Throwable ex) { + callback.onFailure(ex); } }); } - @Override public boolean cancel(boolean mayInterruptIfRunning) { return headersFuture.cancel(mayInterruptIfRunning); @@ -424,8 +409,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe return headers.getAllow(); } @Override - public Set get(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { + public Set get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { HttpHeaders headers = headersFuture.get(timeout, unit); return headers.getAllow(); } @@ -436,68 +420,59 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe // exchange @Override - public ListenableFuture> exchange(String url, HttpMethod method, - HttpEntity requestEntity, Class responseType, Object... uriVariables) - throws RestClientException { - AsyncRequestCallback requestCallback = - httpEntityCallback(requestEntity, responseType); - ResponseExtractor> responseExtractor = - responseEntityExtractor(responseType); + public ListenableFuture> exchange(String url, HttpMethod method, HttpEntity requestEntity, + Class responseType, Object... uriVariables) throws RestClientException { + + AsyncRequestCallback requestCallback = httpEntityCallback(requestEntity, responseType); + ResponseExtractor> responseExtractor = responseEntityExtractor(responseType); return execute(url, method, requestCallback, responseExtractor, uriVariables); } @Override - public ListenableFuture> exchange(String url, HttpMethod method, - HttpEntity requestEntity, Class responseType, - Map uriVariables) throws RestClientException { - AsyncRequestCallback requestCallback = - httpEntityCallback(requestEntity, responseType); - ResponseExtractor> responseExtractor = - responseEntityExtractor(responseType); + public ListenableFuture> exchange(String url, HttpMethod method, HttpEntity requestEntity, + Class responseType, Map uriVariables) throws RestClientException { + + AsyncRequestCallback requestCallback = httpEntityCallback(requestEntity, responseType); + ResponseExtractor> responseExtractor = responseEntityExtractor(responseType); return execute(url, method, requestCallback, responseExtractor, uriVariables); } @Override - public ListenableFuture> exchange(URI url, HttpMethod method, - HttpEntity requestEntity, Class responseType) - throws RestClientException { - AsyncRequestCallback requestCallback = - httpEntityCallback(requestEntity, responseType); - ResponseExtractor> responseExtractor = - responseEntityExtractor(responseType); + public ListenableFuture> exchange(URI url, HttpMethod method, HttpEntity requestEntity, + Class responseType) throws RestClientException { + + AsyncRequestCallback requestCallback = httpEntityCallback(requestEntity, responseType); + ResponseExtractor> responseExtractor = responseEntityExtractor(responseType); return execute(url, method, requestCallback, responseExtractor); } @Override - public ListenableFuture> exchange(String url, HttpMethod method, - HttpEntity requestEntity, ParameterizedTypeReference responseType, - Object... uriVariables) throws RestClientException { + public ListenableFuture> exchange(String url, HttpMethod method, HttpEntity requestEntity, + ParameterizedTypeReference responseType, Object... uriVariables) throws RestClientException { + Type type = responseType.getType(); AsyncRequestCallback requestCallback = httpEntityCallback(requestEntity, type); - ResponseExtractor> responseExtractor = - responseEntityExtractor(type); + ResponseExtractor> responseExtractor = responseEntityExtractor(type); return execute(url, method, requestCallback, responseExtractor, uriVariables); } @Override - public ListenableFuture> exchange(String url, HttpMethod method, - HttpEntity requestEntity, ParameterizedTypeReference responseType, - Map uriVariables) throws RestClientException { + public ListenableFuture> exchange(String url, HttpMethod method, HttpEntity requestEntity, + ParameterizedTypeReference responseType, Map uriVariables) throws RestClientException { + Type type = responseType.getType(); AsyncRequestCallback requestCallback = httpEntityCallback(requestEntity, type); - ResponseExtractor> responseExtractor = - responseEntityExtractor(type); + ResponseExtractor> responseExtractor = responseEntityExtractor(type); return execute(url, method, requestCallback, responseExtractor, uriVariables); } @Override - public ListenableFuture> exchange(URI url, HttpMethod method, - HttpEntity requestEntity, ParameterizedTypeReference responseType) - throws RestClientException { + public ListenableFuture> exchange(URI url, HttpMethod method, HttpEntity requestEntity, + ParameterizedTypeReference responseType) throws RestClientException { + Type type = responseType.getType(); AsyncRequestCallback requestCallback = httpEntityCallback(requestEntity, type); - ResponseExtractor> responseExtractor = - responseEntityExtractor(type); + ResponseExtractor> responseExtractor = responseEntityExtractor(type); return execute(url, method, requestCallback, responseExtractor); } @@ -505,27 +480,24 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe // general execution @Override - public ListenableFuture execute(String url, HttpMethod method, - AsyncRequestCallback requestCallback, ResponseExtractor responseExtractor, - Object... urlVariables) throws RestClientException { + public ListenableFuture execute(String url, HttpMethod method, AsyncRequestCallback requestCallback, + ResponseExtractor responseExtractor, Object... urlVariables) throws RestClientException { URI expanded = new UriTemplate(url).expand(urlVariables); return doExecute(expanded, method, requestCallback, responseExtractor); } @Override - public ListenableFuture execute(String url, HttpMethod method, - AsyncRequestCallback requestCallback, ResponseExtractor responseExtractor, - Map urlVariables) throws RestClientException { + public ListenableFuture execute(String url, HttpMethod method, AsyncRequestCallback requestCallback, + ResponseExtractor responseExtractor, Map urlVariables) throws RestClientException { URI expanded = new UriTemplate(url).expand(urlVariables); return doExecute(expanded, method, requestCallback, responseExtractor); } @Override - public ListenableFuture execute(URI url, HttpMethod method, - AsyncRequestCallback requestCallback, ResponseExtractor responseExtractor) - throws RestClientException { + public ListenableFuture execute(URI url, HttpMethod method, AsyncRequestCallback requestCallback, + ResponseExtractor responseExtractor) throws RestClientException { return doExecute(url, method, requestCallback, responseExtractor); } @@ -553,8 +525,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe requestCallback.doWithRequest(request); } ListenableFuture responseFuture = request.executeAsync(); - return new ResponseExtractorFuture(method, url, responseFuture, - responseExtractor); + return new ResponseExtractorFuture(method, url, responseFuture, responseExtractor); } catch (IOException ex) { throw new ResourceAccessException("I/O error on " + method.name() + @@ -565,9 +536,8 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe private void logResponseStatus(HttpMethod method, URI url, ClientHttpResponse response) { if (logger.isDebugEnabled()) { try { - logger.debug("Async " + method.name() + " request for \"" + url + - "\" resulted in " + response.getStatusCode() + " (" + - response.getStatusText() + ")"); + logger.debug("Async " + method.name() + " request for \"" + url + "\" resulted in " + + response.getStatusCode() + " (" + response.getStatusText() + ")"); } catch (IOException ex) { // ignore @@ -578,9 +548,8 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe private void handleResponseError(HttpMethod method, URI url, ClientHttpResponse response) throws IOException { if (logger.isWarnEnabled()) { try { - logger.warn("Async " + method.name() + " request for \"" + url + - "\" resulted in " + response.getStatusCode() + " (" + - response.getStatusText() + "); invoking error handler"); + logger.warn("Async " + method.name() + " request for \"" + url + "\" resulted in " + + response.getStatusCode() + " (" + response.getStatusText() + "); invoking error handler"); } catch (IOException ex) { // ignore @@ -628,12 +597,12 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe return this.syncTemplate.headersExtractor(); } + /** * Future returned from * {@link #doExecute(URI, HttpMethod, AsyncRequestCallback, ResponseExtractor)} */ - private class ResponseExtractorFuture - extends ListenableFutureAdapter { + private class ResponseExtractorFuture extends ListenableFutureAdapter { private final HttpMethod method; @@ -642,8 +611,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe private final ResponseExtractor responseExtractor; public ResponseExtractorFuture(HttpMethod method, URI url, - ListenableFuture clientHttpResponseFuture, - ResponseExtractor responseExtractor) { + ListenableFuture clientHttpResponseFuture, ResponseExtractor responseExtractor) { super(clientHttpResponseFuture); this.method = method; this.url = url; @@ -672,12 +640,11 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe } protected T convertResponse(ClientHttpResponse response) throws IOException { - return responseExtractor != null ? responseExtractor.extractData(response) : - null; + return (this.responseExtractor != null ? this.responseExtractor.extractData(response) : null); } - } + /** * Adapts a {@link RequestCallback} to the {@link AsyncRequestCallback} interface. */ diff --git a/spring-web/src/test/java/org/springframework/http/client/AbstractAsyncHttpRequestFactoryTestCase.java b/spring-web/src/test/java/org/springframework/http/client/AbstractAsyncHttpRequestFactoryTestCase.java index a587687c06f..fcea074fe64 100644 --- a/spring-web/src/test/java/org/springframework/http/client/AbstractAsyncHttpRequestFactoryTestCase.java +++ b/spring-web/src/test/java/org/springframework/http/client/AbstractAsyncHttpRequestFactoryTestCase.java @@ -37,11 +37,11 @@ import org.springframework.util.concurrent.ListenableFutureCallback; import static org.junit.Assert.*; -public abstract class AbstractAsyncHttpRequestFactoryTestCase extends - AbstractJettyServerTestCase { +public abstract class AbstractAsyncHttpRequestFactoryTestCase extends AbstractJettyServerTestCase { protected AsyncClientHttpRequestFactory factory; + @Before public final void createFactory() throws Exception { factory = createRequestFactory(); @@ -52,6 +52,7 @@ public abstract class AbstractAsyncHttpRequestFactoryTestCase extends protected abstract AsyncClientHttpRequestFactory createRequestFactory(); + @Test public void status() throws Exception { URI uri = new URI(baseUrl + "/status/notfound"); @@ -60,8 +61,7 @@ public abstract class AbstractAsyncHttpRequestFactoryTestCase extends assertEquals("Invalid HTTP URI", uri, request.getURI()); Future futureResponse = request.executeAsync(); ClientHttpResponse response = futureResponse.get(); - assertEquals("Invalid status code", HttpStatus.NOT_FOUND, - response.getStatusCode()); + assertEquals("Invalid status code", HttpStatus.NOT_FOUND, response.getStatusCode()); } @Test @@ -70,46 +70,34 @@ public abstract class AbstractAsyncHttpRequestFactoryTestCase extends AsyncClientHttpRequest request = factory.createAsyncRequest(uri, HttpMethod.GET); assertEquals("Invalid HTTP method", HttpMethod.GET, request.getMethod()); assertEquals("Invalid HTTP URI", uri, request.getURI()); - Future futureResponse = request.executeAsync(); - if (futureResponse instanceof ListenableFuture) { - ListenableFuture listenableFuture = - (ListenableFuture) futureResponse; - - - listenableFuture.addCallback(new ListenableFutureCallback() { - @Override - public void onSuccess(ClientHttpResponse result) { - try { - System.out.println("SUCCESS! " + result.getStatusCode()); - System.out.println("Callback: " + System.currentTimeMillis()); - System.out.println(Thread.currentThread().getId()); - assertEquals("Invalid status code", HttpStatus.NOT_FOUND, - result.getStatusCode()); - } - catch (IOException e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } + ListenableFuture listenableFuture = request.executeAsync(); + listenableFuture.addCallback(new ListenableFutureCallback() { + @Override + public void onSuccess(ClientHttpResponse result) { + try { + System.out.println("SUCCESS! " + result.getStatusCode()); + System.out.println("Callback: " + System.currentTimeMillis()); + System.out.println(Thread.currentThread().getId()); + assertEquals("Invalid status code", HttpStatus.NOT_FOUND, result.getStatusCode()); } - - @Override - public void onFailure(Throwable t) { - System.out.println("FAILURE: " + t); + catch (IOException ex) { + ex.printStackTrace(); } - }); - - } - ClientHttpResponse response = futureResponse.get(); + } + @Override + public void onFailure(Throwable ex) { + System.out.println("FAILURE: " + ex); + } + }); + ClientHttpResponse response = listenableFuture.get(); System.out.println("Main thread: " + System.currentTimeMillis()); - assertEquals("Invalid status code", HttpStatus.NOT_FOUND, - response.getStatusCode()); + assertEquals("Invalid status code", HttpStatus.NOT_FOUND, response.getStatusCode()); System.out.println(Thread.currentThread().getId()); } @Test public void echo() throws Exception { - AsyncClientHttpRequest - request = factory.createAsyncRequest(new URI(baseUrl + "/echo"), - HttpMethod.PUT); + AsyncClientHttpRequest request = factory.createAsyncRequest(new URI(baseUrl + "/echo"), HttpMethod.PUT); assertEquals("Invalid HTTP method", HttpMethod.PUT, request.getMethod()); String headerName = "MyHeader"; String headerValue1 = "value1"; @@ -118,9 +106,9 @@ public abstract class AbstractAsyncHttpRequestFactoryTestCase extends request.getHeaders().add(headerName, headerValue2); final byte[] body = "Hello World".getBytes("UTF-8"); request.getHeaders().setContentLength(body.length); + if (request instanceof StreamingHttpOutputMessage) { - StreamingHttpOutputMessage streamingRequest = - (StreamingHttpOutputMessage) request; + StreamingHttpOutputMessage streamingRequest = (StreamingHttpOutputMessage) request; streamingRequest.setBody(new StreamingHttpOutputMessage.Body() { @Override public void writeTo(OutputStream outputStream) throws IOException { @@ -131,6 +119,7 @@ public abstract class AbstractAsyncHttpRequestFactoryTestCase extends else { StreamUtils.copy(body, request.getBody()); } + Future futureResponse = request.executeAsync(); ClientHttpResponse response = futureResponse.get(); try { @@ -148,13 +137,11 @@ public abstract class AbstractAsyncHttpRequestFactoryTestCase extends @Test(expected = IllegalStateException.class) public void multipleWrites() throws Exception { - AsyncClientHttpRequest - request = factory.createAsyncRequest(new URI(baseUrl + "/echo"), - HttpMethod.POST); + AsyncClientHttpRequest request = factory.createAsyncRequest(new URI(baseUrl + "/echo"), HttpMethod.POST); final byte[] body = "Hello World".getBytes("UTF-8"); + if (request instanceof StreamingHttpOutputMessage) { - StreamingHttpOutputMessage streamingRequest = - (StreamingHttpOutputMessage) request; + StreamingHttpOutputMessage streamingRequest = (StreamingHttpOutputMessage) request; streamingRequest.setBody(new StreamingHttpOutputMessage.Body() { @Override public void writeTo(OutputStream outputStream) throws IOException { @@ -178,9 +165,7 @@ public abstract class AbstractAsyncHttpRequestFactoryTestCase extends @Test(expected = UnsupportedOperationException.class) public void headersAfterExecute() throws Exception { - AsyncClientHttpRequest - request = factory.createAsyncRequest(new URI(baseUrl + "/echo"), - HttpMethod.POST); + AsyncClientHttpRequest request = factory.createAsyncRequest(new URI(baseUrl + "/echo"), HttpMethod.POST); request.getHeaders().add("MyHeader", "value"); byte[] body = "Hello World".getBytes("UTF-8"); FileCopyUtils.copy(body, request.getBody()); @@ -208,9 +193,7 @@ public abstract class AbstractAsyncHttpRequestFactoryTestCase extends protected void assertHttpMethod(String path, HttpMethod method) throws Exception { ClientHttpResponse response = null; try { - AsyncClientHttpRequest request = factory.createAsyncRequest( - new URI(baseUrl + "/methods/" + path), method); - + AsyncClientHttpRequest request = factory.createAsyncRequest(new URI(baseUrl + "/methods/" + path), method); Future futureResponse = request.executeAsync(); response = futureResponse.get(); assertEquals("Invalid response status", HttpStatus.OK, response.getStatusCode()); diff --git a/spring-web/src/test/java/org/springframework/web/client/AsyncRestTemplateIntegrationTests.java b/spring-web/src/test/java/org/springframework/web/client/AsyncRestTemplateIntegrationTests.java index 2b2083fb090..6df2c4ede06 100644 --- a/spring-web/src/test/java/org/springframework/web/client/AsyncRestTemplateIntegrationTests.java +++ b/spring-web/src/test/java/org/springframework/web/client/AsyncRestTemplateIntegrationTests.java @@ -16,16 +16,12 @@ package org.springframework.web.client; -import java.io.UnsupportedEncodingException; import java.net.URI; -import java.net.URISyntaxException; import java.nio.charset.Charset; import java.util.EnumSet; import java.util.Set; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import static org.junit.Assert.*; import org.junit.Before; import org.junit.Test; @@ -43,23 +39,27 @@ import org.springframework.util.MultiValueMap; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; +import static org.junit.Assert.*; + /** * @author Arjen Poutsma + * @author Sebastien Deleuze */ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCase { private AsyncRestTemplate template; + @Before public void createTemplate() { - template = new AsyncRestTemplate( - new HttpComponentsAsyncClientHttpRequestFactory()); + template = new AsyncRestTemplate(new HttpComponentsAsyncClientHttpRequestFactory()); } + @Test - public void getEntity() throws ExecutionException, InterruptedException { - Future> - futureEntity = template.getForEntity(baseUrl + "/{method}", String.class, "get"); + public void getEntity() throws Exception { + Future> futureEntity = + template.getForEntity(baseUrl + "/{method}", String.class, "get"); ResponseEntity entity = futureEntity.get(); assertEquals("Invalid content", helloWorld, entity.getBody()); assertFalse("No headers", entity.getHeaders().isEmpty()); @@ -68,17 +68,17 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa } @Test - public void multipleFutureGets() throws ExecutionException, InterruptedException { - Future> - futureEntity = template.getForEntity(baseUrl + "/{method}", String.class, "get"); + public void multipleFutureGets() throws Exception { + Future> futureEntity = + template.getForEntity(baseUrl + "/{method}", String.class, "get"); futureEntity.get(); futureEntity.get(); } @Test - public void getEntityCallback() throws ExecutionException, InterruptedException { - ListenableFuture> - futureEntity = template.getForEntity(baseUrl + "/{method}", String.class, "get"); + public void getEntityCallback() throws Exception { + ListenableFuture> futureEntity = + template.getForEntity(baseUrl + "/{method}", String.class, "get"); futureEntity.addCallback(new ListenableFutureCallback>() { @Override public void onSuccess(ResponseEntity entity) { @@ -87,10 +87,9 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa assertEquals("Invalid content-type", contentType, entity.getHeaders().getContentType()); assertEquals("Invalid status code", HttpStatus.OK, entity.getStatusCode()); } - @Override - public void onFailure(Throwable t) { - fail(t.getMessage()); + public void onFailure(Throwable ex) { + fail(ex.getMessage()); } }); // wait till done @@ -99,65 +98,55 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa } @Test - public void getNoResponse() throws ExecutionException, InterruptedException { - Future> - futureEntity = template.getForEntity(baseUrl + "/get/nothing", String.class); + public void getNoResponse() throws Exception { + Future> futureEntity = template.getForEntity(baseUrl + "/get/nothing", String.class); ResponseEntity entity = futureEntity.get(); assertNull("Invalid content", entity.getBody()); } @Test - public void getNoContentTypeHeader() - throws UnsupportedEncodingException, ExecutionException, - InterruptedException { - Future> - futureEntity = template.getForEntity(baseUrl + "/get/nocontenttype", - byte[].class); + public void getNoContentTypeHeader() throws Exception { + Future> futureEntity = template.getForEntity(baseUrl + "/get/nocontenttype", byte[].class); ResponseEntity responseEntity = futureEntity.get(); - assertArrayEquals("Invalid content", helloWorld.getBytes("UTF-8"), - responseEntity.getBody()); + assertArrayEquals("Invalid content", helloWorld.getBytes("UTF-8"), responseEntity.getBody()); } @Test - public void getNoContent() throws ExecutionException, InterruptedException { - Future> - responseFuture = template.getForEntity(baseUrl + "/status/nocontent", String.class); + public void getNoContent() throws Exception { + Future> responseFuture = template.getForEntity(baseUrl + "/status/nocontent", String.class); ResponseEntity entity = responseFuture.get(); assertEquals("Invalid response code", HttpStatus.NO_CONTENT, entity.getStatusCode()); assertNull("Invalid content", entity.getBody()); } @Test - public void getNotModified() throws ExecutionException, InterruptedException { - Future> - responseFuture = template.getForEntity(baseUrl + "/status/notmodified", - String.class); + public void getNotModified() throws Exception { + Future> responseFuture = template.getForEntity(baseUrl + "/status/notmodified", String.class); ResponseEntity entity = responseFuture.get(); assertEquals("Invalid response code", HttpStatus.NOT_MODIFIED, entity.getStatusCode()); assertNull("Invalid content", entity.getBody()); } @Test - public void headForHeaders() throws ExecutionException, InterruptedException { + public void headForHeaders() throws Exception { Future headersFuture = template.headForHeaders(baseUrl + "/get"); HttpHeaders headers = headersFuture.get(); assertTrue("No Content-Type header", headers.containsKey("Content-Type")); } @Test - public void headForHeadersCallback() throws ExecutionException, InterruptedException { + public void headForHeadersCallback() throws Exception { ListenableFuture headersFuture = template.headForHeaders(baseUrl + "/get"); headersFuture.addCallback(new ListenableFutureCallback() { @Override public void onSuccess(HttpHeaders result) { assertTrue("No Content-Type header", result.containsKey("Content-Type")); } - @Override - public void onFailure(Throwable t) { - fail(t.getMessage()); + public void onFailure(Throwable ex) { + fail(ex.getMessage()); } }); while (!headersFuture.isDone()) { @@ -165,37 +154,30 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa } @Test - public void postForLocation() - throws URISyntaxException, ExecutionException, InterruptedException { + public void postForLocation() throws Exception { HttpHeaders entityHeaders = new HttpHeaders(); entityHeaders.setContentType(new MediaType("text", "plain", Charset.forName("ISO-8859-15"))); HttpEntity entity = new HttpEntity(helloWorld, entityHeaders); - Future - locationFuture = template.postForLocation(baseUrl + "/{method}", entity, - "post"); + Future locationFuture = template.postForLocation(baseUrl + "/{method}", entity, "post"); URI location = locationFuture.get(); assertEquals("Invalid location", new URI(baseUrl + "/post/1"), location); } @Test - public void postForLocationCallback() - throws URISyntaxException, ExecutionException, InterruptedException { + public void postForLocationCallback() throws Exception { HttpHeaders entityHeaders = new HttpHeaders(); entityHeaders.setContentType(new MediaType("text", "plain", Charset.forName("ISO-8859-15"))); HttpEntity entity = new HttpEntity(helloWorld, entityHeaders); final URI expected = new URI(baseUrl + "/post/1"); - ListenableFuture - locationFuture = template.postForLocation(baseUrl + "/{method}", entity, - "post"); + ListenableFuture locationFuture = template.postForLocation(baseUrl + "/{method}", entity, "post"); locationFuture.addCallback(new ListenableFutureCallback() { @Override public void onSuccess(URI result) { assertEquals("Invalid location", expected, result); } - @Override - public void onFailure(Throwable t) { - fail(t.getMessage()); + public void onFailure(Throwable ex) { + fail(ex.getMessage()); } }); while (!locationFuture.isDone()) { @@ -203,32 +185,27 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa } @Test - public void postForEntity() - throws URISyntaxException, ExecutionException, InterruptedException { + public void postForEntity() throws Exception { HttpEntity requestEntity = new HttpEntity<>(helloWorld); - Future> - responseEntityFuture = template.postForEntity(baseUrl + "/{method}", requestEntity, - String.class, "post"); + Future> responseEntityFuture = + template.postForEntity(baseUrl + "/{method}", requestEntity, String.class, "post"); ResponseEntity responseEntity = responseEntityFuture.get(); assertEquals("Invalid content", helloWorld, responseEntity.getBody()); } @Test - public void postForEntityCallback() - throws URISyntaxException, ExecutionException, InterruptedException { + public void postForEntityCallback() throws Exception { HttpEntity requestEntity = new HttpEntity<>(helloWorld); - ListenableFuture> - responseEntityFuture = template.postForEntity(baseUrl + "/{method}", requestEntity, - String.class, "post"); + ListenableFuture> responseEntityFuture = + template.postForEntity(baseUrl + "/{method}", requestEntity, String.class, "post"); responseEntityFuture.addCallback(new ListenableFutureCallback>() { @Override public void onSuccess(ResponseEntity result) { assertEquals("Invalid content", helloWorld, result.getBody()); } - @Override - public void onFailure(Throwable t) { - fail(t.getMessage()); + public void onFailure(Throwable ex) { + fail(ex.getMessage()); } }); while (!responseEntityFuture.isDone()) { @@ -236,31 +213,24 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa } @Test - public void put() - throws URISyntaxException, ExecutionException, InterruptedException { + public void put() throws Exception { HttpEntity requestEntity = new HttpEntity<>(helloWorld); - Future - responseEntityFuture = template.put(baseUrl + "/{method}", requestEntity, - "put"); + Future responseEntityFuture = template.put(baseUrl + "/{method}", requestEntity, "put"); responseEntityFuture.get(); } @Test - public void putCallback() - throws URISyntaxException, ExecutionException, InterruptedException { + public void putCallback() throws Exception { HttpEntity requestEntity = new HttpEntity<>(helloWorld); - ListenableFuture - responseEntityFuture = template.put(baseUrl + "/{method}", requestEntity, - "put"); + ListenableFuture responseEntityFuture = template.put(baseUrl + "/{method}", requestEntity, "put"); responseEntityFuture.addCallback(new ListenableFutureCallback() { @Override public void onSuccess(Object result) { assertNull(result); } - @Override - public void onFailure(Throwable t) { - fail(t.getMessage()); + public void onFailure(Throwable ex) { + fail(ex.getMessage()); } }); while (!responseEntityFuture.isDone()) { @@ -268,26 +238,22 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa } @Test - public void delete() - throws URISyntaxException, ExecutionException, InterruptedException { + public void delete() throws Exception { Future deletedFuture = template.delete(new URI(baseUrl + "/delete")); - deletedFuture.get(); } @Test - public void deleteCallback() - throws URISyntaxException, ExecutionException, InterruptedException { + public void deleteCallback() throws Exception { ListenableFuture deletedFuture = template.delete(new URI(baseUrl + "/delete")); deletedFuture.addCallback(new ListenableFutureCallback() { @Override public void onSuccess(Object result) { assertNull(result); } - @Override - public void onFailure(Throwable t) { - fail(t.getMessage()); + public void onFailure(Throwable ex) { + fail(ex.getMessage()); } }); while (!deletedFuture.isDone()) { @@ -295,7 +261,7 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa } @Test - public void notFound() throws ExecutionException, InterruptedException { + public void notFound() throws Exception { try { Future future = template.execute(baseUrl + "/status/notfound", HttpMethod.GET, null, null); future.get(); @@ -309,16 +275,13 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa } @Test - public void notFoundCallback() throws ExecutionException, InterruptedException { - ListenableFuture future = - template.execute(baseUrl + "/status/notfound", HttpMethod.GET, null, - null); + public void notFoundCallback() throws Exception { + ListenableFuture future = template.execute(baseUrl + "/status/notfound", HttpMethod.GET, null, null); future.addCallback(new ListenableFutureCallback() { @Override public void onSuccess(Object result) { fail("onSuccess not expected"); } - @Override public void onFailure(Throwable t) { assertTrue(t instanceof HttpClientErrorException); @@ -333,7 +296,7 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa } @Test - public void serverError() throws ExecutionException, InterruptedException { + public void serverError() throws Exception { try { Future future = template.execute(baseUrl + "/status/server", HttpMethod.GET, null, null); future.get(); @@ -347,21 +310,20 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa } @Test - public void serverErrorCallback() throws ExecutionException, InterruptedException { + public void serverErrorCallback() throws Exception { ListenableFuture future = template.execute(baseUrl + "/status/server", HttpMethod.GET, null, null); future.addCallback(new ListenableFutureCallback() { @Override public void onSuccess(Void result) { fail("onSuccess not expected"); } - @Override - public void onFailure(Throwable t) { - assertTrue(t instanceof HttpServerErrorException); - HttpServerErrorException ex = (HttpServerErrorException) t; - assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, ex.getStatusCode()); - assertNotNull(ex.getStatusText()); - assertNotNull(ex.getResponseBodyAsString()); + public void onFailure(Throwable ex) { + assertTrue(ex instanceof HttpServerErrorException); + HttpServerErrorException hsex = (HttpServerErrorException) ex; + assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, hsex.getStatusCode()); + assertNotNull(hsex.getStatusText()); + assertNotNull(hsex.getResponseBodyAsString()); } }); while (!future.isDone()) { @@ -369,30 +331,25 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa } @Test - public void optionsForAllow() - throws URISyntaxException, ExecutionException, InterruptedException { - Future> - allowedFuture = template.optionsForAllow(new URI(baseUrl + "/get")); + public void optionsForAllow() throws Exception { + Future> allowedFuture = template.optionsForAllow(new URI(baseUrl + "/get")); Set allowed = allowedFuture.get(); assertEquals("Invalid response", EnumSet.of(HttpMethod.GET, HttpMethod.OPTIONS, HttpMethod.HEAD, HttpMethod.TRACE), allowed); } @Test - public void optionsForAllowCallback() - throws URISyntaxException, ExecutionException, InterruptedException { - ListenableFuture> - allowedFuture = template.optionsForAllow(new URI(baseUrl + "/get")); + public void optionsForAllowCallback() throws Exception { + ListenableFuture> allowedFuture = template.optionsForAllow(new URI(baseUrl + "/get")); allowedFuture.addCallback(new ListenableFutureCallback>() { @Override public void onSuccess(Set result) { - assertEquals("Invalid response", - EnumSet.of(HttpMethod.GET, HttpMethod.OPTIONS, HttpMethod.HEAD, HttpMethod.TRACE), result); + assertEquals("Invalid response", EnumSet.of(HttpMethod.GET, HttpMethod.OPTIONS, + HttpMethod.HEAD, HttpMethod.TRACE), result); } - @Override - public void onFailure(Throwable t) { - fail(t.getMessage()); + public void onFailure(Throwable ex) { + fail(ex.getMessage()); } }); while (!allowedFuture.isDone()) { @@ -406,8 +363,7 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa requestHeaders.set("MyHeader", "MyValue"); HttpEntity requestEntity = new HttpEntity(requestHeaders); Future> responseFuture = - template.exchange(baseUrl + "/{method}", HttpMethod.GET, requestEntity, - String.class, "get"); + template.exchange(baseUrl + "/{method}", HttpMethod.GET, requestEntity, String.class, "get"); ResponseEntity response = responseFuture.get(); assertEquals("Invalid content", helloWorld, response.getBody()); } @@ -419,17 +375,15 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa requestHeaders.set("MyHeader", "MyValue"); HttpEntity requestEntity = new HttpEntity(requestHeaders); ListenableFuture> responseFuture = - template.exchange(baseUrl + "/{method}", HttpMethod.GET, requestEntity, - String.class, "get"); + template.exchange(baseUrl + "/{method}", HttpMethod.GET, requestEntity, String.class, "get"); responseFuture.addCallback(new ListenableFutureCallback>() { @Override public void onSuccess(ResponseEntity result) { assertEquals("Invalid content", helloWorld, result.getBody()); } - @Override - public void onFailure(Throwable t) { - fail(t.getMessage()); + public void onFailure(Throwable ex) { + fail(ex.getMessage()); } }); while (!responseFuture.isDone()) { @@ -442,9 +396,8 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa requestHeaders.set("MyHeader", "MyValue"); requestHeaders.setContentType(MediaType.TEXT_PLAIN); HttpEntity requestEntity = new HttpEntity(helloWorld, requestHeaders); - Future> - resultFuture = template.exchange(baseUrl + "/{method}", HttpMethod.POST, - requestEntity, Void.class, "post"); + Future> resultFuture = + template.exchange(baseUrl + "/{method}", HttpMethod.POST, requestEntity, Void.class, "post"); ResponseEntity result = resultFuture.get(); assertEquals("Invalid location", new URI(baseUrl + "/post/1"), result.getHeaders().getLocation()); @@ -457,31 +410,26 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa requestHeaders.set("MyHeader", "MyValue"); requestHeaders.setContentType(MediaType.TEXT_PLAIN); HttpEntity requestEntity = new HttpEntity(helloWorld, requestHeaders); - ListenableFuture> - resultFuture = template.exchange(baseUrl + "/{method}", HttpMethod.POST, - requestEntity, Void.class, "post"); + ListenableFuture> resultFuture = + template.exchange(baseUrl + "/{method}", HttpMethod.POST, requestEntity, Void.class, "post"); final URI expected =new URI(baseUrl + "/post/1"); resultFuture.addCallback(new ListenableFutureCallback>() { @Override public void onSuccess(ResponseEntity result) { - assertEquals("Invalid location", expected, - result.getHeaders().getLocation()); + assertEquals("Invalid location", expected, result.getHeaders().getLocation()); assertFalse(result.hasBody()); } - @Override - public void onFailure(Throwable t) { - fail(t.getMessage()); + public void onFailure(Throwable ex) { + fail(ex.getMessage()); } }); while (!resultFuture.isDone()) { } - } @Test - public void multipart() throws UnsupportedEncodingException, ExecutionException, - InterruptedException { + public void multipart() throws Exception { MultiValueMap parts = new LinkedMultiValueMap(); parts.add("name 1", "value 1"); parts.add("name 2", "value 2+1"); @@ -490,8 +438,7 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa parts.add("logo", logo); HttpEntity> requestBody = new HttpEntity<>(parts); - Future future = - template.postForLocation(baseUrl + "/multipart", requestBody); + Future future = template.postForLocation(baseUrl + "/multipart", requestBody); future.get(); } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/client/WebSocketConnectionManager.java b/spring-websocket/src/main/java/org/springframework/web/socket/client/WebSocketConnectionManager.java index 85a90cb2ab9..4b4049f6d88 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/client/WebSocketConnectionManager.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/client/WebSocketConnectionManager.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,9 +23,9 @@ import org.springframework.http.HttpHeaders; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.WebSocketHttpHeaders; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.LoggingWebSocketHandlerDecorator; -import org.springframework.web.socket.WebSocketHttpHeaders; /** * A WebSocket connection manager that is given a URI, a {@link WebSocketClient}, and a @@ -55,7 +55,7 @@ public class WebSocketConnectionManager extends ConnectionManagerSupport { super(uriTemplate, uriVariables); this.client = client; this.webSocketHandler = decorateWebSocketHandler(webSocketHandler); - this.syncClientLifecycle = ((client instanceof SmartLifecycle) && !((SmartLifecycle) client).isRunning()); + this.syncClientLifecycle = (client instanceof SmartLifecycle && !((SmartLifecycle) client).isRunning()); } @@ -132,8 +132,9 @@ public class WebSocketConnectionManager extends ConnectionManagerSupport { @Override protected void openConnection() { - - logger.info("Connecting to WebSocket at " + getUri()); + if (logger.isInfoEnabled()) { + logger.info("Connecting to WebSocket at " + getUri()); + } ListenableFuture future = this.client.doHandshake(this.webSocketHandler, this.headers, getUri()); @@ -145,8 +146,8 @@ public class WebSocketConnectionManager extends ConnectionManagerSupport { logger.info("Successfully connected"); } @Override - public void onFailure(Throwable t) { - logger.error("Failed to connect", t); + public void onFailure(Throwable ex) { + logger.error("Failed to connect", ex); } }); } @@ -158,7 +159,7 @@ public class WebSocketConnectionManager extends ConnectionManagerSupport { @Override protected boolean isConnected() { - return ((this.webSocketSession != null) && (this.webSocketSession.isOpen())); + return (this.webSocketSession != null && this.webSocketSession.isOpen()); } }