|
|
|
|
@ -60,29 +60,29 @@ class ResponseBodySubscriber extends BaseSubscriber<ByteBuffer>
@@ -60,29 +60,29 @@ class ResponseBodySubscriber extends BaseSubscriber<ByteBuffer>
|
|
|
|
|
@Override |
|
|
|
|
public void onSubscribe(Subscription s) { |
|
|
|
|
super.onSubscribe(s); |
|
|
|
|
subscription = s; |
|
|
|
|
subscription.request(1); |
|
|
|
|
this.subscription = s; |
|
|
|
|
this.subscription.request(1); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public void onNext(ByteBuffer buffer) { |
|
|
|
|
super.onNext(buffer); |
|
|
|
|
|
|
|
|
|
if (responseChannel == null) { |
|
|
|
|
responseChannel = exchange.getResponseChannel(); |
|
|
|
|
if (this.responseChannel == null) { |
|
|
|
|
this.responseChannel = this.exchange.getResponseChannel(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
writing.incrementAndGet(); |
|
|
|
|
this.writing.incrementAndGet(); |
|
|
|
|
try { |
|
|
|
|
int c; |
|
|
|
|
do { |
|
|
|
|
c = responseChannel.write(buffer); |
|
|
|
|
c = this.responseChannel.write(buffer); |
|
|
|
|
} while (buffer.hasRemaining() && c > 0); |
|
|
|
|
if (buffer.hasRemaining()) { |
|
|
|
|
writing.incrementAndGet(); |
|
|
|
|
this.writing.incrementAndGet(); |
|
|
|
|
enqueue(buffer); |
|
|
|
|
responseChannel.getWriteSetter().set(this); |
|
|
|
|
responseChannel.resumeWrites(); |
|
|
|
|
this.responseChannel.getWriteSetter().set(this); |
|
|
|
|
this.responseChannel.resumeWrites(); |
|
|
|
|
} |
|
|
|
|
else { |
|
|
|
|
this.subscription.request(1); |
|
|
|
|
@ -93,8 +93,8 @@ class ResponseBodySubscriber extends BaseSubscriber<ByteBuffer>
@@ -93,8 +93,8 @@ class ResponseBodySubscriber extends BaseSubscriber<ByteBuffer>
|
|
|
|
|
onError(ex); |
|
|
|
|
} |
|
|
|
|
finally { |
|
|
|
|
writing.decrementAndGet(); |
|
|
|
|
if (closing.get()) { |
|
|
|
|
this.writing.decrementAndGet(); |
|
|
|
|
if (this.closing.get()) { |
|
|
|
|
closeIfDone(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
@ -103,12 +103,12 @@ class ResponseBodySubscriber extends BaseSubscriber<ByteBuffer>
@@ -103,12 +103,12 @@ class ResponseBodySubscriber extends BaseSubscriber<ByteBuffer>
|
|
|
|
|
private void enqueue(ByteBuffer src) { |
|
|
|
|
do { |
|
|
|
|
PooledByteBuffer pooledBuffer = |
|
|
|
|
exchange.getConnection().getByteBufferPool().allocate(); |
|
|
|
|
this.exchange.getConnection().getByteBufferPool().allocate(); |
|
|
|
|
|
|
|
|
|
ByteBuffer dst = pooledBuffer.getBuffer(); |
|
|
|
|
copy(dst, src); |
|
|
|
|
dst.flip(); |
|
|
|
|
buffers.add(pooledBuffer); |
|
|
|
|
this.buffers.add(pooledBuffer); |
|
|
|
|
} while (src.remaining() > 0); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -124,25 +124,25 @@ class ResponseBodySubscriber extends BaseSubscriber<ByteBuffer>
@@ -124,25 +124,25 @@ class ResponseBodySubscriber extends BaseSubscriber<ByteBuffer>
|
|
|
|
|
try { |
|
|
|
|
int c; |
|
|
|
|
do { |
|
|
|
|
ByteBuffer buffer = buffers.peek().getBuffer(); |
|
|
|
|
ByteBuffer buffer = this.buffers.peek().getBuffer(); |
|
|
|
|
do { |
|
|
|
|
c = channel.write(buffer); |
|
|
|
|
} while (buffer.hasRemaining() && c > 0); |
|
|
|
|
if (!buffer.hasRemaining()) { |
|
|
|
|
safeClose(buffers.remove()); |
|
|
|
|
safeClose(this.buffers.remove()); |
|
|
|
|
} |
|
|
|
|
} while (!buffers.isEmpty() && c > 0); |
|
|
|
|
if (!buffers.isEmpty()) { |
|
|
|
|
} while (!this.buffers.isEmpty() && c > 0); |
|
|
|
|
if (!this.buffers.isEmpty()) { |
|
|
|
|
channel.resumeWrites(); |
|
|
|
|
} |
|
|
|
|
else { |
|
|
|
|
writing.decrementAndGet(); |
|
|
|
|
this.writing.decrementAndGet(); |
|
|
|
|
|
|
|
|
|
if (closing.get()) { |
|
|
|
|
if (this.closing.get()) { |
|
|
|
|
closeIfDone(); |
|
|
|
|
} |
|
|
|
|
else { |
|
|
|
|
subscription.request(1); |
|
|
|
|
this.subscription.request(1); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
@ -154,10 +154,10 @@ class ResponseBodySubscriber extends BaseSubscriber<ByteBuffer>
@@ -154,10 +154,10 @@ class ResponseBodySubscriber extends BaseSubscriber<ByteBuffer>
|
|
|
|
|
@Override |
|
|
|
|
public void onError(Throwable t) { |
|
|
|
|
super.onError(t); |
|
|
|
|
if (!exchange.isResponseStarted() && |
|
|
|
|
exchange.getStatusCode() < INTERNAL_SERVER_ERROR.value()) { |
|
|
|
|
if (!this.exchange.isResponseStarted() && |
|
|
|
|
this.exchange.getStatusCode() < INTERNAL_SERVER_ERROR.value()) { |
|
|
|
|
|
|
|
|
|
exchange.setStatusCode(INTERNAL_SERVER_ERROR.value()); |
|
|
|
|
this.exchange.setStatusCode(INTERNAL_SERVER_ERROR.value()); |
|
|
|
|
} |
|
|
|
|
logger.error("ResponseBodySubscriber error", t); |
|
|
|
|
} |
|
|
|
|
@ -166,15 +166,15 @@ class ResponseBodySubscriber extends BaseSubscriber<ByteBuffer>
@@ -166,15 +166,15 @@ class ResponseBodySubscriber extends BaseSubscriber<ByteBuffer>
|
|
|
|
|
public void onComplete() { |
|
|
|
|
super.onComplete(); |
|
|
|
|
|
|
|
|
|
if (responseChannel != null) { |
|
|
|
|
closing.set(true); |
|
|
|
|
if (this.responseChannel != null) { |
|
|
|
|
this.closing.set(true); |
|
|
|
|
closeIfDone(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void closeIfDone() { |
|
|
|
|
if (writing.get() == 0) { |
|
|
|
|
if (closing.compareAndSet(true, false)) { |
|
|
|
|
if (this.writing.get() == 0) { |
|
|
|
|
if (this.closing.compareAndSet(true, false)) { |
|
|
|
|
closeChannel(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
@ -182,16 +182,16 @@ class ResponseBodySubscriber extends BaseSubscriber<ByteBuffer>
@@ -182,16 +182,16 @@ class ResponseBodySubscriber extends BaseSubscriber<ByteBuffer>
|
|
|
|
|
|
|
|
|
|
private void closeChannel() { |
|
|
|
|
try { |
|
|
|
|
responseChannel.shutdownWrites(); |
|
|
|
|
this.responseChannel.shutdownWrites(); |
|
|
|
|
|
|
|
|
|
if (!responseChannel.flush()) { |
|
|
|
|
responseChannel.getWriteSetter().set( |
|
|
|
|
if (!this.responseChannel.flush()) { |
|
|
|
|
this.responseChannel.getWriteSetter().set( |
|
|
|
|
flushingChannelListener( |
|
|
|
|
o -> safeClose(responseChannel), |
|
|
|
|
o -> safeClose(this.responseChannel), |
|
|
|
|
closingChannelExceptionHandler())); |
|
|
|
|
responseChannel.resumeWrites(); |
|
|
|
|
this.responseChannel.resumeWrites(); |
|
|
|
|
} |
|
|
|
|
responseChannel = null; |
|
|
|
|
this.responseChannel = null; |
|
|
|
|
} |
|
|
|
|
catch (IOException ex) { |
|
|
|
|
onError(ex); |
|
|
|
|
|