From 7e7d6b9c3a3794a075c545e507af342caecc6246 Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Tue, 16 Aug 2022 11:21:12 +0200 Subject: [PATCH] Propagate Context in DataBufferUtils::write(Path) This commit makes sure that the Reactor context is propagated in DataBufferUtils::write(Path). Closes gh-28933 See gh-27517 --- .../core/io/buffer/DataBufferUtils.java | 3 ++- .../core/io/buffer/DataBufferUtilsTests.java | 21 +++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index 687afcb0d2b..a1497b459db 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -366,7 +366,8 @@ public abstract class DataBufferUtils { sink.onDispose(() -> closeChannel(channel)); write(source, channel).subscribe(DataBufferUtils::release, sink::error, - sink::success); + sink::success, + Context.of(sink.contextView())); } catch (IOException ex) { sink.error(ex); diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java index fb1b9c8700f..91594cb8954 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java @@ -990,6 +990,27 @@ class DataBufferUtilsTests extends AbstractDataBufferAllocatingTests { } } + @ParameterizedDataBufferAllocatingTest + void propagateContextPath(DataBufferFactory bufferFactory) throws IOException { + Path path = Paths.get(this.resource.getURI()); + Path out = Files.createTempFile("data-buffer-utils-tests", ".tmp"); + + Flux result = DataBufferUtils.read(path, bufferFactory, 1024, StandardOpenOption.READ) + .transformDeferredContextual((f, ctx) -> { + assertThat(ctx.getOrDefault("key", "EMPTY")).isEqualTo("TEST"); + return f; + }) + .transform(f -> DataBufferUtils.write(f, out)) + .transformDeferredContextual((f, ctx) -> { + assertThat(ctx.getOrDefault("key", "EMPTY")).isEqualTo("TEST"); + return f; + }) + .contextWrite(Context.of("key", "TEST")); + + StepVerifier.create(result) + .verifyComplete(); + } + private static class ZeroDemandSubscriber extends BaseSubscriber { @Override