|
|
|
|
@ -16,45 +16,67 @@
@@ -16,45 +16,67 @@
|
|
|
|
|
|
|
|
|
|
package smoketest.kafka.ssl; |
|
|
|
|
|
|
|
|
|
import java.io.File; |
|
|
|
|
import java.time.Duration; |
|
|
|
|
|
|
|
|
|
import org.awaitility.Awaitility; |
|
|
|
|
import org.junit.jupiter.api.Test; |
|
|
|
|
import org.testcontainers.containers.DockerComposeContainer; |
|
|
|
|
import org.testcontainers.containers.wait.strategy.Wait; |
|
|
|
|
import org.testcontainers.containers.KafkaContainer; |
|
|
|
|
import org.testcontainers.junit.jupiter.Container; |
|
|
|
|
import org.testcontainers.junit.jupiter.Testcontainers; |
|
|
|
|
import org.testcontainers.utility.MountableFile; |
|
|
|
|
import smoketest.kafka.Consumer; |
|
|
|
|
import smoketest.kafka.Producer; |
|
|
|
|
import smoketest.kafka.SampleMessage; |
|
|
|
|
|
|
|
|
|
import org.springframework.beans.factory.annotation.Autowired; |
|
|
|
|
import org.springframework.boot.test.context.SpringBootTest; |
|
|
|
|
import org.springframework.boot.testsupport.testcontainers.DockerImageNames; |
|
|
|
|
import org.springframework.test.context.DynamicPropertyRegistry; |
|
|
|
|
import org.springframework.test.context.DynamicPropertySource; |
|
|
|
|
|
|
|
|
|
import static org.assertj.core.api.Assertions.assertThat; |
|
|
|
|
import static org.hamcrest.Matchers.empty; |
|
|
|
|
import static org.hamcrest.Matchers.not; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Smoke tests for Apache Kafka with SSL. |
|
|
|
|
* |
|
|
|
|
* @author Scott Frederick |
|
|
|
|
* @author Eddú Meléndez |
|
|
|
|
*/ |
|
|
|
|
@Testcontainers(disabledWithoutDocker = true) |
|
|
|
|
@SpringBootTest(classes = { SampleKafkaSslApplication.class, Producer.class, Consumer.class }, |
|
|
|
|
properties = { "spring.kafka.security.protocol=SSL", "spring.kafka.bootstrap-servers=localhost:9093", |
|
|
|
|
"spring.kafka.ssl.bundle=client", |
|
|
|
|
properties = { "spring.kafka.security.protocol=SSL", |
|
|
|
|
"spring.kafka.properties.ssl.endpoint.identification.algorithm=", "spring.kafka.ssl.bundle=client", |
|
|
|
|
"spring.ssl.bundle.jks.client.keystore.location=classpath:ssl/test-client.p12", |
|
|
|
|
"spring.ssl.bundle.jks.client.keystore.password=password", |
|
|
|
|
"spring.ssl.bundle.jks.client.truststore.location=classpath:ssl/test-ca.p12", |
|
|
|
|
"spring.ssl.bundle.jks.client.truststore.password=password" }) |
|
|
|
|
class SampleKafkaSslApplicationTests { |
|
|
|
|
|
|
|
|
|
private static final File KAFKA_COMPOSE_FILE = new File("src/test/resources/docker-compose.yml"); |
|
|
|
|
|
|
|
|
|
private static final String KAFKA_COMPOSE_SERVICE = "kafka"; |
|
|
|
|
|
|
|
|
|
private static final int KAFKA_SSL_PORT = 9093; |
|
|
|
|
|
|
|
|
|
@Container |
|
|
|
|
public DockerComposeContainer<?> container = new DockerComposeContainer<>(KAFKA_COMPOSE_FILE) |
|
|
|
|
.withExposedService(KAFKA_COMPOSE_SERVICE, KAFKA_SSL_PORT, Wait.forListeningPorts(KAFKA_SSL_PORT)); |
|
|
|
|
public static KafkaContainer kafka = new KafkaContainer(DockerImageNames.kafka()) |
|
|
|
|
.withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "PLAINTEXT:SSL,BROKER:PLAINTEXT") |
|
|
|
|
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true") |
|
|
|
|
.withEnv("KAFKA_SSL_CLIENT_AUTH", "required") |
|
|
|
|
.withEnv("KAFKA_SSL_KEYSTORE_LOCATION", "/etc/kafka/secrets/certs/test-server.p12") |
|
|
|
|
.withEnv("KAFKA_SSL_KEYSTORE_PASSWORD", "password") |
|
|
|
|
.withEnv("KAFKA_SSL_KEY_PASSWORD", "password") |
|
|
|
|
.withEnv("KAFKA_SSL_TRUSTSTORE_LOCATION", "/etc/kafka/secrets/certs/test-ca.p12") |
|
|
|
|
.withEnv("KAFKA_SSL_TRUSTSTORE_PASSWORD", "password") |
|
|
|
|
.withEnv("KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM", "") |
|
|
|
|
.withCopyFileToContainer(MountableFile.forClasspathResource("ssl/test-server.p12"), |
|
|
|
|
"/etc/kafka/secrets/certs/test-server.p12") |
|
|
|
|
.withCopyFileToContainer(MountableFile.forClasspathResource("ssl/credentials"), |
|
|
|
|
"/etc/kafka/secrets/certs/credentials") |
|
|
|
|
.withCopyFileToContainer(MountableFile.forClasspathResource("ssl/test-ca.p12"), |
|
|
|
|
"/etc/kafka/secrets/certs/test-ca.p12"); |
|
|
|
|
|
|
|
|
|
@DynamicPropertySource |
|
|
|
|
static void kafkaProperties(DynamicPropertyRegistry registry) { |
|
|
|
|
registry.add("spring.kafka.bootstrap-servers", |
|
|
|
|
() -> String.format("%s:%s", kafka.getHost(), kafka.getMappedPort(9093))); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Autowired |
|
|
|
|
private Producer producer; |
|
|
|
|
|