Browse Source

Provide access to CloseStatus in WebSocketSession

Closes gh-22079
pull/25098/head
Rossen Stoyanchev 6 years ago
parent
commit
de378599d3
  1. 49
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/CloseStatus.java
  2. 9
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java
  3. 42
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java
  4. 4
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java
  5. 8
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java
  6. 4
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketHandlerAdapter.java
  7. 4
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java
  8. 4
      spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java

49
spring-webflux/src/main/java/org/springframework/web/reactive/socket/CloseStatus.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2017 the original author or authors. * Copyright 2002-2020 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -19,6 +19,7 @@ package org.springframework.web.reactive.socket;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
/** /**
* Representation of WebSocket "close" status codes and reasons. Status codes * Representation of WebSocket "close" status codes and reasons. Status codes
@ -184,11 +185,55 @@ public final class CloseStatus {
return new CloseStatus(this.code, reason); return new CloseStatus(this.code, reason);
} }
/**
* @deprecated as of 5.3 in favor of comparing codes directly
*/
@Deprecated
public boolean equalsCode(CloseStatus other) { public boolean equalsCode(CloseStatus other) {
return (this.code == other.code); return (this.code == other.code);
} }
/**
* Return a constant for the given code, or create a new instance if the
* code does not match or there is a reason.
* @since 5.3
*/
public static CloseStatus create(int code, @Nullable String reason) {
if (StringUtils.isEmpty(reason)) {
switch (code) {
case 1000:
return NORMAL;
case 1001:
return GOING_AWAY;
case 1002:
return PROTOCOL_ERROR;
case 1003:
return NOT_ACCEPTABLE;
case 1005:
return NO_STATUS_CODE;
case 1006:
return NO_CLOSE_FRAME;
case 1007:
return BAD_DATA;
case 1008:
return POLICY_VIOLATION;
case 1009:
return TOO_BIG_TO_PROCESS;
case 1010:
return REQUIRED_EXTENSION;
case 1011:
return SERVER_ERROR;
case 1012:
return SERVICE_RESTARTED;
case 1013:
return SERVICE_OVERLOAD;
}
}
return new CloseStatus(code, reason);
}
@Override @Override
public boolean equals(@Nullable Object other) { public boolean equals(@Nullable Object other) {
if (this == other) { if (this == other) {

9
spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2018 the original author or authors. * Copyright 2002-2020 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -95,6 +95,13 @@ public interface WebSocketSession {
*/ */
Mono<Void> close(CloseStatus status); Mono<Void> close(CloseStatus status);
/**
* Provides access to the {@code CloseStatus} with which the session is
* closed either locally or remotely, or completes empty if the session ended
* without a status.
* @since 5.3
*/
Mono<CloseStatus> closeStatus();
// WebSocketMessage factory methods // WebSocketMessage factory methods

42
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2018 the original author or authors. * Copyright 2002-2020 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -35,6 +35,7 @@ import org.springframework.lang.Nullable;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.HandshakeInfo; import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage; import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketMessage.Type; import org.springframework.web.reactive.socket.WebSocketMessage.Type;
import org.springframework.web.reactive.socket.WebSocketSession; import org.springframework.web.reactive.socket.WebSocketSession;
@ -44,8 +45,8 @@ import org.springframework.web.reactive.socket.WebSocketSession;
* event-listener WebSocket APIs (e.g. Java WebSocket API JSR-356, Jetty, * event-listener WebSocket APIs (e.g. Java WebSocket API JSR-356, Jetty,
* Undertow) and Reactive Streams. * Undertow) and Reactive Streams.
* *
* <p>Also an implementation of {@code Subscriber&lt;Void&gt;} so it can be used as * <p>Also implements {@code Subscriber<Void>} so it can be used to subscribe to
* the completion subscriber for session handling * the completion of {@link WebSocketHandler#handle(WebSocketSession)}.
* *
* @author Violeta Georgieva * @author Violeta Georgieva
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
@ -63,7 +64,7 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
@Nullable @Nullable
private final MonoProcessor<Void> completionMono; private final MonoProcessor<Void> handlerCompletion;
private final WebSocketReceivePublisher receivePublisher; private final WebSocketReceivePublisher receivePublisher;
@ -72,6 +73,8 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
private final AtomicBoolean sendCalled = new AtomicBoolean(); private final AtomicBoolean sendCalled = new AtomicBoolean();
private final MonoProcessor<CloseStatus> closeStatusProcessor = MonoProcessor.create();
/** /**
* Base constructor. * Base constructor.
@ -87,15 +90,16 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
} }
/** /**
* Alternative constructor with completion {@code Mono&lt;Void&gt;} to propagate * Alternative constructor with completion {@code Mono<Void>} to propagate
* the session completion (success or error) (for client-side use). * session completion (success or error). This is primarily for use with the
* {@code WebSocketClient} to be able to report the end of execution.
*/ */
public AbstractListenerWebSocketSession(T delegate, String id, HandshakeInfo info, public AbstractListenerWebSocketSession(T delegate, String id, HandshakeInfo info,
DataBufferFactory bufferFactory, @Nullable MonoProcessor<Void> completionMono) { DataBufferFactory bufferFactory, @Nullable MonoProcessor<Void> handlerCompletion) {
super(delegate, id, info, bufferFactory); super(delegate, id, info, bufferFactory);
this.receivePublisher = new WebSocketReceivePublisher(); this.receivePublisher = new WebSocketReceivePublisher();
this.completionMono = completionMono; this.handlerCompletion = handlerCompletion;
} }
@ -126,6 +130,11 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
} }
} }
@Override
public Mono<CloseStatus> closeStatus() {
return this.closeStatusProcessor;
}
/** /**
* Whether the underlying WebSocket API has flow control and can suspend and * Whether the underlying WebSocket API has flow control and can suspend and
* resume the receiving of messages. * resume the receiving of messages.
@ -170,6 +179,7 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
/** Handle an error callback from the WebSocketHandler adapter. */ /** Handle an error callback from the WebSocketHandler adapter. */
void handleError(Throwable ex) { void handleError(Throwable ex) {
this.closeStatusProcessor.onComplete();
this.receivePublisher.onError(ex); this.receivePublisher.onError(ex);
WebSocketSendProcessor sendProcessor = this.sendProcessor; WebSocketSendProcessor sendProcessor = this.sendProcessor;
if (sendProcessor != null) { if (sendProcessor != null) {
@ -179,7 +189,8 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
} }
/** Handle a close callback from the WebSocketHandler adapter. */ /** Handle a close callback from the WebSocketHandler adapter. */
void handleClose(CloseStatus reason) { void handleClose(CloseStatus closeStatus) {
this.closeStatusProcessor.onNext(closeStatus);
this.receivePublisher.onAllDataRead(); this.receivePublisher.onAllDataRead();
WebSocketSendProcessor sendProcessor = this.sendProcessor; WebSocketSendProcessor sendProcessor = this.sendProcessor;
if (sendProcessor != null) { if (sendProcessor != null) {
@ -189,7 +200,7 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
} }
// Subscriber<Void> implementation // Subscriber<Void> implementation tracking WebSocketHandler#handle completion
@Override @Override
public void onSubscribe(Subscription subscription) { public void onSubscribe(Subscription subscription) {
@ -203,17 +214,16 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
@Override @Override
public void onError(Throwable ex) { public void onError(Throwable ex) {
if (this.completionMono != null) { if (this.handlerCompletion != null) {
this.completionMono.onError(ex); this.handlerCompletion.onError(ex);
} }
int code = CloseStatus.SERVER_ERROR.getCode(); close(CloseStatus.SERVER_ERROR.withReason(ex.getMessage()));
close(new CloseStatus(code, ex.getMessage()));
} }
@Override @Override
public void onComplete() { public void onComplete() {
if (this.completionMono != null) { if (this.handlerCompletion != null) {
this.completionMono.onComplete(); this.handlerCompletion.onComplete();
} }
close(); close();
} }

4
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2019 the original author or authors. * Copyright 2002-2020 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -131,7 +131,7 @@ public class JettyWebSocketHandlerAdapter {
@OnWebSocketClose @OnWebSocketClose
public void onWebSocketClose(int statusCode, String reason) { public void onWebSocketClose(int statusCode, String reason) {
if (this.delegateSession != null) { if (this.delegateSession != null) {
this.delegateSession.handleClose(new CloseStatus(statusCode, reason)); this.delegateSession.handleClose(CloseStatus.create(statusCode, reason));
} }
} }

8
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2018 the original author or authors. * Copyright 2002-2020 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -95,9 +95,15 @@ public class ReactorNettyWebSocketSession
@Override @Override
public Mono<Void> close(CloseStatus status) { public Mono<Void> close(CloseStatus status) {
// this will notify WebSocketInbound.receiveCloseStatus()
return getDelegate().getOutbound().sendClose(status.getCode(), status.getReason()); return getDelegate().getOutbound().sendClose(status.getCode(), status.getReason());
} }
@Override
public Mono<CloseStatus> closeStatus() {
return getDelegate().getInbound().receiveCloseStatus()
.map(status -> CloseStatus.create(status.code(), status.reasonText()));
}
/** /**
* Simple container for {@link NettyInbound} and {@link NettyOutbound}. * Simple container for {@link NettyInbound} and {@link NettyOutbound}.

4
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketHandlerAdapter.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2019 the original author or authors. * Copyright 2002-2020 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -110,7 +110,7 @@ public class StandardWebSocketHandlerAdapter extends Endpoint {
public void onClose(Session session, CloseReason reason) { public void onClose(Session session, CloseReason reason) {
if (this.delegateSession != null) { if (this.delegateSession != null) {
int code = reason.getCloseCode().getCode(); int code = reason.getCloseCode().getCode();
this.delegateSession.handleClose(new CloseStatus(code, reason.getReasonPhrase())); this.delegateSession.handleClose(CloseStatus.create(code, reason.getReasonPhrase()));
} }
} }

4
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2018 the original author or authors. * Copyright 2002-2020 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -72,7 +72,7 @@ public class UndertowWebSocketHandlerAdapter extends AbstractReceiveListener {
@Override @Override
protected void onFullCloseMessage(WebSocketChannel channel, BufferedBinaryMessage message) { protected void onFullCloseMessage(WebSocketChannel channel, BufferedBinaryMessage message) {
CloseMessage closeMessage = new CloseMessage(message.getData().getResource()); CloseMessage closeMessage = new CloseMessage(message.getData().getResource());
this.session.handleClose(new CloseStatus(closeMessage.getCode(), closeMessage.getReason())); this.session.handleClose(CloseStatus.create(closeMessage.getCode(), closeMessage.getReason()));
message.getData().free(); message.getData().free();
} }

4
spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java

@ -147,9 +147,11 @@ class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests {
void sessionClosing(WebSocketClient client, HttpServer server, Class<?> serverConfigClass) throws Exception { void sessionClosing(WebSocketClient client, HttpServer server, Class<?> serverConfigClass) throws Exception {
startServer(client, server, serverConfigClass); startServer(client, server, serverConfigClass);
MonoProcessor<CloseStatus> statusProcessor = MonoProcessor.create();
this.client.execute(getUrl("/close"), this.client.execute(getUrl("/close"),
session -> { session -> {
logger.debug("Starting.."); logger.debug("Starting..");
session.closeStatus().subscribe(statusProcessor);
return session.receive() return session.receive()
.doOnNext(s -> logger.debug("inbound " + s)) .doOnNext(s -> logger.debug("inbound " + s))
.then() .then()
@ -158,6 +160,8 @@ class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests {
); );
}) })
.block(TIMEOUT); .block(TIMEOUT);
assertThat(statusProcessor.block()).isEqualTo(CloseStatus.GOING_AWAY);
} }
@ParameterizedWebSocketTest @ParameterizedWebSocketTest

Loading…
Cancel
Save