51 changed files with 48 additions and 4052 deletions
@ -1,68 +0,0 @@
@@ -1,68 +0,0 @@
|
||||
/* |
||||
* Copyright 2002-2022 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.core.codec; |
||||
|
||||
import java.util.Map; |
||||
|
||||
import io.netty5.buffer.Buffer; |
||||
import io.netty5.buffer.DefaultBufferAllocators; |
||||
import org.jspecify.annotations.Nullable; |
||||
|
||||
import org.springframework.core.ResolvableType; |
||||
import org.springframework.core.io.buffer.DataBuffer; |
||||
import org.springframework.core.io.buffer.DataBufferUtils; |
||||
import org.springframework.core.io.buffer.Netty5DataBuffer; |
||||
import org.springframework.util.MimeType; |
||||
import org.springframework.util.MimeTypeUtils; |
||||
|
||||
/** |
||||
* Decoder for {@link Buffer Buffers}. |
||||
* |
||||
* @author Violeta Georgieva |
||||
* @since 6.0 |
||||
*/ |
||||
public class Netty5BufferDecoder extends AbstractDataBufferDecoder<Buffer> { |
||||
|
||||
public Netty5BufferDecoder() { |
||||
super(MimeTypeUtils.ALL); |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType) { |
||||
return (Buffer.class.isAssignableFrom(elementType.toClass()) && |
||||
super.canDecode(elementType, mimeType)); |
||||
} |
||||
|
||||
@Override |
||||
public Buffer decode(DataBuffer dataBuffer, ResolvableType elementType, |
||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) { |
||||
|
||||
if (logger.isDebugEnabled()) { |
||||
logger.debug(Hints.getLogPrefix(hints) + "Read " + dataBuffer.readableByteCount() + " bytes"); |
||||
} |
||||
if (dataBuffer instanceof Netty5DataBuffer netty5DataBuffer) { |
||||
return netty5DataBuffer.getNativeBuffer(); |
||||
} |
||||
byte[] bytes = new byte[dataBuffer.readableByteCount()]; |
||||
dataBuffer.read(bytes); |
||||
Buffer buffer = DefaultBufferAllocators.preferredAllocator().copyOf(bytes); |
||||
DataBufferUtils.release(dataBuffer); |
||||
return buffer; |
||||
} |
||||
|
||||
} |
||||
@ -1,77 +0,0 @@
@@ -1,77 +0,0 @@
|
||||
/* |
||||
* Copyright 2002-2022 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.core.codec; |
||||
|
||||
import java.util.Map; |
||||
|
||||
import io.netty5.buffer.Buffer; |
||||
import org.jspecify.annotations.Nullable; |
||||
import org.reactivestreams.Publisher; |
||||
import reactor.core.publisher.Flux; |
||||
|
||||
import org.springframework.core.ResolvableType; |
||||
import org.springframework.core.io.buffer.DataBuffer; |
||||
import org.springframework.core.io.buffer.DataBufferFactory; |
||||
import org.springframework.core.io.buffer.Netty5DataBufferFactory; |
||||
import org.springframework.util.MimeType; |
||||
import org.springframework.util.MimeTypeUtils; |
||||
|
||||
/** |
||||
* Encoder for {@link Buffer Buffers}. |
||||
* |
||||
* @author Violeta Georgieva |
||||
* @since 6.0 |
||||
*/ |
||||
public class Netty5BufferEncoder extends AbstractEncoder<Buffer> { |
||||
|
||||
public Netty5BufferEncoder() { |
||||
super(MimeTypeUtils.ALL); |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public boolean canEncode(ResolvableType type, @Nullable MimeType mimeType) { |
||||
Class<?> clazz = type.toClass(); |
||||
return super.canEncode(type, mimeType) && Buffer.class.isAssignableFrom(clazz); |
||||
} |
||||
|
||||
@Override |
||||
public Flux<DataBuffer> encode(Publisher<? extends Buffer> inputStream, |
||||
DataBufferFactory bufferFactory, ResolvableType elementType, @Nullable MimeType mimeType, |
||||
@Nullable Map<String, Object> hints) { |
||||
|
||||
return Flux.from(inputStream).map(byteBuffer -> |
||||
encodeValue(byteBuffer, bufferFactory, elementType, mimeType, hints)); |
||||
} |
||||
|
||||
@Override |
||||
public DataBuffer encodeValue(Buffer buffer, DataBufferFactory bufferFactory, ResolvableType valueType, |
||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) { |
||||
|
||||
if (logger.isDebugEnabled() && !Hints.isLoggingSuppressed(hints)) { |
||||
String logPrefix = Hints.getLogPrefix(hints); |
||||
logger.debug(logPrefix + "Writing " + buffer.readableBytes() + " bytes"); |
||||
} |
||||
if (bufferFactory instanceof Netty5DataBufferFactory netty5DataBufferFactory) { |
||||
return netty5DataBufferFactory.wrap(buffer); |
||||
} |
||||
byte[] bytes = new byte[buffer.readableBytes()]; |
||||
buffer.readBytes(bytes, 0, bytes.length); |
||||
buffer.close(); |
||||
return bufferFactory.wrap(bytes); |
||||
} |
||||
} |
||||
@ -1,401 +0,0 @@
@@ -1,401 +0,0 @@
|
||||
/* |
||||
* Copyright 2002-2023 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.core.io.buffer; |
||||
|
||||
import java.nio.ByteBuffer; |
||||
import java.nio.charset.Charset; |
||||
import java.util.NoSuchElementException; |
||||
import java.util.function.IntPredicate; |
||||
|
||||
import io.netty5.buffer.Buffer; |
||||
import io.netty5.buffer.BufferComponent; |
||||
import io.netty5.buffer.ComponentIterator; |
||||
import org.jspecify.annotations.Nullable; |
||||
|
||||
import org.springframework.util.Assert; |
||||
import org.springframework.util.ObjectUtils; |
||||
|
||||
/** |
||||
* Implementation of the {@code DataBuffer} interface that wraps a Netty 5 |
||||
* {@link Buffer}. Typically constructed with {@link Netty5DataBufferFactory}. |
||||
* |
||||
* @author Violeta Georgieva |
||||
* @author Arjen Poutsma |
||||
* @since 6.0 |
||||
*/ |
||||
public final class Netty5DataBuffer implements CloseableDataBuffer, TouchableDataBuffer { |
||||
|
||||
private final Buffer buffer; |
||||
|
||||
private final Netty5DataBufferFactory dataBufferFactory; |
||||
|
||||
|
||||
/** |
||||
* Create a new {@code Netty5DataBuffer} based on the given {@code Buffer}. |
||||
* @param buffer the buffer to base this buffer on |
||||
*/ |
||||
Netty5DataBuffer(Buffer buffer, Netty5DataBufferFactory dataBufferFactory) { |
||||
Assert.notNull(buffer, "Buffer must not be null"); |
||||
Assert.notNull(dataBufferFactory, "Netty5DataBufferFactory must not be null"); |
||||
this.buffer = buffer; |
||||
this.dataBufferFactory = dataBufferFactory; |
||||
} |
||||
|
||||
/** |
||||
* Directly exposes the native {@code Buffer} that this buffer is based on. |
||||
* @return the wrapped buffer |
||||
*/ |
||||
public Buffer getNativeBuffer() { |
||||
return this.buffer; |
||||
} |
||||
|
||||
@Override |
||||
public DataBufferFactory factory() { |
||||
return this.dataBufferFactory; |
||||
} |
||||
|
||||
@Override |
||||
public int indexOf(IntPredicate predicate, int fromIndex) { |
||||
Assert.notNull(predicate, "IntPredicate must not be null"); |
||||
if (fromIndex < 0) { |
||||
fromIndex = 0; |
||||
} |
||||
else if (fromIndex >= this.buffer.writerOffset()) { |
||||
return -1; |
||||
} |
||||
int length = this.buffer.writerOffset() - fromIndex; |
||||
int bytes = this.buffer.openCursor(fromIndex, length).process(predicate.negate()::test); |
||||
return bytes == -1 ? -1 : fromIndex + bytes; |
||||
} |
||||
|
||||
@Override |
||||
public int lastIndexOf(IntPredicate predicate, int fromIndex) { |
||||
Assert.notNull(predicate, "IntPredicate must not be null"); |
||||
if (fromIndex < 0) { |
||||
return -1; |
||||
} |
||||
fromIndex = Math.min(fromIndex, this.buffer.writerOffset() - 1); |
||||
return this.buffer.openCursor(0, fromIndex + 1).process(predicate.negate()::test); |
||||
} |
||||
|
||||
@Override |
||||
public int readableByteCount() { |
||||
return this.buffer.readableBytes(); |
||||
} |
||||
|
||||
@Override |
||||
public int writableByteCount() { |
||||
return this.buffer.writableBytes(); |
||||
} |
||||
|
||||
@Override |
||||
public int readPosition() { |
||||
return this.buffer.readerOffset(); |
||||
} |
||||
|
||||
@Override |
||||
public Netty5DataBuffer readPosition(int readPosition) { |
||||
this.buffer.readerOffset(readPosition); |
||||
return this; |
||||
} |
||||
|
||||
@Override |
||||
public int writePosition() { |
||||
return this.buffer.writerOffset(); |
||||
} |
||||
|
||||
@Override |
||||
public Netty5DataBuffer writePosition(int writePosition) { |
||||
this.buffer.writerOffset(writePosition); |
||||
return this; |
||||
} |
||||
|
||||
@Override |
||||
public byte getByte(int index) { |
||||
return this.buffer.getByte(index); |
||||
} |
||||
|
||||
@Override |
||||
public int capacity() { |
||||
return this.buffer.capacity(); |
||||
} |
||||
|
||||
@Override |
||||
@Deprecated |
||||
public Netty5DataBuffer capacity(int capacity) { |
||||
if (capacity <= 0) { |
||||
throw new IllegalArgumentException(String.format("'newCapacity' %d must be higher than 0", capacity)); |
||||
} |
||||
int diff = capacity - capacity(); |
||||
if (diff > 0) { |
||||
this.buffer.ensureWritable(this.buffer.writableBytes() + diff); |
||||
} |
||||
return this; |
||||
} |
||||
|
||||
@Override |
||||
public DataBuffer ensureWritable(int capacity) { |
||||
Assert.isTrue(capacity >= 0, "Capacity must be >= 0"); |
||||
this.buffer.ensureWritable(capacity); |
||||
return this; |
||||
} |
||||
|
||||
@Override |
||||
public byte read() { |
||||
return this.buffer.readByte(); |
||||
} |
||||
|
||||
@Override |
||||
public Netty5DataBuffer read(byte[] destination) { |
||||
return read(destination, 0, destination.length); |
||||
} |
||||
|
||||
@Override |
||||
public Netty5DataBuffer read(byte[] destination, int offset, int length) { |
||||
this.buffer.readBytes(destination, offset, length); |
||||
return this; |
||||
} |
||||
|
||||
@Override |
||||
public Netty5DataBuffer write(byte b) { |
||||
this.buffer.writeByte(b); |
||||
return this; |
||||
} |
||||
|
||||
@Override |
||||
public Netty5DataBuffer write(byte[] source) { |
||||
this.buffer.writeBytes(source); |
||||
return this; |
||||
} |
||||
|
||||
@Override |
||||
public Netty5DataBuffer write(byte[] source, int offset, int length) { |
||||
this.buffer.writeBytes(source, offset, length); |
||||
return this; |
||||
} |
||||
|
||||
@Override |
||||
public Netty5DataBuffer write(DataBuffer... dataBuffers) { |
||||
if (!ObjectUtils.isEmpty(dataBuffers)) { |
||||
if (hasNetty5DataBuffers(dataBuffers)) { |
||||
Buffer[] nativeBuffers = new Buffer[dataBuffers.length]; |
||||
for (int i = 0; i < dataBuffers.length; i++) { |
||||
nativeBuffers[i] = ((Netty5DataBuffer) dataBuffers[i]).getNativeBuffer(); |
||||
} |
||||
return write(nativeBuffers); |
||||
} |
||||
else { |
||||
ByteBuffer[] byteBuffers = new ByteBuffer[dataBuffers.length]; |
||||
for (int i = 0; i < dataBuffers.length; i++) { |
||||
byteBuffers[i] = ByteBuffer.allocate(dataBuffers[i].readableByteCount()); |
||||
dataBuffers[i].toByteBuffer(byteBuffers[i]); |
||||
} |
||||
return write(byteBuffers); |
||||
} |
||||
} |
||||
return this; |
||||
} |
||||
|
||||
private static boolean hasNetty5DataBuffers(DataBuffer[] buffers) { |
||||
for (DataBuffer buffer : buffers) { |
||||
if (!(buffer instanceof Netty5DataBuffer)) { |
||||
return false; |
||||
} |
||||
} |
||||
return true; |
||||
} |
||||
|
||||
@Override |
||||
public Netty5DataBuffer write(ByteBuffer... buffers) { |
||||
if (!ObjectUtils.isEmpty(buffers)) { |
||||
for (ByteBuffer buffer : buffers) { |
||||
this.buffer.writeBytes(buffer); |
||||
} |
||||
} |
||||
return this; |
||||
} |
||||
|
||||
/** |
||||
* Writes one or more Netty 5 {@link Buffer Buffers} to this buffer, |
||||
* starting at the current writing position. |
||||
* @param buffers the buffers to write into this buffer |
||||
* @return this buffer |
||||
*/ |
||||
public Netty5DataBuffer write(Buffer... buffers) { |
||||
if (!ObjectUtils.isEmpty(buffers)) { |
||||
for (Buffer buffer : buffers) { |
||||
this.buffer.writeBytes(buffer); |
||||
} |
||||
} |
||||
return this; |
||||
} |
||||
|
||||
@Override |
||||
public DataBuffer write(CharSequence charSequence, Charset charset) { |
||||
Assert.notNull(charSequence, "CharSequence must not be null"); |
||||
Assert.notNull(charset, "Charset must not be null"); |
||||
|
||||
this.buffer.writeCharSequence(charSequence, charset); |
||||
return this; |
||||
} |
||||
|
||||
/** |
||||
* {@inheritDoc} |
||||
* <p><strong>Note</strong> that due to the lack of a {@code slice} method |
||||
* in Netty 5's {@link Buffer}, this implementation returns a copy that |
||||
* does <strong>not</strong> share its contents with this buffer. |
||||
*/ |
||||
@Override |
||||
@Deprecated |
||||
public DataBuffer slice(int index, int length) { |
||||
Buffer copy = this.buffer.copy(index, length); |
||||
return new Netty5DataBuffer(copy, this.dataBufferFactory); |
||||
} |
||||
|
||||
@Override |
||||
public DataBuffer split(int index) { |
||||
Buffer split = this.buffer.split(index); |
||||
return new Netty5DataBuffer(split, this.dataBufferFactory); |
||||
} |
||||
|
||||
@Override |
||||
@Deprecated |
||||
public ByteBuffer asByteBuffer() { |
||||
return toByteBuffer(); |
||||
} |
||||
|
||||
@Override |
||||
@Deprecated |
||||
public ByteBuffer asByteBuffer(int index, int length) { |
||||
return toByteBuffer(index, length); |
||||
} |
||||
|
||||
@Override |
||||
@Deprecated |
||||
public ByteBuffer toByteBuffer(int index, int length) { |
||||
ByteBuffer copy = this.buffer.isDirect() ? |
||||
ByteBuffer.allocateDirect(length) : |
||||
ByteBuffer.allocate(length); |
||||
|
||||
this.buffer.copyInto(index, copy, 0, length); |
||||
return copy; |
||||
} |
||||
|
||||
@Override |
||||
public void toByteBuffer(int srcPos, ByteBuffer dest, int destPos, int length) { |
||||
this.buffer.copyInto(srcPos, dest, destPos, length); |
||||
} |
||||
|
||||
@Override |
||||
public ByteBufferIterator readableByteBuffers() { |
||||
return new BufferComponentIterator<>(this.buffer.forEachComponent(), true); |
||||
} |
||||
|
||||
@Override |
||||
public ByteBufferIterator writableByteBuffers() { |
||||
return new BufferComponentIterator<>(this.buffer.forEachComponent(), false); |
||||
} |
||||
|
||||
@Override |
||||
public String toString(Charset charset) { |
||||
Assert.notNull(charset, "Charset must not be null"); |
||||
return this.buffer.toString(charset); |
||||
} |
||||
|
||||
@Override |
||||
public String toString(int index, int length, Charset charset) { |
||||
Assert.notNull(charset, "Charset must not be null"); |
||||
byte[] data = new byte[length]; |
||||
this.buffer.copyInto(index, data, 0, length); |
||||
return new String(data, 0, length, charset); |
||||
} |
||||
|
||||
@Override |
||||
public Netty5DataBuffer touch(Object hint) { |
||||
this.buffer.touch(hint); |
||||
return this; |
||||
} |
||||
|
||||
@Override |
||||
public void close() { |
||||
this.buffer.close(); |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public boolean equals(@Nullable Object other) { |
||||
return (this == other || (other instanceof Netty5DataBuffer that && this.buffer.equals(that.buffer))); |
||||
} |
||||
|
||||
@Override |
||||
public int hashCode() { |
||||
return this.buffer.hashCode(); |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return this.buffer.toString(); |
||||
} |
||||
|
||||
|
||||
private static final class BufferComponentIterator<T extends BufferComponent & ComponentIterator.Next> |
||||
implements ByteBufferIterator { |
||||
|
||||
private final ComponentIterator<T> delegate; |
||||
|
||||
private final boolean readable; |
||||
|
||||
private @Nullable T next; |
||||
|
||||
public BufferComponentIterator(ComponentIterator<T> delegate, boolean readable) { |
||||
Assert.notNull(delegate, "Delegate must not be null"); |
||||
this.delegate = delegate; |
||||
this.readable = readable; |
||||
this.next = readable ? this.delegate.firstReadable() : this.delegate.firstWritable(); |
||||
} |
||||
|
||||
@Override |
||||
public boolean hasNext() { |
||||
return this.next != null; |
||||
} |
||||
|
||||
@Override |
||||
public ByteBuffer next() { |
||||
if (this.next != null) { |
||||
ByteBuffer result; |
||||
if (this.readable) { |
||||
result = this.next.readableBuffer(); |
||||
this.next = this.next.nextReadable(); |
||||
} |
||||
else { |
||||
result = this.next.writableBuffer(); |
||||
this.next = this.next.nextWritable(); |
||||
} |
||||
return result; |
||||
} |
||||
else { |
||||
throw new NoSuchElementException(); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void close() { |
||||
this.delegate.close(); |
||||
} |
||||
} |
||||
|
||||
} |
||||
@ -1,141 +0,0 @@
@@ -1,141 +0,0 @@
|
||||
/* |
||||
* Copyright 2002-2023 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.core.io.buffer; |
||||
|
||||
import java.nio.ByteBuffer; |
||||
import java.util.List; |
||||
|
||||
import io.netty5.buffer.Buffer; |
||||
import io.netty5.buffer.BufferAllocator; |
||||
import io.netty5.buffer.CompositeBuffer; |
||||
import io.netty5.buffer.DefaultBufferAllocators; |
||||
|
||||
import org.springframework.util.Assert; |
||||
|
||||
/** |
||||
* Implementation of the {@code DataBufferFactory} interface based on a |
||||
* Netty 5 {@link BufferAllocator}. |
||||
* |
||||
* @author Violeta Georgieva |
||||
* @author Arjen Poutsma |
||||
* @since 6.0 |
||||
*/ |
||||
public class Netty5DataBufferFactory implements DataBufferFactory { |
||||
|
||||
private final BufferAllocator bufferAllocator; |
||||
|
||||
|
||||
/** |
||||
* Create a new {@code Netty5DataBufferFactory} based on the given factory. |
||||
* @param bufferAllocator the factory to use |
||||
*/ |
||||
public Netty5DataBufferFactory(BufferAllocator bufferAllocator) { |
||||
Assert.notNull(bufferAllocator, "BufferAllocator must not be null"); |
||||
this.bufferAllocator = bufferAllocator; |
||||
} |
||||
|
||||
|
||||
/** |
||||
* Return the {@code BufferAllocator} used by this factory. |
||||
*/ |
||||
public BufferAllocator getBufferAllocator() { |
||||
return this.bufferAllocator; |
||||
} |
||||
|
||||
@Override |
||||
@Deprecated |
||||
public Netty5DataBuffer allocateBuffer() { |
||||
Buffer buffer = this.bufferAllocator.allocate(256); |
||||
return new Netty5DataBuffer(buffer, this); |
||||
} |
||||
|
||||
@Override |
||||
public Netty5DataBuffer allocateBuffer(int initialCapacity) { |
||||
Buffer buffer = this.bufferAllocator.allocate(initialCapacity); |
||||
return new Netty5DataBuffer(buffer, this); |
||||
} |
||||
|
||||
@Override |
||||
public Netty5DataBuffer wrap(ByteBuffer byteBuffer) { |
||||
Buffer buffer = this.bufferAllocator.copyOf(byteBuffer); |
||||
return new Netty5DataBuffer(buffer, this); |
||||
} |
||||
|
||||
@Override |
||||
public Netty5DataBuffer wrap(byte[] bytes) { |
||||
Buffer buffer = this.bufferAllocator.copyOf(bytes); |
||||
return new Netty5DataBuffer(buffer, this); |
||||
} |
||||
|
||||
/** |
||||
* Wrap the given Netty {@link Buffer} in a {@code Netty5DataBuffer}. |
||||
* @param buffer the Netty buffer to wrap |
||||
* @return the wrapped buffer |
||||
*/ |
||||
public Netty5DataBuffer wrap(Buffer buffer) { |
||||
buffer.touch("Wrap buffer"); |
||||
return new Netty5DataBuffer(buffer, this); |
||||
} |
||||
|
||||
/** |
||||
* {@inheritDoc} |
||||
* <p>This implementation uses Netty's {@link CompositeBuffer}. |
||||
*/ |
||||
@Override |
||||
public DataBuffer join(List<? extends DataBuffer> dataBuffers) { |
||||
Assert.notEmpty(dataBuffers, "DataBuffer List must not be empty"); |
||||
if (dataBuffers.size() == 1) { |
||||
return dataBuffers.get(0); |
||||
} |
||||
CompositeBuffer composite = this.bufferAllocator.compose(); |
||||
for (DataBuffer dataBuffer : dataBuffers) { |
||||
Assert.isInstanceOf(Netty5DataBuffer.class, dataBuffer); |
||||
composite.extendWith(((Netty5DataBuffer) dataBuffer).getNativeBuffer().send()); |
||||
} |
||||
return new Netty5DataBuffer(composite, this); |
||||
} |
||||
|
||||
@Override |
||||
public boolean isDirect() { |
||||
return this.bufferAllocator.getAllocationType().isDirect(); |
||||
} |
||||
|
||||
/** |
||||
* Return the given Netty {@link DataBuffer} as a {@link Buffer}. |
||||
* <p>Returns the {@linkplain Netty5DataBuffer#getNativeBuffer() native buffer} |
||||
* if {@code buffer} is a {@link Netty5DataBuffer}; returns |
||||
* {@link BufferAllocator#copyOf(ByteBuffer)} otherwise. |
||||
* @param buffer the {@code DataBuffer} to return a {@code Buffer} for |
||||
* @return the netty {@code Buffer} |
||||
*/ |
||||
public static Buffer toBuffer(DataBuffer buffer) { |
||||
if (buffer instanceof Netty5DataBuffer netty5DataBuffer) { |
||||
return netty5DataBuffer.getNativeBuffer(); |
||||
} |
||||
else { |
||||
ByteBuffer byteBuffer = ByteBuffer.allocate(buffer.readableByteCount()); |
||||
buffer.toByteBuffer(byteBuffer); |
||||
return DefaultBufferAllocators.preferredAllocator().copyOf(byteBuffer); |
||||
} |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public String toString() { |
||||
return "Netty5DataBufferFactory (" + this.bufferAllocator + ")"; |
||||
} |
||||
} |
||||
@ -1,97 +0,0 @@
@@ -1,97 +0,0 @@
|
||||
/* |
||||
* Copyright 2002-2024 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.core.codec; |
||||
|
||||
import java.nio.charset.StandardCharsets; |
||||
import java.util.function.Consumer; |
||||
|
||||
import io.netty5.buffer.Buffer; |
||||
import io.netty5.buffer.DefaultBufferAllocators; |
||||
import org.junit.jupiter.api.Test; |
||||
import reactor.core.publisher.Flux; |
||||
|
||||
import org.springframework.core.ResolvableType; |
||||
import org.springframework.core.io.buffer.DataBuffer; |
||||
import org.springframework.core.testfixture.codec.AbstractDecoderTests; |
||||
import org.springframework.util.MimeTypeUtils; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
|
||||
/** |
||||
* @author Arjen Poutsma |
||||
*/ |
||||
class Netty5BufferDecoderTests extends AbstractDecoderTests<Netty5BufferDecoder> { |
||||
|
||||
private final byte[] fooBytes = "foo".getBytes(StandardCharsets.UTF_8); |
||||
|
||||
private final byte[] barBytes = "bar".getBytes(StandardCharsets.UTF_8); |
||||
|
||||
|
||||
Netty5BufferDecoderTests() { |
||||
super(new Netty5BufferDecoder()); |
||||
} |
||||
|
||||
@Override |
||||
@Test |
||||
protected void canDecode() { |
||||
assertThat(this.decoder.canDecode(ResolvableType.forClass(Buffer.class), |
||||
MimeTypeUtils.TEXT_PLAIN)).isTrue(); |
||||
assertThat(this.decoder.canDecode(ResolvableType.forClass(Integer.class), |
||||
MimeTypeUtils.TEXT_PLAIN)).isFalse(); |
||||
assertThat(this.decoder.canDecode(ResolvableType.forClass(Buffer.class), |
||||
MimeTypeUtils.APPLICATION_JSON)).isTrue(); |
||||
} |
||||
|
||||
@Override |
||||
@Test |
||||
protected void decode() { |
||||
Flux<DataBuffer> input = Flux.concat( |
||||
dataBuffer(this.fooBytes), |
||||
dataBuffer(this.barBytes)); |
||||
|
||||
testDecodeAll(input, Buffer.class, step -> step |
||||
.consumeNextWith(expectByteBuffer(DefaultBufferAllocators.preferredAllocator().copyOf(this.fooBytes))) |
||||
.consumeNextWith(expectByteBuffer(DefaultBufferAllocators.preferredAllocator().copyOf(this.barBytes))) |
||||
.verifyComplete()); |
||||
} |
||||
|
||||
@Override |
||||
@Test |
||||
protected void decodeToMono() { |
||||
Flux<DataBuffer> input = Flux.concat( |
||||
dataBuffer(this.fooBytes), |
||||
dataBuffer(this.barBytes)); |
||||
|
||||
Buffer expected = DefaultBufferAllocators.preferredAllocator().allocate(this.fooBytes.length + this.barBytes.length) |
||||
.writeBytes(this.fooBytes) |
||||
.writeBytes(this.barBytes) |
||||
.readerOffset(0); |
||||
|
||||
testDecodeToMonoAll(input, Buffer.class, step -> step |
||||
.consumeNextWith(expectByteBuffer(expected)) |
||||
.verifyComplete()); |
||||
} |
||||
|
||||
private Consumer<Buffer> expectByteBuffer(Buffer expected) { |
||||
return actual -> { |
||||
try (actual; expected) { |
||||
assertThat(actual).isEqualTo(expected); |
||||
} |
||||
}; |
||||
} |
||||
|
||||
} |
||||
@ -1,72 +0,0 @@
@@ -1,72 +0,0 @@
|
||||
/* |
||||
* Copyright 2002-2022 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.core.codec; |
||||
|
||||
import java.nio.charset.StandardCharsets; |
||||
|
||||
import io.netty5.buffer.Buffer; |
||||
import io.netty5.buffer.DefaultBufferAllocators; |
||||
import org.junit.jupiter.api.Test; |
||||
import reactor.core.publisher.Flux; |
||||
|
||||
import org.springframework.core.ResolvableType; |
||||
import org.springframework.core.testfixture.codec.AbstractEncoderTests; |
||||
import org.springframework.util.MimeTypeUtils; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
|
||||
/** |
||||
* @author Arjen Poutsma |
||||
*/ |
||||
class Netty5BufferEncoderTests extends AbstractEncoderTests<Netty5BufferEncoder> { |
||||
|
||||
private final byte[] fooBytes = "foo".getBytes(StandardCharsets.UTF_8); |
||||
|
||||
private final byte[] barBytes = "bar".getBytes(StandardCharsets.UTF_8); |
||||
|
||||
Netty5BufferEncoderTests() { |
||||
super(new Netty5BufferEncoder()); |
||||
} |
||||
|
||||
@Test |
||||
@Override |
||||
public void canEncode() { |
||||
assertThat(this.encoder.canEncode(ResolvableType.forClass(Buffer.class), |
||||
MimeTypeUtils.TEXT_PLAIN)).isTrue(); |
||||
assertThat(this.encoder.canEncode(ResolvableType.forClass(Integer.class), |
||||
MimeTypeUtils.TEXT_PLAIN)).isFalse(); |
||||
assertThat(this.encoder.canEncode(ResolvableType.forClass(Buffer.class), |
||||
MimeTypeUtils.APPLICATION_JSON)).isTrue(); |
||||
|
||||
// gh-20024
|
||||
assertThat(this.encoder.canEncode(ResolvableType.NONE, null)).isFalse(); |
||||
} |
||||
|
||||
@Test |
||||
@Override |
||||
@SuppressWarnings("resource") |
||||
public void encode() { |
||||
Flux<Buffer> input = Flux.just(this.fooBytes, this.barBytes) |
||||
.map(DefaultBufferAllocators.preferredAllocator()::copyOf); |
||||
|
||||
testEncodeAll(input, Buffer.class, step -> step |
||||
.consumeNextWith(expectBytes(this.fooBytes)) |
||||
.consumeNextWith(expectBytes(this.barBytes)) |
||||
.verifyComplete()); |
||||
} |
||||
|
||||
} |
||||
@ -1,349 +0,0 @@
@@ -1,349 +0,0 @@
|
||||
/* |
||||
* Copyright 2002-2023 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.messaging.tcp.reactor; |
||||
|
||||
import java.nio.ByteBuffer; |
||||
import java.time.Duration; |
||||
import java.util.List; |
||||
import java.util.concurrent.CompletableFuture; |
||||
import java.util.function.BiFunction; |
||||
import java.util.function.Function; |
||||
|
||||
import io.netty5.buffer.Buffer; |
||||
import io.netty5.channel.ChannelHandlerContext; |
||||
import io.netty5.channel.group.ChannelGroup; |
||||
import io.netty5.channel.group.DefaultChannelGroup; |
||||
import io.netty5.handler.codec.ByteToMessageDecoder; |
||||
import io.netty5.util.concurrent.ImmediateEventExecutor; |
||||
import org.apache.commons.logging.Log; |
||||
import org.apache.commons.logging.LogFactory; |
||||
import org.jspecify.annotations.Nullable; |
||||
import org.reactivestreams.Publisher; |
||||
import reactor.core.publisher.Mono; |
||||
import reactor.core.publisher.Sinks; |
||||
import reactor.core.scheduler.Scheduler; |
||||
import reactor.core.scheduler.Schedulers; |
||||
import reactor.netty5.Connection; |
||||
import reactor.netty5.NettyInbound; |
||||
import reactor.netty5.NettyOutbound; |
||||
import reactor.netty5.resources.ConnectionProvider; |
||||
import reactor.netty5.resources.LoopResources; |
||||
import reactor.netty5.tcp.TcpClient; |
||||
import reactor.util.retry.Retry; |
||||
|
||||
import org.springframework.messaging.Message; |
||||
import org.springframework.messaging.tcp.ReconnectStrategy; |
||||
import org.springframework.messaging.tcp.TcpConnection; |
||||
import org.springframework.messaging.tcp.TcpConnectionHandler; |
||||
import org.springframework.messaging.tcp.TcpOperations; |
||||
import org.springframework.util.Assert; |
||||
|
||||
/** |
||||
* Reactor Netty based implementation of {@link TcpOperations}. |
||||
* |
||||
* <p>This class is based on {@link ReactorNettyTcpClient}. |
||||
* |
||||
* @author Rossen Stoyanchev |
||||
* @since 6.0 |
||||
* @param <P> the type of payload for in and outbound messages |
||||
*/ |
||||
public class ReactorNetty2TcpClient<P> implements TcpOperations<P> { |
||||
|
||||
private static final int PUBLISH_ON_BUFFER_SIZE = 16; |
||||
|
||||
|
||||
private final TcpClient tcpClient; |
||||
|
||||
private final TcpMessageCodec<P> codec; |
||||
|
||||
private final @Nullable ChannelGroup channelGroup; |
||||
|
||||
private final @Nullable LoopResources loopResources; |
||||
|
||||
private final @Nullable ConnectionProvider poolResources; |
||||
|
||||
private final Scheduler scheduler = Schedulers.newParallel("tcp-client-scheduler"); |
||||
|
||||
private Log logger = LogFactory.getLog(ReactorNetty2TcpClient.class); |
||||
|
||||
private volatile boolean stopping; |
||||
|
||||
|
||||
/** |
||||
* Simple constructor with the host and port to use to connect to. |
||||
* <p>This constructor manages the lifecycle of the {@link TcpClient} and |
||||
* underlying resources such as {@link ConnectionProvider}, |
||||
* {@link LoopResources}, and {@link ChannelGroup}. |
||||
* <p>For full control over the initialization and lifecycle of the |
||||
* TcpClient, use {@link #ReactorNetty2TcpClient(TcpClient, TcpMessageCodec)}. |
||||
* @param host the host to connect to |
||||
* @param port the port to connect to |
||||
* @param codec for encoding and decoding the input/output byte streams |
||||
* @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec |
||||
*/ |
||||
public ReactorNetty2TcpClient(String host, int port, TcpMessageCodec<P> codec) { |
||||
Assert.notNull(host, "host is required"); |
||||
Assert.notNull(codec, "ReactorNettyCodec is required"); |
||||
|
||||
this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); |
||||
this.loopResources = LoopResources.create("tcp-client-loop"); |
||||
this.poolResources = ConnectionProvider.create("tcp-client-pool", 10000); |
||||
this.codec = codec; |
||||
|
||||
this.tcpClient = TcpClient.create(this.poolResources) |
||||
.host(host).port(port) |
||||
.runOn(this.loopResources, false) |
||||
.doOnConnected(conn -> this.channelGroup.add(conn.channel())); |
||||
} |
||||
|
||||
/** |
||||
* A variant of {@link #ReactorNetty2TcpClient(String, int, TcpMessageCodec)} |
||||
* that still manages the lifecycle of the {@link TcpClient} and underlying |
||||
* resources, but allows for direct configuration of other properties of the |
||||
* client through a {@code Function<TcpClient, TcpClient>}. |
||||
* @param clientConfigurer the configurer function |
||||
* @param codec for encoding and decoding the input/output byte streams |
||||
* @since 5.1.3 |
||||
* @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec |
||||
*/ |
||||
public ReactorNetty2TcpClient(Function<TcpClient, TcpClient> clientConfigurer, TcpMessageCodec<P> codec) { |
||||
Assert.notNull(codec, "ReactorNettyCodec is required"); |
||||
|
||||
this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); |
||||
this.loopResources = LoopResources.create("tcp-client-loop"); |
||||
this.poolResources = ConnectionProvider.create("tcp-client-pool", 10000); |
||||
this.codec = codec; |
||||
|
||||
this.tcpClient = clientConfigurer.apply(TcpClient |
||||
.create(this.poolResources) |
||||
.runOn(this.loopResources, false) |
||||
.doOnConnected(conn -> this.channelGroup.add(conn.channel()))); |
||||
} |
||||
|
||||
/** |
||||
* Constructor with an externally created {@link TcpClient} instance whose |
||||
* lifecycle is expected to be managed externally. |
||||
* @param tcpClient the TcpClient instance to use |
||||
* @param codec for encoding and decoding the input/output byte streams |
||||
* @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec |
||||
*/ |
||||
public ReactorNetty2TcpClient(TcpClient tcpClient, TcpMessageCodec<P> codec) { |
||||
Assert.notNull(tcpClient, "TcpClient is required"); |
||||
Assert.notNull(codec, "ReactorNettyCodec is required"); |
||||
this.tcpClient = tcpClient; |
||||
this.codec = codec; |
||||
|
||||
this.channelGroup = null; |
||||
this.loopResources = null; |
||||
this.poolResources = null; |
||||
} |
||||
|
||||
|
||||
/** |
||||
* Set an alternative logger to use than the one based on the class name. |
||||
* @param logger the logger to use |
||||
* @since 5.1 |
||||
*/ |
||||
public void setLogger(Log logger) { |
||||
this.logger = logger; |
||||
} |
||||
|
||||
/** |
||||
* Return the currently configured Logger. |
||||
* @since 5.1 |
||||
*/ |
||||
public Log getLogger() { |
||||
return logger; |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public CompletableFuture<Void> connectAsync(TcpConnectionHandler<P> handler) { |
||||
Assert.notNull(handler, "TcpConnectionHandler is required"); |
||||
|
||||
if (this.stopping) { |
||||
return handleShuttingDownConnectFailure(handler); |
||||
} |
||||
|
||||
return extendTcpClient(this.tcpClient, handler) |
||||
.handle(new ReactorNettyHandler(handler)) |
||||
.connect() |
||||
.doOnError(handler::afterConnectFailure) |
||||
.then().toFuture(); |
||||
} |
||||
|
||||
/** |
||||
* Provides an opportunity to initialize the {@link TcpClient} for the given |
||||
* {@link TcpConnectionHandler} which may implement sub-interfaces such as |
||||
* {@link org.springframework.messaging.simp.stomp.StompTcpConnectionHandler} |
||||
* that expose further information. |
||||
* @param tcpClient the candidate TcpClient |
||||
* @param handler the handler for the TCP connection |
||||
* @return the same handler or an updated instance |
||||
*/ |
||||
protected TcpClient extendTcpClient(TcpClient tcpClient, TcpConnectionHandler<P> handler) { |
||||
return tcpClient; |
||||
} |
||||
|
||||
@Override |
||||
public CompletableFuture<Void> connectAsync(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) { |
||||
Assert.notNull(handler, "TcpConnectionHandler is required"); |
||||
Assert.notNull(strategy, "ReconnectStrategy is required"); |
||||
|
||||
if (this.stopping) { |
||||
return handleShuttingDownConnectFailure(handler); |
||||
} |
||||
|
||||
// Report first connect to the ListenableFuture
|
||||
CompletableFuture<Void> connectFuture = new CompletableFuture<>(); |
||||
|
||||
extendTcpClient(this.tcpClient, handler) |
||||
.handle(new ReactorNettyHandler(handler)) |
||||
.connect() |
||||
.doOnNext(conn -> connectFuture.complete(null)) |
||||
.doOnError(connectFuture::completeExceptionally) |
||||
.doOnError(handler::afterConnectFailure) // report all connect failures to the handler
|
||||
.flatMap(Connection::onDispose) // post-connect issues
|
||||
.retryWhen(Retry.from(signals -> signals |
||||
.map(retrySignal -> (int) retrySignal.totalRetriesInARow()) |
||||
.flatMap(attempt -> reconnect(attempt, strategy)))) |
||||
.repeatWhen(flux -> flux |
||||
.scan(1, (count, element) -> count++) |
||||
.flatMap(attempt -> reconnect(attempt, strategy))) |
||||
.subscribe(); |
||||
return connectFuture; |
||||
} |
||||
|
||||
private CompletableFuture<Void> handleShuttingDownConnectFailure(TcpConnectionHandler<P> handler) { |
||||
IllegalStateException ex = new IllegalStateException("Shutting down."); |
||||
handler.afterConnectFailure(ex); |
||||
return Mono.<Void>error(ex).toFuture(); |
||||
} |
||||
|
||||
private Publisher<? extends Long> reconnect(Integer attempt, ReconnectStrategy reconnectStrategy) { |
||||
Long time = reconnectStrategy.getTimeToNextAttempt(attempt); |
||||
return (time != null ? Mono.delay(Duration.ofMillis(time), this.scheduler) : Mono.empty()); |
||||
} |
||||
|
||||
@Override |
||||
public CompletableFuture<Void> shutdownAsync() { |
||||
if (this.stopping) { |
||||
return CompletableFuture.completedFuture(null); |
||||
} |
||||
|
||||
this.stopping = true; |
||||
|
||||
Mono<Void> result; |
||||
if (this.channelGroup != null) { |
||||
Sinks.Empty<Void> channnelGroupCloseSink = Sinks.empty(); |
||||
this.channelGroup.close().addListener(future -> channnelGroupCloseSink.tryEmitEmpty()); |
||||
result = channnelGroupCloseSink.asMono(); |
||||
if (this.loopResources != null) { |
||||
result = result.onErrorComplete().then(this.loopResources.disposeLater()); |
||||
} |
||||
if (this.poolResources != null) { |
||||
result = result.onErrorComplete().then(this.poolResources.disposeLater()); |
||||
} |
||||
result = result.onErrorComplete().then(stopScheduler()); |
||||
} |
||||
else { |
||||
result = stopScheduler(); |
||||
} |
||||
|
||||
return result.toFuture(); |
||||
} |
||||
|
||||
private Mono<Void> stopScheduler() { |
||||
return Mono.fromRunnable(() -> { |
||||
this.scheduler.dispose(); |
||||
for (int i = 0; i < 20; i++) { |
||||
if (this.scheduler.isDisposed()) { |
||||
break; |
||||
} |
||||
try { |
||||
Thread.sleep(100); |
||||
} |
||||
catch (Throwable ex) { |
||||
break; |
||||
} |
||||
} |
||||
}); |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return "ReactorNetty2TcpClient[" + this.tcpClient + "]"; |
||||
} |
||||
|
||||
|
||||
private class ReactorNettyHandler implements BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> { |
||||
|
||||
private final TcpConnectionHandler<P> connectionHandler; |
||||
|
||||
ReactorNettyHandler(TcpConnectionHandler<P> handler) { |
||||
this.connectionHandler = handler; |
||||
} |
||||
|
||||
@Override |
||||
@SuppressWarnings("unchecked") |
||||
public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) { |
||||
inbound.withConnection(conn -> { |
||||
if (logger.isDebugEnabled()) { |
||||
logger.debug("Connected to " + conn.address()); |
||||
} |
||||
}); |
||||
Sinks.Empty<Void> completionSink = Sinks.empty(); |
||||
TcpConnection<P> connection = new ReactorNetty2TcpConnection<>(inbound, outbound, codec, completionSink); |
||||
scheduler.schedule(() -> this.connectionHandler.afterConnected(connection)); |
||||
|
||||
inbound.withConnection(conn -> conn.addHandlerFirst(new StompMessageDecoder<>(codec))); |
||||
|
||||
inbound.receiveObject() |
||||
.cast(Message.class) |
||||
.publishOn(scheduler, PUBLISH_ON_BUFFER_SIZE) |
||||
.subscribe( |
||||
this.connectionHandler::handleMessage, |
||||
this.connectionHandler::handleFailure, |
||||
this.connectionHandler::afterConnectionClosed); |
||||
|
||||
return completionSink.asMono(); |
||||
} |
||||
} |
||||
|
||||
|
||||
private static class StompMessageDecoder<P> extends ByteToMessageDecoder { |
||||
|
||||
private final TcpMessageCodec<P> codec; |
||||
|
||||
StompMessageDecoder(TcpMessageCodec<P> codec) { |
||||
this.codec = codec; |
||||
} |
||||
|
||||
@Override |
||||
protected void decode(ChannelHandlerContext ctx, Buffer buffer) throws Exception { |
||||
ByteBuffer byteBuffer = ByteBuffer.allocate(buffer.readableBytes()); |
||||
buffer.readBytes(byteBuffer); |
||||
byteBuffer.flip(); |
||||
List<Message<P>> messages = this.codec.decode(byteBuffer); |
||||
for (Message<P> message : messages) { |
||||
ctx.fireChannelRead(message); |
||||
} |
||||
} |
||||
|
||||
} |
||||
|
||||
} |
||||
@ -1,88 +0,0 @@
@@ -1,88 +0,0 @@
|
||||
/* |
||||
* Copyright 2002-2022 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.messaging.tcp.reactor; |
||||
|
||||
import java.nio.ByteBuffer; |
||||
import java.util.concurrent.CompletableFuture; |
||||
|
||||
import io.netty5.buffer.Buffer; |
||||
import reactor.core.publisher.Mono; |
||||
import reactor.core.publisher.Sinks; |
||||
import reactor.netty5.NettyInbound; |
||||
import reactor.netty5.NettyOutbound; |
||||
|
||||
import org.springframework.messaging.Message; |
||||
import org.springframework.messaging.tcp.TcpConnection; |
||||
|
||||
/** |
||||
* Reactor Netty based implementation of {@link TcpConnection}. |
||||
* |
||||
* <p>This class is based on {@link ReactorNettyTcpConnection}. |
||||
* |
||||
* @author Rossen Stoyanchev |
||||
* @since 6.0 |
||||
* @param <P> the type of payload for outbound messages |
||||
*/ |
||||
public class ReactorNetty2TcpConnection<P> implements TcpConnection<P> { |
||||
|
||||
private final NettyInbound inbound; |
||||
|
||||
private final NettyOutbound outbound; |
||||
|
||||
private final TcpMessageCodec<P> codec; |
||||
|
||||
private final Sinks.Empty<Void> completionSink; |
||||
|
||||
|
||||
public ReactorNetty2TcpConnection(NettyInbound inbound, NettyOutbound outbound, |
||||
TcpMessageCodec<P> codec, Sinks.Empty<Void> completionSink) { |
||||
|
||||
this.inbound = inbound; |
||||
this.outbound = outbound; |
||||
this.codec = codec; |
||||
this.completionSink = completionSink; |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public CompletableFuture<Void> sendAsync(Message<P> message) { |
||||
ByteBuffer byteBuffer = this.codec.encode(message); |
||||
Buffer buffer = this.outbound.alloc().copyOf(byteBuffer); |
||||
return this.outbound.send(Mono.just(buffer)).then().toFuture(); |
||||
} |
||||
|
||||
@Override |
||||
public void onReadInactivity(Runnable runnable, long inactivityDuration) { |
||||
this.inbound.withConnection(conn -> conn.onReadIdle(inactivityDuration, runnable)); |
||||
} |
||||
|
||||
@Override |
||||
public void onWriteInactivity(Runnable runnable, long inactivityDuration) { |
||||
this.inbound.withConnection(conn -> conn.onWriteIdle(inactivityDuration, runnable)); |
||||
} |
||||
|
||||
@Override |
||||
public void close() { |
||||
// Ignore result: concurrent attempts to complete are ok
|
||||
this.completionSink.tryEmitEmpty(); |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return "ReactorNetty2TcpConnection[inbound=" + this.inbound + "]"; |
||||
} |
||||
} |
||||
@ -1,38 +0,0 @@
@@ -1,38 +0,0 @@
|
||||
/* |
||||
* Copyright 2002-2023 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.messaging.simp.stomp; |
||||
|
||||
import org.junit.jupiter.api.Disabled; |
||||
|
||||
import org.springframework.messaging.tcp.TcpOperations; |
||||
import org.springframework.messaging.tcp.reactor.ReactorNetty2TcpClient; |
||||
|
||||
/** |
||||
* Integration tests for {@link StompBrokerRelayMessageHandler} running against |
||||
* ActiveMQ with {@link ReactorNetty2TcpClient}. |
||||
* |
||||
* @author Rossen Stoyanchev |
||||
*/ |
||||
@Disabled("gh-29287 :: Disabled because they fail too frequently") |
||||
public class ReactorNetty2StompBrokerRelayIntegrationTests extends AbstractStompBrokerRelayIntegrationTests { |
||||
|
||||
@Override |
||||
protected TcpOperations<byte[]> initTcpClient(int port) { |
||||
return new ReactorNetty2TcpClient<>("127.0.0.1", port, new StompTcpMessageCodec()); |
||||
} |
||||
|
||||
} |
||||
@ -1,144 +0,0 @@
@@ -1,144 +0,0 @@
|
||||
/* |
||||
* Copyright 2002-2023 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.http.client.reactive; |
||||
|
||||
import java.net.URI; |
||||
import java.util.Map; |
||||
import java.util.concurrent.atomic.AtomicReference; |
||||
import java.util.function.Function; |
||||
|
||||
import io.netty5.util.AttributeKey; |
||||
import reactor.core.publisher.Mono; |
||||
import reactor.netty5.NettyOutbound; |
||||
import reactor.netty5.http.client.HttpClient; |
||||
import reactor.netty5.http.client.HttpClientRequest; |
||||
import reactor.netty5.resources.ConnectionProvider; |
||||
import reactor.netty5.resources.LoopResources; |
||||
|
||||
import org.springframework.http.HttpMethod; |
||||
import org.springframework.util.Assert; |
||||
|
||||
/** |
||||
* Reactor Netty 2 (Netty 5) implementation of {@link ClientHttpConnector}. |
||||
* |
||||
* <p>This class is based on {@link ReactorClientHttpConnector}. |
||||
* |
||||
* @author Violeta Georgieva |
||||
* @since 6.0 |
||||
* @see HttpClient |
||||
*/ |
||||
public class ReactorNetty2ClientHttpConnector implements ClientHttpConnector { |
||||
|
||||
/** |
||||
* Channel attribute key under which {@code WebClient} request attributes are stored as a Map. |
||||
* @since 6.2 |
||||
*/ |
||||
public static final AttributeKey<Map<String, Object>> ATTRIBUTES_KEY = |
||||
AttributeKey.valueOf(ReactorNetty2ClientHttpRequest.class.getName() + ".ATTRIBUTES"); |
||||
|
||||
private static final Function<HttpClient, HttpClient> defaultInitializer = client -> client.compress(true); |
||||
|
||||
|
||||
private final HttpClient httpClient; |
||||
|
||||
|
||||
/** |
||||
* Default constructor. Initializes {@link HttpClient} via: |
||||
* <pre class="code"> |
||||
* HttpClient.create().compress() |
||||
* </pre> |
||||
*/ |
||||
public ReactorNetty2ClientHttpConnector() { |
||||
this.httpClient = defaultInitializer.apply(HttpClient.create().wiretap(true)); |
||||
} |
||||
|
||||
/** |
||||
* Constructor with externally managed Reactor Netty resources, including |
||||
* {@link LoopResources} for event loop threads, and {@link ConnectionProvider} |
||||
* for the connection pool. |
||||
* <p>This constructor should be used only when you don't want the client |
||||
* to participate in the Reactor Netty global resources. By default, the |
||||
* client participates in the Reactor Netty global resources held in |
||||
* {@link reactor.netty5.http.HttpResources}, which is recommended since |
||||
* fixed, shared resources are favored for event loop concurrency. However, |
||||
* consider declaring a {@link ReactorNetty2ResourceFactory} bean with |
||||
* {@code globalResources=true} in order to ensure the Reactor Netty global |
||||
* resources are shut down when the Spring ApplicationContext is closed. |
||||
* @param factory the resource factory to obtain the resources from |
||||
* @param mapper a mapper for further initialization of the created client |
||||
* @since 5.1 |
||||
*/ |
||||
public ReactorNetty2ClientHttpConnector(ReactorNetty2ResourceFactory factory, Function<HttpClient, HttpClient> mapper) { |
||||
ConnectionProvider provider = factory.getConnectionProvider(); |
||||
Assert.notNull(provider, "No ConnectionProvider: is ReactorNetty2ResourceFactory not initialized yet?"); |
||||
this.httpClient = defaultInitializer.andThen(mapper).andThen(applyLoopResources(factory)) |
||||
.apply(HttpClient.create(provider)); |
||||
} |
||||
|
||||
private static Function<HttpClient, HttpClient> applyLoopResources(ReactorNetty2ResourceFactory factory) { |
||||
return httpClient -> { |
||||
LoopResources resources = factory.getLoopResources(); |
||||
Assert.notNull(resources, "No LoopResources: is ReactorNetty2ResourceFactory not initialized yet?"); |
||||
return httpClient.runOn(resources); |
||||
}; |
||||
} |
||||
|
||||
|
||||
/** |
||||
* Constructor with a pre-configured {@code HttpClient} instance. |
||||
* @param httpClient the client to use |
||||
* @since 5.1 |
||||
*/ |
||||
public ReactorNetty2ClientHttpConnector(HttpClient httpClient) { |
||||
Assert.notNull(httpClient, "HttpClient is required"); |
||||
this.httpClient = httpClient; |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri, |
||||
Function<? super ClientHttpRequest, Mono<Void>> requestCallback) { |
||||
|
||||
AtomicReference<ReactorNetty2ClientHttpResponse> responseRef = new AtomicReference<>(); |
||||
|
||||
HttpClient.RequestSender requestSender = this.httpClient |
||||
.request(io.netty5.handler.codec.http.HttpMethod.valueOf(method.name())); |
||||
|
||||
requestSender = (uri.isAbsolute() ? requestSender.uri(uri) : requestSender.uri(uri.toString())); |
||||
|
||||
return requestSender |
||||
.send((request, outbound) -> requestCallback.apply(adaptRequest(method, uri, request, outbound))) |
||||
.responseConnection((response, connection) -> { |
||||
responseRef.set(new ReactorNetty2ClientHttpResponse(response, connection)); |
||||
return Mono.just((ClientHttpResponse) responseRef.get()); |
||||
}) |
||||
.next() |
||||
.doOnCancel(() -> { |
||||
ReactorNetty2ClientHttpResponse response = responseRef.get(); |
||||
if (response != null) { |
||||
response.releaseAfterCancel(method); |
||||
} |
||||
}); |
||||
} |
||||
|
||||
private ReactorNetty2ClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpClientRequest request, |
||||
NettyOutbound nettyOutbound) { |
||||
|
||||
return new ReactorNetty2ClientHttpRequest(method, uri, request, nettyOutbound); |
||||
} |
||||
|
||||
} |
||||
@ -1,162 +0,0 @@
@@ -1,162 +0,0 @@
|
||||
/* |
||||
* Copyright 2002-2024 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.http.client.reactive; |
||||
|
||||
import java.net.URI; |
||||
import java.nio.file.Path; |
||||
|
||||
import io.netty5.buffer.Buffer; |
||||
import io.netty5.handler.codec.http.headers.DefaultHttpCookiePair; |
||||
import org.reactivestreams.Publisher; |
||||
import reactor.core.publisher.Flux; |
||||
import reactor.core.publisher.Mono; |
||||
import reactor.netty5.NettyOutbound; |
||||
import reactor.netty5.channel.ChannelOperations; |
||||
import reactor.netty5.http.client.HttpClientRequest; |
||||
|
||||
import org.springframework.core.io.buffer.DataBuffer; |
||||
import org.springframework.core.io.buffer.DataBufferFactory; |
||||
import org.springframework.core.io.buffer.Netty5DataBufferFactory; |
||||
import org.springframework.http.HttpHeaders; |
||||
import org.springframework.http.HttpMethod; |
||||
import org.springframework.http.ZeroCopyHttpOutputMessage; |
||||
import org.springframework.http.support.Netty5HeadersAdapter; |
||||
|
||||
/** |
||||
* {@link ClientHttpRequest} implementation for the Reactor Netty 2 (Netty 5) HTTP client. |
||||
* |
||||
* <p>This class is based on {@link ReactorClientHttpRequest}. |
||||
* |
||||
* @author Violeta Georgieva |
||||
* @since 6.0 |
||||
* @see reactor.netty5.http.client.HttpClient |
||||
*/ |
||||
class ReactorNetty2ClientHttpRequest extends AbstractClientHttpRequest implements ZeroCopyHttpOutputMessage { |
||||
|
||||
private final HttpMethod httpMethod; |
||||
|
||||
private final URI uri; |
||||
|
||||
private final HttpClientRequest request; |
||||
|
||||
private final NettyOutbound outbound; |
||||
|
||||
private final Netty5DataBufferFactory bufferFactory; |
||||
|
||||
|
||||
public ReactorNetty2ClientHttpRequest( |
||||
HttpMethod method, URI uri, HttpClientRequest request, NettyOutbound outbound) { |
||||
|
||||
this.httpMethod = method; |
||||
this.uri = uri; |
||||
this.request = request; |
||||
this.outbound = outbound; |
||||
this.bufferFactory = new Netty5DataBufferFactory(outbound.alloc()); |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public HttpMethod getMethod() { |
||||
return this.httpMethod; |
||||
} |
||||
|
||||
@Override |
||||
public URI getURI() { |
||||
return this.uri; |
||||
} |
||||
|
||||
@Override |
||||
public DataBufferFactory bufferFactory() { |
||||
return this.bufferFactory; |
||||
} |
||||
|
||||
@Override |
||||
@SuppressWarnings("unchecked") |
||||
public <T> T getNativeRequest() { |
||||
return (T) this.request; |
||||
} |
||||
|
||||
@Override |
||||
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) { |
||||
return doCommit(() -> { |
||||
// Send as Mono if possible as an optimization hint to Reactor Netty
|
||||
if (body instanceof Mono) { |
||||
Mono<Buffer> bufferMono = Mono.from(body).map(Netty5DataBufferFactory::toBuffer); |
||||
return this.outbound.send(bufferMono).then(); |
||||
|
||||
} |
||||
else { |
||||
Flux<Buffer> bufferFlux = Flux.from(body).map(Netty5DataBufferFactory::toBuffer); |
||||
return this.outbound.send(bufferFlux).then(); |
||||
} |
||||
}); |
||||
} |
||||
|
||||
@Override |
||||
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) { |
||||
Publisher<Publisher<Buffer>> buffers = Flux.from(body).map(ReactorNetty2ClientHttpRequest::toBuffers); |
||||
return doCommit(() -> this.outbound.sendGroups(buffers).then()); |
||||
} |
||||
|
||||
private static Publisher<Buffer> toBuffers(Publisher<? extends DataBuffer> dataBuffers) { |
||||
return Flux.from(dataBuffers).map(Netty5DataBufferFactory::toBuffer); |
||||
} |
||||
|
||||
@Override |
||||
public Mono<Void> writeWith(Path file, long position, long count) { |
||||
return doCommit(() -> this.outbound.sendFile(file, position, count).then()); |
||||
} |
||||
|
||||
@Override |
||||
public Mono<Void> setComplete() { |
||||
// NettyOutbound#then() expects a body
|
||||
// Use null as the write action for a more optimal send
|
||||
return doCommit(null); |
||||
} |
||||
|
||||
@Override |
||||
protected void applyHeaders() { |
||||
getHeaders().forEach((key, value) -> this.request.requestHeaders().set(key, value)); |
||||
} |
||||
|
||||
@Override |
||||
protected void applyCookies() { |
||||
getCookies().values().forEach(values -> values.forEach(value -> { |
||||
DefaultHttpCookiePair cookie = new DefaultHttpCookiePair(value.getName(), value.getValue()); |
||||
this.request.addCookie(cookie); |
||||
})); |
||||
} |
||||
|
||||
/** |
||||
* Saves the {@link #getAttributes() request attributes} to the |
||||
* {@link reactor.netty.channel.ChannelOperations#channel() channel} as a single map |
||||
* attribute under the key {@link ReactorNetty2ClientHttpConnector#ATTRIBUTES_KEY}. |
||||
*/ |
||||
@Override |
||||
protected void applyAttributes() { |
||||
if (!getAttributes().isEmpty()) { |
||||
((ChannelOperations<?, ?>) this.request).channel() |
||||
.attr(ReactorNetty2ClientHttpConnector.ATTRIBUTES_KEY).set(getAttributes()); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
protected HttpHeaders initReadOnlyHeaders() { |
||||
return HttpHeaders.readOnlyHttpHeaders(new Netty5HeadersAdapter(this.request.requestHeaders())); |
||||
} |
||||
|
||||
} |
||||
@ -1,183 +0,0 @@
@@ -1,183 +0,0 @@
|
||||
/* |
||||
* Copyright 2002-2024 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.http.client.reactive; |
||||
|
||||
import java.util.Collection; |
||||
import java.util.concurrent.atomic.AtomicInteger; |
||||
import java.util.function.BiFunction; |
||||
|
||||
import io.netty5.handler.codec.http.headers.DefaultHttpSetCookie; |
||||
import io.netty5.handler.codec.http.headers.HttpSetCookie; |
||||
import org.apache.commons.logging.Log; |
||||
import org.apache.commons.logging.LogFactory; |
||||
import org.jspecify.annotations.Nullable; |
||||
import reactor.core.publisher.Flux; |
||||
import reactor.netty5.ChannelOperationsId; |
||||
import reactor.netty5.Connection; |
||||
import reactor.netty5.NettyInbound; |
||||
import reactor.netty5.http.client.HttpClientResponse; |
||||
|
||||
import org.springframework.core.io.buffer.DataBuffer; |
||||
import org.springframework.core.io.buffer.Netty5DataBufferFactory; |
||||
import org.springframework.http.HttpHeaders; |
||||
import org.springframework.http.HttpMethod; |
||||
import org.springframework.http.HttpStatusCode; |
||||
import org.springframework.http.ResponseCookie; |
||||
import org.springframework.http.support.Netty5HeadersAdapter; |
||||
import org.springframework.util.CollectionUtils; |
||||
import org.springframework.util.LinkedMultiValueMap; |
||||
import org.springframework.util.MultiValueMap; |
||||
import org.springframework.util.ObjectUtils; |
||||
|
||||
/** |
||||
* {@link ClientHttpResponse} implementation for the Reactor Netty 2 (Netty 5) HTTP client. |
||||
* |
||||
* <p>This class is based on {@link ReactorClientHttpResponse}. |
||||
* |
||||
* @author Violeta Georgieva |
||||
* @since 6.0 |
||||
* @see reactor.netty5.http.client.HttpClient |
||||
*/ |
||||
class ReactorNetty2ClientHttpResponse implements ClientHttpResponse { |
||||
|
||||
private static final Log logger = LogFactory.getLog(ReactorNetty2ClientHttpResponse.class); |
||||
|
||||
private final HttpClientResponse response; |
||||
|
||||
private final HttpHeaders headers; |
||||
|
||||
private final NettyInbound inbound; |
||||
|
||||
private final Netty5DataBufferFactory bufferFactory; |
||||
|
||||
// 0 - not subscribed, 1 - subscribed, 2 - cancelled via connector (before subscribe)
|
||||
private final AtomicInteger state = new AtomicInteger(); |
||||
|
||||
|
||||
/** |
||||
* Constructor that matches the inputs from |
||||
* {@link reactor.netty5.http.client.HttpClient.ResponseReceiver#responseConnection(BiFunction)}. |
||||
* @since 5.2.8 |
||||
*/ |
||||
public ReactorNetty2ClientHttpResponse(HttpClientResponse response, Connection connection) { |
||||
this.response = response; |
||||
MultiValueMap<String, String> adapter = new Netty5HeadersAdapter(response.responseHeaders()); |
||||
this.headers = HttpHeaders.readOnlyHttpHeaders(adapter); |
||||
this.inbound = connection.inbound(); |
||||
this.bufferFactory = new Netty5DataBufferFactory(connection.outbound().alloc()); |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public String getId() { |
||||
String id = null; |
||||
if (this.response instanceof ChannelOperationsId operationsId) { |
||||
id = (logger.isDebugEnabled() ? operationsId.asLongText() : operationsId.asShortText()); |
||||
} |
||||
if (id == null && this.response instanceof Connection connection) { |
||||
id = connection.channel().id().asShortText(); |
||||
} |
||||
return (id != null ? id : ObjectUtils.getIdentityHexString(this)); |
||||
} |
||||
|
||||
@Override |
||||
public Flux<DataBuffer> getBody() { |
||||
return this.inbound.receive() |
||||
.doOnSubscribe(s -> { |
||||
if (this.state.compareAndSet(0, 1)) { |
||||
return; |
||||
} |
||||
if (this.state.get() == 2) { |
||||
throw new IllegalStateException( |
||||
"The client response body has been released already due to cancellation."); |
||||
} |
||||
}) |
||||
.map(buffer -> this.bufferFactory.wrap(buffer.split())); |
||||
} |
||||
|
||||
@Override |
||||
public HttpHeaders getHeaders() { |
||||
return this.headers; |
||||
} |
||||
|
||||
@Override |
||||
public HttpStatusCode getStatusCode() { |
||||
return HttpStatusCode.valueOf(this.response.status().code()); |
||||
} |
||||
|
||||
@Override |
||||
public MultiValueMap<String, ResponseCookie> getCookies() { |
||||
MultiValueMap<String, ResponseCookie> result = new LinkedMultiValueMap<>(); |
||||
this.response.cookies().values().stream() |
||||
.flatMap(Collection::stream) |
||||
.forEach(cookie -> result.add(cookie.name().toString(), |
||||
ResponseCookie.fromClientResponse(cookie.name().toString(), cookie.value().toString()) |
||||
.domain(toString(cookie.domain())) |
||||
.path(toString(cookie.path())) |
||||
.maxAge(toLong(cookie.maxAge())) |
||||
.secure(cookie.isSecure()) |
||||
.httpOnly(cookie.isHttpOnly()) |
||||
.sameSite(getSameSite(cookie)) |
||||
.build())); |
||||
return CollectionUtils.unmodifiableMultiValueMap(result); |
||||
} |
||||
|
||||
private static @Nullable String toString(@Nullable CharSequence value) { |
||||
return (value != null ? value.toString() : null); |
||||
} |
||||
|
||||
private static long toLong(@Nullable Long value) { |
||||
return (value != null ? value : -1); |
||||
} |
||||
|
||||
private static @Nullable String getSameSite(HttpSetCookie cookie) { |
||||
if (cookie instanceof DefaultHttpSetCookie defaultCookie && defaultCookie.sameSite() != null) { |
||||
return defaultCookie.sameSite().name(); |
||||
} |
||||
return null; |
||||
} |
||||
|
||||
/** |
||||
* Called by {@link ReactorNetty2ClientHttpConnector} when a cancellation is detected |
||||
* but the content has not been subscribed to. If the subscription never |
||||
* materializes then the content will remain not drained. Or it could still |
||||
* materialize if the cancellation happened very early, or the response |
||||
* reading was delayed for some reason. |
||||
*/ |
||||
void releaseAfterCancel(HttpMethod method) { |
||||
if (mayHaveBody(method) && this.state.compareAndSet(0, 2)) { |
||||
if (logger.isDebugEnabled()) { |
||||
logger.debug("[" + getId() + "]" + "Releasing body, not yet subscribed."); |
||||
} |
||||
this.inbound.receive().doOnNext(buffer -> {}).subscribe(buffer -> {}, ex -> {}); |
||||
} |
||||
} |
||||
|
||||
private boolean mayHaveBody(HttpMethod method) { |
||||
int code = getStatusCode().value(); |
||||
return !((code >= 100 && code < 200) || code == 204 || code == 205 || |
||||
method.equals(HttpMethod.HEAD) || getHeaders().getContentLength() == 0); |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return "ReactorNetty2ClientHttpResponse{" + |
||||
"request=[" + this.response.method().name() + " " + this.response.uri() + "]," + |
||||
"status=" + getStatusCode() + '}'; |
||||
} |
||||
|
||||
} |
||||
@ -1,248 +0,0 @@
@@ -1,248 +0,0 @@
|
||||
/* |
||||
* Copyright 2002-2024 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.http.client.reactive; |
||||
|
||||
import java.time.Duration; |
||||
import java.util.function.Consumer; |
||||
import java.util.function.Supplier; |
||||
|
||||
import org.jspecify.annotations.Nullable; |
||||
import reactor.netty5.http.HttpResources; |
||||
import reactor.netty5.resources.ConnectionProvider; |
||||
import reactor.netty5.resources.LoopResources; |
||||
|
||||
import org.springframework.beans.factory.DisposableBean; |
||||
import org.springframework.beans.factory.InitializingBean; |
||||
import org.springframework.http.client.ReactorResourceFactory; |
||||
import org.springframework.util.Assert; |
||||
|
||||
/** |
||||
* Factory to manage Reactor Netty resources, i.e. {@link LoopResources} for |
||||
* event loop threads, and {@link ConnectionProvider} for the connection pool, |
||||
* within the lifecycle of a Spring {@code ApplicationContext}. |
||||
* |
||||
* <p>This factory implements {@link InitializingBean} and {@link DisposableBean} |
||||
* and is expected typically to be declared as a Spring-managed bean. |
||||
* |
||||
* <p>This class is based on {@link ReactorResourceFactory}. |
||||
* |
||||
* @author Violeta Georgieva |
||||
* @since 6.0 |
||||
*/ |
||||
public class ReactorNetty2ResourceFactory implements InitializingBean, DisposableBean { |
||||
|
||||
private boolean useGlobalResources = true; |
||||
|
||||
private @Nullable Consumer<HttpResources> globalResourcesConsumer; |
||||
|
||||
private Supplier<ConnectionProvider> connectionProviderSupplier = () -> ConnectionProvider.create("webflux", 500); |
||||
|
||||
private @Nullable ConnectionProvider connectionProvider; |
||||
|
||||
private Supplier<LoopResources> loopResourcesSupplier = () -> LoopResources.create("webflux-http"); |
||||
|
||||
private @Nullable LoopResources loopResources; |
||||
|
||||
private boolean manageConnectionProvider = false; |
||||
|
||||
private boolean manageLoopResources = false; |
||||
|
||||
private Duration shutdownQuietPeriod = Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_QUIET_PERIOD); |
||||
|
||||
private Duration shutdownTimeout = Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_TIMEOUT); |
||||
|
||||
|
||||
/** |
||||
* Whether to use global Reactor Netty resources via {@link HttpResources}. |
||||
* <p>Default is "true" in which case this factory initializes and stops the |
||||
* global Reactor Netty resources within Spring's {@code ApplicationContext} |
||||
* lifecycle. If set to "false" the factory manages its resources independent |
||||
* of the global ones. |
||||
* @param useGlobalResources whether to expose and manage the global resources |
||||
* @see #addGlobalResourcesConsumer(Consumer) |
||||
*/ |
||||
public void setUseGlobalResources(boolean useGlobalResources) { |
||||
this.useGlobalResources = useGlobalResources; |
||||
} |
||||
|
||||
/** |
||||
* Whether this factory exposes the global |
||||
* {@link HttpResources HttpResources} holder. |
||||
*/ |
||||
public boolean isUseGlobalResources() { |
||||
return this.useGlobalResources; |
||||
} |
||||
|
||||
/** |
||||
* Add a Consumer for configuring the global Reactor Netty resources on |
||||
* startup. When this option is used, {@link #setUseGlobalResources} is also |
||||
* enabled. |
||||
* @param consumer the consumer to apply |
||||
* @see #setUseGlobalResources(boolean) |
||||
*/ |
||||
public void addGlobalResourcesConsumer(Consumer<HttpResources> consumer) { |
||||
this.useGlobalResources = true; |
||||
this.globalResourcesConsumer = (this.globalResourcesConsumer != null ? |
||||
this.globalResourcesConsumer.andThen(consumer) : consumer); |
||||
} |
||||
|
||||
/** |
||||
* Use this when you don't want to participate in global resources and |
||||
* you want to customize the creation of the managed {@code ConnectionProvider}. |
||||
* <p>By default, {@code ConnectionProvider.elastic("http")} is used. |
||||
* <p>Note that this supplier is ignored if {@link #isUseGlobalResources()} |
||||
* is {@code true} or once the {@link #setConnectionProvider(ConnectionProvider) ConnectionProvider} |
||||
* is set. |
||||
* @param supplier the supplier to use |
||||
*/ |
||||
public void setConnectionProviderSupplier(Supplier<ConnectionProvider> supplier) { |
||||
this.connectionProviderSupplier = supplier; |
||||
} |
||||
|
||||
/** |
||||
* Use this when you want to provide an externally managed |
||||
* {@link ConnectionProvider} instance. |
||||
* @param connectionProvider the connection provider to use as is |
||||
*/ |
||||
public void setConnectionProvider(ConnectionProvider connectionProvider) { |
||||
this.connectionProvider = connectionProvider; |
||||
} |
||||
|
||||
/** |
||||
* Return the configured {@link ConnectionProvider}. |
||||
*/ |
||||
public ConnectionProvider getConnectionProvider() { |
||||
Assert.state(this.connectionProvider != null, "ConnectionProvider not initialized yet"); |
||||
return this.connectionProvider; |
||||
} |
||||
|
||||
/** |
||||
* Use this when you don't want to participate in global resources and |
||||
* you want to customize the creation of the managed {@code LoopResources}. |
||||
* <p>By default, {@code LoopResources.create("webflux-http")} is used. |
||||
* <p>Note that this supplier is ignored if {@link #isUseGlobalResources()} |
||||
* is {@code true} or once the {@link #setLoopResources(LoopResources) LoopResources} |
||||
* is set. |
||||
* @param supplier the supplier to use |
||||
*/ |
||||
public void setLoopResourcesSupplier(Supplier<LoopResources> supplier) { |
||||
this.loopResourcesSupplier = supplier; |
||||
} |
||||
|
||||
/** |
||||
* Use this option when you want to provide an externally managed |
||||
* {@link LoopResources} instance. |
||||
* @param loopResources the loop resources to use as is |
||||
*/ |
||||
public void setLoopResources(LoopResources loopResources) { |
||||
this.loopResources = loopResources; |
||||
} |
||||
|
||||
/** |
||||
* Return the configured {@link LoopResources}. |
||||
*/ |
||||
public LoopResources getLoopResources() { |
||||
Assert.state(this.loopResources != null, "LoopResources not initialized yet"); |
||||
return this.loopResources; |
||||
} |
||||
|
||||
/** |
||||
* Configure the amount of time we'll wait before shutting down resources. |
||||
* If a task is submitted during the {@code shutdownQuietPeriod}, it is guaranteed |
||||
* to be accepted and the {@code shutdownQuietPeriod} will start over. |
||||
* <p>By default, this is set to |
||||
* {@link LoopResources#DEFAULT_SHUTDOWN_QUIET_PERIOD} which is 2 seconds but |
||||
* can also be overridden with the system property |
||||
* {@link reactor.netty5.ReactorNetty#SHUTDOWN_QUIET_PERIOD |
||||
* ReactorNetty.SHUTDOWN_QUIET_PERIOD}. |
||||
* @see #setShutdownTimeout(Duration) |
||||
*/ |
||||
public void setShutdownQuietPeriod(Duration shutdownQuietPeriod) { |
||||
Assert.notNull(shutdownQuietPeriod, "shutdownQuietPeriod should not be null"); |
||||
this.shutdownQuietPeriod = shutdownQuietPeriod; |
||||
} |
||||
|
||||
/** |
||||
* Configure the maximum amount of time to wait until the disposal of the |
||||
* underlying resources regardless if a task was submitted during the |
||||
* {@code shutdownQuietPeriod}. |
||||
* <p>By default, this is set to |
||||
* {@link LoopResources#DEFAULT_SHUTDOWN_TIMEOUT} which is 15 seconds but |
||||
* can also be overridden with the system property |
||||
* {@link reactor.netty5.ReactorNetty#SHUTDOWN_TIMEOUT |
||||
* ReactorNetty.SHUTDOWN_TIMEOUT}. |
||||
* @see #setShutdownQuietPeriod(Duration) |
||||
*/ |
||||
public void setShutdownTimeout(Duration shutdownTimeout) { |
||||
Assert.notNull(shutdownTimeout, "shutdownTimeout should not be null"); |
||||
this.shutdownTimeout = shutdownTimeout; |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public void afterPropertiesSet() { |
||||
if (this.useGlobalResources) { |
||||
Assert.isTrue(this.loopResources == null && this.connectionProvider == null, |
||||
"'useGlobalResources' is mutually exclusive with explicitly configured resources"); |
||||
HttpResources httpResources = HttpResources.get(); |
||||
if (this.globalResourcesConsumer != null) { |
||||
this.globalResourcesConsumer.accept(httpResources); |
||||
} |
||||
this.connectionProvider = httpResources; |
||||
this.loopResources = httpResources; |
||||
} |
||||
else { |
||||
if (this.loopResources == null) { |
||||
this.manageLoopResources = true; |
||||
this.loopResources = this.loopResourcesSupplier.get(); |
||||
} |
||||
if (this.connectionProvider == null) { |
||||
this.manageConnectionProvider = true; |
||||
this.connectionProvider = this.connectionProviderSupplier.get(); |
||||
} |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void destroy() { |
||||
if (this.useGlobalResources) { |
||||
HttpResources.disposeLoopsAndConnectionsLater(this.shutdownQuietPeriod, this.shutdownTimeout).block(); |
||||
} |
||||
else { |
||||
try { |
||||
ConnectionProvider provider = this.connectionProvider; |
||||
if (provider != null && this.manageConnectionProvider) { |
||||
provider.disposeLater().block(); |
||||
} |
||||
} |
||||
catch (Throwable ex) { |
||||
// ignore
|
||||
} |
||||
|
||||
try { |
||||
LoopResources resources = this.loopResources; |
||||
if (resources != null && this.manageLoopResources) { |
||||
resources.disposeLater(this.shutdownQuietPeriod, this.shutdownTimeout).block(); |
||||
} |
||||
} |
||||
catch (Throwable ex) { |
||||
// ignore
|
||||
} |
||||
} |
||||
} |
||||
|
||||
} |
||||
@ -1,79 +0,0 @@
@@ -1,79 +0,0 @@
|
||||
/* |
||||
* Copyright 2002-2022 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.http.server.reactive; |
||||
|
||||
import java.net.URISyntaxException; |
||||
import java.util.function.BiFunction; |
||||
|
||||
import io.netty5.handler.codec.http.HttpResponseStatus; |
||||
import org.apache.commons.logging.Log; |
||||
import reactor.core.publisher.Mono; |
||||
import reactor.netty5.http.server.HttpServerRequest; |
||||
import reactor.netty5.http.server.HttpServerResponse; |
||||
|
||||
import org.springframework.core.io.buffer.Netty5DataBufferFactory; |
||||
import org.springframework.http.HttpLogging; |
||||
import org.springframework.http.HttpMethod; |
||||
import org.springframework.util.Assert; |
||||
|
||||
/** |
||||
* Adapt {@link HttpHandler} to the Reactor Netty 5 channel handling function. |
||||
* |
||||
* <p>This class is based on {@link ReactorHttpHandlerAdapter}. |
||||
* |
||||
* @author Violeta Georgieva |
||||
* @since 6.0 |
||||
*/ |
||||
public class ReactorNetty2HttpHandlerAdapter implements BiFunction<HttpServerRequest, HttpServerResponse, Mono<Void>> { |
||||
|
||||
private static final Log logger = HttpLogging.forLogName(ReactorNetty2HttpHandlerAdapter.class); |
||||
|
||||
|
||||
private final HttpHandler httpHandler; |
||||
|
||||
|
||||
public ReactorNetty2HttpHandlerAdapter(HttpHandler httpHandler) { |
||||
Assert.notNull(httpHandler, "HttpHandler must not be null"); |
||||
this.httpHandler = httpHandler; |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public Mono<Void> apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) { |
||||
Netty5DataBufferFactory bufferFactory = new Netty5DataBufferFactory(reactorResponse.alloc()); |
||||
try { |
||||
ReactorNetty2ServerHttpRequest request = new ReactorNetty2ServerHttpRequest(reactorRequest, bufferFactory); |
||||
ServerHttpResponse response = new ReactorNetty2ServerHttpResponse(reactorResponse, bufferFactory); |
||||
|
||||
if (request.getMethod() == HttpMethod.HEAD) { |
||||
response = new HttpHeadResponseDecorator(response); |
||||
} |
||||
|
||||
return this.httpHandler.handle(request, response) |
||||
.doOnError(ex -> logger.trace(request.getLogPrefix() + "Failed to complete: " + ex.getMessage())) |
||||
.doOnSuccess(aVoid -> logger.trace(request.getLogPrefix() + "Handling completed")); |
||||
} |
||||
catch (URISyntaxException ex) { |
||||
if (logger.isDebugEnabled()) { |
||||
logger.debug("Failed to get request URI: " + ex.getMessage()); |
||||
} |
||||
reactorResponse.status(HttpResponseStatus.BAD_REQUEST); |
||||
return Mono.empty(); |
||||
} |
||||
} |
||||
|
||||
} |
||||
@ -1,219 +0,0 @@
@@ -1,219 +0,0 @@
|
||||
/* |
||||
* Copyright 2002-2023 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.http.server.reactive; |
||||
|
||||
import java.net.InetSocketAddress; |
||||
import java.net.URI; |
||||
import java.net.URISyntaxException; |
||||
import java.util.concurrent.atomic.AtomicLong; |
||||
|
||||
import javax.net.ssl.SSLSession; |
||||
|
||||
import io.netty5.channel.Channel; |
||||
import io.netty5.handler.codec.http.HttpHeaderNames; |
||||
import io.netty5.handler.codec.http.headers.HttpCookiePair; |
||||
import io.netty5.handler.ssl.SslHandler; |
||||
import org.apache.commons.logging.Log; |
||||
import org.jspecify.annotations.Nullable; |
||||
import reactor.core.publisher.Flux; |
||||
import reactor.netty5.ChannelOperationsId; |
||||
import reactor.netty5.Connection; |
||||
import reactor.netty5.http.server.HttpServerRequest; |
||||
|
||||
import org.springframework.core.io.buffer.DataBuffer; |
||||
import org.springframework.core.io.buffer.Netty5DataBufferFactory; |
||||
import org.springframework.http.HttpCookie; |
||||
import org.springframework.http.HttpHeaders; |
||||
import org.springframework.http.HttpLogging; |
||||
import org.springframework.http.HttpMethod; |
||||
import org.springframework.http.support.Netty5HeadersAdapter; |
||||
import org.springframework.util.Assert; |
||||
import org.springframework.util.LinkedMultiValueMap; |
||||
import org.springframework.util.MultiValueMap; |
||||
|
||||
/** |
||||
* Adapt {@link ServerHttpRequest} to the Reactor {@link HttpServerRequest}. |
||||
* |
||||
* <p>This class is based on {@link ReactorServerHttpRequest}. |
||||
* |
||||
* @author Violeta Georgieva |
||||
* @author Sebastien Deleuze |
||||
* @since 6.0 |
||||
*/ |
||||
class ReactorNetty2ServerHttpRequest extends AbstractServerHttpRequest { |
||||
|
||||
private static final Log logger = HttpLogging.forLogName(ReactorNetty2ServerHttpRequest.class); |
||||
|
||||
|
||||
private static final AtomicLong logPrefixIndex = new AtomicLong(); |
||||
|
||||
|
||||
private final HttpServerRequest request; |
||||
|
||||
private final Netty5DataBufferFactory bufferFactory; |
||||
|
||||
|
||||
public ReactorNetty2ServerHttpRequest(HttpServerRequest request, Netty5DataBufferFactory bufferFactory) |
||||
throws URISyntaxException { |
||||
|
||||
super(HttpMethod.valueOf(request.method().name()), initUri(request), "", |
||||
new HttpHeaders(new Netty5HeadersAdapter(request.requestHeaders()))); |
||||
Assert.notNull(bufferFactory, "DataBufferFactory must not be null"); |
||||
this.request = request; |
||||
this.bufferFactory = bufferFactory; |
||||
} |
||||
|
||||
private static URI initUri(HttpServerRequest request) throws URISyntaxException { |
||||
Assert.notNull(request, "HttpServerRequest must not be null"); |
||||
return new URI(resolveBaseUrl(request) + resolveRequestUri(request)); |
||||
} |
||||
|
||||
private static URI resolveBaseUrl(HttpServerRequest request) throws URISyntaxException { |
||||
String scheme = getScheme(request); |
||||
|
||||
InetSocketAddress hostAddress = request.hostAddress(); |
||||
if (hostAddress != null) { |
||||
return new URI(scheme, null, hostAddress.getHostString(), hostAddress.getPort(), null, null, null); |
||||
} |
||||
|
||||
CharSequence charSequence = request.requestHeaders().get(HttpHeaderNames.HOST); |
||||
if (charSequence != null) { |
||||
String header = charSequence.toString(); |
||||
final int portIndex; |
||||
if (header.startsWith("[")) { |
||||
portIndex = header.indexOf(':', header.indexOf(']')); |
||||
} |
||||
else { |
||||
portIndex = header.indexOf(':'); |
||||
} |
||||
if (portIndex != -1) { |
||||
try { |
||||
return new URI(scheme, null, header.substring(0, portIndex), |
||||
Integer.parseInt(header, portIndex + 1, header.length(), 10), null, null, null); |
||||
} |
||||
catch (NumberFormatException ex) { |
||||
throw new URISyntaxException(header, "Unable to parse port", portIndex); |
||||
} |
||||
} |
||||
else { |
||||
return new URI(scheme, header, null, null); |
||||
} |
||||
} |
||||
|
||||
throw new IllegalStateException("Neither local hostAddress nor HOST header available"); |
||||
} |
||||
|
||||
private static String getScheme(HttpServerRequest request) { |
||||
return request.scheme(); |
||||
} |
||||
|
||||
private static String resolveRequestUri(HttpServerRequest request) { |
||||
String uri = request.uri(); |
||||
for (int i = 0; i < uri.length(); i++) { |
||||
char c = uri.charAt(i); |
||||
if (c == '/' || c == '?' || c == '#') { |
||||
break; |
||||
} |
||||
if (c == ':' && (i + 2 < uri.length())) { |
||||
if (uri.charAt(i + 1) == '/' && uri.charAt(i + 2) == '/') { |
||||
for (int j = i + 3; j < uri.length(); j++) { |
||||
c = uri.charAt(j); |
||||
if (c == '/' || c == '?' || c == '#') { |
||||
return uri.substring(j); |
||||
} |
||||
} |
||||
return ""; |
||||
} |
||||
} |
||||
} |
||||
return uri; |
||||
} |
||||
|
||||
@Override |
||||
protected MultiValueMap<String, HttpCookie> initCookies() { |
||||
MultiValueMap<String, HttpCookie> cookies = new LinkedMultiValueMap<>(); |
||||
for (CharSequence name : this.request.allCookies().keySet()) { |
||||
for (HttpCookiePair cookie : this.request.allCookies().get(name)) { |
||||
CharSequence cookieValue = cookie.value(); |
||||
HttpCookie httpCookie = new HttpCookie(name.toString(), cookieValue != null ? cookieValue.toString() : null); |
||||
cookies.add(name.toString(), httpCookie); |
||||
} |
||||
} |
||||
return cookies; |
||||
} |
||||
|
||||
@Override |
||||
public @Nullable InetSocketAddress getLocalAddress() { |
||||
return this.request.hostAddress(); |
||||
} |
||||
|
||||
@Override |
||||
public @Nullable InetSocketAddress getRemoteAddress() { |
||||
return this.request.remoteAddress(); |
||||
} |
||||
|
||||
@Override |
||||
protected @Nullable SslInfo initSslInfo() { |
||||
Channel channel = ((Connection) this.request).channel(); |
||||
SslHandler sslHandler = channel.pipeline().get(SslHandler.class); |
||||
if (sslHandler == null && channel.parent() != null) { // HTTP/2
|
||||
sslHandler = channel.parent().pipeline().get(SslHandler.class); |
||||
} |
||||
if (sslHandler != null) { |
||||
SSLSession session = sslHandler.engine().getSession(); |
||||
return new DefaultSslInfo(session); |
||||
} |
||||
return null; |
||||
} |
||||
|
||||
@Override |
||||
public Flux<DataBuffer> getBody() { |
||||
return this.request.receive().transferOwnership().map(this.bufferFactory::wrap); |
||||
} |
||||
|
||||
@SuppressWarnings("unchecked") |
||||
@Override |
||||
public <T> T getNativeRequest() { |
||||
return (T) this.request; |
||||
} |
||||
|
||||
@Override |
||||
protected @Nullable String initId() { |
||||
if (this.request instanceof Connection connection) { |
||||
return connection.channel().id().asShortText() + |
||||
"-" + logPrefixIndex.incrementAndGet(); |
||||
} |
||||
return null; |
||||
} |
||||
|
||||
@Override |
||||
protected String initLogPrefix() { |
||||
String id = null; |
||||
if (this.request instanceof ChannelOperationsId operationsId) { |
||||
id = (logger.isDebugEnabled() ? operationsId.asLongText() : operationsId.asShortText()); |
||||
} |
||||
if (id != null) { |
||||
return id; |
||||
} |
||||
if (this.request instanceof Connection connection) { |
||||
return connection.channel().id().asShortText() + |
||||
"-" + logPrefixIndex.incrementAndGet(); |
||||
} |
||||
return getId(); |
||||
} |
||||
|
||||
} |
||||
@ -1,141 +0,0 @@
@@ -1,141 +0,0 @@
|
||||
/* |
||||
* Copyright 2002-2024 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.http.server.reactive; |
||||
|
||||
import java.nio.file.Path; |
||||
|
||||
import io.netty5.buffer.Buffer; |
||||
import io.netty5.channel.ChannelId; |
||||
import io.netty5.handler.codec.http.headers.DefaultHttpSetCookie; |
||||
import io.netty5.handler.codec.http.headers.HttpSetCookie; |
||||
import org.apache.commons.logging.Log; |
||||
import org.apache.commons.logging.LogFactory; |
||||
import org.reactivestreams.Publisher; |
||||
import reactor.core.publisher.Flux; |
||||
import reactor.core.publisher.Mono; |
||||
import reactor.netty5.ChannelOperationsId; |
||||
import reactor.netty5.http.server.HttpServerResponse; |
||||
|
||||
import org.springframework.core.io.buffer.DataBuffer; |
||||
import org.springframework.core.io.buffer.DataBufferFactory; |
||||
import org.springframework.core.io.buffer.DataBufferUtils; |
||||
import org.springframework.core.io.buffer.Netty5DataBufferFactory; |
||||
import org.springframework.http.HttpHeaders; |
||||
import org.springframework.http.HttpStatusCode; |
||||
import org.springframework.http.ResponseCookie; |
||||
import org.springframework.http.ZeroCopyHttpOutputMessage; |
||||
import org.springframework.http.support.Netty5HeadersAdapter; |
||||
import org.springframework.util.Assert; |
||||
|
||||
/** |
||||
* Adapt {@link ServerHttpResponse} to the {@link HttpServerResponse}. |
||||
* |
||||
* <p>This class is based on {@link ReactorServerHttpResponse}. |
||||
* |
||||
* @author Violeta Georgieva |
||||
* @since 6.0 |
||||
*/ |
||||
class ReactorNetty2ServerHttpResponse extends AbstractServerHttpResponse implements ZeroCopyHttpOutputMessage { |
||||
|
||||
private static final Log logger = LogFactory.getLog(ReactorNetty2ServerHttpResponse.class); |
||||
|
||||
|
||||
private final HttpServerResponse response; |
||||
|
||||
|
||||
public ReactorNetty2ServerHttpResponse(HttpServerResponse response, DataBufferFactory bufferFactory) { |
||||
super(bufferFactory, new HttpHeaders(new Netty5HeadersAdapter(response.responseHeaders()))); |
||||
Assert.notNull(response, "HttpServerResponse must not be null"); |
||||
this.response = response; |
||||
} |
||||
|
||||
|
||||
@SuppressWarnings("unchecked") |
||||
@Override |
||||
public <T> T getNativeResponse() { |
||||
return (T) this.response; |
||||
} |
||||
|
||||
@Override |
||||
public HttpStatusCode getStatusCode() { |
||||
HttpStatusCode status = super.getStatusCode(); |
||||
return (status != null ? status : HttpStatusCode.valueOf(this.response.status().code())); |
||||
} |
||||
|
||||
@Override |
||||
protected void applyStatusCode() { |
||||
HttpStatusCode status = super.getStatusCode(); |
||||
if (status != null) { |
||||
this.response.status(status.value()); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
protected Mono<Void> writeWithInternal(Publisher<? extends DataBuffer> publisher) { |
||||
return this.response.send(toByteBufs(publisher)).then(); |
||||
} |
||||
|
||||
@Override |
||||
protected Mono<Void> writeAndFlushWithInternal(Publisher<? extends Publisher<? extends DataBuffer>> publisher) { |
||||
return this.response.sendGroups(Flux.from(publisher).map(this::toByteBufs)).then(); |
||||
} |
||||
|
||||
@Override |
||||
protected void applyHeaders() { |
||||
} |
||||
|
||||
@Override |
||||
protected void applyCookies() { |
||||
for (String name : getCookies().keySet()) { |
||||
for (ResponseCookie httpCookie : getCookies().get(name)) { |
||||
Long maxAge = (!httpCookie.getMaxAge().isNegative()) ? httpCookie.getMaxAge().getSeconds() : null; |
||||
HttpSetCookie.SameSite sameSite = (httpCookie.getSameSite() != null) ? HttpSetCookie.SameSite.valueOf(httpCookie.getSameSite()) : null; |
||||
// TODO: support Partitioned attribute when available in Netty 5 API
|
||||
DefaultHttpSetCookie cookie = new DefaultHttpSetCookie(name, httpCookie.getValue(), httpCookie.getPath(), |
||||
httpCookie.getDomain(), null, maxAge, sameSite, false, httpCookie.isSecure(), httpCookie.isHttpOnly()); |
||||
this.response.addCookie(cookie); |
||||
} |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public Mono<Void> writeWith(Path file, long position, long count) { |
||||
return doCommit(() -> this.response.sendFile(file, position, count).then()); |
||||
} |
||||
|
||||
private Publisher<Buffer> toByteBufs(Publisher<? extends DataBuffer> dataBuffers) { |
||||
return dataBuffers instanceof Mono ? |
||||
Mono.from(dataBuffers).map(Netty5DataBufferFactory::toBuffer) : |
||||
Flux.from(dataBuffers).map(Netty5DataBufferFactory::toBuffer); |
||||
} |
||||
|
||||
@Override |
||||
protected void touchDataBuffer(DataBuffer buffer) { |
||||
if (logger.isDebugEnabled()) { |
||||
if (this.response instanceof ChannelOperationsId operationsId) { |
||||
DataBufferUtils.touch(buffer, "Channel id: " + operationsId.asLongText()); |
||||
} |
||||
else { |
||||
this.response.withConnection(connection -> { |
||||
ChannelId id = connection.channel().id(); |
||||
DataBufferUtils.touch(buffer, "Channel id: " + id.asShortText()); |
||||
}); |
||||
} |
||||
} |
||||
} |
||||
|
||||
} |
||||
@ -1,286 +0,0 @@
@@ -1,286 +0,0 @@
|
||||
/* |
||||
* Copyright 2002-2024 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.http.support; |
||||
|
||||
import java.util.AbstractSet; |
||||
import java.util.ArrayList; |
||||
import java.util.Collection; |
||||
import java.util.Collections; |
||||
import java.util.Iterator; |
||||
import java.util.List; |
||||
import java.util.Locale; |
||||
import java.util.Map; |
||||
import java.util.Set; |
||||
import java.util.stream.StreamSupport; |
||||
|
||||
import io.netty5.handler.codec.http.headers.HttpHeaders; |
||||
import org.jspecify.annotations.Nullable; |
||||
|
||||
import org.springframework.util.Assert; |
||||
import org.springframework.util.LinkedCaseInsensitiveMap; |
||||
import org.springframework.util.MultiValueMap; |
||||
|
||||
/** |
||||
* {@code MultiValueMap} implementation for wrapping Netty HTTP headers. |
||||
* |
||||
* @author Violeta Georgieva |
||||
* @author Simon Baslé |
||||
* @since 6.1 |
||||
*/ |
||||
public final class Netty5HeadersAdapter implements MultiValueMap<String, String> { |
||||
|
||||
private final HttpHeaders headers; |
||||
|
||||
|
||||
/** |
||||
* Create a new {@code Netty5HeadersAdapter} based on the given |
||||
* {@code HttpHeaders}. |
||||
*/ |
||||
public Netty5HeadersAdapter(HttpHeaders headers) { |
||||
Assert.notNull(headers, "Headers must not be null"); |
||||
this.headers = headers; |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public @Nullable String getFirst(String key) { |
||||
CharSequence value = this.headers.get(key); |
||||
return (value != null ? value.toString() : null); |
||||
} |
||||
|
||||
@Override |
||||
public void add(String key, @Nullable String value) { |
||||
if (value != null) { |
||||
this.headers.add(key, value); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void addAll(String key, List<? extends String> values) { |
||||
this.headers.add(key, values); |
||||
} |
||||
|
||||
@Override |
||||
public void addAll(MultiValueMap<String, String> values) { |
||||
values.forEach(this.headers::add); |
||||
} |
||||
|
||||
@Override |
||||
public void set(String key, @Nullable String value) { |
||||
if (value != null) { |
||||
this.headers.set(key, value); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void setAll(Map<String, String> values) { |
||||
values.forEach(this.headers::set); |
||||
} |
||||
|
||||
@Override |
||||
public Map<String, String> toSingleValueMap() { |
||||
Map<String, String> singleValueMap = new LinkedCaseInsensitiveMap<>( |
||||
this.headers.size(), Locale.ROOT); |
||||
this.headers.forEach(entry -> singleValueMap.putIfAbsent( |
||||
entry.getKey().toString(), entry.getValue().toString())); |
||||
return singleValueMap; |
||||
} |
||||
|
||||
@Override |
||||
public int size() { |
||||
return this.headers.names().size(); |
||||
} |
||||
|
||||
@Override |
||||
public boolean isEmpty() { |
||||
return this.headers.isEmpty(); |
||||
} |
||||
|
||||
@Override |
||||
public boolean containsKey(Object key) { |
||||
return (key instanceof String headerName && this.headers.contains(headerName)); |
||||
} |
||||
|
||||
@Override |
||||
public boolean containsValue(Object value) { |
||||
return (value instanceof String && |
||||
StreamSupport.stream(this.headers.spliterator(), false) |
||||
.anyMatch(entry -> value.equals(entry.getValue()))); |
||||
} |
||||
|
||||
@Override |
||||
public @Nullable List<String> get(Object key) { |
||||
Iterator<CharSequence> iterator = this.headers.valuesIterator((CharSequence) key); |
||||
if (iterator.hasNext()) { |
||||
List<String> result = new ArrayList<>(); |
||||
iterator.forEachRemaining(value -> result.add(value.toString())); |
||||
return result; |
||||
} |
||||
return null; |
||||
} |
||||
|
||||
@Override |
||||
public @Nullable List<String> put(String key, @Nullable List<String> value) { |
||||
List<String> previousValues = get(key); |
||||
this.headers.set(key, value); |
||||
return previousValues; |
||||
} |
||||
|
||||
@Override |
||||
public @Nullable List<String> remove(Object key) { |
||||
if (key instanceof String headerName) { |
||||
List<String> previousValues = get(headerName); |
||||
this.headers.remove(headerName); |
||||
return previousValues; |
||||
} |
||||
return null; |
||||
} |
||||
|
||||
@Override |
||||
public void putAll(Map<? extends String, ? extends List<String>> map) { |
||||
map.forEach(this.headers::set); |
||||
} |
||||
|
||||
@Override |
||||
public void clear() { |
||||
this.headers.clear(); |
||||
} |
||||
|
||||
@Override |
||||
public Set<String> keySet() { |
||||
return new HeaderNames(); |
||||
} |
||||
|
||||
@Override |
||||
public Collection<List<String>> values() { |
||||
List<List<String>> result = new ArrayList<>(this.headers.size()); |
||||
forEach((key, value) -> result.add(value)); |
||||
return result; |
||||
} |
||||
|
||||
@Override |
||||
public Set<Entry<String, List<String>>> entrySet() { |
||||
return new AbstractSet<>() { |
||||
@Override |
||||
public Iterator<Entry<String, List<String>>> iterator() { |
||||
return new EntryIterator(); |
||||
} |
||||
|
||||
@Override |
||||
public int size() { |
||||
return headers.size(); |
||||
} |
||||
}; |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public String toString() { |
||||
return org.springframework.http.HttpHeaders.formatHeaders(this); |
||||
} |
||||
|
||||
|
||||
private class EntryIterator implements Iterator<Entry<String, List<String>>> { |
||||
|
||||
private final Iterator<CharSequence> names = headers.names().iterator(); |
||||
|
||||
@Override |
||||
public boolean hasNext() { |
||||
return this.names.hasNext(); |
||||
} |
||||
|
||||
@Override |
||||
public Entry<String, List<String>> next() { |
||||
return new HeaderEntry(this.names.next()); |
||||
} |
||||
} |
||||
|
||||
|
||||
private class HeaderEntry implements Entry<String, List<String>> { |
||||
|
||||
private final CharSequence key; |
||||
|
||||
HeaderEntry(CharSequence key) { |
||||
this.key = key; |
||||
} |
||||
|
||||
@Override |
||||
public String getKey() { |
||||
return this.key.toString(); |
||||
} |
||||
|
||||
@Override |
||||
public List<String> getValue() { |
||||
List<String> values = get(this.key); |
||||
return (values != null ? values : Collections.emptyList()); |
||||
} |
||||
|
||||
@Override |
||||
public List<String> setValue(List<String> value) { |
||||
List<String> previousValues = getValue(); |
||||
headers.set(this.key, value); |
||||
return previousValues; |
||||
} |
||||
} |
||||
|
||||
private class HeaderNames extends AbstractSet<String> { |
||||
|
||||
@Override |
||||
public Iterator<String> iterator() { |
||||
return new HeaderNamesIterator(headers.names().iterator()); |
||||
} |
||||
|
||||
@Override |
||||
public int size() { |
||||
return headers.names().size(); |
||||
} |
||||
} |
||||
|
||||
private final class HeaderNamesIterator implements Iterator<String> { |
||||
|
||||
private final Iterator<CharSequence> iterator; |
||||
|
||||
private @Nullable CharSequence currentName; |
||||
|
||||
private HeaderNamesIterator(Iterator<CharSequence> iterator) { |
||||
this.iterator = iterator; |
||||
} |
||||
|
||||
@Override |
||||
public boolean hasNext() { |
||||
return this.iterator.hasNext(); |
||||
} |
||||
|
||||
@Override |
||||
public String next() { |
||||
this.currentName = this.iterator.next(); |
||||
return this.currentName.toString(); |
||||
} |
||||
|
||||
@Override |
||||
public void remove() { |
||||
if (this.currentName == null) { |
||||
throw new IllegalStateException("No current Header in iterator"); |
||||
} |
||||
if (!headers.contains(this.currentName)) { |
||||
throw new IllegalStateException("Header not present: " + this.currentName); |
||||
} |
||||
headers.remove(this.currentName); |
||||
} |
||||
} |
||||
|
||||
} |
||||
@ -1,71 +0,0 @@
@@ -1,71 +0,0 @@
|
||||
/* |
||||
* Copyright 2002-2022 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.web.testfixture.http.server.reactive.bootstrap; |
||||
|
||||
import java.net.InetSocketAddress; |
||||
import java.util.concurrent.atomic.AtomicReference; |
||||
|
||||
import reactor.netty5.DisposableServer; |
||||
|
||||
import org.springframework.http.server.reactive.ReactorNetty2HttpHandlerAdapter; |
||||
|
||||
/** |
||||
* This class is copied from {@link ReactorHttpServer}. |
||||
* |
||||
* @author Violeta Georgieva |
||||
* @since 6.0 |
||||
*/ |
||||
public class ReactorNetty2HttpServer extends AbstractHttpServer { |
||||
|
||||
private ReactorNetty2HttpHandlerAdapter reactorHandler; |
||||
|
||||
private reactor.netty5.http.server.HttpServer reactorServer; |
||||
|
||||
private AtomicReference<DisposableServer> serverRef = new AtomicReference<>(); |
||||
|
||||
|
||||
@Override |
||||
protected void initServer() { |
||||
this.reactorHandler = createHttpHandlerAdapter(); |
||||
this.reactorServer = reactor.netty5.http.server.HttpServer.create().wiretap(true) |
||||
.host(getHost()).port(getPort()); |
||||
} |
||||
|
||||
private ReactorNetty2HttpHandlerAdapter createHttpHandlerAdapter() { |
||||
return new ReactorNetty2HttpHandlerAdapter(resolveHttpHandler()); |
||||
} |
||||
|
||||
@Override |
||||
protected void startInternal() { |
||||
DisposableServer server = this.reactorServer.handle(this.reactorHandler).bind().block(); |
||||
setPort(((InetSocketAddress) server.address()).getPort()); |
||||
this.serverRef.set(server); |
||||
} |
||||
|
||||
@Override |
||||
protected void stopInternal() { |
||||
this.serverRef.get().dispose(); |
||||
} |
||||
|
||||
@Override |
||||
protected void resetInternal() { |
||||
this.reactorServer = null; |
||||
this.reactorHandler = null; |
||||
this.serverRef.set(null); |
||||
} |
||||
|
||||
} |
||||
@ -1,78 +0,0 @@
@@ -1,78 +0,0 @@
|
||||
/* |
||||
* Copyright 2002-2022 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.web.testfixture.http.server.reactive.bootstrap; |
||||
|
||||
import java.net.InetSocketAddress; |
||||
import java.util.concurrent.atomic.AtomicReference; |
||||
|
||||
import io.netty.handler.ssl.util.SelfSignedCertificate; |
||||
import reactor.netty5.DisposableServer; |
||||
import reactor.netty5.http.Http11SslContextSpec; |
||||
|
||||
import org.springframework.http.server.reactive.ReactorNetty2HttpHandlerAdapter; |
||||
|
||||
/** |
||||
* This class is copied from {@link ReactorHttpsServer}. |
||||
* |
||||
* @author Violeta Georgieva |
||||
* @since 6.0 |
||||
*/ |
||||
public class ReactorNetty2HttpsServer extends AbstractHttpServer { |
||||
|
||||
private ReactorNetty2HttpHandlerAdapter reactorHandler; |
||||
|
||||
private reactor.netty5.http.server.HttpServer reactorServer; |
||||
|
||||
private AtomicReference<DisposableServer> serverRef = new AtomicReference<>(); |
||||
|
||||
|
||||
@Override |
||||
protected void initServer() throws Exception { |
||||
SelfSignedCertificate cert = new SelfSignedCertificate(); |
||||
Http11SslContextSpec http11SslContextSpec = Http11SslContextSpec.forServer(cert.certificate(), cert.privateKey()); |
||||
|
||||
this.reactorHandler = createHttpHandlerAdapter(); |
||||
this.reactorServer = reactor.netty5.http.server.HttpServer.create() |
||||
.host(getHost()) |
||||
.port(getPort()) |
||||
.secure(sslContextSpec -> sslContextSpec.sslContext(http11SslContextSpec)); |
||||
} |
||||
|
||||
private ReactorNetty2HttpHandlerAdapter createHttpHandlerAdapter() { |
||||
return new ReactorNetty2HttpHandlerAdapter(resolveHttpHandler()); |
||||
} |
||||
|
||||
@Override |
||||
protected void startInternal() { |
||||
DisposableServer server = this.reactorServer.handle(this.reactorHandler).bind().block(); |
||||
setPort(((InetSocketAddress) server.address()).getPort()); |
||||
this.serverRef.set(server); |
||||
} |
||||
|
||||
@Override |
||||
protected void stopInternal() { |
||||
this.serverRef.get().dispose(); |
||||
} |
||||
|
||||
@Override |
||||
protected void resetInternal() { |
||||
this.reactorServer = null; |
||||
this.reactorHandler = null; |
||||
this.serverRef.set(null); |
||||
} |
||||
|
||||
} |
||||
@ -1,107 +0,0 @@
@@ -1,107 +0,0 @@
|
||||
/* |
||||
* Copyright 2002-2022 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.web.reactive.socket.adapter; |
||||
|
||||
import java.util.HashMap; |
||||
import java.util.Map; |
||||
|
||||
import io.netty5.buffer.Buffer; |
||||
import io.netty5.handler.codec.http.websocketx.BinaryWebSocketFrame; |
||||
import io.netty5.handler.codec.http.websocketx.PingWebSocketFrame; |
||||
import io.netty5.handler.codec.http.websocketx.PongWebSocketFrame; |
||||
import io.netty5.handler.codec.http.websocketx.TextWebSocketFrame; |
||||
import io.netty5.handler.codec.http.websocketx.WebSocketFrame; |
||||
|
||||
import org.springframework.core.io.buffer.DataBuffer; |
||||
import org.springframework.core.io.buffer.Netty5DataBufferFactory; |
||||
import org.springframework.util.Assert; |
||||
import org.springframework.util.ObjectUtils; |
||||
import org.springframework.web.reactive.socket.HandshakeInfo; |
||||
import org.springframework.web.reactive.socket.WebSocketMessage; |
||||
import org.springframework.web.reactive.socket.WebSocketSession; |
||||
|
||||
/** |
||||
* Base class for Netty-based {@link WebSocketSession} adapters that provides |
||||
* convenience methods to convert Netty {@link WebSocketFrame WebSocketFrames} to and from |
||||
* {@link WebSocketMessage WebSocketMessages}. |
||||
* |
||||
* <p>This class is based on {@link NettyWebSocketSessionSupport}. |
||||
* |
||||
* @author Violeta Georgieva |
||||
* @since 6.0 |
||||
* @param <T> the native delegate type |
||||
*/ |
||||
public abstract class Netty5WebSocketSessionSupport<T> extends AbstractWebSocketSession<T> { |
||||
|
||||
/** |
||||
* The default max size for inbound WebSocket frames. |
||||
*/ |
||||
public static final int DEFAULT_FRAME_MAX_SIZE = 64 * 1024; |
||||
|
||||
|
||||
private static final Map<Class<?>, WebSocketMessage.Type> messageTypes; |
||||
|
||||
static { |
||||
messageTypes = new HashMap<>(8); |
||||
messageTypes.put(TextWebSocketFrame.class, WebSocketMessage.Type.TEXT); |
||||
messageTypes.put(BinaryWebSocketFrame.class, WebSocketMessage.Type.BINARY); |
||||
messageTypes.put(PingWebSocketFrame.class, WebSocketMessage.Type.PING); |
||||
messageTypes.put(PongWebSocketFrame.class, WebSocketMessage.Type.PONG); |
||||
} |
||||
|
||||
|
||||
protected Netty5WebSocketSessionSupport(T delegate, HandshakeInfo info, Netty5DataBufferFactory factory) { |
||||
super(delegate, ObjectUtils.getIdentityHexString(delegate), info, factory); |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public Netty5DataBufferFactory bufferFactory() { |
||||
return (Netty5DataBufferFactory) super.bufferFactory(); |
||||
} |
||||
|
||||
|
||||
protected WebSocketMessage toMessage(WebSocketFrame frame) { |
||||
DataBuffer payload = bufferFactory().wrap(frame.binaryData()); |
||||
WebSocketMessage.Type messageType = messageTypes.get(frame.getClass()); |
||||
Assert.state(messageType != null, "Unexpected message type"); |
||||
return new WebSocketMessage(messageType, payload, frame); |
||||
} |
||||
|
||||
protected WebSocketFrame toFrame(WebSocketMessage message) { |
||||
if (message.getNativeMessage() != null) { |
||||
return message.getNativeMessage(); |
||||
} |
||||
Buffer buffer = Netty5DataBufferFactory.toBuffer(message.getPayload()); |
||||
if (WebSocketMessage.Type.TEXT.equals(message.getType())) { |
||||
return new TextWebSocketFrame(buffer); |
||||
} |
||||
else if (WebSocketMessage.Type.BINARY.equals(message.getType())) { |
||||
return new BinaryWebSocketFrame(buffer); |
||||
} |
||||
else if (WebSocketMessage.Type.PING.equals(message.getType())) { |
||||
return new PingWebSocketFrame(buffer); |
||||
} |
||||
else if (WebSocketMessage.Type.PONG.equals(message.getType())) { |
||||
return new PongWebSocketFrame(buffer); |
||||
} |
||||
else { |
||||
throw new IllegalArgumentException("Unexpected message type: " + message.getType()); |
||||
} |
||||
} |
||||
|
||||
} |
||||
@ -1,173 +0,0 @@
@@ -1,173 +0,0 @@
|
||||
/* |
||||
* Copyright 2002-2022 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.web.reactive.socket.adapter; |
||||
|
||||
import java.util.function.Consumer; |
||||
|
||||
import io.netty5.channel.ChannelId; |
||||
import io.netty5.handler.codec.http.websocketx.WebSocketFrame; |
||||
import org.reactivestreams.Publisher; |
||||
import reactor.core.publisher.Flux; |
||||
import reactor.core.publisher.Mono; |
||||
import reactor.netty5.Connection; |
||||
import reactor.netty5.NettyInbound; |
||||
import reactor.netty5.NettyOutbound; |
||||
import reactor.netty5.channel.ChannelOperations; |
||||
import reactor.netty5.http.websocket.WebsocketInbound; |
||||
import reactor.netty5.http.websocket.WebsocketOutbound; |
||||
|
||||
import org.springframework.core.io.buffer.Netty5DataBufferFactory; |
||||
import org.springframework.web.reactive.socket.CloseStatus; |
||||
import org.springframework.web.reactive.socket.HandshakeInfo; |
||||
import org.springframework.web.reactive.socket.WebSocketMessage; |
||||
import org.springframework.web.reactive.socket.WebSocketSession; |
||||
|
||||
/** |
||||
* {@link WebSocketSession} implementation for use with the Reactor Netty's (Netty 5) |
||||
* {@link NettyInbound} and {@link NettyOutbound}. |
||||
* This class is based on {@link ReactorNettyWebSocketSession}. |
||||
* |
||||
* @author Violeta Georgieva |
||||
* @since 6.0 |
||||
*/ |
||||
public class ReactorNetty2WebSocketSession |
||||
extends Netty5WebSocketSessionSupport<ReactorNetty2WebSocketSession.WebSocketConnection> { |
||||
|
||||
private final int maxFramePayloadLength; |
||||
|
||||
private final ChannelId channelId; |
||||
|
||||
|
||||
/** |
||||
* Constructor for the session, using the {@link #DEFAULT_FRAME_MAX_SIZE} value. |
||||
*/ |
||||
public ReactorNetty2WebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound, |
||||
HandshakeInfo info, Netty5DataBufferFactory bufferFactory) { |
||||
|
||||
this(inbound, outbound, info, bufferFactory, DEFAULT_FRAME_MAX_SIZE); |
||||
} |
||||
|
||||
/** |
||||
* Constructor with an additional maxFramePayloadLength argument. |
||||
* @since 5.1 |
||||
*/ |
||||
@SuppressWarnings("rawtypes") |
||||
public ReactorNetty2WebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound, |
||||
HandshakeInfo info, Netty5DataBufferFactory bufferFactory, |
||||
int maxFramePayloadLength) { |
||||
|
||||
super(new WebSocketConnection(inbound, outbound), info, bufferFactory); |
||||
this.maxFramePayloadLength = maxFramePayloadLength; |
||||
this.channelId = ((ChannelOperations) inbound).channel().id(); |
||||
} |
||||
|
||||
|
||||
/** |
||||
* Return the id of the underlying Netty channel. |
||||
* @since 5.3.4 |
||||
*/ |
||||
public ChannelId getChannelId() { |
||||
return this.channelId; |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public Flux<WebSocketMessage> receive() { |
||||
return getDelegate().getInbound() |
||||
.aggregateFrames(this.maxFramePayloadLength) |
||||
.receiveFrames() |
||||
.map(super::toMessage) |
||||
.doOnNext(message -> { |
||||
if (logger.isTraceEnabled()) { |
||||
logger.trace(getLogPrefix() + "Received " + message); |
||||
} |
||||
}); |
||||
} |
||||
|
||||
@Override |
||||
public Mono<Void> send(Publisher<WebSocketMessage> messages) { |
||||
Flux<WebSocketFrame> frames = Flux.from(messages) |
||||
.doOnNext(message -> { |
||||
if (logger.isTraceEnabled()) { |
||||
logger.trace(getLogPrefix() + "Sending " + message); |
||||
} |
||||
}) |
||||
.map(this::toFrame); |
||||
return getDelegate().getOutbound() |
||||
.sendObject(frames) |
||||
.then(); |
||||
} |
||||
|
||||
@Override |
||||
public boolean isOpen() { |
||||
DisposedCallback callback = new DisposedCallback(); |
||||
getDelegate().getInbound().withConnection(callback); |
||||
return !callback.isDisposed(); |
||||
} |
||||
|
||||
@Override |
||||
public Mono<Void> close(CloseStatus status) { |
||||
// this will notify WebSocketInbound.receiveCloseStatus()
|
||||
return getDelegate().getOutbound().sendClose(status.getCode(), status.getReason()); |
||||
} |
||||
|
||||
@Override |
||||
public Mono<CloseStatus> closeStatus() { |
||||
return getDelegate().getInbound().receiveCloseStatus() |
||||
.map(status -> CloseStatus.create(status.code(), status.reasonText())); |
||||
} |
||||
|
||||
/** |
||||
* Simple container for {@link NettyInbound} and {@link NettyOutbound}. |
||||
*/ |
||||
public static class WebSocketConnection { |
||||
|
||||
private final WebsocketInbound inbound; |
||||
|
||||
private final WebsocketOutbound outbound; |
||||
|
||||
|
||||
public WebSocketConnection(WebsocketInbound inbound, WebsocketOutbound outbound) { |
||||
this.inbound = inbound; |
||||
this.outbound = outbound; |
||||
} |
||||
|
||||
public WebsocketInbound getInbound() { |
||||
return this.inbound; |
||||
} |
||||
|
||||
public WebsocketOutbound getOutbound() { |
||||
return this.outbound; |
||||
} |
||||
} |
||||
|
||||
|
||||
private static class DisposedCallback implements Consumer<Connection> { |
||||
|
||||
private boolean disposed; |
||||
|
||||
public boolean isDisposed() { |
||||
return this.disposed; |
||||
} |
||||
|
||||
@Override |
||||
public void accept(Connection connection) { |
||||
this.disposed = connection.isDisposed(); |
||||
} |
||||
} |
||||
|
||||
} |
||||
@ -1,159 +0,0 @@
@@ -1,159 +0,0 @@
|
||||
/* |
||||
* Copyright 2002-2022 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.web.reactive.socket.client; |
||||
|
||||
import java.net.URI; |
||||
import java.util.function.Supplier; |
||||
|
||||
import org.apache.commons.logging.Log; |
||||
import org.apache.commons.logging.LogFactory; |
||||
import org.jspecify.annotations.Nullable; |
||||
import reactor.core.publisher.Mono; |
||||
import reactor.netty5.http.client.HttpClient; |
||||
import reactor.netty5.http.client.WebsocketClientSpec; |
||||
import reactor.netty5.http.websocket.WebsocketInbound; |
||||
|
||||
import org.springframework.core.io.buffer.Netty5DataBufferFactory; |
||||
import org.springframework.http.HttpHeaders; |
||||
import org.springframework.util.Assert; |
||||
import org.springframework.util.StringUtils; |
||||
import org.springframework.web.reactive.socket.HandshakeInfo; |
||||
import org.springframework.web.reactive.socket.WebSocketHandler; |
||||
import org.springframework.web.reactive.socket.WebSocketSession; |
||||
import org.springframework.web.reactive.socket.adapter.ReactorNetty2WebSocketSession; |
||||
|
||||
/** |
||||
* {@link WebSocketClient} implementation for use with Reactor Netty for Netty 5. |
||||
* |
||||
* <p>This class is based on {@link ReactorNettyWebSocketClient}. |
||||
* |
||||
* @author Violeta Georgieva |
||||
* @since 6.0 |
||||
*/ |
||||
public class ReactorNetty2WebSocketClient implements WebSocketClient { |
||||
|
||||
private static final Log logger = LogFactory.getLog(ReactorNetty2WebSocketClient.class); |
||||
|
||||
|
||||
private final HttpClient httpClient; |
||||
|
||||
private final Supplier<WebsocketClientSpec.Builder> specBuilderSupplier; |
||||
|
||||
private @Nullable Boolean handlePing; |
||||
|
||||
|
||||
/** |
||||
* Default constructor. |
||||
*/ |
||||
public ReactorNetty2WebSocketClient() { |
||||
this(HttpClient.create()); |
||||
} |
||||
|
||||
/** |
||||
* Constructor that accepts an existing {@link HttpClient} builder |
||||
* with a default {@link WebsocketClientSpec.Builder}. |
||||
* @since 5.1 |
||||
*/ |
||||
public ReactorNetty2WebSocketClient(HttpClient httpClient) { |
||||
this(httpClient, WebsocketClientSpec.builder()); |
||||
} |
||||
|
||||
/** |
||||
* Constructor that accepts an existing {@link HttpClient} builder |
||||
* and a pre-configured {@link WebsocketClientSpec.Builder}. |
||||
*/ |
||||
public ReactorNetty2WebSocketClient( |
||||
HttpClient httpClient, Supplier<WebsocketClientSpec.Builder> builderSupplier) { |
||||
|
||||
Assert.notNull(httpClient, "HttpClient is required"); |
||||
Assert.notNull(builderSupplier, "WebsocketClientSpec.Builder is required"); |
||||
this.httpClient = httpClient; |
||||
this.specBuilderSupplier = builderSupplier; |
||||
} |
||||
|
||||
|
||||
/** |
||||
* Return the configured {@link HttpClient}. |
||||
*/ |
||||
public HttpClient getHttpClient() { |
||||
return this.httpClient; |
||||
} |
||||
|
||||
/** |
||||
* Build an instance of {@code WebsocketClientSpec} that reflects the current |
||||
* configuration. This can be used to check the configured parameters except |
||||
* for sub-protocols which depend on the {@link WebSocketHandler} that is used |
||||
* for a given upgrade. |
||||
*/ |
||||
public WebsocketClientSpec getWebsocketClientSpec() { |
||||
return buildSpec(null); |
||||
} |
||||
|
||||
private WebsocketClientSpec buildSpec(@Nullable String protocols) { |
||||
WebsocketClientSpec.Builder builder = this.specBuilderSupplier.get(); |
||||
if (StringUtils.hasText(protocols)) { |
||||
builder.protocols(protocols); |
||||
} |
||||
return builder.build(); |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public Mono<Void> execute(URI url, WebSocketHandler handler) { |
||||
return execute(url, new HttpHeaders(), handler); |
||||
} |
||||
|
||||
@Override |
||||
public Mono<Void> execute(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) { |
||||
String protocols = StringUtils.collectionToCommaDelimitedString(handler.getSubProtocols()); |
||||
WebsocketClientSpec clientSpec = buildSpec(protocols); |
||||
return getHttpClient() |
||||
.headers(nettyHeaders -> setNettyHeaders(requestHeaders, nettyHeaders)) |
||||
.websocket(clientSpec) |
||||
.uri(url.toString()) |
||||
.handle((inbound, outbound) -> { |
||||
HttpHeaders responseHeaders = toHttpHeaders(inbound); |
||||
String protocol = responseHeaders.getFirst("Sec-WebSocket-Protocol"); |
||||
HandshakeInfo info = new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol); |
||||
Netty5DataBufferFactory factory = new Netty5DataBufferFactory(outbound.alloc()); |
||||
WebSocketSession session = new ReactorNetty2WebSocketSession( |
||||
inbound, outbound, info, factory, clientSpec.maxFramePayloadLength()); |
||||
if (logger.isDebugEnabled()) { |
||||
logger.debug("Started session '" + session.getId() + "' for " + url); |
||||
} |
||||
return handler.handle(session).checkpoint(url + " [ReactorNetty2WebSocketClient]"); |
||||
}) |
||||
.doOnRequest(n -> { |
||||
if (logger.isDebugEnabled()) { |
||||
logger.debug("Connecting to " + url); |
||||
} |
||||
}) |
||||
.next(); |
||||
} |
||||
|
||||
private void setNettyHeaders(HttpHeaders httpHeaders, io.netty5.handler.codec.http.headers.HttpHeaders nettyHeaders) { |
||||
httpHeaders.forEach(nettyHeaders::set); |
||||
} |
||||
|
||||
private HttpHeaders toHttpHeaders(WebsocketInbound inbound) { |
||||
HttpHeaders headers = new HttpHeaders(); |
||||
inbound.headers().iterator().forEachRemaining(entry -> |
||||
headers.add(entry.getKey().toString(), entry.getValue().toString())); |
||||
return headers; |
||||
} |
||||
|
||||
} |
||||
@ -1,113 +0,0 @@
@@ -1,113 +0,0 @@
|
||||
/* |
||||
* Copyright 2002-2023 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.web.reactive.socket.server.upgrade; |
||||
|
||||
import java.net.URI; |
||||
import java.util.function.Supplier; |
||||
|
||||
import org.jspecify.annotations.Nullable; |
||||
import reactor.core.publisher.Mono; |
||||
import reactor.netty5.http.server.HttpServerResponse; |
||||
import reactor.netty5.http.server.WebsocketServerSpec; |
||||
|
||||
import org.springframework.core.io.buffer.Netty5DataBufferFactory; |
||||
import org.springframework.http.server.reactive.ServerHttpResponse; |
||||
import org.springframework.http.server.reactive.ServerHttpResponseDecorator; |
||||
import org.springframework.util.Assert; |
||||
import org.springframework.web.reactive.socket.HandshakeInfo; |
||||
import org.springframework.web.reactive.socket.WebSocketHandler; |
||||
import org.springframework.web.reactive.socket.adapter.ReactorNetty2WebSocketSession; |
||||
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy; |
||||
import org.springframework.web.server.ServerWebExchange; |
||||
|
||||
/** |
||||
* A WebSocket {@code RequestUpgradeStrategy} for Reactor Netty for Netty 5. |
||||
* |
||||
* <p>This class is based on {@link ReactorNettyRequestUpgradeStrategy}. |
||||
* |
||||
* @author Violeta Georgieva |
||||
* @since 6.0 |
||||
*/ |
||||
public class ReactorNetty2RequestUpgradeStrategy implements RequestUpgradeStrategy { |
||||
|
||||
private final Supplier<WebsocketServerSpec.Builder> specBuilderSupplier; |
||||
|
||||
|
||||
/** |
||||
* Create an instances with a default {@link WebsocketServerSpec.Builder}. |
||||
* @since 5.2.6 |
||||
*/ |
||||
public ReactorNetty2RequestUpgradeStrategy() { |
||||
this(WebsocketServerSpec::builder); |
||||
} |
||||
|
||||
|
||||
/** |
||||
* Create an instance with a pre-configured {@link WebsocketServerSpec.Builder} |
||||
* to use for WebSocket upgrades. |
||||
* @since 5.2.6 |
||||
*/ |
||||
public ReactorNetty2RequestUpgradeStrategy(Supplier<WebsocketServerSpec.Builder> builderSupplier) { |
||||
Assert.notNull(builderSupplier, "WebsocketServerSpec.Builder is required"); |
||||
this.specBuilderSupplier = builderSupplier; |
||||
} |
||||
|
||||
|
||||
/** |
||||
* Build an instance of {@code WebsocketServerSpec} that reflects the current |
||||
* configuration. This can be used to check the configured parameters except |
||||
* for sub-protocols which depend on the {@link WebSocketHandler} that is used |
||||
* for a given upgrade. |
||||
* @since 5.2.6 |
||||
*/ |
||||
public WebsocketServerSpec getWebsocketServerSpec() { |
||||
return buildSpec(null); |
||||
} |
||||
|
||||
WebsocketServerSpec buildSpec(@Nullable String subProtocol) { |
||||
WebsocketServerSpec.Builder builder = this.specBuilderSupplier.get(); |
||||
if (subProtocol != null) { |
||||
builder.protocols(subProtocol); |
||||
} |
||||
return builder.build(); |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler, |
||||
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) { |
||||
|
||||
ServerHttpResponse response = exchange.getResponse(); |
||||
HttpServerResponse reactorResponse = ServerHttpResponseDecorator.getNativeResponse(response); |
||||
HandshakeInfo handshakeInfo = handshakeInfoFactory.get(); |
||||
Netty5DataBufferFactory bufferFactory = (Netty5DataBufferFactory) response.bufferFactory(); |
||||
URI uri = exchange.getRequest().getURI(); |
||||
|
||||
// Trigger WebFlux preCommit actions and upgrade
|
||||
return response.setComplete() |
||||
.then(Mono.defer(() -> { |
||||
WebsocketServerSpec spec = buildSpec(subProtocol); |
||||
return reactorResponse.sendWebsocket((in, out) -> { |
||||
ReactorNetty2WebSocketSession session = |
||||
new ReactorNetty2WebSocketSession( |
||||
in, out, handshakeInfo, bufferFactory, spec.maxFramePayloadLength()); |
||||
return handler.handle(session).checkpoint(uri + " [ReactorNetty2RequestUpgradeStrategy]"); |
||||
}, spec); |
||||
})); |
||||
} |
||||
|
||||
} |
||||
Loading…
Reference in new issue