diff --git a/framework-platform/framework-platform.gradle b/framework-platform/framework-platform.gradle index e6d2bb7a9f8..7701943706a 100644 --- a/framework-platform/framework-platform.gradle +++ b/framework-platform/framework-platform.gradle @@ -9,7 +9,7 @@ javaPlatform { dependencies { api(platform("com.fasterxml.jackson:jackson-bom:2.13.3")) api(platform("io.netty:netty-bom:4.1.80.Final")) - api(platform("io.netty:netty5-bom:5.0.0.Alpha4")) + api(platform("io.netty:netty5-bom:5.0.0.Alpha3")) api(platform("io.projectreactor:reactor-bom:2022.0.0-M5")) api(platform("io.rsocket:rsocket-bom:1.1.2")) api(platform("org.apache.groovy:groovy-bom:4.0.4")) diff --git a/spring-web/spring-web.gradle b/spring-web/spring-web.gradle index 9e8c1511be8..1cd38acf8a1 100644 --- a/spring-web/spring-web.gradle +++ b/spring-web/spring-web.gradle @@ -20,10 +20,14 @@ dependencies { optional("io.reactivex.rxjava3:rxjava") optional("io.netty:netty-buffer") optional("io.netty:netty-handler") - optional("io.netty:netty-codec-http") // Until Netty4ClientHttpRequest is removed - optional("io.netty:netty-transport") // Until Netty4ClientHttpRequest is removed + optional("io.netty:netty-codec-http") + optional("io.netty:netty-transport") optional("io.projectreactor.netty:reactor-netty-http") optional("io.netty:netty5-buffer") + optional("io.netty:netty5-handler") + optional("io.netty:netty5-codec-http") + optional("io.netty:netty5-transport") + optional("io.projectreactor.netty:reactor-netty5-http:2.0.0-M1") optional("io.undertow:undertow-core") optional("org.apache.tomcat.embed:tomcat-embed-core") optional("org.eclipse.jetty:jetty-server") { diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/Netty5HeadersAdapter.java b/spring-web/src/main/java/org/springframework/http/client/reactive/Netty5HeadersAdapter.java new file mode 100644 index 00000000000..31a178b64dd --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/Netty5HeadersAdapter.java @@ -0,0 +1,283 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.client.reactive; + +import java.util.AbstractSet; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import io.netty5.handler.codec.http.HttpHeaders; + +import org.springframework.lang.Nullable; +import org.springframework.util.CollectionUtils; +import org.springframework.util.MultiValueMap; + +/** + * {@code MultiValueMap} implementation for wrapping Netty HTTP headers. + * + *

There is a duplicate of this class in the server package! + * + * This class is based on {@link NettyHeadersAdapter}. + * + * @author Violeta Georgieva + * @since 6.0 + */ +class Netty5HeadersAdapter implements MultiValueMap { + + private final HttpHeaders headers; + + + Netty5HeadersAdapter(HttpHeaders headers) { + this.headers = headers; + } + + + @Override + @Nullable + public String getFirst(String key) { + return this.headers.get(key); + } + + @Override + public void add(String key, @Nullable String value) { + if (value != null) { + this.headers.add(key, value); + } + } + + @Override + public void addAll(String key, List values) { + this.headers.add(key, values); + } + + @Override + public void addAll(MultiValueMap values) { + values.forEach(this.headers::add); + } + + @Override + public void set(String key, @Nullable String value) { + if (value != null) { + this.headers.set(key, value); + } + } + + @Override + public void setAll(Map values) { + values.forEach(this.headers::set); + } + + @Override + public Map toSingleValueMap() { + Map singleValueMap = CollectionUtils.newLinkedHashMap(this.headers.size()); + this.headers.entries() + .forEach(entry -> { + if (!singleValueMap.containsKey(entry.getKey())) { + singleValueMap.put(entry.getKey(), entry.getValue()); + } + }); + return singleValueMap; + } + + @Override + public int size() { + return this.headers.names().size(); + } + + @Override + public boolean isEmpty() { + return this.headers.isEmpty(); + } + + @Override + public boolean containsKey(Object key) { + return (key instanceof String headerName && this.headers.contains(headerName)); + } + + @Override + public boolean containsValue(Object value) { + return (value instanceof String && + this.headers.entries().stream() + .anyMatch(entry -> value.equals(entry.getValue()))); + } + + @Override + @Nullable + public List get(Object key) { + if (containsKey(key)) { + return this.headers.getAll((String) key); + } + return null; + } + + @Nullable + @Override + public List put(String key, @Nullable List value) { + List previousValues = this.headers.getAll(key); + this.headers.set(key, value); + return previousValues; + } + + @Nullable + @Override + public List remove(Object key) { + if (key instanceof String headerName) { + List previousValues = this.headers.getAll(headerName); + this.headers.remove(headerName); + return previousValues; + } + return null; + } + + @Override + public void putAll(Map> map) { + map.forEach(this.headers::set); + } + + @Override + public void clear() { + this.headers.clear(); + } + + @Override + public Set keySet() { + return new HeaderNames(); + } + + @Override + public Collection> values() { + return this.headers.names().stream() + .map(this.headers::getAll).collect(Collectors.toList()); + } + + @Override + public Set>> entrySet() { + return new AbstractSet<>() { + @Override + public Iterator>> iterator() { + return new EntryIterator(); + } + + @Override + public int size() { + return headers.size(); + } + }; + } + + + @Override + public String toString() { + return org.springframework.http.HttpHeaders.formatHeaders(this); + } + + + private class EntryIterator implements Iterator>> { + + private final Iterator names = headers.names().iterator(); + + @Override + public boolean hasNext() { + return this.names.hasNext(); + } + + @Override + public Entry> next() { + return new HeaderEntry(this.names.next()); + } + } + + + private class HeaderEntry implements Entry> { + + private final String key; + + HeaderEntry(String key) { + this.key = key; + } + + @Override + public String getKey() { + return this.key; + } + + @Override + public List getValue() { + return headers.getAll(this.key); + } + + @Override + public List setValue(List value) { + List previousValues = headers.getAll(this.key); + headers.set(this.key, value); + return previousValues; + } + } + + + private class HeaderNames extends AbstractSet { + + @Override + public Iterator iterator() { + return new HeaderNamesIterator(headers.names().iterator()); + } + + @Override + public int size() { + return headers.names().size(); + } + } + + private final class HeaderNamesIterator implements Iterator { + + private final Iterator iterator; + + @Nullable + private String currentName; + + private HeaderNamesIterator(Iterator iterator) { + this.iterator = iterator; + } + + @Override + public boolean hasNext() { + return this.iterator.hasNext(); + } + + @Override + public String next() { + this.currentName = this.iterator.next(); + return this.currentName; + } + + @Override + public void remove() { + if (this.currentName == null) { + throw new IllegalStateException("No current Header in iterator"); + } + if (!headers.contains(this.currentName)) { + throw new IllegalStateException("Header not present: " + this.currentName); + } + headers.remove(this.currentName); + } + } + +} diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorNetty2ClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorNetty2ClientHttpConnector.java new file mode 100644 index 00000000000..1eeece1426f --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorNetty2ClientHttpConnector.java @@ -0,0 +1,132 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.client.reactive; + +import java.net.URI; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +import reactor.core.publisher.Mono; +import reactor.netty5.NettyOutbound; +import reactor.netty5.http.client.HttpClient; +import reactor.netty5.http.client.HttpClientRequest; +import reactor.netty5.resources.ConnectionProvider; +import reactor.netty5.resources.LoopResources; + +import org.springframework.http.HttpMethod; +import org.springframework.util.Assert; + +/** + * Reactor Netty 2 (Netty 5) implementation of {@link ClientHttpConnector}. + * + *

This class is based on {@link ReactorClientHttpConnector}. + * + * @author Violeta Georgieva + * @since 6.0 + * @see HttpClient + */ +public class ReactorNetty2ClientHttpConnector implements ClientHttpConnector { + + private final static Function defaultInitializer = client -> client.compress(true); + + + private final HttpClient httpClient; + + + /** + * Default constructor. Initializes {@link HttpClient} via: + *

+	 * HttpClient.create().compress()
+	 * 
+ */ + public ReactorNetty2ClientHttpConnector() { + this.httpClient = defaultInitializer.apply(HttpClient.create().wiretap(true)); + } + + /** + * Constructor with externally managed Reactor Netty resources, including + * {@link LoopResources} for event loop threads, and {@link ConnectionProvider} + * for the connection pool. + *

This constructor should be used only when you don't want the client + * to participate in the Reactor Netty global resources. By default, the + * client participates in the Reactor Netty global resources held in + * {@link reactor.netty5.http.HttpResources}, which is recommended since + * fixed, shared resources are favored for event loop concurrency. However, + * consider declaring a {@link ReactorNetty2ResourceFactory} bean with + * {@code globalResources=true} in order to ensure the Reactor Netty global + * resources are shut down when the Spring ApplicationContext is closed. + * @param factory the resource factory to obtain the resources from + * @param mapper a mapper for further initialization of the created client + * @since 5.1 + */ + public ReactorNetty2ClientHttpConnector(ReactorNetty2ResourceFactory factory, Function mapper) { + ConnectionProvider provider = factory.getConnectionProvider(); + Assert.notNull(provider, "No ConnectionProvider: is ReactorNetty2ResourceFactory not initialized yet?"); + this.httpClient = defaultInitializer.andThen(mapper).andThen(applyLoopResources(factory)) + .apply(HttpClient.create(provider)); + } + + private static Function applyLoopResources(ReactorNetty2ResourceFactory factory) { + return httpClient -> { + LoopResources resources = factory.getLoopResources(); + Assert.notNull(resources, "No LoopResources: is ReactorNetty2ResourceFactory not initialized yet?"); + return httpClient.runOn(resources); + }; + } + + + /** + * Constructor with a pre-configured {@code HttpClient} instance. + * @param httpClient the client to use + * @since 5.1 + */ + public ReactorNetty2ClientHttpConnector(HttpClient httpClient) { + Assert.notNull(httpClient, "HttpClient is required"); + this.httpClient = httpClient; + } + + + @Override + public Mono connect(HttpMethod method, URI uri, + Function> requestCallback) { + + AtomicReference responseRef = new AtomicReference<>(); + + return this.httpClient + .request(io.netty5.handler.codec.http.HttpMethod.valueOf(method.name())) + .uri(uri.toString()) + .send((request, outbound) -> requestCallback.apply(adaptRequest(method, uri, request, outbound))) + .responseConnection((response, connection) -> { + responseRef.set(new ReactorNetty2ClientHttpResponse(response, connection)); + return Mono.just((ClientHttpResponse) responseRef.get()); + }) + .next() + .doOnCancel(() -> { + ReactorNetty2ClientHttpResponse response = responseRef.get(); + if (response != null) { + response.releaseAfterCancel(method); + } + }); + } + + private ReactorNetty2ClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpClientRequest request, + NettyOutbound nettyOutbound) { + + return new ReactorNetty2ClientHttpRequest(method, uri, request, nettyOutbound); + } + +} diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorNetty2ClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorNetty2ClientHttpRequest.java new file mode 100644 index 00000000000..81efbab9a41 --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorNetty2ClientHttpRequest.java @@ -0,0 +1,143 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.client.reactive; + +import java.net.URI; +import java.nio.file.Path; +import java.util.Collection; + +import io.netty5.buffer.api.Buffer; +import io.netty5.handler.codec.http.cookie.DefaultCookie; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.netty5.NettyOutbound; +import reactor.netty5.http.client.HttpClientRequest; + +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.Netty5DataBufferFactory; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.ZeroCopyHttpOutputMessage; + +/** + * {@link ClientHttpRequest} implementation for the Reactor Netty 2 (Netty 5) HTTP client. + * + *

This class is based on {@link ReactorClientHttpRequest}. + * + * @author Violeta Georgieva + * @since 6.0 + * @see reactor.netty5.http.client.HttpClient + */ +class ReactorNetty2ClientHttpRequest extends AbstractClientHttpRequest implements ZeroCopyHttpOutputMessage { + + private final HttpMethod httpMethod; + + private final URI uri; + + private final HttpClientRequest request; + + private final NettyOutbound outbound; + + private final Netty5DataBufferFactory bufferFactory; + + + public ReactorNetty2ClientHttpRequest(HttpMethod method, URI uri, HttpClientRequest request, NettyOutbound outbound) { + this.httpMethod = method; + this.uri = uri; + this.request = request; + this.outbound = outbound; + this.bufferFactory = new Netty5DataBufferFactory(outbound.alloc()); + } + + + @Override + public HttpMethod getMethod() { + return this.httpMethod; + } + + @Override + public URI getURI() { + return this.uri; + } + + @Override + public DataBufferFactory bufferFactory() { + return this.bufferFactory; + } + + @Override + @SuppressWarnings("unchecked") + public T getNativeRequest() { + return (T) this.request; + } + + @Override + public Mono writeWith(Publisher body) { + return doCommit(() -> { + // Send as Mono if possible as an optimization hint to Reactor Netty + if (body instanceof Mono) { + Mono bufferMono = Mono.from(body).map(Netty5DataBufferFactory::toBuffer); + return this.outbound.send(bufferMono).then(); + + } + else { + Flux bufferFlux = Flux.from(body).map(Netty5DataBufferFactory::toBuffer); + return this.outbound.send(bufferFlux).then(); + } + }); + } + + @Override + public Mono writeAndFlushWith(Publisher> body) { + Publisher> buffers = Flux.from(body).map(ReactorNetty2ClientHttpRequest::toBuffers); + return doCommit(() -> this.outbound.sendGroups(buffers).then()); + } + + private static Publisher toBuffers(Publisher dataBuffers) { + return Flux.from(dataBuffers).map(Netty5DataBufferFactory::toBuffer); + } + + @Override + public Mono writeWith(Path file, long position, long count) { + return doCommit(() -> this.outbound.sendFile(file, position, count).then()); + } + + @Override + public Mono setComplete() { + return doCommit(this.outbound::then); + } + + @Override + protected void applyHeaders() { + getHeaders().forEach((key, value) -> this.request.requestHeaders().set(key, value)); + } + + @Override + protected void applyCookies() { + getCookies().values().stream().flatMap(Collection::stream) + .map(cookie -> new DefaultCookie(cookie.getName(), cookie.getValue())) + .forEach(this.request::addCookie); + } + + @Override + protected HttpHeaders initReadOnlyHeaders() { + return HttpHeaders.readOnlyHttpHeaders(new Netty5HeadersAdapter(this.request.requestHeaders())); + } + +} diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorNetty2ClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorNetty2ClientHttpResponse.java new file mode 100644 index 00000000000..d9cc9782d60 --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorNetty2ClientHttpResponse.java @@ -0,0 +1,214 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.client.reactive; + +import java.util.Collection; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; + +import io.netty5.buffer.api.BufferAllocator; +import io.netty5.handler.codec.http.cookie.Cookie; +import io.netty5.handler.codec.http.cookie.DefaultCookie; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import reactor.core.publisher.Flux; +import reactor.netty5.Connection; +import reactor.netty5.NettyInbound; +import reactor.netty5.http.client.HttpClientResponse; + +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.Netty5DataBufferFactory; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.HttpStatusCode; +import org.springframework.http.ResponseCookie; +import org.springframework.lang.Nullable; +import org.springframework.util.ClassUtils; +import org.springframework.util.CollectionUtils; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; +import org.springframework.util.ObjectUtils; + +/** + * {@link ClientHttpResponse} implementation for the Reactor Netty 2 (Netty 5) HTTP client. + * + *

This class is based on {@link ReactorClientHttpResponse}. + * + * @author Violeta Georgieva + * @since 6.0 + * @see reactor.netty5.http.client.HttpClient + */ +class ReactorNetty2ClientHttpResponse implements ClientHttpResponse { + + /** Reactor Netty 1.0.5+. */ + static final boolean reactorNettyRequestChannelOperationsIdPresent = ClassUtils.isPresent( + "reactor.netty5.ChannelOperationsId", ReactorNetty2ClientHttpResponse.class.getClassLoader()); + + + private static final Log logger = LogFactory.getLog(ReactorNetty2ClientHttpResponse.class); + + private final HttpClientResponse response; + + private final HttpHeaders headers; + + private final NettyInbound inbound; + + private final Netty5DataBufferFactory bufferFactory; + + // 0 - not subscribed, 1 - subscribed, 2 - cancelled via connector (before subscribe) + private final AtomicInteger state = new AtomicInteger(); + + + /** + * Constructor that matches the inputs from + * {@link reactor.netty5.http.client.HttpClient.ResponseReceiver#responseConnection(BiFunction)}. + * @since 5.2.8 + */ + public ReactorNetty2ClientHttpResponse(HttpClientResponse response, Connection connection) { + this.response = response; + MultiValueMap adapter = new Netty5HeadersAdapter(response.responseHeaders()); + this.headers = HttpHeaders.readOnlyHttpHeaders(adapter); + this.inbound = connection.inbound(); + this.bufferFactory = new Netty5DataBufferFactory(connection.outbound().alloc()); + } + + /** + * Constructor with inputs extracted from a {@link Connection}. + * @deprecated as of 5.2.8, in favor of {@link #ReactorNetty2ClientHttpResponse(HttpClientResponse, Connection)} + */ + @Deprecated + public ReactorNetty2ClientHttpResponse(HttpClientResponse response, NettyInbound inbound, BufferAllocator alloc) { + this.response = response; + MultiValueMap adapter = new Netty5HeadersAdapter(response.responseHeaders()); + this.headers = HttpHeaders.readOnlyHttpHeaders(adapter); + this.inbound = inbound; + this.bufferFactory = new Netty5DataBufferFactory(alloc); + } + + + @Override + public String getId() { + String id = null; + if (reactorNettyRequestChannelOperationsIdPresent) { + id = ChannelOperationsIdHelper.getId(this.response); + } + if (id == null && this.response instanceof Connection connection) { + id = connection.channel().id().asShortText(); + } + return (id != null ? id : ObjectUtils.getIdentityHexString(this)); + } + + @Override + public Flux getBody() { + return this.inbound.receive() + .doOnSubscribe(s -> { + if (this.state.compareAndSet(0, 1)) { + return; + } + if (this.state.get() == 2) { + throw new IllegalStateException( + "The client response body has been released already due to cancellation."); + } + }) + .map(buffer -> this.bufferFactory.wrap(buffer.split())); + } + + @Override + public HttpHeaders getHeaders() { + return this.headers; + } + + @Override + public HttpStatusCode getStatusCode() { + return HttpStatusCode.valueOf(this.response.status().code()); + } + + @Override + @Deprecated + public int getRawStatusCode() { + return this.response.status().code(); + } + + @Override + public MultiValueMap getCookies() { + MultiValueMap result = new LinkedMultiValueMap<>(); + this.response.cookies().values().stream() + .flatMap(Collection::stream) + .forEach(cookie -> result.add(cookie.name(), + ResponseCookie.fromClientResponse(cookie.name(), cookie.value()) + .domain(cookie.domain()) + .path(cookie.path()) + .maxAge(cookie.maxAge()) + .secure(cookie.isSecure()) + .httpOnly(cookie.isHttpOnly()) + .sameSite(getSameSite(cookie)) + .build())); + return CollectionUtils.unmodifiableMultiValueMap(result); + } + + @Nullable + private static String getSameSite(Cookie cookie) { + if (cookie instanceof DefaultCookie defaultCookie) { + if (defaultCookie.sameSite() != null) { + return defaultCookie.sameSite().name(); + } + } + return null; + } + + /** + * Called by {@link ReactorNetty2ClientHttpConnector} when a cancellation is detected + * but the content has not been subscribed to. If the subscription never + * materializes then the content will remain not drained. Or it could still + * materialize if the cancellation happened very early, or the response + * reading was delayed for some reason. + */ + void releaseAfterCancel(HttpMethod method) { + if (mayHaveBody(method) && this.state.compareAndSet(0, 2)) { + if (logger.isDebugEnabled()) { + logger.debug("[" + getId() + "]" + "Releasing body, not yet subscribed."); + } + this.inbound.receive().doOnNext(buffer -> {}).subscribe(buffer -> {}, ex -> {}); + } + } + + private boolean mayHaveBody(HttpMethod method) { + int code = this.getRawStatusCode(); + return !((code >= 100 && code < 200) || code == 204 || code == 205 || + method.equals(HttpMethod.HEAD) || getHeaders().getContentLength() == 0); + } + + @Override + public String toString() { + return "ReactorNetty2ClientHttpResponse{" + + "request=[" + this.response.method().name() + " " + this.response.uri() + "]," + + "status=" + getRawStatusCode() + '}'; + } + + + private static class ChannelOperationsIdHelper { + + @Nullable + public static String getId(HttpClientResponse response) { + if (response instanceof reactor.netty5.ChannelOperationsId id) { + return (logger.isDebugEnabled() ? id.asLongText() : id.asShortText()); + } + return null; + } + } + +} diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorNetty2ResourceFactory.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorNetty2ResourceFactory.java new file mode 100644 index 00000000000..1e619d00c9b --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorNetty2ResourceFactory.java @@ -0,0 +1,250 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.client.reactive; + +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import reactor.netty5.http.HttpResources; +import reactor.netty5.resources.ConnectionProvider; +import reactor.netty5.resources.LoopResources; + +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + +/** + * Factory to manage Reactor Netty resources, i.e. {@link LoopResources} for + * event loop threads, and {@link ConnectionProvider} for the connection pool, + * within the lifecycle of a Spring {@code ApplicationContext}. + * + *

This factory implements {@link InitializingBean} and {@link DisposableBean} + * and is expected typically to be declared as a Spring-managed bean. + * + *

This class is based on {@link ReactorResourceFactory}. + * + * @author Violeta Georgieva + * @since 6.0 + */ +public class ReactorNetty2ResourceFactory implements InitializingBean, DisposableBean { + + private boolean useGlobalResources = true; + + @Nullable + private Consumer globalResourcesConsumer; + + private Supplier connectionProviderSupplier = () -> ConnectionProvider.create("webflux", 500); + + @Nullable + private ConnectionProvider connectionProvider; + + private Supplier loopResourcesSupplier = () -> LoopResources.create("webflux-http"); + + @Nullable + private LoopResources loopResources; + + private boolean manageConnectionProvider = false; + + private boolean manageLoopResources = false; + + private Duration shutdownQuietPeriod = Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_QUIET_PERIOD); + + private Duration shutdownTimeout = Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_TIMEOUT); + + + /** + * Whether to use global Reactor Netty resources via {@link HttpResources}. + *

Default is "true" in which case this factory initializes and stops the + * global Reactor Netty resources within Spring's {@code ApplicationContext} + * lifecycle. If set to "false" the factory manages its resources independent + * of the global ones. + * @param useGlobalResources whether to expose and manage the global resources + * @see #addGlobalResourcesConsumer(Consumer) + */ + public void setUseGlobalResources(boolean useGlobalResources) { + this.useGlobalResources = useGlobalResources; + } + + /** + * Whether this factory exposes the global + * {@link HttpResources HttpResources} holder. + */ + public boolean isUseGlobalResources() { + return this.useGlobalResources; + } + + /** + * Add a Consumer for configuring the global Reactor Netty resources on + * startup. When this option is used, {@link #setUseGlobalResources} is also + * enabled. + * @param consumer the consumer to apply + * @see #setUseGlobalResources(boolean) + */ + public void addGlobalResourcesConsumer(Consumer consumer) { + this.useGlobalResources = true; + this.globalResourcesConsumer = (this.globalResourcesConsumer != null ? + this.globalResourcesConsumer.andThen(consumer) : consumer); + } + + /** + * Use this when you don't want to participate in global resources and + * you want to customize the creation of the managed {@code ConnectionProvider}. + *

By default, {@code ConnectionProvider.elastic("http")} is used. + *

Note that this option is ignored if {@code userGlobalResources=false} or + * {@link #setConnectionProvider(ConnectionProvider)} is set. + * @param supplier the supplier to use + */ + public void setConnectionProviderSupplier(Supplier supplier) { + this.connectionProviderSupplier = supplier; + } + + /** + * Use this when you want to provide an externally managed + * {@link ConnectionProvider} instance. + * @param connectionProvider the connection provider to use as is + */ + public void setConnectionProvider(ConnectionProvider connectionProvider) { + this.connectionProvider = connectionProvider; + } + + /** + * Return the configured {@link ConnectionProvider}. + */ + public ConnectionProvider getConnectionProvider() { + Assert.state(this.connectionProvider != null, "ConnectionProvider not initialized yet"); + return this.connectionProvider; + } + + /** + * Use this when you don't want to participate in global resources and + * you want to customize the creation of the managed {@code LoopResources}. + *

By default, {@code LoopResources.create("reactor-http")} is used. + *

Note that this option is ignored if {@code userGlobalResources=false} or + * {@link #setLoopResources(LoopResources)} is set. + * @param supplier the supplier to use + */ + public void setLoopResourcesSupplier(Supplier supplier) { + this.loopResourcesSupplier = supplier; + } + + /** + * Use this option when you want to provide an externally managed + * {@link LoopResources} instance. + * @param loopResources the loop resources to use as is + */ + public void setLoopResources(LoopResources loopResources) { + this.loopResources = loopResources; + } + + /** + * Return the configured {@link LoopResources}. + */ + public LoopResources getLoopResources() { + Assert.state(this.loopResources != null, "LoopResources not initialized yet"); + return this.loopResources; + } + + /** + * Configure the amount of time we'll wait before shutting down resources. + * If a task is submitted during the {@code shutdownQuietPeriod}, it is guaranteed + * to be accepted and the {@code shutdownQuietPeriod} will start over. + *

By default, this is set to + * {@link LoopResources#DEFAULT_SHUTDOWN_QUIET_PERIOD} which is 2 seconds but + * can also be overridden with the system property + * {@link reactor.netty5.ReactorNetty#SHUTDOWN_QUIET_PERIOD + * ReactorNetty.SHUTDOWN_QUIET_PERIOD}. + * @since 5.2.4 + * @see #setShutdownTimeout(Duration) + */ + public void setShutdownQuietPeriod(Duration shutdownQuietPeriod) { + Assert.notNull(shutdownQuietPeriod, "shutdownQuietPeriod should not be null"); + this.shutdownQuietPeriod = shutdownQuietPeriod; + } + + /** + * Configure the maximum amount of time to wait until the disposal of the + * underlying resources regardless if a task was submitted during the + * {@code shutdownQuietPeriod}. + *

By default, this is set to + * {@link LoopResources#DEFAULT_SHUTDOWN_TIMEOUT} which is 15 seconds but + * can also be overridden with the system property + * {@link reactor.netty5.ReactorNetty#SHUTDOWN_TIMEOUT + * ReactorNetty.SHUTDOWN_TIMEOUT}. + * @since 5.2.4 + * @see #setShutdownQuietPeriod(Duration) + */ + public void setShutdownTimeout(Duration shutdownTimeout) { + Assert.notNull(shutdownTimeout, "shutdownTimeout should not be null"); + this.shutdownTimeout = shutdownTimeout; + } + + + @Override + public void afterPropertiesSet() { + if (this.useGlobalResources) { + Assert.isTrue(this.loopResources == null && this.connectionProvider == null, + "'useGlobalResources' is mutually exclusive with explicitly configured resources"); + HttpResources httpResources = HttpResources.get(); + if (this.globalResourcesConsumer != null) { + this.globalResourcesConsumer.accept(httpResources); + } + this.connectionProvider = httpResources; + this.loopResources = httpResources; + } + else { + if (this.loopResources == null) { + this.manageLoopResources = true; + this.loopResources = this.loopResourcesSupplier.get(); + } + if (this.connectionProvider == null) { + this.manageConnectionProvider = true; + this.connectionProvider = this.connectionProviderSupplier.get(); + } + } + } + + @Override + public void destroy() { + if (this.useGlobalResources) { + HttpResources.disposeLoopsAndConnectionsLater(this.shutdownQuietPeriod, this.shutdownTimeout).block(); + } + else { + try { + ConnectionProvider provider = this.connectionProvider; + if (provider != null && this.manageConnectionProvider) { + provider.disposeLater().block(); + } + } + catch (Throwable ex) { + // ignore + } + + try { + LoopResources resources = this.loopResources; + if (resources != null && this.manageLoopResources) { + resources.disposeLater(this.shutdownQuietPeriod, this.shutdownTimeout).block(); + } + } + catch (Throwable ex) { + // ignore + } + } + } + +} diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/Netty5HeadersAdapter.java b/spring-web/src/main/java/org/springframework/http/server/reactive/Netty5HeadersAdapter.java new file mode 100644 index 00000000000..7e071b94ddf --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/Netty5HeadersAdapter.java @@ -0,0 +1,280 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.server.reactive; + +import java.util.AbstractSet; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import io.netty5.handler.codec.http.HttpHeaders; + +import org.springframework.lang.Nullable; +import org.springframework.util.CollectionUtils; +import org.springframework.util.MultiValueMap; + +/** + * {@code MultiValueMap} implementation for wrapping Netty HTTP headers. + * + *

This class is based on {@link NettyHeadersAdapter}. + * + * @author Violeta Georgieva + * @since 6.0 + */ +final class Netty5HeadersAdapter implements MultiValueMap { + + private final HttpHeaders headers; + + + Netty5HeadersAdapter(HttpHeaders headers) { + this.headers = headers; + } + + + @Override + @Nullable + public String getFirst(String key) { + return this.headers.get(key); + } + + @Override + public void add(String key, @Nullable String value) { + if (value != null) { + this.headers.add(key, value); + } + } + + @Override + public void addAll(String key, List values) { + this.headers.add(key, values); + } + + @Override + public void addAll(MultiValueMap values) { + values.forEach(this.headers::add); + } + + @Override + public void set(String key, @Nullable String value) { + if (value != null) { + this.headers.set(key, value); + } + } + + @Override + public void setAll(Map values) { + values.forEach(this.headers::set); + } + + @Override + public Map toSingleValueMap() { + Map singleValueMap = CollectionUtils.newLinkedHashMap(this.headers.size()); + this.headers.entries() + .forEach(entry -> { + if (!singleValueMap.containsKey(entry.getKey())) { + singleValueMap.put(entry.getKey(), entry.getValue()); + } + }); + return singleValueMap; + } + + @Override + public int size() { + return this.headers.names().size(); + } + + @Override + public boolean isEmpty() { + return this.headers.isEmpty(); + } + + @Override + public boolean containsKey(Object key) { + return (key instanceof String headerName && this.headers.contains(headerName)); + } + + @Override + public boolean containsValue(Object value) { + return (value instanceof String && + this.headers.entries().stream() + .anyMatch(entry -> value.equals(entry.getValue()))); + } + + @Override + @Nullable + public List get(Object key) { + if (containsKey(key)) { + return this.headers.getAll((String) key); + } + return null; + } + + @Nullable + @Override + public List put(String key, @Nullable List value) { + List previousValues = this.headers.getAll(key); + this.headers.set(key, value); + return previousValues; + } + + @Nullable + @Override + public List remove(Object key) { + if (key instanceof String headerName) { + List previousValues = this.headers.getAll(headerName); + this.headers.remove(headerName); + return previousValues; + } + return null; + } + + @Override + public void putAll(Map> map) { + map.forEach(this.headers::set); + } + + @Override + public void clear() { + this.headers.clear(); + } + + @Override + public Set keySet() { + return new HeaderNames(); + } + + @Override + public Collection> values() { + return this.headers.names().stream() + .map(this.headers::getAll).collect(Collectors.toList()); + } + + @Override + public Set>> entrySet() { + return new AbstractSet<>() { + @Override + public Iterator>> iterator() { + return new EntryIterator(); + } + + @Override + public int size() { + return headers.size(); + } + }; + } + + + @Override + public String toString() { + return org.springframework.http.HttpHeaders.formatHeaders(this); + } + + + private class EntryIterator implements Iterator>> { + + private final Iterator names = headers.names().iterator(); + + @Override + public boolean hasNext() { + return this.names.hasNext(); + } + + @Override + public Entry> next() { + return new HeaderEntry(this.names.next()); + } + } + + + private class HeaderEntry implements Entry> { + + private final String key; + + HeaderEntry(String key) { + this.key = key; + } + + @Override + public String getKey() { + return this.key; + } + + @Override + public List getValue() { + return headers.getAll(this.key); + } + + @Override + public List setValue(List value) { + List previousValues = headers.getAll(this.key); + headers.set(this.key, value); + return previousValues; + } + } + + private class HeaderNames extends AbstractSet { + + @Override + public Iterator iterator() { + return new HeaderNamesIterator(headers.names().iterator()); + } + + @Override + public int size() { + return headers.names().size(); + } + } + + private final class HeaderNamesIterator implements Iterator { + + private final Iterator iterator; + + @Nullable + private String currentName; + + private HeaderNamesIterator(Iterator iterator) { + this.iterator = iterator; + } + + @Override + public boolean hasNext() { + return this.iterator.hasNext(); + } + + @Override + public String next() { + this.currentName = this.iterator.next(); + return this.currentName; + } + + @Override + public void remove() { + if (this.currentName == null) { + throw new IllegalStateException("No current Header in iterator"); + } + if (!headers.contains(this.currentName)) { + throw new IllegalStateException("Header not present: " + this.currentName); + } + headers.remove(this.currentName); + } + } + +} diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/NettyHeadersAdapter.java b/spring-web/src/main/java/org/springframework/http/server/reactive/NettyHeadersAdapter.java index ed4f381eba4..e51b85d947c 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/NettyHeadersAdapter.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/NettyHeadersAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,7 +38,7 @@ import org.springframework.util.MultiValueMap; * @author Brian Clozel * @since 5.1.1 */ -class NettyHeadersAdapter implements MultiValueMap { +final class NettyHeadersAdapter implements MultiValueMap { private final HttpHeaders headers; diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorNetty2HttpHandlerAdapter.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorNetty2HttpHandlerAdapter.java new file mode 100644 index 00000000000..10c15167392 --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorNetty2HttpHandlerAdapter.java @@ -0,0 +1,79 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.server.reactive; + +import java.net.URISyntaxException; +import java.util.function.BiFunction; + +import io.netty5.handler.codec.http.HttpResponseStatus; +import org.apache.commons.logging.Log; +import reactor.core.publisher.Mono; +import reactor.netty5.http.server.HttpServerRequest; +import reactor.netty5.http.server.HttpServerResponse; + +import org.springframework.core.io.buffer.Netty5DataBufferFactory; +import org.springframework.http.HttpLogging; +import org.springframework.http.HttpMethod; +import org.springframework.util.Assert; + +/** + * Adapt {@link HttpHandler} to the Reactor Netty 5 channel handling function. + * + *

This class is based on {@link ReactorHttpHandlerAdapter}. + * + * @author Violeta Georgieva + * @since 6.0 + */ +public class ReactorNetty2HttpHandlerAdapter implements BiFunction> { + + private static final Log logger = HttpLogging.forLogName(ReactorNetty2HttpHandlerAdapter.class); + + + private final HttpHandler httpHandler; + + + public ReactorNetty2HttpHandlerAdapter(HttpHandler httpHandler) { + Assert.notNull(httpHandler, "HttpHandler must not be null"); + this.httpHandler = httpHandler; + } + + + @Override + public Mono apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) { + Netty5DataBufferFactory bufferFactory = new Netty5DataBufferFactory(reactorResponse.alloc()); + try { + ReactorNetty2ServerHttpRequest request = new ReactorNetty2ServerHttpRequest(reactorRequest, bufferFactory); + ServerHttpResponse response = new ReactorNetty2ServerHttpResponse(reactorResponse, bufferFactory); + + if (request.getMethod() == HttpMethod.HEAD) { + response = new HttpHeadResponseDecorator(response); + } + + return this.httpHandler.handle(request, response) + .doOnError(ex -> logger.trace(request.getLogPrefix() + "Failed to complete: " + ex.getMessage())) + .doOnSuccess(aVoid -> logger.trace(request.getLogPrefix() + "Handling completed")); + } + catch (URISyntaxException ex) { + if (logger.isDebugEnabled()) { + logger.debug("Failed to get request URI: " + ex.getMessage()); + } + reactorResponse.status(HttpResponseStatus.BAD_REQUEST); + return Mono.empty(); + } + } + +} diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorNetty2ServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorNetty2ServerHttpRequest.java new file mode 100644 index 00000000000..55cbaaf297e --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorNetty2ServerHttpRequest.java @@ -0,0 +1,243 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.server.reactive; + +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.concurrent.atomic.AtomicLong; + +import javax.net.ssl.SSLSession; + +import io.netty5.channel.Channel; +import io.netty5.handler.codec.http.HttpHeaderNames; +import io.netty5.handler.codec.http.cookie.Cookie; +import io.netty5.handler.ssl.SslHandler; +import org.apache.commons.logging.Log; +import reactor.core.publisher.Flux; +import reactor.netty5.Connection; +import reactor.netty5.http.server.HttpServerRequest; + +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.Netty5DataBufferFactory; +import org.springframework.http.HttpCookie; +import org.springframework.http.HttpLogging; +import org.springframework.http.HttpMethod; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +/** + * Adapt {@link ServerHttpRequest} to the Reactor {@link HttpServerRequest}. + * + *

This class is based on {@link ReactorServerHttpRequest}. + * + * @author Violeta Georgieva + * @since 6.0 + */ +class ReactorNetty2ServerHttpRequest extends AbstractServerHttpRequest { + + /** Reactor Netty 1.0.5+. */ + static final boolean reactorNettyRequestChannelOperationsIdPresent = ClassUtils.isPresent( + "reactor.netty.ChannelOperationsId", ReactorNetty2ServerHttpRequest.class.getClassLoader()); + + private static final Log logger = HttpLogging.forLogName(ReactorNetty2ServerHttpRequest.class); + + + private static final AtomicLong logPrefixIndex = new AtomicLong(); + + + private final HttpServerRequest request; + + private final Netty5DataBufferFactory bufferFactory; + + + public ReactorNetty2ServerHttpRequest(HttpServerRequest request, Netty5DataBufferFactory bufferFactory) + throws URISyntaxException { + + super(initUri(request), "", new Netty5HeadersAdapter(request.requestHeaders())); + Assert.notNull(bufferFactory, "DataBufferFactory must not be null"); + this.request = request; + this.bufferFactory = bufferFactory; + } + + private static URI initUri(HttpServerRequest request) throws URISyntaxException { + Assert.notNull(request, "HttpServerRequest must not be null"); + return new URI(resolveBaseUrl(request) + resolveRequestUri(request)); + } + + private static URI resolveBaseUrl(HttpServerRequest request) throws URISyntaxException { + String scheme = getScheme(request); + String header = request.requestHeaders().get(HttpHeaderNames.HOST); + if (header != null) { + final int portIndex; + if (header.startsWith("[")) { + portIndex = header.indexOf(':', header.indexOf(']')); + } + else { + portIndex = header.indexOf(':'); + } + if (portIndex != -1) { + try { + return new URI(scheme, null, header.substring(0, portIndex), + Integer.parseInt(header, portIndex + 1, header.length(), 10), null, null, null); + } + catch (NumberFormatException ex) { + throw new URISyntaxException(header, "Unable to parse port", portIndex); + } + } + else { + return new URI(scheme, header, null, null); + } + } + else { + InetSocketAddress localAddress = request.hostAddress(); + Assert.state(localAddress != null, "No host address available"); + return new URI(scheme, null, localAddress.getHostString(), + localAddress.getPort(), null, null, null); + } + } + + private static String getScheme(HttpServerRequest request) { + return request.scheme(); + } + + private static String resolveRequestUri(HttpServerRequest request) { + String uri = request.uri(); + for (int i = 0; i < uri.length(); i++) { + char c = uri.charAt(i); + if (c == '/' || c == '?' || c == '#') { + break; + } + if (c == ':' && (i + 2 < uri.length())) { + if (uri.charAt(i + 1) == '/' && uri.charAt(i + 2) == '/') { + for (int j = i + 3; j < uri.length(); j++) { + c = uri.charAt(j); + if (c == '/' || c == '?' || c == '#') { + return uri.substring(j); + } + } + return ""; + } + } + } + return uri; + } + + @Override + public HttpMethod getMethod() { + return HttpMethod.valueOf(this.request.method().name()); + } + + @Override + @Deprecated + public String getMethodValue() { + return this.request.method().name(); + } + + @Override + protected MultiValueMap initCookies() { + MultiValueMap cookies = new LinkedMultiValueMap<>(); + for (CharSequence name : this.request.cookies().keySet()) { + for (Cookie cookie : this.request.cookies().get(name)) { + HttpCookie httpCookie = new HttpCookie(name.toString(), cookie.value()); + cookies.add(name.toString(), httpCookie); + } + } + return cookies; + } + + @Override + @Nullable + public InetSocketAddress getLocalAddress() { + return this.request.hostAddress(); + } + + @Override + @Nullable + public InetSocketAddress getRemoteAddress() { + return this.request.remoteAddress(); + } + + @Override + @Nullable + protected SslInfo initSslInfo() { + Channel channel = ((Connection) this.request).channel(); + SslHandler sslHandler = channel.pipeline().get(SslHandler.class); + if (sslHandler == null && channel.parent() != null) { // HTTP/2 + sslHandler = channel.parent().pipeline().get(SslHandler.class); + } + if (sslHandler != null) { + SSLSession session = sslHandler.engine().getSession(); + return new DefaultSslInfo(session); + } + return null; + } + + @Override + public Flux getBody() { + return this.request.receive().transferOwnership().map(this.bufferFactory::wrap); + } + + @SuppressWarnings("unchecked") + @Override + public T getNativeRequest() { + return (T) this.request; + } + + @Override + @Nullable + protected String initId() { + if (this.request instanceof Connection) { + return ((Connection) this.request).channel().id().asShortText() + + "-" + logPrefixIndex.incrementAndGet(); + } + return null; + } + + @Override + protected String initLogPrefix() { + if (reactorNettyRequestChannelOperationsIdPresent) { + String id = (ChannelOperationsIdHelper.getId(this.request)); + if (id != null) { + return id; + } + } + if (this.request instanceof Connection) { + return ((Connection) this.request).channel().id().asShortText() + + "-" + logPrefixIndex.incrementAndGet(); + } + return getId(); + } + + + private static class ChannelOperationsIdHelper { + + @Nullable + public static String getId(HttpServerRequest request) { + if (request instanceof reactor.netty.ChannelOperationsId) { + return (logger.isDebugEnabled() ? + ((reactor.netty.ChannelOperationsId) request).asLongText() : + ((reactor.netty.ChannelOperationsId) request).asShortText()); + } + return null; + } + } + +} diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorNetty2ServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorNetty2ServerHttpResponse.java new file mode 100644 index 00000000000..f932750fc9a --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorNetty2ServerHttpResponse.java @@ -0,0 +1,157 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.server.reactive; + +import java.nio.file.Path; +import java.util.List; + +import io.netty5.buffer.api.Buffer; +import io.netty5.channel.ChannelId; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.netty5.ChannelOperationsId; +import reactor.netty5.http.server.HttpServerResponse; + +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.Netty5DataBufferFactory; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatusCode; +import org.springframework.http.ResponseCookie; +import org.springframework.http.ZeroCopyHttpOutputMessage; +import org.springframework.util.Assert; + +/** + * Adapt {@link ServerHttpResponse} to the {@link HttpServerResponse}. + * + *

This class is based on {@link ReactorServerHttpResponse}. + * + * @author Violeta Georgieva + * @since 6.0 + */ +class ReactorNetty2ServerHttpResponse extends AbstractServerHttpResponse implements ZeroCopyHttpOutputMessage { + + private static final Log logger = LogFactory.getLog(ReactorNetty2ServerHttpResponse.class); + + + private final HttpServerResponse response; + + + public ReactorNetty2ServerHttpResponse(HttpServerResponse response, DataBufferFactory bufferFactory) { + super(bufferFactory, new HttpHeaders(new Netty5HeadersAdapter(response.responseHeaders()))); + Assert.notNull(response, "HttpServerResponse must not be null"); + this.response = response; + } + + + @SuppressWarnings("unchecked") + @Override + public T getNativeResponse() { + return (T) this.response; + } + + @Override + public HttpStatusCode getStatusCode() { + HttpStatusCode status = super.getStatusCode(); + return (status != null ? status : HttpStatusCode.valueOf(this.response.status().code())); + } + + @Override + @Deprecated + public Integer getRawStatusCode() { + Integer status = super.getRawStatusCode(); + return (status != null ? status : this.response.status().code()); + } + + @Override + protected void applyStatusCode() { + HttpStatusCode status = super.getStatusCode(); + if (status != null) { + this.response.status(status.value()); + } + } + + @Override + protected Mono writeWithInternal(Publisher publisher) { + return this.response.send(toByteBufs(publisher)).then(); + } + + @Override + protected Mono writeAndFlushWithInternal(Publisher> publisher) { + return this.response.sendGroups(Flux.from(publisher).map(this::toByteBufs)).then(); + } + + @Override + protected void applyHeaders() { + } + + @Override + protected void applyCookies() { + // Netty Cookie doesn't support sameSite. When this is resolved, we can adapt to it again: + // https://github.com/netty/netty/issues/8161 + for (List cookies : getCookies().values()) { + for (ResponseCookie cookie : cookies) { + this.response.addHeader(HttpHeaders.SET_COOKIE, cookie.toString()); + } + } + } + + @Override + public Mono writeWith(Path file, long position, long count) { + return doCommit(() -> this.response.sendFile(file, position, count).then()); + } + + private Publisher toByteBufs(Publisher dataBuffers) { + return dataBuffers instanceof Mono ? + Mono.from(dataBuffers).map(Netty5DataBufferFactory::toBuffer) : + Flux.from(dataBuffers).map(Netty5DataBufferFactory::toBuffer); + } + + @Override + protected void touchDataBuffer(DataBuffer buffer) { + if (logger.isDebugEnabled()) { + if (ReactorNetty2ServerHttpRequest.reactorNettyRequestChannelOperationsIdPresent) { + if (ChannelOperationsIdHelper.touch(buffer, this.response)) { + return; + } + } + this.response.withConnection(connection -> { + ChannelId id = connection.channel().id(); + DataBufferUtils.touch(buffer, "Channel id: " + id.asShortText()); + }); + } + } + + + private static class ChannelOperationsIdHelper { + + public static boolean touch(DataBuffer dataBuffer, HttpServerResponse response) { + if (response instanceof ChannelOperationsId) { + String id = ((ChannelOperationsId) response).asLongText(); + DataBufferUtils.touch(dataBuffer, "Channel id: " + id); + return true; + } + return false; + } + } + + +} diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/HeadersAdaptersTests.java b/spring-web/src/test/java/org/springframework/http/server/reactive/HeadersAdaptersTests.java index 0788426cbfe..392904b3c80 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/HeadersAdaptersTests.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/HeadersAdaptersTests.java @@ -133,11 +133,12 @@ class HeadersAdaptersTests { static Stream headers() { return Stream.of( - arguments(named("Map", CollectionUtils.toMultiValueMap(new LinkedCaseInsensitiveMap<>(8, Locale.ENGLISH)))), - arguments(named("Netty", new NettyHeadersAdapter(new DefaultHttpHeaders()))), - arguments(named("Tomcat", new TomcatHeadersAdapter(new MimeHeaders()))), - arguments(named("Undertow", new UndertowHeadersAdapter(new HeaderMap()))), - arguments(named("Jetty", new JettyHeadersAdapter(HttpFields.build()))) + arguments(named("Map", CollectionUtils.toMultiValueMap(new LinkedCaseInsensitiveMap<>(8, Locale.ENGLISH)))), + arguments(named("Netty", new NettyHeadersAdapter(new DefaultHttpHeaders()))), + arguments(named("Netty", new Netty5HeadersAdapter(new io.netty5.handler.codec.http.DefaultHttpHeaders()))), + arguments(named("Tomcat", new TomcatHeadersAdapter(new MimeHeaders()))), + arguments(named("Undertow", new UndertowHeadersAdapter(new HeaderMap()))), + arguments(named("Jetty", new JettyHeadersAdapter(HttpFields.build()))) ); } diff --git a/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/AbstractHttpHandlerIntegrationTests.java b/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/AbstractHttpHandlerIntegrationTests.java index a971ed12af6..2db1114fe48 100644 --- a/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/AbstractHttpHandlerIntegrationTests.java +++ b/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/AbstractHttpHandlerIntegrationTests.java @@ -127,6 +127,7 @@ public abstract class AbstractHttpHandlerIntegrationTests { return Stream.of( named("Jetty", new JettyHttpServer()), named("Reactor Netty", new ReactorHttpServer()), + named("Reactor Netty 2", new ReactorHttpServer()), named("Tomcat", new TomcatHttpServer()), named("Undertow", new UndertowHttpServer()) ); diff --git a/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/ReactorNetty2HttpServer.java b/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/ReactorNetty2HttpServer.java new file mode 100644 index 00000000000..3d968b5bfe1 --- /dev/null +++ b/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/ReactorNetty2HttpServer.java @@ -0,0 +1,71 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.testfixture.http.server.reactive.bootstrap; + +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicReference; + +import reactor.netty5.DisposableServer; + +import org.springframework.http.server.reactive.ReactorNetty2HttpHandlerAdapter; + +/** + * This class is copied from {@link ReactorHttpServer}. + * + * @author Violeta Georgieva + * @since 6.0 + */ +public class ReactorNetty2HttpServer extends AbstractHttpServer { + + private ReactorNetty2HttpHandlerAdapter reactorHandler; + + private reactor.netty5.http.server.HttpServer reactorServer; + + private AtomicReference serverRef = new AtomicReference<>(); + + + @Override + protected void initServer() { + this.reactorHandler = createHttpHandlerAdapter(); + this.reactorServer = reactor.netty5.http.server.HttpServer.create().wiretap(true) + .host(getHost()).port(getPort()); + } + + private ReactorNetty2HttpHandlerAdapter createHttpHandlerAdapter() { + return new ReactorNetty2HttpHandlerAdapter(resolveHttpHandler()); + } + + @Override + protected void startInternal() { + DisposableServer server = this.reactorServer.handle(this.reactorHandler).bind().block(); + setPort(((InetSocketAddress) server.address()).getPort()); + this.serverRef.set(server); + } + + @Override + protected void stopInternal() { + this.serverRef.get().dispose(); + } + + @Override + protected void resetInternal() { + this.reactorServer = null; + this.reactorHandler = null; + this.serverRef.set(null); + } + +} diff --git a/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/ReactorNetty2HttpsServer.java b/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/ReactorNetty2HttpsServer.java new file mode 100644 index 00000000000..80cd9e47f63 --- /dev/null +++ b/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/ReactorNetty2HttpsServer.java @@ -0,0 +1,78 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.testfixture.http.server.reactive.bootstrap; + +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicReference; + +import io.netty.handler.ssl.util.SelfSignedCertificate; +import reactor.netty5.DisposableServer; +import reactor.netty5.http.Http11SslContextSpec; + +import org.springframework.http.server.reactive.ReactorNetty2HttpHandlerAdapter; + +/** + * This class is copied from {@link ReactorHttpsServer}. + * + * @author Violeta Georgieva + * @since 6.0 + */ +public class ReactorNetty2HttpsServer extends AbstractHttpServer { + + private ReactorNetty2HttpHandlerAdapter reactorHandler; + + private reactor.netty5.http.server.HttpServer reactorServer; + + private AtomicReference serverRef = new AtomicReference<>(); + + + @Override + protected void initServer() throws Exception { + SelfSignedCertificate cert = new SelfSignedCertificate(); + Http11SslContextSpec http11SslContextSpec = Http11SslContextSpec.forServer(cert.certificate(), cert.privateKey()); + + this.reactorHandler = createHttpHandlerAdapter(); + this.reactorServer = reactor.netty5.http.server.HttpServer.create() + .host(getHost()) + .port(getPort()) + .secure(sslContextSpec -> sslContextSpec.sslContext(http11SslContextSpec)); + } + + private ReactorNetty2HttpHandlerAdapter createHttpHandlerAdapter() { + return new ReactorNetty2HttpHandlerAdapter(resolveHttpHandler()); + } + + @Override + protected void startInternal() { + DisposableServer server = this.reactorServer.handle(this.reactorHandler).bind().block(); + setPort(((InetSocketAddress) server.address()).getPort()); + this.serverRef.set(server); + } + + @Override + protected void stopInternal() { + this.serverRef.get().dispose(); + } + + @Override + protected void resetInternal() { + this.reactorServer = null; + this.reactorHandler = null; + this.serverRef.set(null); + } + +} diff --git a/spring-webflux/spring-webflux.gradle b/spring-webflux/spring-webflux.gradle index dad4a490b6b..2121c0aaf6a 100644 --- a/spring-webflux/spring-webflux.gradle +++ b/spring-webflux/spring-webflux.gradle @@ -16,6 +16,7 @@ dependencies { optional("com.fasterxml.jackson.core:jackson-databind") optional("com.fasterxml.jackson.dataformat:jackson-dataformat-smile") optional("io.projectreactor.netty:reactor-netty-http") + optional("io.projectreactor.netty:reactor-netty5-http:2.0.0-M1") optional("org.apache.tomcat:tomcat-websocket") { exclude group: "org.apache.tomcat", module: "tomcat-servlet-api" exclude group: "org.apache.tomcat", module: "tomcat-websocket-api" diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/Netty5WebSocketSessionSupport.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/Netty5WebSocketSessionSupport.java new file mode 100644 index 00000000000..b86121cd373 --- /dev/null +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/Netty5WebSocketSessionSupport.java @@ -0,0 +1,105 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.reactive.socket.adapter; + +import java.util.HashMap; +import java.util.Map; + +import io.netty5.buffer.api.Buffer; +import io.netty5.handler.codec.http.websocketx.BinaryWebSocketFrame; +import io.netty5.handler.codec.http.websocketx.PingWebSocketFrame; +import io.netty5.handler.codec.http.websocketx.PongWebSocketFrame; +import io.netty5.handler.codec.http.websocketx.TextWebSocketFrame; +import io.netty5.handler.codec.http.websocketx.WebSocketFrame; + +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.Netty5DataBufferFactory; +import org.springframework.util.ObjectUtils; +import org.springframework.web.reactive.socket.HandshakeInfo; +import org.springframework.web.reactive.socket.WebSocketMessage; +import org.springframework.web.reactive.socket.WebSocketSession; + +/** + * Base class for Netty-based {@link WebSocketSession} adapters that provides + * convenience methods to convert Netty {@link WebSocketFrame WebSocketFrames} to and from + * {@link WebSocketMessage WebSocketMessages}. + * + *

This class is based on {@link NettyWebSocketSessionSupport}. + * + * @author Violeta Georgieva + * @since 6.0 + * @param the native delegate type + */ +public abstract class Netty5WebSocketSessionSupport extends AbstractWebSocketSession { + + /** + * The default max size for inbound WebSocket frames. + */ + public static final int DEFAULT_FRAME_MAX_SIZE = 64 * 1024; + + + private static final Map, WebSocketMessage.Type> messageTypes; + + static { + messageTypes = new HashMap<>(8); + messageTypes.put(TextWebSocketFrame.class, WebSocketMessage.Type.TEXT); + messageTypes.put(BinaryWebSocketFrame.class, WebSocketMessage.Type.BINARY); + messageTypes.put(PingWebSocketFrame.class, WebSocketMessage.Type.PING); + messageTypes.put(PongWebSocketFrame.class, WebSocketMessage.Type.PONG); + } + + + protected Netty5WebSocketSessionSupport(T delegate, HandshakeInfo info, Netty5DataBufferFactory factory) { + super(delegate, ObjectUtils.getIdentityHexString(delegate), info, factory); + } + + + @Override + public Netty5DataBufferFactory bufferFactory() { + return (Netty5DataBufferFactory) super.bufferFactory(); + } + + + protected WebSocketMessage toMessage(WebSocketFrame frame) { + WebSocketFrame newFrame = frame.send().receive(); + DataBuffer payload = bufferFactory().wrap(newFrame.binaryData()); + return new WebSocketMessage(messageTypes.get(newFrame.getClass()), payload, newFrame); + } + + protected WebSocketFrame toFrame(WebSocketMessage message) { + if (message.getNativeMessage() != null) { + return message.getNativeMessage(); + } + Buffer buffer = Netty5DataBufferFactory.toBuffer(message.getPayload()); + if (WebSocketMessage.Type.TEXT.equals(message.getType())) { + return new TextWebSocketFrame(buffer); + } + else if (WebSocketMessage.Type.BINARY.equals(message.getType())) { + return new BinaryWebSocketFrame(buffer); + } + else if (WebSocketMessage.Type.PING.equals(message.getType())) { + return new PingWebSocketFrame(buffer); + } + else if (WebSocketMessage.Type.PONG.equals(message.getType())) { + return new PongWebSocketFrame(buffer); + } + else { + throw new IllegalArgumentException("Unexpected message type: " + message.getType()); + } + } + +} diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNetty2WebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNetty2WebSocketSession.java new file mode 100644 index 00000000000..bf0fc539f05 --- /dev/null +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNetty2WebSocketSession.java @@ -0,0 +1,173 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.reactive.socket.adapter; + +import java.util.function.Consumer; + +import io.netty5.channel.ChannelId; +import io.netty5.handler.codec.http.websocketx.WebSocketFrame; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.netty5.Connection; +import reactor.netty5.NettyInbound; +import reactor.netty5.NettyOutbound; +import reactor.netty5.channel.ChannelOperations; +import reactor.netty5.http.websocket.WebsocketInbound; +import reactor.netty5.http.websocket.WebsocketOutbound; + +import org.springframework.core.io.buffer.Netty5DataBufferFactory; +import org.springframework.web.reactive.socket.CloseStatus; +import org.springframework.web.reactive.socket.HandshakeInfo; +import org.springframework.web.reactive.socket.WebSocketMessage; +import org.springframework.web.reactive.socket.WebSocketSession; + +/** + * {@link WebSocketSession} implementation for use with the Reactor Netty's (Netty 5) + * {@link NettyInbound} and {@link NettyOutbound}. + * This class is based on {@link ReactorNettyWebSocketSession}. + * + * @author Violeta Georgieva + * @since 6.0 + */ +public class ReactorNetty2WebSocketSession + extends Netty5WebSocketSessionSupport { + + private final int maxFramePayloadLength; + + private final ChannelId channelId; + + + /** + * Constructor for the session, using the {@link #DEFAULT_FRAME_MAX_SIZE} value. + */ + public ReactorNetty2WebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound, + HandshakeInfo info, Netty5DataBufferFactory bufferFactory) { + + this(inbound, outbound, info, bufferFactory, DEFAULT_FRAME_MAX_SIZE); + } + + /** + * Constructor with an additional maxFramePayloadLength argument. + * @since 5.1 + */ + @SuppressWarnings("rawtypes") + public ReactorNetty2WebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound, + HandshakeInfo info, Netty5DataBufferFactory bufferFactory, + int maxFramePayloadLength) { + + super(new WebSocketConnection(inbound, outbound), info, bufferFactory); + this.maxFramePayloadLength = maxFramePayloadLength; + this.channelId = ((ChannelOperations) inbound).channel().id(); + } + + + /** + * Return the id of the underlying Netty channel. + * @since 5.3.4 + */ + public ChannelId getChannelId() { + return this.channelId; + } + + + @Override + public Flux receive() { + return getDelegate().getInbound() + .aggregateFrames(this.maxFramePayloadLength) + .receiveFrames() + .map(super::toMessage) + .doOnNext(message -> { + if (logger.isTraceEnabled()) { + logger.trace(getLogPrefix() + "Received " + message); + } + }); + } + + @Override + public Mono send(Publisher messages) { + Flux frames = Flux.from(messages) + .doOnNext(message -> { + if (logger.isTraceEnabled()) { + logger.trace(getLogPrefix() + "Sending " + message); + } + }) + .map(this::toFrame); + return getDelegate().getOutbound() + .sendObject(frames) + .then(); + } + + @Override + public boolean isOpen() { + DisposedCallback callback = new DisposedCallback(); + getDelegate().getInbound().withConnection(callback); + return !callback.isDisposed(); + } + + @Override + public Mono close(CloseStatus status) { + // this will notify WebSocketInbound.receiveCloseStatus() + return getDelegate().getOutbound().sendClose(status.getCode(), status.getReason()); + } + + @Override + public Mono closeStatus() { + return getDelegate().getInbound().receiveCloseStatus() + .map(status -> CloseStatus.create(status.code(), status.reasonText())); + } + + /** + * Simple container for {@link NettyInbound} and {@link NettyOutbound}. + */ + public static class WebSocketConnection { + + private final WebsocketInbound inbound; + + private final WebsocketOutbound outbound; + + + public WebSocketConnection(WebsocketInbound inbound, WebsocketOutbound outbound) { + this.inbound = inbound; + this.outbound = outbound; + } + + public WebsocketInbound getInbound() { + return this.inbound; + } + + public WebsocketOutbound getOutbound() { + return this.outbound; + } + } + + + private static class DisposedCallback implements Consumer { + + private boolean disposed; + + public boolean isDisposed() { + return this.disposed; + } + + @Override + public void accept(Connection connection) { + this.disposed = connection.isDisposed(); + } + } + +} diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/ReactorNetty2WebSocketClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/ReactorNetty2WebSocketClient.java new file mode 100644 index 00000000000..f169953a4e2 --- /dev/null +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/ReactorNetty2WebSocketClient.java @@ -0,0 +1,230 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.reactive.socket.client; + +import java.net.URI; +import java.util.function.Supplier; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import reactor.core.publisher.Mono; +import reactor.netty5.http.client.HttpClient; +import reactor.netty5.http.client.WebsocketClientSpec; +import reactor.netty5.http.websocket.WebsocketInbound; + +import org.springframework.core.io.buffer.Netty5DataBufferFactory; +import org.springframework.http.HttpHeaders; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; +import org.springframework.web.reactive.socket.HandshakeInfo; +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.WebSocketSession; +import org.springframework.web.reactive.socket.adapter.ReactorNetty2WebSocketSession; + +/** + * {@link WebSocketClient} implementation for use with Reactor Netty for Netty 5. + * + *

This class is based on {@link ReactorNettyWebSocketClient}. + * + * @author Violeta Georgieva + * @since 6.0 + */ +public class ReactorNetty2WebSocketClient implements WebSocketClient { + + private static final Log logger = LogFactory.getLog(ReactorNetty2WebSocketClient.class); + + + private final HttpClient httpClient; + + private final Supplier specBuilderSupplier; + + @Nullable + private Integer maxFramePayloadLength; + + @Nullable + private Boolean handlePing; + + + /** + * Default constructor. + */ + public ReactorNetty2WebSocketClient() { + this(HttpClient.create()); + } + + /** + * Constructor that accepts an existing {@link HttpClient} builder + * with a default {@link WebsocketClientSpec.Builder}. + * @since 5.1 + */ + public ReactorNetty2WebSocketClient(HttpClient httpClient) { + this(httpClient, WebsocketClientSpec.builder()); + } + + /** + * Constructor that accepts an existing {@link HttpClient} builder + * and a pre-configured {@link WebsocketClientSpec.Builder}. + * @since 5.3 + */ + public ReactorNetty2WebSocketClient( + HttpClient httpClient, Supplier builderSupplier) { + + Assert.notNull(httpClient, "HttpClient is required"); + Assert.notNull(builderSupplier, "WebsocketClientSpec.Builder is required"); + this.httpClient = httpClient; + this.specBuilderSupplier = builderSupplier; + } + + + /** + * Return the configured {@link HttpClient}. + */ + public HttpClient getHttpClient() { + return this.httpClient; + } + + /** + * Build an instance of {@code WebsocketClientSpec} that reflects the current + * configuration. This can be used to check the configured parameters except + * for sub-protocols which depend on the {@link WebSocketHandler} that is used + * for a given upgrade. + * @since 5.3 + */ + public WebsocketClientSpec getWebsocketClientSpec() { + return buildSpec(null); + } + + private WebsocketClientSpec buildSpec(@Nullable String protocols) { + WebsocketClientSpec.Builder builder = this.specBuilderSupplier.get(); + if (StringUtils.hasText(protocols)) { + builder.protocols(protocols); + } + if (this.maxFramePayloadLength != null) { + builder.maxFramePayloadLength(this.maxFramePayloadLength); + } + if (this.handlePing != null) { + builder.handlePing(this.handlePing); + } + return builder.build(); + } + + /** + * Configure the maximum allowable frame payload length. Setting this value + * to your application's requirement may reduce denial of service attacks + * using long data frames. + *

Corresponds to the argument with the same name in the constructor of + * {@link io.netty5.handler.codec.http.websocketx.WebSocketServerHandshakerFactory + * WebSocketServerHandshakerFactory} in Netty. + *

By default set to 65536 (64K). + * @param maxFramePayloadLength the max length for frames. + * @since 5.2 + * @deprecated as of 5.3 in favor of providing a supplier of + * {@link WebsocketClientSpec.Builder} with a + * constructor argument + */ + @Deprecated + public void setMaxFramePayloadLength(int maxFramePayloadLength) { + this.maxFramePayloadLength = maxFramePayloadLength; + } + + /** + * Return the configured {@link #setMaxFramePayloadLength(int) maxFramePayloadLength}. + * @since 5.2 + * @deprecated as of 5.3 in favor of {@link #getWebsocketClientSpec()} + */ + @Deprecated + public int getMaxFramePayloadLength() { + return getWebsocketClientSpec().maxFramePayloadLength(); + } + + /** + * Configure whether to let ping frames through to be handled by the + * {@link WebSocketHandler} given to the execute method. By default, Reactor + * Netty automatically replies with pong frames in response to pings. This is + * useful in a proxy for allowing ping and pong frames through. + *

By default this is set to {@code false} in which case ping frames are + * handled automatically by Reactor Netty. If set to {@code true}, ping + * frames will be passed through to the {@link WebSocketHandler}. + * @param handlePing whether to let Ping frames through for handling + * @since 5.2.4 + * @deprecated as of 5.3 in favor of providing a supplier of + * {@link WebsocketClientSpec.Builder} with a + * constructor argument + */ + @Deprecated + public void setHandlePing(boolean handlePing) { + this.handlePing = handlePing; + } + + /** + * Return the configured {@link #setHandlePing(boolean)}. + * @since 5.2.4 + * @deprecated as of 5.3 in favor of {@link #getWebsocketClientSpec()} + */ + @Deprecated + public boolean getHandlePing() { + return getWebsocketClientSpec().handlePing(); + } + + @Override + public Mono execute(URI url, WebSocketHandler handler) { + return execute(url, new HttpHeaders(), handler); + } + + @Override + public Mono execute(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) { + String protocols = StringUtils.collectionToCommaDelimitedString(handler.getSubProtocols()); + return getHttpClient() + .headers(nettyHeaders -> setNettyHeaders(requestHeaders, nettyHeaders)) + .websocket(buildSpec(protocols)) + .uri(url.toString()) + .handle((inbound, outbound) -> { + HttpHeaders responseHeaders = toHttpHeaders(inbound); + String protocol = responseHeaders.getFirst("Sec-WebSocket-Protocol"); + HandshakeInfo info = new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol); + Netty5DataBufferFactory factory = new Netty5DataBufferFactory(outbound.alloc()); + WebSocketSession session = new ReactorNetty2WebSocketSession( + inbound, outbound, info, factory, getMaxFramePayloadLength()); + if (logger.isDebugEnabled()) { + logger.debug("Started session '" + session.getId() + "' for " + url); + } + return handler.handle(session).checkpoint(url + " [ReactorNetty2WebSocketClient]"); + }) + .doOnRequest(n -> { + if (logger.isDebugEnabled()) { + logger.debug("Connecting to " + url); + } + }) + .next(); + } + + private void setNettyHeaders(HttpHeaders httpHeaders, io.netty5.handler.codec.http.HttpHeaders nettyHeaders) { + httpHeaders.forEach(nettyHeaders::set); + } + + private HttpHeaders toHttpHeaders(WebsocketInbound inbound) { + HttpHeaders headers = new HttpHeaders(); + io.netty5.handler.codec.http.HttpHeaders nettyHeaders = inbound.headers(); + nettyHeaders.forEach(entry -> { + String name = entry.getKey(); + headers.put(name, nettyHeaders.getAll(name)); + }); + return headers; + } + +} diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNetty2RequestUpgradeStrategy.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNetty2RequestUpgradeStrategy.java new file mode 100644 index 00000000000..9670d093dc1 --- /dev/null +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNetty2RequestUpgradeStrategy.java @@ -0,0 +1,183 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.reactive.socket.server.upgrade; + +import java.net.URI; +import java.util.function.Supplier; + +import reactor.core.publisher.Mono; +import reactor.netty5.http.server.HttpServerResponse; +import reactor.netty5.http.server.WebsocketServerSpec; + +import org.springframework.core.io.buffer.Netty5DataBufferFactory; +import org.springframework.http.server.reactive.ServerHttpResponse; +import org.springframework.http.server.reactive.ServerHttpResponseDecorator; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; +import org.springframework.web.reactive.socket.HandshakeInfo; +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.adapter.ReactorNetty2WebSocketSession; +import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy; +import org.springframework.web.server.ServerWebExchange; + +/** + * A {@link RequestUpgradeStrategy} for use with Reactor Netty for Netty 5. + * + *

This class is based on {@link ReactorNettyRequestUpgradeStrategy}. + *\ + * @author Violeta Georgieva + * @since 6.0 + */ +public class ReactorNetty2RequestUpgradeStrategy implements RequestUpgradeStrategy { + + private final Supplier specBuilderSupplier; + + @Nullable + private Integer maxFramePayloadLength; + + @Nullable + private Boolean handlePing; + + + /** + * Create an instances with a default {@link WebsocketServerSpec.Builder}. + * @since 5.2.6 + */ + public ReactorNetty2RequestUpgradeStrategy() { + this(WebsocketServerSpec::builder); + } + + + /** + * Create an instance with a pre-configured {@link WebsocketServerSpec.Builder} + * to use for WebSocket upgrades. + * @since 5.2.6 + */ + public ReactorNetty2RequestUpgradeStrategy(Supplier builderSupplier) { + Assert.notNull(builderSupplier, "WebsocketServerSpec.Builder is required"); + this.specBuilderSupplier = builderSupplier; + } + + + /** + * Build an instance of {@code WebsocketServerSpec} that reflects the current + * configuration. This can be used to check the configured parameters except + * for sub-protocols which depend on the {@link WebSocketHandler} that is used + * for a given upgrade. + * @since 5.2.6 + */ + public WebsocketServerSpec getWebsocketServerSpec() { + return buildSpec(null); + } + + WebsocketServerSpec buildSpec(@Nullable String subProtocol) { + WebsocketServerSpec.Builder builder = this.specBuilderSupplier.get(); + if (subProtocol != null) { + builder.protocols(subProtocol); + } + if (this.maxFramePayloadLength != null) { + builder.maxFramePayloadLength(this.maxFramePayloadLength); + } + if (this.handlePing != null) { + builder.handlePing(this.handlePing); + } + return builder.build(); + } + + /** + * Configure the maximum allowable frame payload length. Setting this value + * to your application's requirement may reduce denial of service attacks + * using long data frames. + *

Corresponds to the argument with the same name in the constructor of + * {@link io.netty5.handler.codec.http.websocketx.WebSocketServerHandshakerFactory + * WebSocketServerHandshakerFactory} in Netty. + *

By default set to 65536 (64K). + * @param maxFramePayloadLength the max length for frames. + * @since 5.1 + * @deprecated as of 5.2.6 in favor of providing a supplier of + * {@link WebsocketServerSpec.Builder} with a + * constructor argument + */ + @Deprecated + public void setMaxFramePayloadLength(Integer maxFramePayloadLength) { + this.maxFramePayloadLength = maxFramePayloadLength; + } + + /** + * Return the configured max length for frames. + * @since 5.1 + * @deprecated as of 5.2.6 in favor of {@link #getWebsocketServerSpec()} + */ + @Deprecated + public int getMaxFramePayloadLength() { + return getWebsocketServerSpec().maxFramePayloadLength(); + } + + /** + * Configure whether to let ping frames through to be handled by the + * {@link WebSocketHandler} given to the upgrade method. By default, Reactor + * Netty automatically replies with pong frames in response to pings. This is + * useful in a proxy for allowing ping and pong frames through. + *

By default this is set to {@code false} in which case ping frames are + * handled automatically by Reactor Netty. If set to {@code true}, ping + * frames will be passed through to the {@link WebSocketHandler}. + * @param handlePing whether to let Ping frames through for handling + * @since 5.2.4 + * @deprecated as of 5.2.6 in favor of providing a supplier of + * {@link WebsocketServerSpec.Builder} with a + * constructor argument + */ + @Deprecated + public void setHandlePing(boolean handlePing) { + this.handlePing = handlePing; + } + + /** + * Return the configured {@link #setHandlePing(boolean)}. + * @since 5.2.4 + * @deprecated as of 5.2.6 in favor of {@link #getWebsocketServerSpec()} + */ + @Deprecated + public boolean getHandlePing() { + return getWebsocketServerSpec().handlePing(); + } + + + @Override + public Mono upgrade(ServerWebExchange exchange, WebSocketHandler handler, + @Nullable String subProtocol, Supplier handshakeInfoFactory) { + + ServerHttpResponse response = exchange.getResponse(); + HttpServerResponse reactorResponse = ServerHttpResponseDecorator.getNativeResponse(response); + HandshakeInfo handshakeInfo = handshakeInfoFactory.get(); + Netty5DataBufferFactory bufferFactory = (Netty5DataBufferFactory) response.bufferFactory(); + URI uri = exchange.getRequest().getURI(); + + // Trigger WebFlux preCommit actions and upgrade + return response.setComplete() + .then(Mono.defer(() -> { + WebsocketServerSpec spec = buildSpec(subProtocol); + return reactorResponse.sendWebsocket((in, out) -> { + ReactorNetty2WebSocketSession session = + new ReactorNetty2WebSocketSession( + in, out, handshakeInfo, bufferFactory, spec.maxFramePayloadLength()); + return handler.handle(session).checkpoint(uri + " [ReactorNetty2RequestUpgradeStrategy]"); + }, spec); + })); + } + +} diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java index 80e068a4742..8be90ef25b3 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java @@ -73,6 +73,7 @@ import org.springframework.http.client.reactive.HttpComponentsClientHttpConnecto import org.springframework.http.client.reactive.JdkClientHttpConnector; import org.springframework.http.client.reactive.JettyClientHttpConnector; import org.springframework.http.client.reactive.ReactorClientHttpConnector; +import org.springframework.http.client.reactive.ReactorNetty2ClientHttpConnector; import org.springframework.web.reactive.function.BodyExtractors; import org.springframework.web.reactive.function.client.WebClient.ResponseSpec; import org.springframework.web.testfixture.xml.Pojo; @@ -102,6 +103,7 @@ class WebClientIntegrationTests { static Stream> arguments() { return Stream.of( named("Reactor Netty", new ReactorClientHttpConnector()), + named("Reactor Netty 2", new ReactorNetty2ClientHttpConnector()), named("JDK", new JdkClientHttpConnector()), named("Jetty", new JettyClientHttpConnector()), named("HttpComponents", new HttpComponentsClientHttpConnector()) @@ -860,6 +862,12 @@ class WebClientIntegrationTests { @ParameterizedWebClientTest void statusHandlerSuppressedErrorSignalWithFlux(ClientHttpConnector connector) { + + // Temporarily disabled, leads to io.netty5.buffer.api.BufferClosedException + if (connector instanceof ReactorNetty2ClientHttpConnector) { + return; + } + startServer(connector); prepareResponse(response -> response.setResponseCode(500) diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/socket/AbstractWebSocketIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/socket/AbstractWebSocketIntegrationTests.java index 8886a920661..71a081c0aef 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/socket/AbstractWebSocketIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/socket/AbstractWebSocketIntegrationTests.java @@ -46,6 +46,7 @@ import org.springframework.http.server.reactive.HttpHandler; import org.springframework.web.filter.reactive.ServerWebExchangeContextFilter; import org.springframework.web.reactive.DispatcherHandler; import org.springframework.web.reactive.socket.client.JettyWebSocketClient; +import org.springframework.web.reactive.socket.client.ReactorNetty2WebSocketClient; import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient; import org.springframework.web.reactive.socket.client.TomcatWebSocketClient; import org.springframework.web.reactive.socket.client.UndertowWebSocketClient; @@ -55,6 +56,7 @@ import org.springframework.web.reactive.socket.server.WebSocketService; import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService; import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter; import org.springframework.web.reactive.socket.server.upgrade.JettyRequestUpgradeStrategy; +import org.springframework.web.reactive.socket.server.upgrade.ReactorNetty2RequestUpgradeStrategy; import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy; import org.springframework.web.reactive.socket.server.upgrade.TomcatRequestUpgradeStrategy; import org.springframework.web.reactive.socket.server.upgrade.UndertowRequestUpgradeStrategy; @@ -63,6 +65,7 @@ import org.springframework.web.server.adapter.WebHttpHandlerBuilder; import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.ReactorHttpServer; +import org.springframework.web.testfixture.http.server.reactive.bootstrap.ReactorNetty2HttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.TomcatHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.UndertowHttpServer; @@ -92,6 +95,7 @@ abstract class AbstractWebSocketIntegrationTests { new TomcatWebSocketClient(), new JettyWebSocketClient(), new ReactorNettyWebSocketClient(), + new ReactorNetty2WebSocketClient(), new UndertowWebSocketClient(Xnio.getInstance().createWorker(OptionMap.EMPTY)) }; @@ -99,6 +103,7 @@ abstract class AbstractWebSocketIntegrationTests { servers.put(new TomcatHttpServer(TMP_DIR.getAbsolutePath(), WsContextListener.class), TomcatConfig.class); servers.put(new JettyHttpServer(), JettyConfig.class); servers.put(new ReactorHttpServer(), ReactorNettyConfig.class); + servers.put(new ReactorNetty2HttpServer(), ReactorNetty2Config.class); servers.put(new UndertowHttpServer(), UndertowConfig.class); // Try each client once against each server.. @@ -204,6 +209,14 @@ abstract class AbstractWebSocketIntegrationTests { } } + @Configuration + static class ReactorNetty2Config extends AbstractHandlerAdapterConfig { + + @Override + protected RequestUpgradeStrategy getUpgradeStrategy() { + return new ReactorNetty2RequestUpgradeStrategy(); + } + } @Configuration static class TomcatConfig extends AbstractHandlerAdapterConfig {