diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java index be2481c38ba..88f21da1ef7 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java @@ -16,9 +16,13 @@ package org.springframework.boot.autoconfigure.kafka; +import java.io.IOException; + import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -28,6 +32,7 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.kafka.support.LoggingProducerListener; import org.springframework.kafka.support.ProducerListener; @@ -35,6 +40,7 @@ import org.springframework.kafka.support.ProducerListener; * {@link EnableAutoConfiguration Auto-configuration} for Apache Kafka. * * @author Gary Russell + * @author Stephane Nicoll * @since 1.5.0 */ @Configuration @@ -81,4 +87,20 @@ public class KafkaAutoConfiguration { this.properties.buildProducerProperties()); } + @Bean + @ConditionalOnProperty(name = "spring.kafka.jaas.enabled") + @ConditionalOnMissingBean + public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException { + KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer(); + Jaas jaasProperties = this.properties.getJaas(); + if (jaasProperties.getControlFlag() != null) { + jaas.setControlFlag(jaasProperties.getControlFlag()); + } + if (jaasProperties.getLoginModule() != null) { + jaas.setLoginModule(jaasProperties.getLoginModule()); + } + jaas.setOptions(jaasProperties.getOptions()); + return jaas; + } + } diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index 239d11344ed..d362c87816a 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.core.io.Resource; import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode; +import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.util.CollectionUtils; /** @@ -74,6 +75,8 @@ public class KafkaProperties { private final Ssl ssl = new Ssl(); + private final Jaas jaas = new Jaas(); + private final Template template = new Template(); public List getBootstrapServers() { @@ -116,6 +119,10 @@ public class KafkaProperties { return this.ssl; } + public Jaas getJaas() { + return this.jaas; + } + public Template getTemplate() { return this.template; } @@ -776,4 +783,63 @@ public class KafkaProperties { } + public static class Jaas { + + /** + * Enable JAAS configuration. + */ + private boolean enabled; + + /** + * Login module. + */ + private String loginModule = "com.sun.security.auth.module.Krb5LoginModule"; + + /** + * Control flag for login configuration. + */ + private KafkaJaasLoginModuleInitializer.ControlFlag controlFlag = + KafkaJaasLoginModuleInitializer.ControlFlag.REQUIRED; + + /** + * Additional JAAS options. + */ + private final Map options = new HashMap<>(); + + public boolean isEnabled() { + return this.enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public String getLoginModule() { + return this.loginModule; + } + + public void setLoginModule(String loginModule) { + this.loginModule = loginModule; + } + + public KafkaJaasLoginModuleInitializer.ControlFlag getControlFlag() { + return this.controlFlag; + } + + public void setControlFlag(KafkaJaasLoginModuleInitializer.ControlFlag controlFlag) { + this.controlFlag = controlFlag; + } + + public Map getOptions() { + return this.options; + } + + public void setOptions(Map options) { + if (options != null) { + this.options.putAll(options); + } + } + + } + } diff --git a/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json index 9bb77b83705..42d7e3c3e2a 100644 --- a/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json +++ b/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json @@ -325,6 +325,10 @@ "description": "Log a warning for transactions executed without a single enlisted resource.", "defaultValue": true }, + { + "name": "spring.kafka.jaas.control-flag", + "defaultValue": "required" + }, { "name": "spring.mobile.devicedelegatingviewresolver.enabled", "type": "java.lang.Boolean", diff --git a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 5dbb753df67..104ccd2d91f 100644 --- a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2016 the original author or authors. + * Copyright 2012-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,8 @@ import java.io.File; import java.util.Collections; import java.util.Map; +import javax.security.auth.login.AppConfigurationEntry; + import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SslConfigs; @@ -38,8 +40,10 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode; +import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.entry; /** * Tests for {@link KafkaAutoConfiguration}. @@ -160,6 +164,8 @@ public class KafkaAutoConfigurationTests { assertThat(configs.get(ProducerConfig.RETRIES_CONFIG)).isEqualTo(2); assertThat(configs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) .isEqualTo(IntegerSerializer.class); + assertThat(this.context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)) + .isEmpty(); } @Test @@ -169,7 +175,11 @@ public class KafkaAutoConfigurationTests { "spring.kafka.listener.ack-count=123", "spring.kafka.listener.ack-time=456", "spring.kafka.listener.concurrency=3", - "spring.kafka.listener.poll-timeout=2000"); + "spring.kafka.listener.poll-timeout=2000", + "spring.kafka.jaas.enabled=true", + "spring.kafka.jaas.login-module=foo", + "spring.kafka.jaas.control-flag=REQUISITE", + "spring.kafka.jaas.options.useKeyTab=true"); DefaultKafkaProducerFactory producerFactory = this.context .getBean(DefaultKafkaProducerFactory.class); DefaultKafkaConsumerFactory consumerFactory = this.context @@ -189,6 +199,16 @@ public class KafkaAutoConfigurationTests { assertThat(dfa.getPropertyValue("concurrency")).isEqualTo(3); assertThat(dfa.getPropertyValue("containerProperties.pollTimeout")) .isEqualTo(2000L); + assertThat(this.context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)) + .hasSize(1); + KafkaJaasLoginModuleInitializer jaas = this.context.getBean( + KafkaJaasLoginModuleInitializer.class); + dfa = new DirectFieldAccessor(jaas); + assertThat(dfa.getPropertyValue("loginModule")).isEqualTo("foo"); + assertThat(dfa.getPropertyValue("controlFlag")) + .isEqualTo(AppConfigurationEntry.LoginModuleControlFlag.REQUISITE); + assertThat(((Map) dfa.getPropertyValue("options"))) + .containsExactly(entry("useKeyTab", "true")); } private void load(String... environment) { diff --git a/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc b/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc index f5c55b39103..b1b5ec2ad5c 100644 --- a/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc +++ b/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc @@ -942,6 +942,10 @@ content into your application; rather pick only the properties that you need. spring.kafka.consumer.key-deserializer= # Deserializer class for keys. spring.kafka.consumer.max-poll-records= # Maximum number of records returned in a single call to poll(). spring.kafka.consumer.value-deserializer= # Deserializer class for values. + spring.kafka.jaas.control-flag=required # Control flag for login configuration. + spring.kafka.jaas.enabled= # Enable JAAS configuration. + spring.kafka.jaas.login-module=com.sun.security.auth.module.Krb5LoginModule # Login module. + spring.kafka.jaas.options= # Additional JAAS options. spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME". spring.kafka.listener.ack-mode= # Listener AckMode; see the spring-kafka documentation. spring.kafka.listener.ack-time= # Time in milliseconds between offset commits when ackMode is "TIME" or "COUNT_TIME".