Testing de und integración de Apache Kafka dentro de una aplicación Spring Boot y JUnit

29 de marzo de 2020

Han pasado casi dos años desde que escribí mi primer test de integración para una aplicación de Kafka Spring Boot. Me llevó mucha investigación escribir esta primera prueba de integración y finalmente terminé escribiendo un blog post sobre testing Kafka con Spring Boot.

No había demasiada información sobre este tipo de tests y al final fue muy simple hacerlo, pero sin documentar. He visto mucha interacción y feedback con respecto a mi anterior entrada al blog y el GitHub Gist. Desde entonces spring-kafka-test cambió dos veces el patrón de uso y se introdujo JUnit 5. Eso significa que el código está ahora desactualizado y con eso, también la entrada del blog. Esta es la razón por la que decidí crear una versión revisada de la anterior entrada del blog.

Set up de tu proyecto

O bien utilizas gtu proyecto existente de Spring Boot o generas uno nuevo en start.spring.io start.spring.io. Cuando seleccionas Spring for Apache Kafka en start.spring.io añade automáticamente todas las entradas necesarias en el fichero de Maven o Gradle. A estas alturas también viene con JUnit 5, así que ya está listo para empezar. Sin embargo, en caso de que tengas un proyecto más antiguo, puede que necesites añadir la dependencia spring-kafka-test:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <scope>test</scope>
</dependency>

Configuración de clases

La forma más fácil de empezar una prueba es simplemente anotar la clase con @EmbeddedKafka. Esto te permite inyectar el EmbeddedKafkaBroker en tu método de test o en un método de configuración al principio.

@EmbeddedKafka
public class SimpleKafkaTest {

    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @BeforeEach
    void setUp(EmbeddedKafkaBroker embeddedKafkaBroker) {
        this.embeddedKafkaBroker = embeddedKafkaBroker;
    }

    // ...

}

Habrás notado que no hay ninguna anotación de Spring en esta clase. Sin anotarlo con @ExtendWith(SpringExtension.class) o con una extensión que lo implique (p.ej. @SpringBootTest) el test se ejecuta fuera del contexto de Spring y, por ejemplo las expresiones podrían no resolverse.

Hay un par de propiedades disponibles para influir en el comportamiento y el tamaño del nodo Kafka integrado inclueyendo las siguientes:

Configuración de Clase con Spring Context

Asumiendo que sería conveniente poder aprovechar también las ventajas de Spring Context, puede que haga falta añadir la anotación @SpringBootTest al caso de prueba anterior. Sin embargo, si no cambias Spring Context, tienes que cambiar también la manera de implementar EmbeddedKafkaBroker, de lo contrario verás el siguiente error:

org.junit.jupiter.api.extension.ParameterResolutionException: 
Failed to resolve parameter [org.springframework.kafka.test.EmbeddedKafkaBroker embeddedKafkaBroker] 
in method [void com.example.demo.SimpleKafkaTest.setUp(org.springframework.kafka.test.EmbeddedKafkaBroker)]: 
Could not find embedded broker instance

La resolución es muy simple: se trata solo de cambiar el autowiring de JUnit5 a la anotación @Autowired de Spring:

@EmbeddedKafka
@ExtendWith(SpringExtension.class)
public class SimpleKafkaTest {

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    // ...

}

Configuración de Kafka Consumer

Ahora puedes configurar tu consumidor o productor. Empecemos con el consumidor:

Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(
    configs, 
    new StringDeserializer(), 
    new StringDeserializer()
);

KafkaTestUtils.consumerProps te proporciona todo lo que necesitas para hacer la configuración. El primer parámetro es el nombre de tu grupo de consumidores, el segundo una manera para establecer un auto commit y, el último parámetro es la instancia EmbeddedKafkaBroker.

Después, puedes configurar tu consumidor con el Spring Wrapper DefaultKafkaConsumerFactory.

Podríamos ahora seguir adelante y suscribir al consumidor a un tema. Dado que es el primer consumidor que se suscribe, el coordinador lo asignará automáticamente. En la entrada anterior del blog he mostrado dos opciones cómo manejar el periodo de espera. Después de revisar esos métodos, llegué a la conclusión que no me ha convencido ninguno de los métodos teniendo en cuenta lo siguiente:

  1. Podríamos configurar nuestro consumidor para que empiece siempre desde el principio. Por lo tanto, necesitaríamos configurar la propiedad AUTO_OFFSET_RESET_CONFIG a earliest. Sin embargo, esto no funciona en caso de que quieras ignorar los mensajes anteriores.

  2. Podemos llamar a consumer.poll(0). Esta acción conllevaría una espera hasta que estemos suscritos, incluso con el timeout 0 (primer parémetro). Sin embargo, este método está marcado como obsoleto en la versión 2.0 porque podría causar un bloqueo infinito. consumer.poll(0) espera hasta que los metadatos sean actualizados sin tener en cuenta el timeout. El nuevo método que los sustituye se llama consumer.poll(Duration). Este método se supone que sólo debe esperar hasta el timeout hasta que se haya llevado a cabo la asignación. En la práctica, este método no siempre funciona como se espera, ya que a veces la actualización de los metadatos es muy rápida y se queda a la espera del primer mensaje.

Hoy en día la Documentación de prueba recomienda otro enfoque que nos permite esperar utilizando KafkaMessageListenerContainer:

KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
BlockingQueue<ConsumerRecord<String, String>> records = new LinkedBlockingQueue<>();
container.setupMessageListener((MessageListener<String, String>) records::add);
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());

Este contenedor tiene un receptor de mensajes y los escribe tan pronto como se reciben a una cola. En la prueba podemos leer los registros de los consumidores de la cola y la cola se bloqueará hasta que recibamos el primer registro. Al utilizar ContainerTestUtil.waitForAssignment esperamos la asiganción inicial, ya que lo indicamos explícitamente.

De la misma misma manera tenemos que hacer un stop() nuestro contenedor después para asegurarnos de que tenemos un contexto limpio en un escenario de múltiples pruebas. Una configuración completa podría ser así:

@EmbeddedKafka
@ExtendWith(SpringExtension.class)
public class SimpleKafkaTest {

    private static final String TOPIC = "domain-events";

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    BlockingQueue<ConsumerRecord<String, String>> records;

    KafkaMessageListenerContainer<String, String> container;

    @BeforeEach
    void setUp() {
        Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
        DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer());
        ContainerProperties containerProperties = new ContainerProperties(TOPIC);
        container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
        records = new LinkedBlockingQueue<>();
        container.setupMessageListener((MessageListener<String, String>) records::add);
        container.start();
        ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
    }

    @AfterEach
    void tearDown() {
        container.stop();
    }

    // our tests...
}

Configuración de Kafka Producer

Confirgurar Kafka Producer es aún más fácil que Kafka Consumer:

Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();

En caso de que no tengas EmbeddedKafkaBroker podrías utilizar también KafkaTestUtils.senderProps(String brokers) para obtener propiedades actuales.

Producir y consumir mensajes

Como ahora tenemos un consumidor y un productor, podemos producir mensajes:

producer.send(new ProducerRecord<>(TOPIC, "my-aggregate-id", "my-test-value"));
producer.flush();

...y también consumir mensajes y hacer afirmaciones sobre ellos:

ConsumerRecord<String, String> singleRecord = records.poll(100, TimeUnit.MILLISECONDS);
assertThat(singleRecord).isNotNull();
assertThat(singleRecord.key()).isEqualTo("my-aggregate-id");
assertThat(singleRecord.value()).isEqualTo("my-test-value");

Serializar y deserializar la clave y el valor

Arriba puedes configurar tus serializadores En el caso de que tengas una herencia y tengas una parent class abstracta o una interfaz, tu implementación podría estar en el caso de prueba. En este caso obtendrás la siguiente excepción:

Caused by: java.lang.IllegalArgumentException: The class 'com.example.kafkatestsample.infrastructure.kafka.TestDomainEvent' is not in the trusted packages: [java.util, java.lang, com.example.kafkatestsample.event]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).

Puedes resolverlo añadiendo un paquete específico o todos los paquetes a la lista de paquetes de confianza:

JsonDeserializer<DomainEvent> domainEventJsonDeserializer = new JsonDeserializer<>(DomainEvent.class);
domainEventJsonDeserializer.addTrustedPackages("*");

Mejorar el rendimiento de ejecución para pruebas múltiples

En caso de que tengamos múltiples tests, debemos iniciar y detener el Kafka Broker para cada prueba. Para mejorar este comportamiento, podemos usar una característica de JUnit 5 para decir que nos gustaría tener la misma instancia de clase. Esto se puede hacer con la anotación @TestInstance(TestInstance.Lifecycle.PER_CLASS). Podemos convertir nuestro @BeforeEach y @AfterEach a @BeforeAll y @AfterAll. Lo único que debemos asegurar es que cada prueba de la clase consuma todos los mensajes que se producen en el mismo test. El resultado será este:

@EmbeddedKafka
@ExtendWith(SpringExtension.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class SimpleKafkaTest {

    private static final String TOPIC = "domain-events";

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    BlockingQueue<ConsumerRecord<String, String>> records;

    KafkaMessageListenerContainer<String, String> container;

    @BeforeAll
    void setUp() {
        Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
        DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer());
        ContainerProperties containerProperties = new ContainerProperties(TOPIC);
        container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
        records = new LinkedBlockingQueue<>();
        container.setupMessageListener((MessageListener<String, String>) records::add);
        container.start();
        ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
    }

    @AfterAll
    void tearDown() {
        container.stop();
    }

    // …
}

Conclusión

Es fácil probar una integración una vez que tienes la configuración funcionando. El @EmbeddedKafka proporciona una anotación útil para empezar. Con el approach de JUnit 5 puedes llevar a cabo pruebas similares pcon el uso del contexto de Spring o sin él. Dado que JUnit 5 nos permite especificar cómo se ejecuta la clase, podemos mejorar el rendimiento de ejecución para una clase fácilmente. Una vez que se ejecuta Kafka integrado, hacen falta unos trucos como por ejemplo bootstrapping al consumidor y los addTrustedPackages. Puedes consultar el source code completo en el ejemplo de hacer test de Kafka con Spring Boot y JUnit 5 en este GitHub Gist.

Sobre el autor: Valentin Zickner

Desde el año 2016 trabaja como Software Engineer en mimacom. Valentin es experto en tecnologías Cloud, Spring, Elasticsearch y Flowable.

Comments
Únete a nosotros