|
|
|
|
@ -16,10 +16,9 @@
@@ -16,10 +16,9 @@
|
|
|
|
|
package sample.kafka; |
|
|
|
|
|
|
|
|
|
import org.junit.jupiter.api.Test; |
|
|
|
|
import org.junit.jupiter.api.extension.RegisterExtension; |
|
|
|
|
|
|
|
|
|
import org.springframework.beans.factory.annotation.Autowired; |
|
|
|
|
import org.springframework.boot.test.context.SpringBootTest; |
|
|
|
|
import org.springframework.boot.test.extension.OutputCapture; |
|
|
|
|
import org.springframework.kafka.test.context.EmbeddedKafka; |
|
|
|
|
|
|
|
|
|
import static org.assertj.core.api.Assertions.assertThat; |
|
|
|
|
@ -36,17 +35,18 @@ import static org.assertj.core.api.Assertions.assertThat;
@@ -36,17 +35,18 @@ import static org.assertj.core.api.Assertions.assertThat;
|
|
|
|
|
@EmbeddedKafka(topics = "testTopic") |
|
|
|
|
class SampleKafkaApplicationTests { |
|
|
|
|
|
|
|
|
|
@RegisterExtension |
|
|
|
|
OutputCapture output = new OutputCapture(); |
|
|
|
|
@Autowired |
|
|
|
|
private Consumer consumer; |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
void testVanillaExchange() throws Exception { |
|
|
|
|
long end = System.currentTimeMillis() + 30000; |
|
|
|
|
while (!this.output.toString().contains("A simple test message") |
|
|
|
|
long end = System.currentTimeMillis() + 10000; |
|
|
|
|
while (this.consumer.getMessages().isEmpty() |
|
|
|
|
&& System.currentTimeMillis() < end) { |
|
|
|
|
Thread.sleep(250); |
|
|
|
|
} |
|
|
|
|
assertThat(this.output).contains("A simple test message"); |
|
|
|
|
assertThat(this.consumer.getMessages()).extracting("message") |
|
|
|
|
.containsOnly("A simple test message"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|