|
|
|
@ -609,6 +609,7 @@ final class PartGenerator extends BaseSubscriber<MultipartParser.Token> { |
|
|
|
} |
|
|
|
} |
|
|
|
else { |
|
|
|
else { |
|
|
|
MultipartUtils.closeChannel(newState.channel); |
|
|
|
MultipartUtils.closeChannel(newState.channel); |
|
|
|
|
|
|
|
MultipartUtils.deleteFile(newState.file); |
|
|
|
this.content.forEach(DataBufferUtils::release); |
|
|
|
this.content.forEach(DataBufferUtils::release); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
@ -640,6 +641,8 @@ final class PartGenerator extends BaseSubscriber<MultipartParser.Token> { |
|
|
|
|
|
|
|
|
|
|
|
private volatile boolean closeOnDispose = true; |
|
|
|
private volatile boolean closeOnDispose = true; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private volatile boolean deleteOnDispose = true; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public IdleFileState(WritingFileState state) { |
|
|
|
public IdleFileState(WritingFileState state) { |
|
|
|
this.headers = state.headers; |
|
|
|
this.headers = state.headers; |
|
|
|
@ -654,16 +657,20 @@ final class PartGenerator extends BaseSubscriber<MultipartParser.Token> { |
|
|
|
if (PartGenerator.this.maxDiskUsagePerPart == -1 || count <= PartGenerator.this.maxDiskUsagePerPart) { |
|
|
|
if (PartGenerator.this.maxDiskUsagePerPart == -1 || count <= PartGenerator.this.maxDiskUsagePerPart) { |
|
|
|
|
|
|
|
|
|
|
|
this.closeOnDispose = false; |
|
|
|
this.closeOnDispose = false; |
|
|
|
|
|
|
|
this.deleteOnDispose = false; |
|
|
|
WritingFileState newState = new WritingFileState(this); |
|
|
|
WritingFileState newState = new WritingFileState(this); |
|
|
|
if (changeState(this, newState)) { |
|
|
|
if (changeState(this, newState)) { |
|
|
|
newState.writeBuffer(dataBuffer); |
|
|
|
newState.writeBuffer(dataBuffer); |
|
|
|
} |
|
|
|
} |
|
|
|
else { |
|
|
|
else { |
|
|
|
MultipartUtils.closeChannel(this.channel); |
|
|
|
MultipartUtils.closeChannel(this.channel); |
|
|
|
|
|
|
|
MultipartUtils.deleteFile(this.file); |
|
|
|
DataBufferUtils.release(dataBuffer); |
|
|
|
DataBufferUtils.release(dataBuffer); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
else { |
|
|
|
else { |
|
|
|
|
|
|
|
MultipartUtils.closeChannel(this.channel); |
|
|
|
|
|
|
|
MultipartUtils.deleteFile(this.file); |
|
|
|
DataBufferUtils.release(dataBuffer); |
|
|
|
DataBufferUtils.release(dataBuffer); |
|
|
|
emitError(new DataBufferLimitException( |
|
|
|
emitError(new DataBufferLimitException( |
|
|
|
"Part exceeded the disk usage limit of " + PartGenerator.this.maxDiskUsagePerPart + |
|
|
|
"Part exceeded the disk usage limit of " + PartGenerator.this.maxDiskUsagePerPart + |
|
|
|
@ -674,6 +681,7 @@ final class PartGenerator extends BaseSubscriber<MultipartParser.Token> { |
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public void partComplete(boolean finalPart) { |
|
|
|
public void partComplete(boolean finalPart) { |
|
|
|
MultipartUtils.closeChannel(this.channel); |
|
|
|
MultipartUtils.closeChannel(this.channel); |
|
|
|
|
|
|
|
this.deleteOnDispose = false; |
|
|
|
emitPart(DefaultParts.part(this.headers, this.file, PartGenerator.this.blockingOperationScheduler)); |
|
|
|
emitPart(DefaultParts.part(this.headers, this.file, PartGenerator.this.blockingOperationScheduler)); |
|
|
|
if (finalPart) { |
|
|
|
if (finalPart) { |
|
|
|
emitComplete(); |
|
|
|
emitComplete(); |
|
|
|
@ -685,6 +693,9 @@ final class PartGenerator extends BaseSubscriber<MultipartParser.Token> { |
|
|
|
if (this.closeOnDispose) { |
|
|
|
if (this.closeOnDispose) { |
|
|
|
MultipartUtils.closeChannel(this.channel); |
|
|
|
MultipartUtils.closeChannel(this.channel); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if (this.deleteOnDispose) { |
|
|
|
|
|
|
|
MultipartUtils.deleteFile(this.file); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -710,6 +721,8 @@ final class PartGenerator extends BaseSubscriber<MultipartParser.Token> { |
|
|
|
|
|
|
|
|
|
|
|
private volatile boolean finalPart; |
|
|
|
private volatile boolean finalPart; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private volatile boolean disposed; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public WritingFileState(CreateFileState state, Path file, WritableByteChannel channel) { |
|
|
|
public WritingFileState(CreateFileState state, Path file, WritableByteChannel channel) { |
|
|
|
this.headers = state.headers; |
|
|
|
this.headers = state.headers; |
|
|
|
@ -761,11 +774,15 @@ final class PartGenerator extends BaseSubscriber<MultipartParser.Token> { |
|
|
|
if (this.completed) { |
|
|
|
if (this.completed) { |
|
|
|
newState.partComplete(this.finalPart); |
|
|
|
newState.partComplete(this.finalPart); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
else if (this.disposed) { |
|
|
|
|
|
|
|
newState.dispose(); |
|
|
|
|
|
|
|
} |
|
|
|
else if (changeState(this, newState)) { |
|
|
|
else if (changeState(this, newState)) { |
|
|
|
requestToken(); |
|
|
|
requestToken(); |
|
|
|
} |
|
|
|
} |
|
|
|
else { |
|
|
|
else { |
|
|
|
MultipartUtils.closeChannel(this.channel); |
|
|
|
MultipartUtils.closeChannel(this.channel); |
|
|
|
|
|
|
|
MultipartUtils.deleteFile(this.file); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@ -779,6 +796,8 @@ final class PartGenerator extends BaseSubscriber<MultipartParser.Token> { |
|
|
|
return Mono.empty(); |
|
|
|
return Mono.empty(); |
|
|
|
} |
|
|
|
} |
|
|
|
catch (IOException ex) { |
|
|
|
catch (IOException ex) { |
|
|
|
|
|
|
|
MultipartUtils.closeChannel(this.channel); |
|
|
|
|
|
|
|
MultipartUtils.deleteFile(this.file); |
|
|
|
return Mono.error(ex); |
|
|
|
return Mono.error(ex); |
|
|
|
} |
|
|
|
} |
|
|
|
finally { |
|
|
|
finally { |
|
|
|
@ -786,6 +805,12 @@ final class PartGenerator extends BaseSubscriber<MultipartParser.Token> { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
|
|
|
public void dispose() { |
|
|
|
|
|
|
|
this.disposed = true; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public String toString() { |
|
|
|
public String toString() { |
|
|
|
return "WRITE-FILE"; |
|
|
|
return "WRITE-FILE"; |
|
|
|
|