Spring Cloud Bus con Kafka
En este artículo voy a explicar cómo configurar un bus de mensajería asíncrona entre microservicios Spring Boot llamado Spring Cloud Bus.
Introducción
Spring Cloud Bus – es parte del proyecto Spring Cloud, y usualmente se utiliza para actualizar cambios de configuración en los microservicios Spring Boot enviándoles a estos mensajes de actualización de forma asíncrona mediante Spring Cloud Config y brokers como Active MQ o Kafka. Kafka es un middleware de mensajería con topología usual publicador y suscriptor de tópicos.
Para desarrolladores familiarizados con Spring Boot es conocido que las "properties" de configuración de un microservicio se pueden refrescar en runtime: basta con hacer una llamada al endpoint /actuator/refresh para que todos los beans anotados con @RefreshScope se actualicen con los nuevos valores.
(Usaremos la palabra micro para referirnos a un microservicio Spring Boot).
Para ello, los pasos son los siguientes:
Paso 1: Se cambia el valor mi_propiedad=valor_nuevo en el fichero application.properties (o .yml) o en el config server (servidor centralizado de configuraciones Spring Cloud Config)
Paso 2: Se invoca el endpoint http://localhost:8012/actuator/refresh y el micro recupera nuevamente las propiedades del config server y refresca los beans anotados con @RefreshScope
¿Qué ganamos usando Spring Cloud Bus?
Veamos la diferencia entre refrescar propiedades sin usar Spring Cloud Bus y refrescar usándolo:
Refrescar propiedades sin Spring Cloud Bus:
- La propiedad es refrescada en el micro sin necesidad de reinicio. Afecta únicamente a este.
- La invocación al endpoint de refresh es síncrona.
- Es necesario saber el url de cada micro http://
/endpoint/refresh. - Siempre hablamos de propiedades.
Refrescar propiedades con Spring Cloud Bus:
- La configuración es refrescada en un micro o en un grupo de ellos.
- La invocación al endpoint de refresh es asíncrona, con posibilidad de recibir posteriormente un ACK del(los) micros refrescados.
- No es necesario saber el url de cada micro/endpoint a refrescar. Podemos usar la url del config server. Los microservicios suscritos/interesados en ese evento ejecutan el refresco.
- Se puede extender el api para enviar cualquier tipo de mensajes y no solo de configuración.
Cómo funciona Spring Cloud Bus
En este esquema puedes ver los distintos pasos en el funcionamiento de Spring Cloud Bus.
Cada paso queda explicado más abajo:
Paso 1 El usuario sube un cambio de alguna propiedad al repositorio git de Spring Cloud Config.
Paso 2 (opcional) El servidor git dispara un hook debido al cambio, invoca al url /actuator/busrefresh del config-server. Si no se configura este hook también se puede disparar esta acción de forma controlada, por ejemplo, mediante una pipeline de integración continua. Opcionalmente se puede especificar en el url el destino o grupo de microservicios que procesarán el mensaje de cambio de configuración. Ejemplo: http://localhost/busrefresh/app1:dev:*.
Paso 3 El config-server produce un mensaje de “refresh” en un tópico Kafka.
Paso 4 Todos los microservicios reciben este mensaje del tópico y ejecutan la operación refresh. Esto es, cada config-server-client vuelve a pedir los datos del config-server para este microservicio y refresca todos los beans anotados con @RefreshScope.
Paso 5 (opcional) Los microservicios pueden opcionalmente enviar el ack de confirmación de mensaje de refresco recibido. Posteriormente es posible consultar el config-server vía API REST la lista de los acks recibidos de cada micro.
Monitoreo
Se pueden ver los eventos de refresco enviados y los acks recibidos en la siguiente url /trace. Para esto activar la propiedad
spring.cloud.bus.trace.enable=true
Preguntas y respuestas
Normalmente surgen el mismo tipo de dudas sobre este tema. Por eso recojo a continuación algunas preguntas que nos solemos hacer y sus respuestas:
¿Quiénes son los productores y consumidores de los mensajes de refresco? El mensaje se puede producir desde cualquier micro y es consumido por todos o un grupo de ellos. Por una cuestion de orden podemos lanzar las peticiones de refresco siempre desde el micro Spring Cloud Config server.
¿Cómo se define qué grupo de microservicios consume del mensaje de refresco? Para esto se usa la última parte del url que se refiere al target Ejemplo: http://localhost/busrefresh/app1:dev:*.
Cada instancia de micro tiene una propiedad única: spring.cloud.bus.id . Esta por defecto es: spring.application.name: spring.application.index:
spring.cloud.bus.id=<centro de datos>:<tipo de micro>:<nombre de micro>:<versión de micro>: <random uuid>
¿A qué tópico y bróker se envían los mensajes de refresco? El tópico por defecto es springCloudBus. Se puede modificar. El bróker(s) se elige con la propiedad:
spring.cloud.stream.kafka.binder.brokers: <broker>:<puerto>
¿Cómo se configuran las acks? Las acks son las respuestas de los micros cuando han recibido el mensaje de refresco (no confundir con las acks de Kafka que se refieren a cuando el mensaje ha sido producido en el tópico). En este caso se refiere a cuando un micro ha recibido un mensaje de refresco que es de su interés. La siguiente propiedad debe estar configurada:
spring.cloud.bus.ack.enabled=true
Código fuente demo
A continuación el codigo fuente de la parte cliente Spring Cloud Config (un micro cualquiera my-config-client) y el micro servidor Spring Cloud Config (my-config-server):
Código fuente servidor: https://github.com/leonardotorresaltez/my-config-server
Código fuente cliente: https://github.com/leonardotorresaltez/my-config-client
Para configurar el servidor:
Dependencias para activar spring cloud bus
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>
Propiedades en application.yml
spring.cloud.bus.enabled: true
spring.cloud.stream.kafka.binder.brokers: localhost:9092
Para configurar el cliente:
Dependencias para activar spring cloud bus
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>
Propiedades en application.yml
spring:
cloud:
bus:
enabled: true
refresh:
enabled: true
spring.cloud.stream.kafka.binder.brokers: localhost:9092
Test de la demo
A continuación os dejo los pasos de cómo probar el código fuente del apartado anterior:
1 Iniciar el servidor my-config-server, basado en spring cloud config Iniciar el cliente my-config-client. Solo tiene un controlador y endpoint /getpropertyvalue
2 Antes de cambiar la propiedad consultar valor en el config-client
GET http ://localhost:8080/getpropertyvalue/mipropiedad
>>> valororiginal
3 Cambiar propiedad en fichero config/application.yml en el config-server
mipropiedad=nuevoValor
4 Después de cambiar la propiedad, enviar mensaje refresco al config-server
POST http ://localhost:8888/busrefresh
5 Consultar valor en el config-client
GET http ://localhost:8080/getpropertyvalue/mipropiedad
>>> nuevoValor
Conclusión
Spring Cloud Bus es un api que funciona sobre infraestructura de brokers asíncronos que nos ayuda a mantener nuestros microservicios con la configuración actualizada, además de ser muy versátil: podemos dirigir estos mensajes de cambios de configuración u otros tipos de mensajes custom hacia todos los microservicios o un grupo de ellos. En este caso usamos Kafka como message broker.
Si te interesa cómo tunear los elementos Kafka (tópico/productores/consumidores) para este caso de uso háznoslo saber :).