@ -17,6 +17,7 @@ package org.springframework.messaging.rsocket;
@@ -17,6 +17,7 @@ package org.springframework.messaging.rsocket;
import java.util.function.Function ;
import io.rsocket.AbstractRSocket ;
import io.rsocket.ConnectionSetupPayload ;
import io.rsocket.Payload ;
import io.rsocket.RSocket ;
@ -40,6 +41,8 @@ import org.springframework.messaging.support.MessageBuilder;
@@ -40,6 +41,8 @@ import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.MessageHeaderAccessor ;
import org.springframework.util.Assert ;
import org.springframework.util.MimeType ;
import org.springframework.util.MimeTypeUtils ;
import org.springframework.util.StringUtils ;
/ * *
* Package private implementation of { @link RSocket } that is is hooked into an
@ -49,90 +52,96 @@ import org.springframework.util.MimeType;
@@ -49,90 +52,96 @@ import org.springframework.util.MimeType;
* @author Rossen Stoyanchev
* @since 5 . 2
* /
class MessagingRSocket implements RSocket {
class MessagingRSocket extends Abstract RSocket {
private final ReactiveMessageChannel messageChannel ;
private final RSocketRequester requester ;
@Nullable
private final MimeType dataMimeType ;
private MimeType dataMimeType ;
private final RSocketStrategies strategies ;
MessagingRSocket ( ReactiveMessageChannel messageChannel ,
RSocket sendingRSocket , @Nullable MimeType dataMimeType , RSocketStrategies strategies ) {
RSocket sendingRSocket , @Nullable MimeType defaultD ataMimeType , RSocketStrategies strategies ) {
Assert . notNull ( messageChannel , "'messageChannel' is required" ) ;
Assert . notNull ( sendingRSocket , "'sendingRSocket' is required" ) ;
this . messageChannel = messageChannel ;
this . requester = RSocketRequester . create ( sendingRSocket , dataMimeType , strategies ) ;
this . dataMimeType = dataMimeType ;
this . requester = RSocketRequester . create ( sendingRSocket , defaultD ataMimeType , strategies ) ;
this . dataMimeType = defaultD ataMimeType ;
this . strategies = strategies ;
}
public Mono < Void > afterConnectionEstablished ( ConnectionSetupPayload payload ) {
return execute ( payload ) . flatMap ( flux - > flux . take ( 0 ) . then ( ) ) ;
public Mono < Void > handleConnectionSetupPayload ( ConnectionSetupPayload payload ) {
if ( StringUtils . hasText ( payload . dataMimeType ( ) ) ) {
this . dataMimeType = MimeTypeUtils . parseMimeType ( payload . dataMimeType ( ) ) ;
}
return handle ( payload ) ;
}
@Override
public Mono < Void > fireAndForget ( Payload payload ) {
return execute ( payload ) . flatMap ( flux - > flux . take ( 0 ) . then ( ) ) ;
return handle ( payload ) ;
}
@Override
public Mono < Payload > requestResponse ( Payload payload ) {
return execute ( payload ) . flatMap ( Flux : : next ) ;
return handleAndReply ( payload , Flux . just ( payload ) ) . next ( ) ;
}
@Override
public Flux < Payload > requestStream ( Payload payload ) {
return execute ( payload ) . flatMapMany ( Function . identity ( ) ) ;
return handleAndReply ( payload , Flux . just ( payload ) ) ;
}
@Override
public Flux < Payload > requestChannel ( Publisher < Payload > payloads ) {
return Flux . from ( payloads )
. switchOnFirst ( ( signal , inner ) - > {
Payload first = signal . get ( ) ;
return first ! = null ? execute ( first , inner ) . flatMapMany ( Function . identity ( ) ) : inner ;
. switchOnFirst ( ( signal , innerFlux ) - > {
Payload firstPayload = signal . get ( ) ;
return firstPayload = = null ? innerFlux : handleAndReply ( firstPayload , innerFlux ) ;
} ) ;
}
@Override
public Mono < Void > metadataPush ( Payload payload ) {
return null ;
// This won't be very useful until createHeaders starting doing something more with metadata..
return handle ( payload ) ;
}
private Mono < Flux < Payload > > execute ( Payload payload ) {
return execute ( payload , Flux . just ( payload ) ) ;
}
private Mono < Flux < Payload > > execute ( Payload firstPayload , Flux < Payload > payloads ) {
private Mono < Void > handle ( Payload payload ) {
// TODO:
// Since we do retain(), we need to ensure buffers are released if not consumed,
// e.g. error before Flux subscribed to, no handler found, @MessageMapping ignores payload, etc.
Message < ? > message = MessageBuilder . createMessage (
Mono . fromCallable ( ( ) - > wrapPayloadData ( payload ) ) ,
createHeaders ( payload , null ) ) ;
Flux < DataBuffer > payloadDataBuffers = payloads
. map ( payload - > PayloadUtils . asDataBuffer ( payload , this . strategies . dataBufferFactory ( ) ) )
. doOnDiscard ( PooledDataBuffer . class , DataBufferUtils : : release ) ;
return this . messageChannel . send ( message ) . flatMap ( result - > result ?
Mono . empty ( ) : Mono . error ( new MessageDeliveryException ( "RSocket request not handled" ) ) ) ;
}
private Flux < Payload > handleAndReply ( Payload firstPayload , Flux < Payload > payloads ) {
MonoProcessor < Flux < Payload > > replyMono = MonoProcessor . create ( ) ;
MessageHeaders headers = createHeaders ( firstPayload , replyMono ) ;
Message < ? > message = MessageBuilder . createMessage ( payloadDataBuffers , headers ) ;
Message < ? > message = MessageBuilder . createMessage (
payloads . map ( this : : wrapPayloadData ) . doOnDiscard ( PooledDataBuffer . class , DataBufferUtils : : release ) ,
createHeaders ( firstPayload , replyMono ) ) ;
return this . messageChannel . send ( message ) . flatMap ( result - > result ?
replyMono . isTerminated ( ) ? replyMono : Mono . empty ( ) :
Mono . error ( new MessageDeliveryException ( "RSocket interaction not handled" ) ) ) ;
return this . messageChannel . send ( message ) . flatMapMany ( result - >
result & & re plyMono . isTerminated ( ) ? replyMono . flatMapMany ( Function . identity ( ) ) :
Mono . error ( new MessageDeliveryException ( "RSocket request not handled" ) ) ) ;
}
private MessageHeaders createHeaders ( Payload payload , MonoProcessor < ? > replyMono ) {
private MessageHeaders createHeaders ( Payload payload , @Nullable MonoProcessor < ? > replyMono ) {
// TODO:
// For now treat the metadata as a simple string with routing information.
// We'll have to get more sophisticated once the routing extension is completed.
// https://github.com/rsocket/rsocket-java/issues/568
@ -147,7 +156,10 @@ class MessagingRSocket implements RSocket {
@@ -147,7 +156,10 @@ class MessagingRSocket implements RSocket {
}
headers . setHeader ( RSocketRequesterMethodArgumentResolver . RSOCKET_REQUESTER_HEADER , this . requester ) ;
if ( replyMono ! = null ) {
headers . setHeader ( RSocketPayloadReturnValueHandler . RESPONSE_HEADER , replyMono ) ;
}
DataBufferFactory bufferFactory = this . strategies . dataBufferFactory ( ) ;
headers . setHeader ( HandlerMethodReturnValueHandler . DATA_BUFFER_FACTORY_HEADER , bufferFactory ) ;
@ -155,13 +167,8 @@ class MessagingRSocket implements RSocket {
@@ -155,13 +167,8 @@ class MessagingRSocket implements RSocket {
return headers . getMessageHeaders ( ) ;
}
@Override
public Mono < Void > onClose ( ) {
return null ;
}
@Override
public void dispose ( ) {
private DataBuffer wrapPayloadData ( Payload payload ) {
return PayloadUtils . wrapPayloadData ( payload , this . strategies . dataBufferFactory ( ) ) ;
}
}