@ -18,20 +18,19 @@ package org.springframework.web.reactive.socket.adapter;
import java.io.IOException ;
import java.io.IOException ;
import java.net.URI ;
import java.net.URI ;
import java.util.concurrent.atomic.AtomicBoolean ;
import java.util.concurrent.atomic.AtomicBoolean ;
import org.reactivestreams.Publisher ;
import org.reactivestreams.Publisher ;
import reactor.core.publisher.Flux ;
import reactor.core.publisher.Mono ;
import org.springframework.http.server.reactive.AbstractListenerReadPublisher ;
import org.springframework.http.server.reactive.AbstractListenerReadPublisher ;
import org.springframework.http.server.reactive.AbstractListenerWriteProcessor ;
import org.springframework.http.server.reactive.AbstractListenerWriteProcessor ;
import org.springframework.util.Assert ;
import org.springframework.util.Assert ;
import org.springframework.web.reactive.socket.CloseStatus ;
import org.springframework.web.reactive.socket.CloseStatus ;
import org.springframework.web.reactive.socket.WebSocketMessage ;
import org.springframework.web.reactive.socket.WebSocketMessage ;
import org.springframework.web.reactive.socket.WebSocketSession ;
import org.springframework.web.reactive.socket.WebSocketMessage.Type ;
import org.springframework.web.reactive.socket.WebSocketMessage.Type ;
import org.springframework.web.reactive.socket.WebSocketSession ;
import reactor.core.publisher.Flux ;
import reactor.core.publisher.Mono ;
/ * *
/ * *
* Base class for Listener - based { @link WebSocketSession } adapters .
* Base class for Listener - based { @link WebSocketSession } adapters .
@ -39,7 +38,7 @@ import reactor.core.publisher.Mono;
* @author Violeta Georgieva
* @author Violeta Georgieva
* @since 5 . 0
* @since 5 . 0
* /
* /
public abstract class AbstractListenerWebSocketSessionSupport < T > extends WebSocketSessionSupport < T > {
public abstract class AbstractListenerWebSocketSession < T > extends WebSocketSessionSupport < T > {
private final AtomicBoolean sendCalled = new AtomicBoolean ( ) ;
private final AtomicBoolean sendCalled = new AtomicBoolean ( ) ;
@ -47,12 +46,12 @@ public abstract class AbstractListenerWebSocketSessionSupport<T> extends WebSock
private final URI uri ;
private final URI uri ;
protected final WebSocketMessagePublisher webSocketMessagePublisher =
private final WebSocketReceivePublisher receivePublisher = new WebSocketReceivePublisher ( ) ;
new WebSocketMessagePublisher ( ) ;
protected volatile WebSocketMessageProcessor webSocketMessage Processor ;
private volatile WebSocketSendProcessor send Processor ;
public AbstractListenerWebSocketSessionSupport ( T delegate , String id , URI uri ) {
public AbstractListenerWebSocketSession ( T delegate , String id , URI uri ) {
super ( delegate ) ;
super ( delegate ) ;
Assert . notNull ( id , "'id' is required." ) ;
Assert . notNull ( id , "'id' is required." ) ;
Assert . notNull ( uri , "'uri' is required." ) ;
Assert . notNull ( uri , "'uri' is required." ) ;
@ -60,6 +59,7 @@ public abstract class AbstractListenerWebSocketSessionSupport<T> extends WebSock
this . uri = uri ;
this . uri = uri ;
}
}
@Override
@Override
public String getId ( ) {
public String getId ( ) {
return this . id ;
return this . id ;
@ -70,18 +70,22 @@ public abstract class AbstractListenerWebSocketSessionSupport<T> extends WebSock
return this . uri ;
return this . uri ;
}
}
protected WebSocketSendProcessor getSendProcessor ( ) {
return this . sendProcessor ;
}
@Override
@Override
public Flux < WebSocketMessage > receive ( ) {
public Flux < WebSocketMessage > receive ( ) {
return Flux . from ( this . webSocketMessagePublisher ) ;
return Flux . from ( this . receiv ePublisher) ;
}
}
@Override
@Override
public Mono < Void > send ( Publisher < WebSocketMessage > messages ) {
public Mono < Void > send ( Publisher < WebSocketMessage > messages ) {
if ( this . sendCalled . compareAndSet ( false , true ) ) {
if ( this . sendCalled . compareAndSet ( false , true ) ) {
this . webSocketMes sag eProcessor = new WebSocketMessage Processor ( ) ;
this . send Processor = new WebSocketSend Processor ( ) ;
return Mono . from ( subscriber - > {
return Mono . from ( subscriber - > {
messages . subscribe ( this . webSocketMes sag eProcessor) ;
messages . subscribe ( this . send Processor ) ;
this . webSocketMes sag eProcessor. subscribe ( subscriber ) ;
this . send Processor . subscribe ( subscriber ) ;
} ) ;
} ) ;
}
}
else {
else {
@ -97,32 +101,38 @@ public abstract class AbstractListenerWebSocketSessionSupport<T> extends WebSock
// no-op
// no-op
}
}
protected abstract boolean writeInternal ( WebSocketMessage message ) throws IOException ;
protected boolean isReadyToReceive ( ) {
return this . receivePublisher . isReadyToReceive ( ) ;
}
/** Handle a message callback from the Servlet container */
protected abstract boolean sendMessage ( WebSocketMessage message ) throws IOException ;
/** Handle a message callback from the WebSocketHandler adapter */
void handleMessage ( Type type , WebSocketMessage message ) {
void handleMessage ( Type type , WebSocketMessage message ) {
this . webSocketMessagePublisher . processWebSocketMessage ( message ) ;
this . receivePublisher . handle Message( message ) ;
}
}
/** Handle a error callback from the Servlet contain er */
/** Handle an error callback from the WebSocketHandler adapt er */
void handleError ( Throwable ex ) {
void handleError ( Throwable ex ) {
this . webSocketMessag ePublisher. onError ( ex ) ;
this . receiv ePublisher. onError ( ex ) ;
if ( this . webSocketMes sag eProcessor ! = null ) {
if ( this . send Processor ! = null ) {
this . webSocketMes sag eProcessor. cancel ( ) ;
this . send Processor . cancel ( ) ;
this . webSocketMes sag eProcessor. onError ( ex ) ;
this . send Processor . onError ( ex ) ;
}
}
}
}
/** Handle a complete callback from the Servlet contain er */
/** Handle a close callback from the WebSocketHandler adapt er */
void handleClose ( CloseStatus reason ) {
void handleClose ( CloseStatus reason ) {
this . webSocketMessag ePublisher. onAllDataRead ( ) ;
this . receiv ePublisher. onAllDataRead ( ) ;
if ( this . webSocketMes sag eProcessor ! = null ) {
if ( this . send Processor ! = null ) {
this . webSocketMes sag eProcessor. cancel ( ) ;
this . send Processor . cancel ( ) ;
this . webSocketMes sag eProcessor. onComplete ( ) ;
this . send Processor . onComplete ( ) ;
}
}
}
}
final class WebSocketMessagePublisher extends AbstractListenerReadPublisher < WebSocketMessage > {
private final class WebSocketReceivePublisher extends AbstractListenerReadPublisher < WebSocketMessage > {
private volatile WebSocketMessage webSocketMessage ;
private volatile WebSocketMessage webSocketMessage ;
@Override
@Override
@ -144,52 +154,47 @@ public abstract class AbstractListenerWebSocketSessionSupport<T> extends WebSock
return null ;
return null ;
}
}
void processWebSocket Message( WebSocketMessage webSocketMessage ) {
void handle Message( WebSocketMessage webSocketMessage ) {
this . webSocketMessage = webSocketMessage ;
this . webSocketMessage = webSocketMessage ;
suspendReceives ( ) ;
suspendReceives ( ) ;
onDataAvailable ( ) ;
onDataAvailable ( ) ;
}
}
boolean canAccept ( ) {
boolean isReadyToReceive ( ) {
return this . webSocketMessage = = null ;
return this . webSocketMessage = = null ;
}
}
}
}
final class WebSocketMessageProcessor extends AbstractListenerWriteProcessor < WebSocketMessage > {
protected final class WebSocketSendProcessor extends AbstractListenerWriteProcessor < WebSocketMessage > {
private volatile boolean isReady = true ;
private volatile boolean isReady = true ;
@Override
@Override
protected boolean write ( WebSocketMessage message ) throws IOException {
protected boolean write ( WebSocketMessage message ) throws IOException {
return writeInternal ( message ) ;
return sendMessage ( message ) ;
}
}
@Override
@Override
protected void releaseData ( ) {
protected void releaseData ( ) {
if ( logger . isTraceEnabled ( ) ) {
if ( logger . isTraceEnabled ( ) ) {
logger . trace ( "releaseBuffer : " + this . currentData ) ;
logger . trace ( "releaseData : " + this . currentData ) ;
}
}
this . currentData = null ;
this . currentData = null ;
}
}
@Override
@Override
protected boolean isDataEmpty ( WebSocketMessage data ) {
protected boolean isDataEmpty ( WebSocketMessage message ) {
return data . getPayload ( ) . readableByteCount ( ) = = 0 ;
return message . getPayload ( ) . readableByteCount ( ) = = 0 ;
}
}
@Override
@Override
protected boolean isWritePossible ( ) {
protected boolean isWritePossible ( ) {
if ( this . isReady & & this . currentData ! = null ) {
return this . isReady & & this . currentData ! = null ;
return true ;
}
else {
return false ;
}
}
}
void setReady ( boolean ready ) {
public void setReady ( boolean ready ) {
this . isReady = ready ;
this . isReady = ready ;
}
}
}
}
}
}