Spring Cloud Bus con Kafka

23 de junio de 2022

En este artículo voy a explicar cómo configurar un bus de mensajería asíncrona entre microservicios Spring Boot llamado Spring Cloud Bus.

Introducción

Spring Cloud Bus – es parte del proyecto Spring Cloud, y usualmente se utiliza para actualizar cambios de configuración en los microservicios Spring Boot enviándoles a estos mensajes de actualización de forma asíncrona mediante Spring Cloud Config y brokers como Active MQ o Kafka. Kafka es un middleware de mensajería con topología usual publicador y suscriptor de tópicos.

Para desarrolladores familiarizados con Spring Boot es conocido que las "properties" de configuración de un microservicio se pueden refrescar en runtime: basta con hacer una llamada al endpoint /actuator/refresh para que todos los beans anotados con @RefreshScope se actualicen con los nuevos valores.

(Usaremos la palabra micro para referirnos a un microservicio Spring Boot).

Para ello, los pasos son los siguientes:

Paso 1: Se cambia el valor mi_propiedad=valor_nuevo en el fichero application.properties (o .yml) o en el config server (servidor centralizado de configuraciones Spring Cloud Config)

Paso 2: Se invoca el endpoint http://localhost:8012/actuator/refresh y el micro recupera nuevamente las propiedades del config server y refresca los beans anotados con @RefreshScope

¿Qué ganamos usando Spring Cloud Bus?

Veamos la diferencia entre refrescar propiedades sin usar Spring Cloud Bus y refrescar usándolo:

Refrescar propiedades sin Spring Cloud Bus:

Refrescar propiedades con Spring Cloud Bus:

Cómo funciona Spring Cloud Bus

En este esquema puedes ver los distintos pasos en el funcionamiento de Spring Cloud Bus.

Cada paso queda explicado más abajo:

Paso 1 El usuario sube un cambio de alguna propiedad al repositorio git de Spring Cloud Config.

Paso 2 (opcional) El servidor git dispara un hook debido al cambio, invoca al url /actuator/busrefresh del config-server. Si no se configura este hook también se puede disparar esta acción de forma controlada, por ejemplo, mediante una pipeline de integración continua. Opcionalmente se puede especificar en el url el destino o grupo de microservicios que procesarán el mensaje de cambio de configuración. Ejemplo: http://localhost/busrefresh/app1:dev:*.

Paso 3 El config-server produce un mensaje de “refresh” en un tópico Kafka.

Paso 4 Todos los microservicios reciben este mensaje del tópico y ejecutan la operación refresh. Esto es, cada config-server-client vuelve a pedir los datos del config-server para este microservicio y refresca todos los beans anotados con @RefreshScope.

Paso 5 (opcional) Los microservicios pueden opcionalmente enviar el ack de confirmación de mensaje de refresco recibido. Posteriormente es posible consultar el config-server vía API REST la lista de los acks recibidos de cada micro.

Monitoreo

Se pueden ver los eventos de refresco enviados y los acks recibidos en la siguiente url /trace. Para esto activar la propiedad

spring.cloud.bus.trace.enable=true 

Preguntas y respuestas

Normalmente surgen el mismo tipo de dudas sobre este tema. Por eso recojo a continuación algunas preguntas que nos solemos hacer y sus respuestas:

¿Quiénes son los productores y consumidores de los mensajes de refresco? El mensaje se puede producir desde cualquier micro y es consumido por todos o un grupo de ellos. Por una cuestion de orden podemos lanzar las peticiones de refresco siempre desde el micro Spring Cloud Config server.

¿Cómo se define qué grupo de microservicios consume del mensaje de refresco? Para esto se usa la última parte del url que se refiere al target Ejemplo: http://localhost/busrefresh/app1:dev:*.

Cada instancia de micro tiene una propiedad única: spring.cloud.bus.id . Esta por defecto es: spring.application.name: spring.application.index: . Lo ideal es usar identificadores de grupos mayores a menores. Por ejemplo:

spring.cloud.bus.id=<centro de datos>:<tipo de micro>:<nombre de micro>:<versión de micro>: <random uuid>

¿A qué tópico y bróker se envían los mensajes de refresco? El tópico por defecto es springCloudBus. Se puede modificar. El bróker(s) se elige con la propiedad:

spring.cloud.stream.kafka.binder.brokers: <broker>:<puerto>

¿Cómo se configuran las acks? Las acks son las respuestas de los micros cuando han recibido el mensaje de refresco (no confundir con las acks de Kafka que se refieren a cuando el mensaje ha sido producido en el tópico). En este caso se refiere a cuando un micro ha recibido un mensaje de refresco que es de su interés. La siguiente propiedad debe estar configurada:

spring.cloud.bus.ack.enabled=true

Código fuente demo

A continuación el codigo fuente de la parte cliente Spring Cloud Config (un micro cualquiera my-config-client) y el micro servidor Spring Cloud Config (my-config-server):

Código fuente servidor: https://github.com/leonardotorresaltez/my-config-server

Código fuente cliente: https://github.com/leonardotorresaltez/my-config-client

Para configurar el servidor:

Dependencias para activar spring cloud bus

<dependency>
		    <groupId>org.springframework.cloud</groupId>
		    <artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>

Propiedades en application.yml

spring.cloud.bus.enabled: true
spring.cloud.stream.kafka.binder.brokers: localhost:9092

Para configurar el cliente:

Dependencias para activar spring cloud bus

<dependency>
		    <groupId>org.springframework.cloud</groupId>
		    <artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>

Propiedades en application.yml

spring:
  cloud:
    bus:
      enabled: true
      refresh:
        enabled: true
    
spring.cloud.stream.kafka.binder.brokers: localhost:9092

Test de la demo

A continuación os dejo los pasos de cómo probar el código fuente del apartado anterior:

1 Iniciar el servidor my-config-server, basado en spring cloud config Iniciar el cliente my-config-client. Solo tiene un controlador y endpoint /getpropertyvalue

2 Antes de cambiar la propiedad consultar valor en el config-client

GET  http ://localhost:8080/getpropertyvalue/mipropiedad
>>> valororiginal

3 Cambiar propiedad en fichero config/application.yml en el config-server

mipropiedad=nuevoValor

4 Después de cambiar la propiedad, enviar mensaje refresco al config-server

POST http ://localhost:8888/busrefresh

5 Consultar valor en el config-client

GET  http ://localhost:8080/getpropertyvalue/mipropiedad
>>> nuevoValor

Conclusión

Spring Cloud Bus es un api que funciona sobre infraestructura de brokers asíncronos que nos ayuda a mantener nuestros microservicios con la configuración actualizada, además de ser muy versátil: podemos dirigir estos mensajes de cambios de configuración u otros tipos de mensajes custom hacia todos los microservicios o un grupo de ellos. En este caso usamos Kafka como message broker.

Si te interesa cómo tunear los elementos Kafka (tópico/productores/consumidores) para este caso de uso háznoslo saber :).

Sobre el autor: Leonardo Torres Altez

Especialista en el diseño y desarrollo de software middleware y microservicios, trabaja como Software Architect y Certified Apache Kafka Trainer.

Comments
Únete a nosotros