Browse Source

SockJsTransportFailureException provides constructor variant without session id

pull/808/head
Juergen Hoeller 11 years ago
parent
commit
e0a11f2ae7
  1. 17
      spring-websocket/src/main/java/org/springframework/web/socket/sockjs/SockJsException.java
  2. 24
      spring-websocket/src/main/java/org/springframework/web/socket/sockjs/SockJsTransportFailureException.java
  3. 4
      spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/DefaultTransportRequest.java
  4. 4
      spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/JettyXhrTransport.java
  5. 85
      spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/UndertowXhrTransport.java

17
spring-websocket/src/main/java/org/springframework/web/socket/sockjs/SockJsException.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2013 the original author or authors. * Copyright 2002-2015 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -30,15 +30,30 @@ public class SockJsException extends NestedRuntimeException {
private final String sessionId; private final String sessionId;
/**
* Constructor for SockJsException.
* @param message the exception message
* @param cause the root cause
*/
public SockJsException(String message, Throwable cause) { public SockJsException(String message, Throwable cause) {
this(message, null, cause); this(message, null, cause);
} }
/**
* Constructor for SockJsException.
* @param message the exception message
* @param sessionId the SockJS session id
* @param cause the root cause
*/
public SockJsException(String message, String sessionId, Throwable cause) { public SockJsException(String message, String sessionId, Throwable cause) {
super(message, cause); super(message, cause);
this.sessionId = sessionId; this.sessionId = sessionId;
} }
/**
* Return the SockJS session id.
*/
public String getSockJsSessionId() { public String getSockJsSessionId() {
return this.sessionId; return this.sessionId;
} }

24
spring-websocket/src/main/java/org/springframework/web/socket/sockjs/SockJsTransportFailureException.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2013 the original author or authors. * Copyright 2002-2015 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -17,9 +17,9 @@
package org.springframework.web.socket.sockjs; package org.springframework.web.socket.sockjs;
/** /**
* Indicates a serious failure that occurred in the SockJS implementation as opposed to in * Indicates a serious failure that occurred in the SockJS implementation as opposed to
* user code (e.g. IOException while writing to the response). When this exception is * in user code (e.g. IOException while writing to the response). When this exception
* raised, the SockJS session is typically closed. * is raised, the SockJS session is typically closed.
* *
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @since 4.0 * @since 4.0
@ -27,6 +27,22 @@ package org.springframework.web.socket.sockjs;
@SuppressWarnings("serial") @SuppressWarnings("serial")
public class SockJsTransportFailureException extends SockJsException { public class SockJsTransportFailureException extends SockJsException {
/**
* Constructor for SockJsTransportFailureException.
* @param message the exception message
* @param cause the root cause
* @since 4.1.7
*/
public SockJsTransportFailureException(String message, Throwable cause) {
super(message, cause);
}
/**
* Constructor for SockJsTransportFailureException.
* @param message the exception message
* @param sessionId the SockJS session id
* @param cause the root cause
*/
public SockJsTransportFailureException(String message, String sessionId, Throwable cause) { public SockJsTransportFailureException(String message, String sessionId, Throwable cause) {
super(message, sessionId, cause); super(message, sessionId, cause);
} }

4
spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/DefaultTransportRequest.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2014 the original author or authors. * Copyright 2002-2015 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -205,7 +205,7 @@ class DefaultTransportRequest implements TransportRequest {
if (isTimeoutFailure) { if (isTimeoutFailure) {
String message = "Connect timed out for " + DefaultTransportRequest.this; String message = "Connect timed out for " + DefaultTransportRequest.this;
logger.error(message); logger.error(message);
ex = new SockJsTransportFailureException(message, getSockJsUrlInfo().getSessionId(), null); ex = new SockJsTransportFailureException(message, getSockJsUrlInfo().getSessionId(), ex);
} }
if (fallbackRequest != null) { if (fallbackRequest != null) {
logger.error(DefaultTransportRequest.this + " failed. Falling back on next transport.", ex); logger.error(DefaultTransportRequest.this + " failed. Falling back on next transport.", ex);

4
spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/JettyXhrTransport.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2014 the original author or authors. * Copyright 2002-2015 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -126,7 +126,7 @@ public class JettyXhrTransport extends AbstractXhrTransport implements XhrTransp
response = httpRequest.send(); response = httpRequest.send();
} }
catch (Exception ex) { catch (Exception ex) {
throw new SockJsTransportFailureException("Failed to execute request to " + url, null, ex); throw new SockJsTransportFailureException("Failed to execute request to " + url, ex);
} }
HttpStatus status = HttpStatus.valueOf(response.getStatus()); HttpStatus status = HttpStatus.valueOf(response.getStatus());
HttpHeaders responseHeaders = toHttpHeaders(response.getHeaders()); HttpHeaders responseHeaders = toHttpHeaders(response.getHeaders());

85
spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/UndertowXhrTransport.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2014 the original author or authors. * Copyright 2002-2015 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -65,6 +65,7 @@ import org.springframework.web.socket.sockjs.frame.SockJsFrame;
/** /**
* An XHR transport based on Undertow's {@link io.undertow.client.UndertowClient}. * An XHR transport based on Undertow's {@link io.undertow.client.UndertowClient}.
* Compatible with Undertow 1.0, 1.1, 1.2.
* *
* <p>When used for testing purposes (e.g. load testing) or for specific use cases * <p>When used for testing purposes (e.g. load testing) or for specific use cases
* (like HTTPS configuration), a custom OptionMap should be provided: * (like HTTPS configuration), a custom OptionMap should be provided:
@ -88,13 +89,15 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
private static final AttachmentKey<String> RESPONSE_BODY = AttachmentKey.create(String.class); private static final AttachmentKey<String> RESPONSE_BODY = AttachmentKey.create(String.class);
private final Pool<ByteBuffer> bufferPool;
private final UndertowClient httpClient;
private final OptionMap optionMap; private final OptionMap optionMap;
private final XnioWorker worker; private final XnioWorker worker;
private final UndertowClient httpClient; private final Pool<ByteBuffer> bufferPool;
public UndertowXhrTransport() throws IOException { public UndertowXhrTransport() throws IOException {
this(OptionMap.builder().parse(Options.WORKER_NAME, "SockJSClient").getMap()); this(OptionMap.builder().parse(Options.WORKER_NAME, "SockJSClient").getMap());
@ -102,19 +105,20 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
public UndertowXhrTransport(OptionMap optionMap) throws IOException { public UndertowXhrTransport(OptionMap optionMap) throws IOException {
Assert.notNull(optionMap, "'optionMap' is required"); Assert.notNull(optionMap, "'optionMap' is required");
this.bufferPool = new ByteBufferSlicePool(1048, 1048); this.httpClient = UndertowClient.getInstance();
this.optionMap = optionMap; this.optionMap = optionMap;
this.worker = Xnio.getInstance().createWorker(optionMap); this.worker = Xnio.getInstance().createWorker(optionMap);
this.httpClient = UndertowClient.getInstance(); this.bufferPool = new ByteBufferSlicePool(1048, 1048);
} }
private static HttpHeaders toHttpHeaders(HeaderMap headerMap) { private static HttpHeaders toHttpHeaders(HeaderMap headerMap) {
HttpHeaders responseHeaders = new HttpHeaders(); HttpHeaders responseHeaders = new HttpHeaders();
Iterator<HttpString> names = headerMap.getHeaderNames().iterator(); Iterator<HttpString> names = headerMap.getHeaderNames().iterator();
while(names.hasNext()) { while (names.hasNext()) {
HttpString name = names.next(); HttpString name = names.next();
Iterator<String> values = headerMap.get(name).iterator(); Iterator<String> values = headerMap.get(name).iterator();
while(values.hasNext()) { while (values.hasNext()) {
responseHeaders.add(name.toString(), values.next()); responseHeaders.add(name.toString(), values.next());
} }
} }
@ -130,21 +134,24 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
} }
} }
/** /**
* Return Undertow's native HTTP client * Return Undertow's native HTTP client
*/ */
public UndertowClient getHttpClient() { public UndertowClient getHttpClient() {
return httpClient; return this.httpClient;
} }
/** /**
* Return the {@link org.xnio.XnioWorker} backing the I/O operations for Undertow's HTTP client * Return the {@link org.xnio.XnioWorker} backing the I/O operations
* for Undertow's HTTP client.
* @see org.xnio.Xnio * @see org.xnio.Xnio
*/ */
public XnioWorker getWorker() { public XnioWorker getWorker() {
return this.worker; return this.worker;
} }
@Override @Override
protected ResponseEntity<String> executeInfoRequestInternal(URI infoUrl) { protected ResponseEntity<String> executeInfoRequestInternal(URI infoUrl) {
return executeRequest(infoUrl, Methods.GET, getRequestHeaders(), null); return executeRequest(infoUrl, Methods.GET, getRequestHeaders(), null);
@ -156,23 +163,23 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
} }
protected ResponseEntity<String> executeRequest(URI url, HttpString method, HttpHeaders headers, String body) { protected ResponseEntity<String> executeRequest(URI url, HttpString method, HttpHeaders headers, String body) {
CountDownLatch latch = new CountDownLatch(1);
List<ClientResponse> responses = new CopyOnWriteArrayList<ClientResponse>();
final CountDownLatch latch = new CountDownLatch(1);
final List<ClientResponse> responses = new CopyOnWriteArrayList<ClientResponse>();
try { try {
final ClientConnection connection = this.httpClient.connect(url, this.worker, ClientConnection connection = this.httpClient.connect(
this.bufferPool, this.optionMap).get(); url, this.worker, this.bufferPool, this.optionMap).get();
try { try {
final ClientRequest request = new ClientRequest().setMethod(method).setPath(url.getPath()); ClientRequest request = new ClientRequest().setMethod(method).setPath(url.getPath());
request.getRequestHeaders().add(HttpString.tryFromString(HttpHeaders.HOST), url.getHost()); request.getRequestHeaders().add(HttpString.tryFromString(HttpHeaders.HOST), url.getHost());
if (body !=null && !body.isEmpty()) { if (body != null && !body.isEmpty()) {
request.getRequestHeaders().add(HttpString.tryFromString(HttpHeaders.CONTENT_LENGTH), body.length()); request.getRequestHeaders().add(HttpString.tryFromString(HttpHeaders.CONTENT_LENGTH), body.length());
} }
addHttpHeaders(request, headers); addHttpHeaders(request, headers);
connection.sendRequest(request, createRequestCallback(body, responses, latch)); connection.sendRequest(request, createRequestCallback(body, responses, latch));
latch.await(); latch.await();
final ClientResponse response = responses.iterator().next(); ClientResponse response = responses.iterator().next();
HttpStatus status = HttpStatus.valueOf(response.getResponseCode()); HttpStatus status = HttpStatus.valueOf(response.getResponseCode());
HttpHeaders responseHeaders = toHttpHeaders(response.getResponseHeaders()); HttpHeaders responseHeaders = toHttpHeaders(response.getResponseHeaders());
String responseBody = response.getAttachment(RESPONSE_BODY); String responseBody = response.getAttachment(RESPONSE_BODY);
@ -185,10 +192,10 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
} }
} }
catch (IOException ex) { catch (IOException ex) {
throw new SockJsTransportFailureException("Failed to execute request to " + url, null, ex); throw new SockJsTransportFailureException("Failed to execute request to " + url, ex);
} }
catch(InterruptedException ex) { catch (InterruptedException ex) {
throw new SockJsTransportFailureException("Failed to execute request to " + url, null, ex); throw new SockJsTransportFailureException("Interrupted while processing request to " + url, ex);
} }
} }
@ -203,21 +210,18 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
@Override @Override
public void completed(final ClientExchange result) { public void completed(final ClientExchange result) {
responses.add(result.getResponse()); responses.add(result.getResponse());
new StringReadChannelListener(result.getConnection().getBufferPool()) { new StringReadChannelListener(result.getConnection().getBufferPool()) {
@Override @Override
protected void stringDone(String string) { protected void stringDone(String string) {
result.getResponse().putAttachment(RESPONSE_BODY, string); result.getResponse().putAttachment(RESPONSE_BODY, string);
latch.countDown(); latch.countDown();
} }
@Override @Override
protected void error(IOException ex) { protected void error(IOException ex) {
onFailure(latch, ex); onFailure(latch, ex);
} }
}.setup(result.getResponseChannel()); }.setup(result.getResponseChannel());
} }
@Override @Override
public void failed(IOException ex) { public void failed(IOException ex) {
onFailure(latch, ex); onFailure(latch, ex);
@ -238,28 +242,28 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
onFailure(latch, ex); onFailure(latch, ex);
} }
} }
@Override @Override
public void failed(IOException ex) { public void failed(IOException ex) {
onFailure(latch, ex); onFailure(latch, ex);
} }
private void onFailure(CountDownLatch latch, IOException ex) {
private void onFailure(final CountDownLatch latch, IOException ex) {
latch.countDown(); latch.countDown();
throw new SockJsTransportFailureException("Failed to execute request", null, ex); throw new SockJsTransportFailureException("Failed to execute request", ex);
} }
}; };
} }
@Override @Override
protected void connectInternal(TransportRequest request, WebSocketHandler handler, URI receiveUrl, protected void connectInternal(TransportRequest request, WebSocketHandler handler, URI receiveUrl,
HttpHeaders handshakeHeaders, XhrClientSockJsSession session, SettableListenableFuture<WebSocketSession> connectFuture) { HttpHeaders handshakeHeaders, XhrClientSockJsSession session,
SettableListenableFuture<WebSocketSession> connectFuture) {
executeReceiveRequest(receiveUrl, handshakeHeaders, session, connectFuture); executeReceiveRequest(receiveUrl, handshakeHeaders, session, connectFuture);
} }
private void executeReceiveRequest(final URI url, final HttpHeaders headers, final XhrClientSockJsSession session, private void executeReceiveRequest(final URI url, final HttpHeaders headers, final XhrClientSockJsSession session,
final SettableListenableFuture<WebSocketSession> connectFuture) { final SettableListenableFuture<WebSocketSession> connectFuture) {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Starting XHR receive request, url=" + url); logger.trace("Starting XHR receive request, url=" + url);
} }
@ -273,10 +277,9 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
addHttpHeaders(httpRequest, headers); addHttpHeaders(httpRequest, headers);
result.sendRequest(httpRequest, createConnectCallback(url, getRequestHeaders(), session, connectFuture)); result.sendRequest(httpRequest, createConnectCallback(url, getRequestHeaders(), session, connectFuture));
} }
@Override @Override
public void failed(IOException ex) { public void failed(IOException ex) {
throw new SockJsTransportFailureException("Failed to execute request to " + url, null, ex); throw new SockJsTransportFailureException("Failed to execute request to " + url, ex);
} }
}, },
url, this.worker, this.bufferPool, this.optionMap); url, this.worker, this.bufferPool, this.optionMap);
@ -289,11 +292,9 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
return new ClientCallback<ClientExchange>() { return new ClientCallback<ClientExchange>() {
@Override @Override
public void completed(final ClientExchange result) { public void completed(final ClientExchange result) {
result.setResponseListener(new ClientCallback<ClientExchange>() { result.setResponseListener(new ClientCallback<ClientExchange>() {
@Override @Override
public void completed(final ClientExchange result) { public void completed(ClientExchange result) {
ClientResponse response = result.getResponse(); ClientResponse response = result.getResponse();
if (response.getResponseCode() != 200) { if (response.getResponseCode() != 200) {
HttpStatus status = HttpStatus.valueOf(response.getResponseCode()); HttpStatus status = HttpStatus.valueOf(response.getResponseCode());
@ -320,9 +321,7 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
IoUtils.safeClose(result.getConnection()); IoUtils.safeClose(result.getConnection());
onFailure(exc); onFailure(exc);
} }
} }
@Override @Override
public void failed(IOException exc) { public void failed(IOException exc) {
IoUtils.safeClose(result.getConnection()); IoUtils.safeClose(result.getConnection());
@ -330,12 +329,10 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
} }
}); });
} }
@Override @Override
public void failed(IOException exc) { public void failed(IOException exc) {
onFailure(exc); onFailure(exc);
} }
private void onFailure(Throwable failure) { private void onFailure(Throwable failure) {
if (connectFuture.setException(failure)) { if (connectFuture.setException(failure)) {
return; return;
@ -349,21 +346,26 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
} }
} }
}; };
} }
public class SockJsResponseListener implements ChannelListener<StreamSourceChannel> { public class SockJsResponseListener implements ChannelListener<StreamSourceChannel> {
private final ClientConnection connection; private final ClientConnection connection;
private final URI url; private final URI url;
private final HttpHeaders headers; private final HttpHeaders headers;
private final XhrClientSockJsSession session; private final XhrClientSockJsSession session;
private final SettableListenableFuture<WebSocketSession> connectFuture; private final SettableListenableFuture<WebSocketSession> connectFuture;
private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
public SockJsResponseListener(ClientConnection connection, URI url, HttpHeaders headers, public SockJsResponseListener(ClientConnection connection, URI url, HttpHeaders headers,
XhrClientSockJsSession sockJsSession, SettableListenableFuture<WebSocketSession> connectFuture) { XhrClientSockJsSession sockJsSession, SettableListenableFuture<WebSocketSession> connectFuture) {
this.connection = connection; this.connection = connection;
this.url = url; this.url = url;
this.headers = headers; this.headers = headers;
@ -371,7 +373,7 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
this.connectFuture = connectFuture; this.connectFuture = connectFuture;
} }
public void setup(final StreamSourceChannel channel) { public void setup(StreamSourceChannel channel) {
channel.suspendReads(); channel.suspendReads();
channel.getReadSetter().set(this); channel.getReadSetter().set(this);
channel.resumeReads(); channel.resumeReads();
@ -388,7 +390,6 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
} }
Pooled<ByteBuffer> pooled = this.connection.getBufferPool().allocate(); Pooled<ByteBuffer> pooled = this.connection.getBufferPool().allocate();
try { try {
int r; int r;
do { do {
@ -403,7 +404,7 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
onSuccess(); onSuccess();
} }
else { else {
while(buffer.hasRemaining()) { while (buffer.hasRemaining()) {
int b = buffer.get(); int b = buffer.get();
if (b == '\n') { if (b == '\n') {
handleFrame(); handleFrame();
@ -413,8 +414,8 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
} }
} }
} }
}
} while (r > 0); while (r > 0);
} }
catch (IOException exc) { catch (IOException exc) {
onFailure(exc); onFailure(exc);

Loading…
Cancel
Save