Browse Source
Move from AsyncInputStream handling to Publisher for GridFS. UUID types require additional configuration setup to prevent errors while processing legacy (type 3) binary types. We still use type 3 as default but allow codec configuration for type 4 via Java and XML configuration. Updated migration guide. Original pull request: #823.pull/830/head
30 changed files with 579 additions and 1162 deletions
@ -0,0 +1,77 @@
@@ -0,0 +1,77 @@
|
||||
/* |
||||
* Copyright 2020 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.data.mongodb; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
import org.springframework.data.util.Version; |
||||
import org.springframework.util.StringUtils; |
||||
|
||||
import com.mongodb.MongoDriverInformation; |
||||
|
||||
/** |
||||
* Class that exposes the SpringData MongoDB specific information like the current {@link Version} or |
||||
* {@link MongoDriverInformation driver information}. |
||||
* |
||||
* @author Christoph Strobl |
||||
* @since 3.0 |
||||
*/ |
||||
public class SpringDataMongoDB { |
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(SpringDataMongoDB.class); |
||||
|
||||
private static final Version FALLBACK_VERSION = new Version(3); |
||||
private static final MongoDriverInformation DRIVER_INFORMATION = MongoDriverInformation |
||||
.builder(MongoDriverInformation.builder().build()).driverName("spring-data").build(); |
||||
|
||||
/** |
||||
* Obtain the SpringData MongoDB specific driver information. |
||||
* |
||||
* @return never {@literal null}. |
||||
*/ |
||||
public static MongoDriverInformation driverInformation() { |
||||
return DRIVER_INFORMATION; |
||||
} |
||||
|
||||
/** |
||||
* Fetches the "Implementation-Version" manifest attribute from the jar file. |
||||
* <p /> |
||||
* Note that some ClassLoaders do not expose the package metadata, hence this class might not be able to determine the |
||||
* version in all environments. In this case the current Major version is returned as a fallback. |
||||
* |
||||
* @return never {@literal null}. |
||||
*/ |
||||
public static Version version() { |
||||
|
||||
Package pkg = SpringDataMongoDB.class.getPackage(); |
||||
String versionString = (pkg != null ? pkg.getImplementationVersion() : null); |
||||
|
||||
if (!StringUtils.hasText(versionString)) { |
||||
|
||||
LOGGER.debug("Unable to find Spring Data MongoDB version."); |
||||
return FALLBACK_VERSION; |
||||
} |
||||
|
||||
try { |
||||
return Version.parse(versionString); |
||||
} catch (Exception e) { |
||||
LOGGER.debug("Cannot read Spring Data MongoDB version '{}'.", versionString); |
||||
} |
||||
|
||||
return FALLBACK_VERSION; |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,45 @@
@@ -0,0 +1,45 @@
|
||||
/* |
||||
* Copyright 2020 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.data.mongodb.config; |
||||
|
||||
import java.beans.PropertyEditorSupport; |
||||
|
||||
import org.bson.UuidRepresentation; |
||||
import org.springframework.lang.Nullable; |
||||
import org.springframework.util.StringUtils; |
||||
|
||||
/** |
||||
* Parse a {@link String} to a {@link UuidRepresentation}. |
||||
* |
||||
* @author Christoph Strobl |
||||
* @since 3.0 |
||||
*/ |
||||
public class UUidRepresentationPropertyEditor extends PropertyEditorSupport { |
||||
|
||||
/* |
||||
* (non-Javadoc) |
||||
* @see java.beans.PropertyEditorSupport#setAsText(java.lang.String) |
||||
*/ |
||||
@Override |
||||
public void setAsText(@Nullable String value) { |
||||
|
||||
if (!StringUtils.hasText(value)) { |
||||
return; |
||||
} |
||||
|
||||
setValue(UuidRepresentation.valueOf(value)); |
||||
} |
||||
} |
||||
@ -1,361 +0,0 @@
@@ -1,361 +0,0 @@
|
||||
/* |
||||
* Copyright 2019-2020 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.data.mongodb.gridfs; |
||||
|
||||
import lombok.RequiredArgsConstructor; |
||||
import reactor.core.CoreSubscriber; |
||||
import reactor.core.publisher.Flux; |
||||
import reactor.core.publisher.FluxSink; |
||||
import reactor.core.publisher.Mono; |
||||
import reactor.core.publisher.Operators; |
||||
import reactor.util.concurrent.Queues; |
||||
import reactor.util.context.Context; |
||||
|
||||
import java.nio.ByteBuffer; |
||||
import java.util.Queue; |
||||
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; |
||||
import java.util.concurrent.atomic.AtomicLongFieldUpdater; |
||||
|
||||
import org.reactivestreams.Publisher; |
||||
import org.reactivestreams.Subscription; |
||||
import org.springframework.core.io.buffer.DataBuffer; |
||||
import org.springframework.core.io.buffer.DataBufferUtils; |
||||
|
||||
import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; |
||||
|
||||
/** |
||||
* Adapter accepting a binary stream {@link Publisher} and emitting its through {@link AsyncInputStream}. |
||||
* <p> |
||||
* This adapter subscribes to the binary {@link Publisher} as soon as the first chunk gets {@link #read(ByteBuffer) |
||||
* requested}. Requests are queued and binary chunks are requested from the {@link Publisher}. As soon as the |
||||
* {@link Publisher} emits items, chunks are provided to the read request which completes the number-of-written-bytes |
||||
* {@link Publisher}. |
||||
* <p> |
||||
* {@link AsyncInputStream} is supposed to work as sequential callback API that is called until reaching EOF. |
||||
* {@link #close()} is propagated as cancellation signal to the binary {@link Publisher}. |
||||
* |
||||
* @author Mark Paluch |
||||
* @author Christoph Strobl |
||||
* @since 2.2 |
||||
*/ |
||||
@RequiredArgsConstructor |
||||
class AsyncInputStreamAdapter implements AsyncInputStream { |
||||
|
||||
private static final AtomicLongFieldUpdater<AsyncInputStreamAdapter> DEMAND = AtomicLongFieldUpdater |
||||
.newUpdater(AsyncInputStreamAdapter.class, "demand"); |
||||
|
||||
private static final AtomicIntegerFieldUpdater<AsyncInputStreamAdapter> SUBSCRIBED = AtomicIntegerFieldUpdater |
||||
.newUpdater(AsyncInputStreamAdapter.class, "subscribed"); |
||||
|
||||
private static final int SUBSCRIPTION_NOT_SUBSCRIBED = 0; |
||||
private static final int SUBSCRIPTION_SUBSCRIBED = 1; |
||||
|
||||
private final Publisher<? extends DataBuffer> buffers; |
||||
private final Context subscriberContext; |
||||
|
||||
private volatile Subscription subscription; |
||||
private volatile boolean cancelled; |
||||
private volatile boolean allDataBuffersReceived; |
||||
private volatile Throwable error; |
||||
private final Queue<ReadRequest> readRequests = Queues.<ReadRequest> small().get(); |
||||
|
||||
private final Queue<DataBuffer> bufferQueue = Queues.<DataBuffer> small().get(); |
||||
|
||||
// see DEMAND
|
||||
volatile long demand; |
||||
|
||||
// see SUBSCRIBED
|
||||
volatile int subscribed = SUBSCRIPTION_NOT_SUBSCRIBED; |
||||
|
||||
/* |
||||
* (non-Javadoc) |
||||
* @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#read(java.nio.ByteBuffer) |
||||
*/ |
||||
@Override |
||||
public Publisher<Integer> read(ByteBuffer dst) { |
||||
|
||||
return Flux.create(sink -> { |
||||
|
||||
readRequests.offer(new ReadRequest(sink, dst)); |
||||
|
||||
sink.onCancel(this::terminatePendingReads); |
||||
sink.onDispose(this::terminatePendingReads); |
||||
sink.onRequest(this::request); |
||||
}); |
||||
} |
||||
|
||||
void onError(FluxSink<Integer> sink, Throwable e) { |
||||
|
||||
readRequests.poll(); |
||||
sink.error(e); |
||||
} |
||||
|
||||
void onComplete(FluxSink<Integer> sink, int writtenBytes) { |
||||
|
||||
readRequests.poll(); |
||||
DEMAND.decrementAndGet(this); |
||||
sink.next(writtenBytes); |
||||
sink.complete(); |
||||
} |
||||
|
||||
/* |
||||
* (non-Javadoc) |
||||
* @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#skip(long) |
||||
*/ |
||||
@Override |
||||
public Publisher<Long> skip(long bytesToSkip) { |
||||
throw new UnsupportedOperationException("Skip is currently not implemented"); |
||||
} |
||||
|
||||
/* |
||||
* (non-Javadoc) |
||||
* @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#close() |
||||
*/ |
||||
@Override |
||||
public Publisher<Void> close() { |
||||
|
||||
return Mono.create(sink -> { |
||||
|
||||
cancelled = true; |
||||
|
||||
if (error != null) { |
||||
terminatePendingReads(); |
||||
sink.error(error); |
||||
return; |
||||
} |
||||
|
||||
terminatePendingReads(); |
||||
sink.success(); |
||||
}); |
||||
} |
||||
|
||||
protected void request(long n) { |
||||
|
||||
if (allDataBuffersReceived && bufferQueue.isEmpty()) { |
||||
|
||||
terminatePendingReads(); |
||||
return; |
||||
} |
||||
|
||||
Operators.addCap(DEMAND, this, n); |
||||
|
||||
if (SUBSCRIBED.get(this) == SUBSCRIPTION_NOT_SUBSCRIBED) { |
||||
|
||||
if (SUBSCRIBED.compareAndSet(this, SUBSCRIPTION_NOT_SUBSCRIBED, SUBSCRIPTION_SUBSCRIBED)) { |
||||
buffers.subscribe(new DataBufferCoreSubscriber()); |
||||
} |
||||
|
||||
} else { |
||||
|
||||
Subscription subscription = this.subscription; |
||||
|
||||
if (subscription != null) { |
||||
requestFromSubscription(subscription); |
||||
} |
||||
} |
||||
|
||||
} |
||||
|
||||
void requestFromSubscription(Subscription subscription) { |
||||
|
||||
if (cancelled) { |
||||
subscription.cancel(); |
||||
} |
||||
|
||||
drainLoop(); |
||||
} |
||||
|
||||
void drainLoop() { |
||||
|
||||
while (DEMAND.get(AsyncInputStreamAdapter.this) > 0) { |
||||
|
||||
DataBuffer wip = bufferQueue.peek(); |
||||
|
||||
if (wip == null) { |
||||
break; |
||||
} |
||||
|
||||
if (wip.readableByteCount() == 0) { |
||||
bufferQueue.poll(); |
||||
continue; |
||||
} |
||||
|
||||
ReadRequest consumer = AsyncInputStreamAdapter.this.readRequests.peek(); |
||||
if (consumer == null) { |
||||
break; |
||||
} |
||||
|
||||
consumer.transferBytes(wip, wip.readableByteCount()); |
||||
} |
||||
|
||||
if (bufferQueue.isEmpty()) { |
||||
|
||||
if (allDataBuffersReceived) { |
||||
terminatePendingReads(); |
||||
return; |
||||
} |
||||
|
||||
if (demand > 0) { |
||||
subscription.request(1); |
||||
} |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Terminates pending reads with empty buffers. |
||||
*/ |
||||
void terminatePendingReads() { |
||||
|
||||
ReadRequest readers; |
||||
|
||||
while ((readers = readRequests.poll()) != null) { |
||||
readers.onComplete(); |
||||
} |
||||
} |
||||
|
||||
private class DataBufferCoreSubscriber implements CoreSubscriber<DataBuffer> { |
||||
|
||||
@Override |
||||
public Context currentContext() { |
||||
return AsyncInputStreamAdapter.this.subscriberContext; |
||||
} |
||||
|
||||
@Override |
||||
public void onSubscribe(Subscription s) { |
||||
|
||||
AsyncInputStreamAdapter.this.subscription = s; |
||||
s.request(1); |
||||
} |
||||
|
||||
@Override |
||||
public void onNext(DataBuffer dataBuffer) { |
||||
|
||||
if (cancelled || allDataBuffersReceived) { |
||||
DataBufferUtils.release(dataBuffer); |
||||
Operators.onNextDropped(dataBuffer, AsyncInputStreamAdapter.this.subscriberContext); |
||||
return; |
||||
} |
||||
|
||||
ReadRequest readRequest = AsyncInputStreamAdapter.this.readRequests.peek(); |
||||
|
||||
if (readRequest == null) { |
||||
|
||||
DataBufferUtils.release(dataBuffer); |
||||
Operators.onNextDropped(dataBuffer, AsyncInputStreamAdapter.this.subscriberContext); |
||||
subscription.cancel(); |
||||
return; |
||||
} |
||||
|
||||
bufferQueue.offer(dataBuffer); |
||||
|
||||
drainLoop(); |
||||
} |
||||
|
||||
@Override |
||||
public void onError(Throwable t) { |
||||
|
||||
if (AsyncInputStreamAdapter.this.cancelled || AsyncInputStreamAdapter.this.allDataBuffersReceived) { |
||||
Operators.onErrorDropped(t, AsyncInputStreamAdapter.this.subscriberContext); |
||||
return; |
||||
} |
||||
|
||||
AsyncInputStreamAdapter.this.error = t; |
||||
AsyncInputStreamAdapter.this.allDataBuffersReceived = true; |
||||
terminatePendingReads(); |
||||
} |
||||
|
||||
@Override |
||||
public void onComplete() { |
||||
|
||||
AsyncInputStreamAdapter.this.allDataBuffersReceived = true; |
||||
if (bufferQueue.isEmpty()) { |
||||
terminatePendingReads(); |
||||
} |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Request to read bytes and transfer these to the associated {@link ByteBuffer}. |
||||
*/ |
||||
class ReadRequest { |
||||
|
||||
private final FluxSink<Integer> sink; |
||||
private final ByteBuffer dst; |
||||
|
||||
private int writtenBytes; |
||||
|
||||
ReadRequest(FluxSink<Integer> sink, ByteBuffer dst) { |
||||
this.sink = sink; |
||||
this.dst = dst; |
||||
this.writtenBytes = -1; |
||||
} |
||||
|
||||
public void onComplete() { |
||||
|
||||
if (error != null) { |
||||
AsyncInputStreamAdapter.this.onError(sink, error); |
||||
return; |
||||
} |
||||
|
||||
AsyncInputStreamAdapter.this.onComplete(sink, writtenBytes); |
||||
} |
||||
|
||||
public void transferBytes(DataBuffer db, int bytes) { |
||||
|
||||
try { |
||||
|
||||
if (error != null) { |
||||
AsyncInputStreamAdapter.this.onError(sink, error); |
||||
return; |
||||
} |
||||
|
||||
ByteBuffer byteBuffer = db.asByteBuffer(); |
||||
int remaining = byteBuffer.remaining(); |
||||
int writeCapacity = Math.min(dst.remaining(), remaining); |
||||
int limit = Math.min(byteBuffer.position() + writeCapacity, byteBuffer.capacity()); |
||||
int toWrite = limit - byteBuffer.position(); |
||||
|
||||
if (toWrite == 0) { |
||||
|
||||
AsyncInputStreamAdapter.this.onComplete(sink, writtenBytes); |
||||
return; |
||||
} |
||||
|
||||
int oldPosition = byteBuffer.position(); |
||||
|
||||
byteBuffer.limit(toWrite); |
||||
dst.put(byteBuffer); |
||||
byteBuffer.limit(byteBuffer.capacity()); |
||||
byteBuffer.position(oldPosition); |
||||
db.readPosition(db.readPosition() + toWrite); |
||||
|
||||
if (writtenBytes == -1) { |
||||
writtenBytes = bytes; |
||||
} else { |
||||
writtenBytes += bytes; |
||||
} |
||||
|
||||
} catch (Exception e) { |
||||
AsyncInputStreamAdapter.this.onError(sink, e); |
||||
} finally { |
||||
|
||||
if (db.readableByteCount() == 0) { |
||||
DataBufferUtils.release(db); |
||||
} |
||||
} |
||||
} |
||||
} |
||||
} |
||||
@ -1,80 +0,0 @@
@@ -1,80 +0,0 @@
|
||||
/* |
||||
* Copyright 2019-2020 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.data.mongodb.gridfs; |
||||
|
||||
import reactor.core.publisher.Flux; |
||||
import reactor.core.publisher.Mono; |
||||
|
||||
import org.reactivestreams.Publisher; |
||||
import org.springframework.core.io.buffer.DataBuffer; |
||||
import org.springframework.core.io.buffer.DataBufferFactory; |
||||
import org.springframework.core.io.buffer.DataBufferUtils; |
||||
|
||||
import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; |
||||
|
||||
/** |
||||
* Utility methods to create adapters from between {@link Publisher} of {@link DataBuffer} and {@link AsyncInputStream}. |
||||
* |
||||
* @author Mark Paluch |
||||
* @since 2.2 |
||||
*/ |
||||
class BinaryStreamAdapters { |
||||
|
||||
/** |
||||
* Creates a {@link Flux} emitting {@link DataBuffer} by reading binary chunks from {@link AsyncInputStream}. |
||||
* Publisher termination (completion, error, cancellation) closes the {@link AsyncInputStream}. |
||||
* <p/> |
||||
* The resulting {@link org.reactivestreams.Publisher} filters empty binary chunks and uses {@link DataBufferFactory} |
||||
* settings to determine the chunk size. |
||||
* |
||||
* @param inputStream must not be {@literal null}. |
||||
* @param dataBufferFactory must not be {@literal null}. |
||||
* @param bufferSize read {@code n} bytes per iteration. |
||||
* @return {@link Flux} emitting {@link DataBuffer}s. |
||||
* @see DataBufferFactory#allocateBuffer() |
||||
*/ |
||||
static Flux<DataBuffer> toPublisher(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory, |
||||
int bufferSize) { |
||||
|
||||
return DataBufferPublisherAdapter.createBinaryStream(inputStream, dataBufferFactory, bufferSize) //
|
||||
.filter(it -> { |
||||
|
||||
if (it.readableByteCount() == 0) { |
||||
DataBufferUtils.release(it); |
||||
return false; |
||||
} |
||||
return true; |
||||
}); |
||||
} |
||||
|
||||
/** |
||||
* Creates a {@link Mono} emitting a {@link AsyncInputStream} to consume a {@link Publisher} emitting |
||||
* {@link DataBuffer} and exposing the binary stream through {@link AsyncInputStream}. {@link DataBuffer}s are |
||||
* released by the adapter during consumption. |
||||
* <p/> |
||||
* This method returns a {@link Mono} to retain the {@link reactor.util.context.Context subscriber context}. |
||||
* |
||||
* @param dataBuffers must not be {@literal null}. |
||||
* @return {@link Mono} emitting {@link AsyncInputStream}. |
||||
* @see DataBufferUtils#release(DataBuffer) |
||||
*/ |
||||
static Mono<AsyncInputStream> toAsyncInputStream(Publisher<? extends DataBuffer> dataBuffers) { |
||||
|
||||
return Mono.create(sink -> { |
||||
sink.success(new AsyncInputStreamAdapter(dataBuffers, sink.currentContext())); |
||||
}); |
||||
} |
||||
} |
||||
@ -1,340 +0,0 @@
@@ -1,340 +0,0 @@
|
||||
/* |
||||
* Copyright 2019-2020 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.data.mongodb.gridfs; |
||||
|
||||
import lombok.RequiredArgsConstructor; |
||||
import reactor.core.CoreSubscriber; |
||||
import reactor.core.publisher.Flux; |
||||
import reactor.core.publisher.FluxSink; |
||||
import reactor.core.publisher.Mono; |
||||
import reactor.core.publisher.Operators; |
||||
import reactor.util.context.Context; |
||||
|
||||
import java.nio.ByteBuffer; |
||||
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; |
||||
import java.util.concurrent.atomic.AtomicLongFieldUpdater; |
||||
|
||||
import org.reactivestreams.Publisher; |
||||
import org.reactivestreams.Subscription; |
||||
import org.springframework.core.io.buffer.DataBuffer; |
||||
import org.springframework.core.io.buffer.DataBufferFactory; |
||||
|
||||
import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; |
||||
|
||||
/** |
||||
* Utility to adapt a {@link AsyncInputStream} to a {@link Publisher} emitting {@link DataBuffer}. |
||||
* |
||||
* @author Mark Paluch |
||||
* @author Christoph Strobl |
||||
* @since 2.2 |
||||
*/ |
||||
class DataBufferPublisherAdapter { |
||||
|
||||
/** |
||||
* Creates a {@link Publisher} emitting {@link DataBuffer}s by reading binary chunks from {@link AsyncInputStream}. |
||||
* Closes the {@link AsyncInputStream} once the {@link Publisher} terminates. |
||||
* |
||||
* @param inputStream must not be {@literal null}. |
||||
* @param dataBufferFactory must not be {@literal null}. |
||||
* @param bufferSize read {@code n} bytes per iteration. |
||||
* @return the resulting {@link Publisher}. |
||||
*/ |
||||
static Flux<DataBuffer> createBinaryStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory, |
||||
int bufferSize) { |
||||
|
||||
return Flux.usingWhen(Mono.just(new DelegatingAsyncInputStream(inputStream, dataBufferFactory, bufferSize)), |
||||
DataBufferPublisherAdapter::doRead, AsyncInputStream::close, (it, err) -> it.close(), AsyncInputStream::close); |
||||
} |
||||
|
||||
/** |
||||
* Use an {@link AsyncInputStreamHandler} to read data from the given {@link AsyncInputStream}. |
||||
* |
||||
* @param inputStream the source stream. |
||||
* @return a {@link Flux} emitting data chunks one by one. |
||||
* @since 2.2.1 |
||||
*/ |
||||
private static Flux<DataBuffer> doRead(DelegatingAsyncInputStream inputStream) { |
||||
|
||||
AsyncInputStreamHandler streamHandler = new AsyncInputStreamHandler(inputStream, inputStream.dataBufferFactory, |
||||
inputStream.bufferSize); |
||||
return Flux.create((sink) -> { |
||||
|
||||
sink.onDispose(streamHandler::close); |
||||
sink.onCancel(streamHandler::close); |
||||
|
||||
sink.onRequest(n -> { |
||||
streamHandler.request(sink, n); |
||||
}); |
||||
}); |
||||
} |
||||
|
||||
/** |
||||
* An {@link AsyncInputStream} also holding a {@link DataBufferFactory} and default {@literal bufferSize} for reading |
||||
* from it, delegating operations on the {@link AsyncInputStream} to the reference instance. <br /> |
||||
* Used to pass on the {@link AsyncInputStream} and parameters to avoid capturing lambdas. |
||||
* |
||||
* @author Christoph Strobl |
||||
* @since 2.2.1 |
||||
*/ |
||||
private static class DelegatingAsyncInputStream implements AsyncInputStream { |
||||
|
||||
private final AsyncInputStream inputStream; |
||||
private final DataBufferFactory dataBufferFactory; |
||||
private int bufferSize; |
||||
|
||||
/** |
||||
* @param inputStream the source input stream. |
||||
* @param dataBufferFactory |
||||
* @param bufferSize |
||||
*/ |
||||
DelegatingAsyncInputStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory, int bufferSize) { |
||||
|
||||
this.inputStream = inputStream; |
||||
this.dataBufferFactory = dataBufferFactory; |
||||
this.bufferSize = bufferSize; |
||||
} |
||||
|
||||
/* |
||||
* (non-Javadoc) |
||||
* @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#read(java.nio.ByteBuffer) |
||||
*/ |
||||
@Override |
||||
public Publisher<Integer> read(ByteBuffer dst) { |
||||
return inputStream.read(dst); |
||||
} |
||||
|
||||
/* |
||||
* (non-Javadoc) |
||||
* @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#skip(long) |
||||
*/ |
||||
@Override |
||||
public Publisher<Long> skip(long bytesToSkip) { |
||||
return inputStream.skip(bytesToSkip); |
||||
} |
||||
|
||||
/* |
||||
* (non-Javadoc) |
||||
* @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#close() |
||||
*/ |
||||
@Override |
||||
public Publisher<Void> close() { |
||||
return inputStream.close(); |
||||
} |
||||
} |
||||
|
||||
@RequiredArgsConstructor |
||||
static class AsyncInputStreamHandler { |
||||
|
||||
private static final AtomicLongFieldUpdater<AsyncInputStreamHandler> DEMAND = AtomicLongFieldUpdater |
||||
.newUpdater(AsyncInputStreamHandler.class, "demand"); |
||||
|
||||
private static final AtomicIntegerFieldUpdater<AsyncInputStreamHandler> STATE = AtomicIntegerFieldUpdater |
||||
.newUpdater(AsyncInputStreamHandler.class, "state"); |
||||
|
||||
private static final AtomicIntegerFieldUpdater<AsyncInputStreamHandler> DRAIN = AtomicIntegerFieldUpdater |
||||
.newUpdater(AsyncInputStreamHandler.class, "drain"); |
||||
|
||||
private static final AtomicIntegerFieldUpdater<AsyncInputStreamHandler> READ = AtomicIntegerFieldUpdater |
||||
.newUpdater(AsyncInputStreamHandler.class, "read"); |
||||
|
||||
private static final int STATE_OPEN = 0; |
||||
private static final int STATE_CLOSED = 1; |
||||
|
||||
private static final int DRAIN_NONE = 0; |
||||
private static final int DRAIN_COMPLETION = 1; |
||||
|
||||
private static final int READ_NONE = 0; |
||||
private static final int READ_IN_PROGRESS = 1; |
||||
|
||||
final AsyncInputStream inputStream; |
||||
final DataBufferFactory dataBufferFactory; |
||||
final int bufferSize; |
||||
|
||||
// see DEMAND
|
||||
volatile long demand; |
||||
|
||||
// see STATE
|
||||
volatile int state = STATE_OPEN; |
||||
|
||||
// see DRAIN
|
||||
volatile int drain = DRAIN_NONE; |
||||
|
||||
// see READ_IN_PROGRESS
|
||||
volatile int read = READ_NONE; |
||||
|
||||
void request(FluxSink<DataBuffer> sink, long n) { |
||||
|
||||
Operators.addCap(DEMAND, this, n); |
||||
drainLoop(sink); |
||||
} |
||||
|
||||
/** |
||||
* Loops while we have demand and while no read is in progress. |
||||
* |
||||
* @param sink |
||||
*/ |
||||
void drainLoop(FluxSink<DataBuffer> sink) { |
||||
while (onShouldRead()) { |
||||
emitNext(sink); |
||||
} |
||||
} |
||||
|
||||
boolean onShouldRead() { |
||||
return !isClosed() && getDemand() > 0 && onWantRead(); |
||||
} |
||||
|
||||
boolean onWantRead() { |
||||
return READ.compareAndSet(this, READ_NONE, READ_IN_PROGRESS); |
||||
} |
||||
|
||||
void onReadDone() { |
||||
READ.compareAndSet(this, READ_IN_PROGRESS, READ_NONE); |
||||
} |
||||
|
||||
long getDemand() { |
||||
return DEMAND.get(this); |
||||
} |
||||
|
||||
void decrementDemand() { |
||||
DEMAND.decrementAndGet(this); |
||||
} |
||||
|
||||
void close() { |
||||
STATE.compareAndSet(this, STATE_OPEN, STATE_CLOSED); |
||||
} |
||||
|
||||
boolean enterDrainLoop() { |
||||
return DRAIN.compareAndSet(this, DRAIN_NONE, DRAIN_COMPLETION); |
||||
} |
||||
|
||||
void leaveDrainLoop() { |
||||
DRAIN.set(this, DRAIN_NONE); |
||||
} |
||||
|
||||
boolean isClosed() { |
||||
return STATE.get(this) == STATE_CLOSED; |
||||
} |
||||
|
||||
/** |
||||
* Emit the next {@link DataBuffer}. |
||||
* |
||||
* @param sink |
||||
* @return |
||||
*/ |
||||
private void emitNext(FluxSink<DataBuffer> sink) { |
||||
|
||||
ByteBuffer transport = ByteBuffer.allocate(bufferSize); |
||||
BufferCoreSubscriber bufferCoreSubscriber = new BufferCoreSubscriber(sink, dataBufferFactory, transport); |
||||
try { |
||||
inputStream.read(transport).subscribe(bufferCoreSubscriber); |
||||
} catch (Throwable e) { |
||||
sink.error(e); |
||||
} |
||||
} |
||||
|
||||
private class BufferCoreSubscriber implements CoreSubscriber<Integer> { |
||||
|
||||
private final FluxSink<DataBuffer> sink; |
||||
private final DataBufferFactory factory; |
||||
private final ByteBuffer transport; |
||||
private volatile Subscription subscription; |
||||
|
||||
BufferCoreSubscriber(FluxSink<DataBuffer> sink, DataBufferFactory factory, ByteBuffer transport) { |
||||
|
||||
this.sink = sink; |
||||
this.factory = factory; |
||||
this.transport = transport; |
||||
} |
||||
|
||||
@Override |
||||
public Context currentContext() { |
||||
return sink.currentContext(); |
||||
} |
||||
|
||||
@Override |
||||
public void onSubscribe(Subscription s) { |
||||
|
||||
this.subscription = s; |
||||
s.request(1); |
||||
} |
||||
|
||||
@Override |
||||
public void onNext(Integer bytes) { |
||||
|
||||
if (isClosed()) { |
||||
return; |
||||
} |
||||
|
||||
if (bytes > 0) { |
||||
|
||||
DataBuffer buffer = readNextChunk(); |
||||
sink.next(buffer); |
||||
decrementDemand(); |
||||
} |
||||
|
||||
if (bytes == -1) { |
||||
sink.complete(); |
||||
return; |
||||
} |
||||
|
||||
subscription.request(1); |
||||
} |
||||
|
||||
private DataBuffer readNextChunk() { |
||||
|
||||
transport.flip(); |
||||
|
||||
DataBuffer dataBuffer = factory.allocateBuffer(transport.remaining()); |
||||
dataBuffer.write(transport); |
||||
|
||||
transport.clear(); |
||||
|
||||
return dataBuffer; |
||||
} |
||||
|
||||
@Override |
||||
public void onError(Throwable t) { |
||||
|
||||
if (isClosed()) { |
||||
|
||||
Operators.onErrorDropped(t, sink.currentContext()); |
||||
return; |
||||
} |
||||
|
||||
close(); |
||||
sink.error(t); |
||||
} |
||||
|
||||
@Override |
||||
public void onComplete() { |
||||
|
||||
onReadDone(); |
||||
|
||||
if (!isClosed()) { |
||||
|
||||
if (enterDrainLoop()) { |
||||
try { |
||||
drainLoop(sink); |
||||
} finally { |
||||
leaveDrainLoop(); |
||||
} |
||||
} |
||||
|
||||
} |
||||
} |
||||
} |
||||
} |
||||
} |
||||
@ -0,0 +1,36 @@
@@ -0,0 +1,36 @@
|
||||
/* |
||||
* Copyright 2020 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.data.mongodb; |
||||
|
||||
import static org.assertj.core.api.Assertions.*; |
||||
|
||||
import org.junit.jupiter.api.Test; |
||||
|
||||
/** |
||||
* @author Christoph Strobl |
||||
*/ |
||||
class SpringDataMongoDBTests { |
||||
|
||||
@Test // DATAMONGO-2427
|
||||
void driverInformationHoldsSpringDataHint() { |
||||
assertThat(SpringDataMongoDB.driverInformation().getDriverNames()).contains("spring-data"); |
||||
} |
||||
|
||||
@Test // DATAMONGO-2427
|
||||
void versionIsDetectedFromPackage() { |
||||
assertThat(SpringDataMongoDB.version()).isNotNull(); |
||||
} |
||||
} |
||||
@ -1,103 +0,0 @@
@@ -1,103 +0,0 @@
|
||||
/* |
||||
* Copyright 2019-2020 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.data.mongodb.gridfs; |
||||
|
||||
import static org.assertj.core.api.Assertions.*; |
||||
|
||||
import reactor.core.publisher.Flux; |
||||
import reactor.core.publisher.Mono; |
||||
import reactor.test.StepVerifier; |
||||
|
||||
import java.io.IOException; |
||||
import java.nio.ByteBuffer; |
||||
|
||||
import org.junit.Test; |
||||
|
||||
import org.springframework.core.io.ClassPathResource; |
||||
import org.springframework.core.io.buffer.DataBuffer; |
||||
import org.springframework.core.io.buffer.DataBufferUtils; |
||||
import org.springframework.core.io.buffer.DefaultDataBufferFactory; |
||||
import org.springframework.util.StreamUtils; |
||||
|
||||
import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; |
||||
import com.mongodb.reactivestreams.client.gridfs.helpers.AsyncStreamHelper; |
||||
|
||||
/** |
||||
* Unit tests for {@link BinaryStreamAdapters}. |
||||
* |
||||
* @author Mark Paluch |
||||
*/ |
||||
public class BinaryStreamAdaptersUnitTests { |
||||
|
||||
@Test // DATAMONGO-1855
|
||||
public void shouldAdaptAsyncInputStreamToDataBufferPublisher() throws IOException { |
||||
|
||||
ClassPathResource resource = new ClassPathResource("gridfs/gridfs.xml"); |
||||
|
||||
byte[] content = StreamUtils.copyToByteArray(resource.getInputStream()); |
||||
AsyncInputStream inputStream = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream()); |
||||
|
||||
Flux<DataBuffer> dataBuffers = BinaryStreamAdapters.toPublisher(inputStream, new DefaultDataBufferFactory(), 256); |
||||
|
||||
DataBufferUtils.join(dataBuffers) //
|
||||
.as(StepVerifier::create) //
|
||||
.consumeNextWith(actual -> { |
||||
|
||||
byte[] actualContent = new byte[actual.readableByteCount()]; |
||||
actual.read(actualContent); |
||||
assertThat(actualContent).isEqualTo(content); |
||||
}) //
|
||||
.verifyComplete(); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1855
|
||||
public void shouldAdaptBinaryPublisherToAsyncInputStream() throws IOException { |
||||
|
||||
ClassPathResource resource = new ClassPathResource("gridfs/gridfs.xml"); |
||||
|
||||
byte[] content = StreamUtils.copyToByteArray(resource.getInputStream()); |
||||
|
||||
Flux<DataBuffer> dataBuffers = DataBufferUtils.readInputStream(resource::getInputStream, |
||||
new DefaultDataBufferFactory(), 10); |
||||
|
||||
AsyncInputStream inputStream = BinaryStreamAdapters.toAsyncInputStream(dataBuffers).block(); |
||||
ByteBuffer complete = readBuffer(inputStream); |
||||
|
||||
assertThat(complete).isEqualTo(ByteBuffer.wrap(content)); |
||||
} |
||||
|
||||
static ByteBuffer readBuffer(AsyncInputStream inputStream) { |
||||
|
||||
ByteBuffer complete = ByteBuffer.allocate(1024); |
||||
|
||||
boolean hasData = true; |
||||
while (hasData) { |
||||
|
||||
ByteBuffer chunk = ByteBuffer.allocate(100); |
||||
|
||||
Integer bytesRead = Mono.from(inputStream.read(chunk)).block(); |
||||
|
||||
chunk.flip(); |
||||
complete.put(chunk); |
||||
|
||||
hasData = bytesRead > -1; |
||||
} |
||||
|
||||
complete.flip(); |
||||
|
||||
return complete; |
||||
} |
||||
} |
||||
@ -1,55 +0,0 @@
@@ -1,55 +0,0 @@
|
||||
/* |
||||
* Copyright 2019-2020 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.data.mongodb.gridfs; |
||||
|
||||
import static org.mockito.Mockito.*; |
||||
|
||||
import reactor.core.publisher.Flux; |
||||
import reactor.core.publisher.Mono; |
||||
import reactor.test.StepVerifier; |
||||
|
||||
import org.junit.Test; |
||||
|
||||
import org.springframework.core.io.buffer.DataBuffer; |
||||
import org.springframework.core.io.buffer.DataBufferFactory; |
||||
import org.springframework.core.io.buffer.DefaultDataBufferFactory; |
||||
|
||||
/** |
||||
* Unit tests for {@link DataBufferPublisherAdapter}. |
||||
* |
||||
* @author Mark Paluch |
||||
*/ |
||||
public class DataBufferPublisherAdapterUnitTests { |
||||
|
||||
DataBufferFactory factory = new DefaultDataBufferFactory(); |
||||
|
||||
@Test // DATAMONGO-2230
|
||||
public void adapterShouldPropagateErrors() { |
||||
|
||||
AsyncInputStreamAdapter asyncInput = mock(AsyncInputStreamAdapter.class); |
||||
|
||||
when(asyncInput.read(any())).thenReturn(Mono.just(1), Mono.error(new IllegalStateException())); |
||||
when(asyncInput.close()).thenReturn(Mono.empty()); |
||||
|
||||
Flux<DataBuffer> binaryStream = DataBufferPublisherAdapter.createBinaryStream(asyncInput, factory, 256); |
||||
|
||||
StepVerifier.create(binaryStream, 0) //
|
||||
.thenRequest(1) //
|
||||
.expectNextCount(1) //
|
||||
.thenRequest(1) //
|
||||
.verifyError(IllegalStateException.class); |
||||
} |
||||
} |
||||
Loading…
Reference in new issue