Apache Flink con Kafka
En este artículo te voy a explicar cómo configurar y usar Apache Flink utilizando Kafka como backend.
Introducción
Apache Flink es un sistema de procesamiento de datos distribuido de código abierto que permite la ejecución de flujos de trabajo en tiempo real y por lotes. Fue desarrollado por la Apache Software Foundation y se basa en un modelo de procesamiento de datos llamado streaming first, lo que significa que está diseñado para manejar grandes volúmenes de datos en tiempo real.
Flink proporciona dos APIs: Table API para operaciones en lenguaje sql y DataStream API para operaciones de filtrado, mapeo, agregación, ventana y unión. También ofrece una amplia gama de conectores de origen y destino, lo que lo hace compatible con una variedad de sistemas de almacenamiento y procesamiento de datos.
Flink es altamente escalable e incluye en cada release un operador Kubernetes para crear un cluster de alta disponibilidad.
Apache Flink y Kafka Streams
Flink fue concebido dede el inicio para trabajar tanto en modo streaming como en modo batch, tiene una forma de despliegue para cada modo. Además, tiene una interfaz ANSI SQL para consultar los streams y utiliza su mecanismo de watermarks y checkpoints para anotar el trabajo que va realizando.
Flink tiene muchas similitudes con Kafka Streams/KSQLDB: ambos están desarrollados en Scala, usan "Rocks DB" como base de datos embebida y operaciones de transformación DSL similares.
Aplicación de ejemplo de Apache Flink con Kafka
A continuación mostramos el procedimiento de cómo instalar un cluster Flink en local y desplegar una pipeline sencilla, que consume cadenas de texto del tópico Kafka flink_input y las devuelve transformadas en el tópico flink_output.
Paso 1: Instalación
Kafka
Para tener un entorno local he usado el docker-compose de Confluent Community:
https://github.com/confluentinc/cp-all-in-one/tree/7.3.0-post/cp-all-in-one
clonar el repo e iniciar kafka:
$ docker-compose up -d
Flink
Prerrequisitos:
Instalar jdk 11:
#en ubuntu:
$ apt-get install jdk11
Descargar la última versión de https://flink.apache.org/downloads/:
$ wget https://dlcdn.apache.org/flink/flink-1.17.0/flink-1.17.0-bin-scala_2.12.tgz
$ tar zxvf flink-1.17.0-bin-scala_2.12.tgz
A partir de ahora, todas las rutas son relativas a la carpeta donde fue expandido el fichero tgz.
Paso 2: Configuración
Para iniciar, editar el fichero ./conf/flink.conf.yaml cambiar las líneas:
rest.port: 8089
rest.bind-address: 0.0.0.0
Esto es necesario para tener disponible la interfaz web de control de tareas/pipelines Flink.
Paso 3: Iniciar el cluster
Para iniciar o parar el cluster usar:
$ ./bin/start-cluster.sh
$ ./bin/stop-cluster.sh
La interfaz de control estará disponible en http://localhost:8089
Paso 4: Descargar ejemplo
Clonar el repo https://github.com/leonardotorresaltez/flink-kafka-pipeline
git clone https://github.com/leonardotorresaltez/flink-kafka-pipeline.git
Paso a explicar la clase principal que describe y ejecuta la pipeline FlinkPipeline.java
public class FlinkPipeline {
. . .
//paso 1: obtener contexto de ejecucion Flink runtime
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
//paso 2: obtener origenes(sources) y destinos(sink o productor)
KafkaSource<String> flinkKafkaConsumer = createStringConsumerForTopic(inputTopic, address, consumerGroup);
KafkaSink<String> flinkKafkaProducer= createStringProducerForTopic(outputTopic, address);
//paso 3: crear un stream a partir del topico origen
DataStream<String> stringInputStream =environment.fromSource(flinkKafkaConsumer, WatermarkStrategy.noWatermarks(), "kafkasource");
//paso 4: declarar la pipeline Flink, usar operacion map
stringInputStream.map(new WordsCapitalizer()).sinkTo(flinkKafkaProducer);
//paso 5: ejecutar la pipeline
environment.execute();
. . .
}
Paso 5: Empaquetar
Para empaquetar la aplicación de ejemplo, situarse en la misma carpeta del fichero pom.xml
$ mvn package
En este empaquetado, el maven-shade-plugin configurado en el pom.xml genera el fichero desplegable FlinkDataPipeline.jar
¿Qué hace el maven-shade-plugin?
Este se encarga de hacer un 'fat jar' esto es, incluye las librerías dependecias que configuremos en el pom.xml. Esto es necesario para que cuando despleguemos todo se encuentre en el classpath. Otra opción es poner los jar de dependencias en la carpeta ./lib de Flink.
Aquí se muestra la parte del pom.xml, donde se configura maven-shade-plugin:
. . .
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>FlinkDataPipeline</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>FlinkDataPipeline</finalName>
<artifactSet>
<includes>
<artifact>org.projectlombok:lombok</artifact>
<artifact>commons-configuration:commons-configuration</artifact>
<artifact>commons-lang:commons-lang</artifact>
<artifact>commons-logging:commons-logging</artifact>
</includes>
<artifactSet>
</configuration>
</execution>
</executions>
. . .
Como vemos incluyo dependencias como loombok, commons-lang,etc
También he colocado otras en la carpeta lib de Flink, entre ellas, el conector Kafka:
$ ls ./lib
...
-rwxrwxrwx 1 leonardo leonardo 402930 Apr 21 16:00 flink-connector-kafka-1.17.0.jar
-rwxrwxrwx 1 leonardo leonardo 1815811 Apr 21 16:00 flink-core-1.17.0.jar
También está configurada la clase que tiene el método main en el fichero MANIFEST dentro del jar generado:
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.flinkexample.flink.FlinkPipeline</mainClass>
</transformer>
</transformers>
Paso 6: Desplegar aplicación
Una vez que el jar(nuestra pipeline) está generado se puede desplegar en el cluster Flink de dos formas:
Primer modo de despliegue:
$ ./bin/flink run \
--detached \
./path/to/FlinkDataPipeline.jar
Segundo modo de despliegue: haciendo un upload del jar en la interfaz web : http://localhost:8089/#/submit
luego se puede ver si el job está en ejecución:
$ ./bin/flink list
Paso 7: Test
Para probar la aplicación solo hace falta producir algunos mensajes en el tópico de entrada.
#ejecutar dentro del contenedor kafka broker
$kafka-console-producer --bootstrap-server localhost:9092 --topic flink_input
> abc
> xyz
Paso 8 (opcional): SQL CLI
Para ejecutar la línea de comandos SQL:
$ ./bin/sql-client.sh
>
CREATE TABLE flink_output_stream (
`value` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'flink_output',
'properties.bootstrap.servers' = 'localhost:49092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'raw'
);
>select * from flink_output_stream;
value ts
----- --
ABC 2023-05-19 12:43:02.482
XYZ 2023-05-19 12:43:04.811
Paso 9: Parar la aplicación
Para hacer stop del job usar:
$ ./bin/flink stop \
--savepointPath /tmp/flink-savepoints \
<job id>
Conlusión
Apache Flink es un proyecto open source muy activo que ofrece muchos conectores, que facilitan su integración con backends para hacer procesamiento de streaming de datos, entre ellos, Apache Kafka. Siendo una alternativa a Kafka Streams más agnóstica, pero a la vez menos pre-integrada.
Este es un ejemplo en local. Para una instalación productiva se requiere instalarlo en Kubernetes usando el operador Flink que gestiona la creación del cluster, y configurar todas las opciones de alta disponibilidad. Si tú también quieres empezar a utilizar Apache Flink utilizando Kafka como backend contáctanos y te podremos ayudar.