diff --git a/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/AbstractClientHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/AbstractClientHttpRequest.java new file mode 100644 index 00000000000..850eea9e36e --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/AbstractClientHttpRequest.java @@ -0,0 +1,88 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.client.reactive; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +import reactor.core.publisher.Mono; + +import org.springframework.http.HttpHeaders; +import org.springframework.util.Assert; + +/** + * Base class for {@link ClientHttpRequest} implementations. + * + * @author Rossen Stoyanchev + * @author Brian Clozel + */ +public abstract class AbstractClientHttpRequest implements ClientHttpRequest { + + private final HttpHeaders headers; + + private AtomicReference state = new AtomicReference<>(State.NEW); + + private final List>> beforeCommitActions = new ArrayList<>(4); + + public AbstractClientHttpRequest(HttpHeaders httpHeaders) { + if (httpHeaders == null) { + this.headers = new HttpHeaders(); + } + else { + this.headers = httpHeaders; + } + } + + @Override + public HttpHeaders getHeaders() { + if (State.COMITTED.equals(this.state.get())) { + return HttpHeaders.readOnlyHttpHeaders(this.headers); + } + return this.headers; + } + + protected Mono applyBeforeCommit() { + Mono mono = Mono.empty(); + if (this.state.compareAndSet(State.NEW, State.COMMITTING)) { + for (Supplier> action : this.beforeCommitActions) { + mono = mono.after(() -> action.get()); + } + return mono + .otherwise(ex -> { + // Ignore errors from beforeCommit actions + return Mono.empty(); + }) + .after(() -> { + this.state.set(State.COMITTED); + //writeHeaders(); + //writeCookies(); + return Mono.empty(); + }); + } + return mono; + } + + @Override + public void beforeCommit(Supplier> action) { + Assert.notNull(action); + this.beforeCommitActions.add(action); + } + + private enum State {NEW, COMMITTING, COMITTED} +} diff --git a/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java new file mode 100644 index 00000000000..e80a56ae5c8 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java @@ -0,0 +1,119 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.client.reactive; + +import java.net.URI; + +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.io.buffer.Buffer; +import reactor.io.net.http.HttpClient; +import reactor.io.net.http.model.Method; + +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferAllocator; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; + +/** + * {@link ClientHttpRequest} implementation for the Reactor Net HTTP client + * + * @author Brian Clozel + * @see HttpClient + */ +public class ReactorClientHttpRequest extends AbstractClientHttpRequest { + + private final DataBufferAllocator allocator; + + private final HttpMethod httpMethod; + + private final URI uri; + + private final HttpClient httpClient; + + private Flux body; + + + public ReactorClientHttpRequest(HttpMethod httpMethod, URI uri, HttpClient httpClient, HttpHeaders headers, + DataBufferAllocator allocator) { + super(headers); + this.allocator = allocator; + this.httpMethod = httpMethod; + this.uri = uri; + this.httpClient = httpClient; + } + + @Override + public HttpMethod getMethod() { + return this.httpMethod; + } + + @Override + public URI getURI() { + return this.uri; + } + + /** + * Set the body of the message to the given {@link Publisher}. + * + *

Since the HTTP channel is not yet created when this method + * is called, the {@code Mono} return value completes immediately. + * For an event that signals that we're done writing the request, check the + * {@link #execute()} method. + * + * @return a publisher that completes immediately. + * @see #execute() + */ + @Override + public Mono setBody(Publisher body) { + + this.body = Flux.from(body).map(b -> new Buffer(b.asByteBuffer())); + return Mono.empty(); + } + + @Override + public Mono execute() { + + return this.httpClient.request(new Method(httpMethod.toString()), uri.toString(), + channel -> { + // see https://github.com/reactor/reactor-io/pull/8 + if (body == null) { + channel.headers().removeTransferEncodingChunked(); + } + return applyBeforeCommit() + .after(() -> + { + getHeaders().entrySet().stream() + .forEach(e -> channel.headers().set(e.getKey(), e.getValue())); + return Mono.empty(); + } + ) + .after(() -> { + if (body != null) { + return channel.writeBufferWith(body); + } + else { + return channel.writeHeaders(); + } + }); + }) + .map(httpChannel -> new ReactorClientHttpResponse(httpChannel, allocator)); + } + +} + diff --git a/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java new file mode 100644 index 00000000000..4eaac999186 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java @@ -0,0 +1,72 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.client.reactive; + +import java.nio.ByteBuffer; + +import reactor.core.publisher.Flux; +import reactor.io.buffer.Buffer; +import reactor.io.net.http.HttpChannel; + +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferAllocator; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; + +/** + * {@link ClientHttpResponse} implementation for the Reactor Net HTTP client + * + * @author Brian Clozel + * @see reactor.io.net.http.HttpClient + */ +public class ReactorClientHttpResponse implements ClientHttpResponse { + + private final DataBufferAllocator allocator; + + private final HttpChannel channel; + + + public ReactorClientHttpResponse(HttpChannel channel, DataBufferAllocator allocator) { + this.allocator = allocator; + this.channel = channel; + } + + @Override + public Flux getBody() { + return Flux.from(channel.input()).map(b -> allocator.wrap(b.byteBuffer())); + } + + @Override + public HttpHeaders getHeaders() { + HttpHeaders headers = new HttpHeaders(); + this.channel.responseHeaders().entries().stream().forEach(e -> headers.add(e.getKey(), e.getValue())); + return headers; + } + + @Override + public HttpStatus getStatusCode() { + return HttpStatus.valueOf(this.channel.responseStatus().getCode()); + } + + @Override + public String toString() { + return "ReactorClientHttpResponse{" + + "request=" + this.channel.method() + " " + this.channel.uri().toString() + "," + + "status=" + getStatusCode() + + '}'; + } +} diff --git a/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorHttpClientRequestFactory.java b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorHttpClientRequestFactory.java new file mode 100644 index 00000000000..f3f83b95ae1 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorHttpClientRequestFactory.java @@ -0,0 +1,63 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.client.reactive; + +import java.net.URI; + +import reactor.io.net.ReactiveNet; +import reactor.io.net.http.HttpClient; + +import org.springframework.core.io.buffer.DataBufferAllocator; +import org.springframework.core.io.buffer.DefaultDataBufferAllocator; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.util.Assert; + +/** + * Create a {@link ClientHttpRequest} for the Reactor Net HTTP client + * + * @author Brian Clozel + */ +public class ReactorHttpClientRequestFactory implements ClientHttpRequestFactory { + + private final DataBufferAllocator allocator; + + private final HttpClient httpClient; + + public ReactorHttpClientRequestFactory() { + this(new DefaultDataBufferAllocator()); + } + + public ReactorHttpClientRequestFactory(DataBufferAllocator allocator) { + this(allocator, ReactiveNet.httpClient()); + } + + protected ReactorHttpClientRequestFactory(DataBufferAllocator allocator, HttpClient httpClient) { + this.allocator = allocator; + this.httpClient = httpClient; + } + + @Override + public ClientHttpRequest createRequest(HttpMethod httpMethod, URI uri, HttpHeaders headers) { + Assert.notNull(httpMethod, "HTTP method is required"); + Assert.notNull(uri, "request URI is required"); + Assert.notNull(headers, "request headers are required"); + + return new ReactorClientHttpRequest(httpMethod, uri, this.httpClient, headers, this.allocator); + } + +}