|
|
|
@ -16,6 +16,7 @@ |
|
|
|
|
|
|
|
|
|
|
|
package org.springframework.messaging.rsocket; |
|
|
|
package org.springframework.messaging.rsocket; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import java.util.function.BiFunction; |
|
|
|
import java.util.function.Function; |
|
|
|
import java.util.function.Function; |
|
|
|
|
|
|
|
|
|
|
|
import io.rsocket.ConnectionSetupPayload; |
|
|
|
import io.rsocket.ConnectionSetupPayload; |
|
|
|
@ -26,6 +27,8 @@ import reactor.core.publisher.Mono; |
|
|
|
import org.springframework.lang.Nullable; |
|
|
|
import org.springframework.lang.Nullable; |
|
|
|
import org.springframework.messaging.Message; |
|
|
|
import org.springframework.messaging.Message; |
|
|
|
import org.springframework.util.MimeType; |
|
|
|
import org.springframework.util.MimeType; |
|
|
|
|
|
|
|
import org.springframework.util.MimeTypeUtils; |
|
|
|
|
|
|
|
import org.springframework.util.StringUtils; |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
* Extension of {@link RSocketMessageHandler} that can be plugged directly into |
|
|
|
* Extension of {@link RSocketMessageHandler} that can be plugged directly into |
|
|
|
@ -38,7 +41,7 @@ import org.springframework.util.MimeType; |
|
|
|
* @since 5.2 |
|
|
|
* @since 5.2 |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
public final class MessageHandlerAcceptor extends RSocketMessageHandler |
|
|
|
public final class MessageHandlerAcceptor extends RSocketMessageHandler |
|
|
|
implements SocketAcceptor, Function<RSocket, RSocket> { |
|
|
|
implements SocketAcceptor, BiFunction<ConnectionSetupPayload, RSocket, RSocket> { |
|
|
|
|
|
|
|
|
|
|
|
@Nullable |
|
|
|
@Nullable |
|
|
|
private MimeType defaultDataMimeType; |
|
|
|
private MimeType defaultDataMimeType; |
|
|
|
@ -58,7 +61,7 @@ public final class MessageHandlerAcceptor extends RSocketMessageHandler |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public Mono<RSocket> accept(ConnectionSetupPayload setupPayload, RSocket sendingRSocket) { |
|
|
|
public Mono<RSocket> accept(ConnectionSetupPayload setupPayload, RSocket sendingRSocket) { |
|
|
|
MessagingRSocket rsocket = createRSocket(sendingRSocket); |
|
|
|
MessagingRSocket rsocket = createRSocket(setupPayload, sendingRSocket); |
|
|
|
|
|
|
|
|
|
|
|
// Allow handling of the ConnectionSetupPayload via @MessageMapping methods.
|
|
|
|
// Allow handling of the ConnectionSetupPayload via @MessageMapping methods.
|
|
|
|
// However, if the handling is to make requests to the client, it's expected
|
|
|
|
// However, if the handling is to make requests to the client, it's expected
|
|
|
|
@ -67,15 +70,18 @@ public final class MessageHandlerAcceptor extends RSocketMessageHandler |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public RSocket apply(RSocket sendingRSocket) { |
|
|
|
public RSocket apply(ConnectionSetupPayload setupPayload, RSocket sendingRSocket) { |
|
|
|
return createRSocket(sendingRSocket); |
|
|
|
return createRSocket(setupPayload, sendingRSocket); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private MessagingRSocket createRSocket(RSocket rsocket) { |
|
|
|
private MessagingRSocket createRSocket(ConnectionSetupPayload setupPayload, RSocket rsocket) { |
|
|
|
|
|
|
|
MimeType dataMimeType = StringUtils.hasText(setupPayload.dataMimeType()) ? |
|
|
|
|
|
|
|
MimeTypeUtils.parseMimeType(setupPayload.dataMimeType()) : |
|
|
|
|
|
|
|
this.defaultDataMimeType; |
|
|
|
return new MessagingRSocket(this::handleMessage, |
|
|
|
return new MessagingRSocket(this::handleMessage, |
|
|
|
route -> getRouteMatcher().parseRoute(route), |
|
|
|
route -> getRouteMatcher().parseRoute(route), |
|
|
|
RSocketRequester.wrap(rsocket, this.defaultDataMimeType, getRSocketStrategies()), |
|
|
|
RSocketRequester.wrap(rsocket, dataMimeType, getRSocketStrategies()), |
|
|
|
this.defaultDataMimeType, |
|
|
|
dataMimeType, |
|
|
|
getRSocketStrategies().dataBufferFactory()); |
|
|
|
getRSocketStrategies().dataBufferFactory()); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|