|
|
|
|
@ -1,5 +1,5 @@
@@ -1,5 +1,5 @@
|
|
|
|
|
/* |
|
|
|
|
* Copyright 2002-2021 the original author or authors. |
|
|
|
|
* Copyright 2002-2022 the original author or authors. |
|
|
|
|
* |
|
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
|
|
|
* you may not use this file except in compliance with the License. |
|
|
|
|
@ -26,6 +26,8 @@ import org.reactivestreams.Subscriber;
@@ -26,6 +26,8 @@ import org.reactivestreams.Subscriber;
|
|
|
|
|
import org.reactivestreams.Subscription; |
|
|
|
|
import reactor.core.publisher.Operators; |
|
|
|
|
|
|
|
|
|
import org.springframework.core.io.buffer.DataBuffer; |
|
|
|
|
import org.springframework.core.io.buffer.DefaultDataBufferFactory; |
|
|
|
|
import org.springframework.core.log.LogDelegateFactory; |
|
|
|
|
import org.springframework.lang.Nullable; |
|
|
|
|
import org.springframework.util.Assert; |
|
|
|
|
@ -56,6 +58,8 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
@@ -56,6 +58,8 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
|
|
|
|
|
*/ |
|
|
|
|
protected static Log rsReadLogger = LogDelegateFactory.getHiddenLog(AbstractListenerReadPublisher.class); |
|
|
|
|
|
|
|
|
|
final static DataBuffer EMPTY_BUFFER = DefaultDataBufferFactory.sharedInstance.allocateBuffer(0); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private final AtomicReference<State> state = new AtomicReference<>(State.UNSUBSCRIBED); |
|
|
|
|
|
|
|
|
|
@ -180,7 +184,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
@@ -180,7 +184,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Read and publish data one at a time until there is no more data, no more |
|
|
|
|
* demand, or perhaps we completed in the mean time. |
|
|
|
|
* demand, or perhaps we completed meanwhile. |
|
|
|
|
* @return {@code true} if there is more demand; {@code false} if there is |
|
|
|
|
* no more demand or we have completed. |
|
|
|
|
*/ |
|
|
|
|
@ -188,7 +192,12 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
@@ -188,7 +192,12 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
|
|
|
|
|
long r; |
|
|
|
|
while ((r = this.demand) > 0 && (this.state.get() != State.COMPLETED)) { |
|
|
|
|
T data = read(); |
|
|
|
|
if (data != null) { |
|
|
|
|
if (data == EMPTY_BUFFER) { |
|
|
|
|
if (rsReadLogger.isTraceEnabled()) { |
|
|
|
|
rsReadLogger.trace(getLogPrefix() + "0 bytes read, trying again"); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
else if (data != null) { |
|
|
|
|
if (r != Long.MAX_VALUE) { |
|
|
|
|
DEMAND_FIELD_UPDATER.addAndGet(this, -1L); |
|
|
|
|
} |
|
|
|
|
|