Testing de und integración de Apache Kafka dentro de una aplicación Spring Boot y JUnit
Han pasado casi dos años desde que escribí mi primer test de integración para una aplicación de Kafka Spring Boot. Me llevó mucha investigación escribir esta primera prueba de integración y finalmente terminé escribiendo un blog post sobre testing Kafka con Spring Boot.
No había demasiada información sobre este tipo de tests y al final fue muy simple hacerlo, pero sin documentar.
He visto mucha interacción y feedback con respecto a mi anterior entrada al blog y el GitHub Gist.
Desde entonces spring-kafka-test
cambió dos veces el patrón de uso y se introdujo JUnit 5.
Eso significa que el código está ahora desactualizado y con eso, también la entrada del blog.
Esta es la razón por la que decidí crear una versión revisada de la anterior entrada del blog.
Set up de tu proyecto
O bien utilizas gtu proyecto existente de Spring Boot o generas uno nuevo en start.spring.io start.spring.io.
Cuando seleccionas Spring for Apache Kafka
en start.spring.io añade automáticamente todas las entradas necesarias en el fichero de Maven o Gradle.
A estas alturas también viene con JUnit 5, así que ya está listo para empezar.
Sin embargo, en caso de que tengas un proyecto más antiguo, puede que necesites añadir la dependencia spring-kafka-test
:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
Configuración de clases
La forma más fácil de empezar una prueba es simplemente anotar la clase con @EmbeddedKafka
.
Esto te permite inyectar el EmbeddedKafkaBroker
en tu método de test o en un método de configuración al principio.
@EmbeddedKafka
public class SimpleKafkaTest {
private EmbeddedKafkaBroker embeddedKafkaBroker;
@BeforeEach
void setUp(EmbeddedKafkaBroker embeddedKafkaBroker) {
this.embeddedKafkaBroker = embeddedKafkaBroker;
}
// ...
}
Habrás notado que no hay ninguna anotación de Spring en esta clase.
Sin anotarlo con @ExtendWith(SpringExtension.class)
o con una extensión que lo implique (p.ej. @SpringBootTest
) el test se ejecuta fuera del contexto de Spring y, por ejemplo las expresiones podrían no resolverse.
Hay un par de propiedades disponibles para influir en el comportamiento y el tamaño del nodo Kafka integrado inclueyendo las siguientes:
count
: número de broker, el valor por defecto es1
controlledShutdown
, valor por defecto esfalse
ports
, lista de puertos en caso de que quiera acceder a estos brokers de otra instanciapartitions
, el valor por defecto es2
topics
nombres de los topics que se crearán en la puesta en marchabrokerProperties
/brokerPropertiesLocation
propiedades adicionales para el broker Kafka ∫
Configuración de Clase con Spring Context
Asumiendo que sería conveniente poder aprovechar también las ventajas de Spring Context, puede que haga falta añadir la anotación @SpringBootTest
al caso de prueba anterior.
Sin embargo, si no cambias Spring Context, tienes que cambiar también la manera de implementar EmbeddedKafkaBroker
, de lo contrario verás el siguiente error:
org.junit.jupiter.api.extension.ParameterResolutionException:
Failed to resolve parameter [org.springframework.kafka.test.EmbeddedKafkaBroker embeddedKafkaBroker]
in method [void com.example.demo.SimpleKafkaTest.setUp(org.springframework.kafka.test.EmbeddedKafkaBroker)]:
Could not find embedded broker instance
La resolución es muy simple: se trata solo de cambiar el autowiring de JUnit5 a la anotación @Autowired
de Spring:
@EmbeddedKafka
@ExtendWith(SpringExtension.class)
public class SimpleKafkaTest {
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
// ...
}
Configuración de Kafka Consumer
Ahora puedes configurar tu consumidor o productor. Empecemos con el consumidor:
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(
configs,
new StringDeserializer(),
new StringDeserializer()
);
KafkaTestUtils.consumerProps
te proporciona todo lo que necesitas para hacer la configuración.
El primer parámetro es el nombre de tu grupo de consumidores, el segundo una manera para establecer un auto commit y, el último parámetro es la instancia EmbeddedKafkaBroker
.
Después, puedes configurar tu consumidor con el Spring Wrapper DefaultKafkaConsumerFactory
.
Podríamos ahora seguir adelante y suscribir al consumidor a un tema. Dado que es el primer consumidor que se suscribe, el coordinador lo asignará automáticamente. En la entrada anterior del blog he mostrado dos opciones cómo manejar el periodo de espera. Después de revisar esos métodos, llegué a la conclusión que no me ha convencido ninguno de los métodos teniendo en cuenta lo siguiente:
-
Podríamos configurar nuestro consumidor para que empiece siempre desde el principio. Por lo tanto, necesitaríamos configurar la propiedad
AUTO_OFFSET_RESET_CONFIG
aearliest
. Sin embargo, esto no funciona en caso de que quieras ignorar los mensajes anteriores. -
Podemos llamar a
consumer.poll(0)
. Esta acción conllevaría una espera hasta que estemos suscritos, incluso con el timeout0
(primer parémetro). Sin embargo, este método está marcado como obsoleto en la versión 2.0 porque podría causar un bloqueo infinito.consumer.poll(0)
espera hasta que los metadatos sean actualizados sin tener en cuenta el timeout. El nuevo método que los sustituye se llamaconsumer.poll(Duration)
. Este método se supone que sólo debe esperar hasta el timeout hasta que se haya llevado a cabo la asignación. En la práctica, este método no siempre funciona como se espera, ya que a veces la actualización de los metadatos es muy rápida y se queda a la espera del primer mensaje.
Hoy en día la Documentación de prueba recomienda otro enfoque que nos permite esperar utilizando KafkaMessageListenerContainer
:
KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
BlockingQueue<ConsumerRecord<String, String>> records = new LinkedBlockingQueue<>();
container.setupMessageListener((MessageListener<String, String>) records::add);
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
Este contenedor tiene un receptor de mensajes y los escribe tan pronto como se reciben a una cola.
En la prueba podemos leer los registros de los consumidores de la cola y la cola se bloqueará hasta que recibamos el primer registro.
Al utilizar ContainerTestUtil.waitForAssignment
esperamos la asiganción inicial, ya que lo indicamos explícitamente.
De la misma misma manera tenemos que hacer un stop()
nuestro contenedor después para asegurarnos de que tenemos un contexto limpio en un escenario de múltiples pruebas.
Una configuración completa podría ser así:
@EmbeddedKafka
@ExtendWith(SpringExtension.class)
public class SimpleKafkaTest {
private static final String TOPIC = "domain-events";
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
BlockingQueue<ConsumerRecord<String, String>> records;
KafkaMessageListenerContainer<String, String> container;
@BeforeEach
void setUp() {
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer());
ContainerProperties containerProperties = new ContainerProperties(TOPIC);
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
records = new LinkedBlockingQueue<>();
container.setupMessageListener((MessageListener<String, String>) records::add);
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
}
@AfterEach
void tearDown() {
container.stop();
}
// our tests...
}
Configuración de Kafka Producer
Confirgurar Kafka Producer es aún más fácil que Kafka Consumer:
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();
En caso de que no tengas EmbeddedKafkaBroker
podrías utilizar también KafkaTestUtils.senderProps(String brokers)
para obtener propiedades actuales.
Producir y consumir mensajes
Como ahora tenemos un consumidor y un productor, podemos producir mensajes:
producer.send(new ProducerRecord<>(TOPIC, "my-aggregate-id", "my-test-value"));
producer.flush();
...y también consumir mensajes y hacer afirmaciones sobre ellos:
ConsumerRecord<String, String> singleRecord = records.poll(100, TimeUnit.MILLISECONDS);
assertThat(singleRecord).isNotNull();
assertThat(singleRecord.key()).isEqualTo("my-aggregate-id");
assertThat(singleRecord.value()).isEqualTo("my-test-value");
Serializar y deserializar la clave y el valor
Arriba puedes configurar tus serializadores En el caso de que tengas una herencia y tengas una parent class abstracta o una interfaz, tu implementación podría estar en el caso de prueba. En este caso obtendrás la siguiente excepción:
Caused by: java.lang.IllegalArgumentException: The class 'com.example.kafkatestsample.infrastructure.kafka.TestDomainEvent' is not in the trusted packages: [java.util, java.lang, com.example.kafkatestsample.event]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
Puedes resolverlo añadiendo un paquete específico o todos los paquetes a la lista de paquetes de confianza:
JsonDeserializer<DomainEvent> domainEventJsonDeserializer = new JsonDeserializer<>(DomainEvent.class);
domainEventJsonDeserializer.addTrustedPackages("*");
Mejorar el rendimiento de ejecución para pruebas múltiples
En caso de que tengamos múltiples tests, debemos iniciar y detener el Kafka Broker para cada prueba. Para mejorar este comportamiento, podemos usar una característica de JUnit 5 para decir que nos gustaría tener la misma instancia de clase.
Esto se puede hacer con la anotación @TestInstance(TestInstance.Lifecycle.PER_CLASS)
.
Podemos convertir nuestro @BeforeEach
y @AfterEach
a @BeforeAll
y @AfterAll
.
Lo único que debemos asegurar es que cada prueba de la clase consuma todos los mensajes que se producen en el mismo test.
El resultado será este:
@EmbeddedKafka
@ExtendWith(SpringExtension.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class SimpleKafkaTest {
private static final String TOPIC = "domain-events";
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
BlockingQueue<ConsumerRecord<String, String>> records;
KafkaMessageListenerContainer<String, String> container;
@BeforeAll
void setUp() {
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer());
ContainerProperties containerProperties = new ContainerProperties(TOPIC);
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
records = new LinkedBlockingQueue<>();
container.setupMessageListener((MessageListener<String, String>) records::add);
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
}
@AfterAll
void tearDown() {
container.stop();
}
// …
}
Conclusión
Es fácil probar una integración una vez que tienes la configuración funcionando.
El @EmbeddedKafka
proporciona una anotación útil para empezar.
Con el approach de JUnit 5 puedes llevar a cabo pruebas similares pcon el uso del contexto de Spring o sin él.
Dado que JUnit 5 nos permite especificar cómo se ejecuta la clase, podemos mejorar el rendimiento de ejecución para una clase fácilmente.
Una vez que se ejecuta Kafka integrado, hacen falta unos trucos como por ejemplo bootstrapping al consumidor y los addTrustedPackages
.
Puedes consultar el source code completo en el ejemplo de hacer test de Kafka con Spring Boot y JUnit 5 en este GitHub Gist.