Apache Flink con Kafka

19 de mayo de 2023

En este artículo te voy a explicar cómo configurar y usar Apache Flink utilizando Kafka como backend.

Introducción

Apache Flink es un sistema de procesamiento de datos distribuido de código abierto que permite la ejecución de flujos de trabajo en tiempo real y por lotes. Fue desarrollado por la Apache Software Foundation y se basa en un modelo de procesamiento de datos llamado streaming first, lo que significa que está diseñado para manejar grandes volúmenes de datos en tiempo real.

Flink proporciona dos APIs: Table API para operaciones en lenguaje sql y DataStream API para operaciones de filtrado, mapeo, agregación, ventana y unión. También ofrece una amplia gama de conectores de origen y destino, lo que lo hace compatible con una variedad de sistemas de almacenamiento y procesamiento de datos.

Flink es altamente escalable e incluye en cada release un operador Kubernetes para crear un cluster de alta disponibilidad.

Apache Flink y Kafka Streams

Flink fue concebido dede el inicio para trabajar tanto en modo streaming como en modo batch, tiene una forma de despliegue para cada modo. Además, tiene una interfaz ANSI SQL para consultar los streams y utiliza su mecanismo de watermarks y checkpoints para anotar el trabajo que va realizando.

Flink tiene muchas similitudes con Kafka Streams/KSQLDB: ambos están desarrollados en Scala, usan "Rocks DB" como base de datos embebida y operaciones de transformación DSL similares.

Aplicación de ejemplo de Apache Flink con Kafka

A continuación mostramos el procedimiento de cómo instalar un cluster Flink en local y desplegar una pipeline sencilla, que consume cadenas de texto del tópico Kafka flink_input y las devuelve transformadas en el tópico flink_output.

Paso 1: Instalación

Kafka

Para tener un entorno local he usado el docker-compose de Confluent Community:

https://github.com/confluentinc/cp-all-in-one/tree/7.3.0-post/cp-all-in-one

clonar el repo e iniciar kafka:

$ docker-compose up -d

Flink

Prerrequisitos:

Instalar jdk 11:

#en ubuntu:
$ apt-get install jdk11

Descargar la última versión de https://flink.apache.org/downloads/:

$ wget https://dlcdn.apache.org/flink/flink-1.17.0/flink-1.17.0-bin-scala_2.12.tgz  
$ tar zxvf flink-1.17.0-bin-scala_2.12.tgz

A partir de ahora, todas las rutas son relativas a la carpeta donde fue expandido el fichero tgz.

Paso 2: Configuración

Para iniciar, editar el fichero ./conf/flink.conf.yaml cambiar las líneas:

rest.port: 8089
rest.bind-address: 0.0.0.0

Esto es necesario para tener disponible la interfaz web de control de tareas/pipelines Flink.

Paso 3: Iniciar el cluster

Para iniciar o parar el cluster usar:

$ ./bin/start-cluster.sh
$ ./bin/stop-cluster.sh

La interfaz de control estará disponible en http://localhost:8089

Paso 4: Descargar ejemplo

Clonar el repo https://github.com/leonardotorresaltez/flink-kafka-pipeline

git clone https://github.com/leonardotorresaltez/flink-kafka-pipeline.git

Paso a explicar la clase principal que describe y ejecuta la pipeline FlinkPipeline.java

public class FlinkPipeline {

. . .
        //paso 1: obtener contexto de ejecucion Flink runtime
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        //paso 2: obtener origenes(sources) y destinos(sink o productor)
        KafkaSource<String> flinkKafkaConsumer = createStringConsumerForTopic(inputTopic, address, consumerGroup);
        KafkaSink<String> flinkKafkaProducer= createStringProducerForTopic(outputTopic, address);
        
        //paso 3: crear un stream a partir del topico origen
        DataStream<String> stringInputStream =environment.fromSource(flinkKafkaConsumer, WatermarkStrategy.noWatermarks(), "kafkasource");

        //paso 4: declarar la pipeline Flink, usar operacion map
        stringInputStream.map(new WordsCapitalizer()).sinkTo(flinkKafkaProducer);

        //paso 5: ejecutar la pipeline
        environment.execute();
. . .

 }

Paso 5: Empaquetar

Para empaquetar la aplicación de ejemplo, situarse en la misma carpeta del fichero pom.xml

$ mvn package

En este empaquetado, el maven-shade-plugin configurado en el pom.xml genera el fichero desplegable FlinkDataPipeline.jar

¿Qué hace el maven-shade-plugin?

Este se encarga de hacer un 'fat jar' esto es, incluye las librerías dependecias que configuremos en el pom.xml. Esto es necesario para que cuando despleguemos todo se encuentre en el classpath. Otra opción es poner los jar de dependencias en la carpeta ./lib de Flink.

Aquí se muestra la parte del pom.xml, donde se configura maven-shade-plugin:

 . . .

  <plugin>
		<groupId>org.apache.maven.plugins</groupId>
		<artifactId>maven-shade-plugin</artifactId>
		<executions>
			<execution>
				<id>FlinkDataPipeline</id>
				<phase>package</phase>
				<goals>
					<goal>shade</goal>
				</goals>
				<configuration>
				  <finalName>FlinkDataPipeline</finalName>
				  <artifactSet>
					  <includes>
						  <artifact>org.projectlombok:lombok</artifact>
						  <artifact>commons-configuration:commons-configuration</artifact>
						  <artifact>commons-lang:commons-lang</artifact>
						  <artifact>commons-logging:commons-logging</artifact>
					  </includes>
				  <artifactSet> 
        </configuration>
      </execution>
    </executions>
 . . .

Como vemos incluyo dependencias como loombok, commons-lang,etc

También he colocado otras en la carpeta lib de Flink, entre ellas, el conector Kafka:

$ ls ./lib
...
-rwxrwxrwx 1 leonardo leonardo    402930 Apr 21 16:00 flink-connector-kafka-1.17.0.jar
-rwxrwxrwx 1 leonardo leonardo   1815811 Apr 21 16:00 flink-core-1.17.0.jar

También está configurada la clase que tiene el método main en el fichero MANIFEST dentro del jar generado:

	<transformers>
		<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
		<mainClass>com.flinkexample.flink.FlinkPipeline</mainClass>
		</transformer>
	</transformers>

Paso 6: Desplegar aplicación

Una vez que el jar(nuestra pipeline) está generado se puede desplegar en el cluster Flink de dos formas:

Primer modo de despliegue:

$ ./bin/flink run \
      --detached \
      ./path/to/FlinkDataPipeline.jar

Segundo modo de despliegue: haciendo un upload del jar en la interfaz web : http://localhost:8089/#/submit

luego se puede ver si el job está en ejecución:

$ ./bin/flink list

Paso 7: Test

Para probar la aplicación solo hace falta producir algunos mensajes en el tópico de entrada.

#ejecutar dentro del contenedor kafka broker
$kafka-console-producer --bootstrap-server localhost:9092 --topic flink_input
> abc
> xyz

Paso 8 (opcional): SQL CLI

Para ejecutar la línea de comandos SQL:

$ ./bin/sql-client.sh

>
CREATE TABLE flink_output_stream (
  `value` STRING,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  'topic' = 'flink_output',
  'properties.bootstrap.servers' = 'localhost:49092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'raw'
);

>select * from flink_output_stream;
value ts
----- --
ABC   2023-05-19 12:43:02.482
XYZ   2023-05-19 12:43:04.811

Paso 9: Parar la aplicación

Para hacer stop del job usar:

$ ./bin/flink stop \
       --savepointPath /tmp/flink-savepoints \
       <job id>

Conlusión

Apache Flink es un proyecto open source muy activo que ofrece muchos conectores, que facilitan su integración con backends para hacer procesamiento de streaming de datos, entre ellos, Apache Kafka. Siendo una alternativa a Kafka Streams más agnóstica, pero a la vez menos pre-integrada.

Este es un ejemplo en local. Para una instalación productiva se requiere instalarlo en Kubernetes usando el operador Flink que gestiona la creación del cluster, y configurar todas las opciones de alta disponibilidad. Si tú también quieres empezar a utilizar Apache Flink utilizando Kafka como backend contáctanos y te podremos ayudar.

Sobre el autor: Leonardo Torres Altez

Especialista en el diseño y desarrollo de software middleware y microservicios, trabaja como Software Architect y Certified Apache Kafka Trainer.

Comments
Únete a nosotros