Cómo crear una tarea de tratamiento de datos con Apache Beam – Canalización en streaming

Cómo crear una tarea de tratamiento de datos con Apache Beam – Canalización en streaming

  • Alexey Romanenko
    Alexey Romanenko is Open Source Engineer in Talend (France) with more than 15 years of experience in software development. During his career, he has been working on different projects, like high-load web services, web search engines and cloud storage. Also, he developed and presented a course devoted to Hadoop/Cloud technologies for students. Recently, he joined the Apache Beam project as a new contributor. He spends his spare time with his family and he likes to play ice hockey.

En nuestra última entrada del blog hablamos sobre cómo crear tareas de tratamiento de datos con Apache Beam. En esta ocasión vamos a hablar de una de las cosas más buscadas del mundo moderno de los big data hoy en día: el tratamiento de datos en streaming.

La principal diferencia entre los lotes y el streaming es el tipo de fuente de datos de entrada. Cuando su conjunto de datos es limitado (aunque en términos de tamaño sea enorme) y no se actualiza durante el tratamiento, lo más probable es que utilice una canalización por lotes. En este caso, la fuente de entrada pueden ser desde archivos, tablas de bases de datos, objetos de almacenes de objetos, etc. Me gustaría volver a subrayar que, cuando se trabaja por lotes, damos por hecho que los datos no son mutables durante todo el tiempo de procesamiento y que el número de registros de entrada es constante. ¿Por qué centrarnos en esto? Pues porque incluso con archivos podemos tener un flujo de datos ilimitado cuando se están añadiendo o cambiando archivos constantemente. Es este caso debemos aplicar un enfoque en streaming para operar con los datos. Así pues, si sabemos que nuestros datos son limitados e inmutables, necesitamos crear una canalización por lotes.

La cosa se complica cuando nuestro conjunto de datos es ilimitado (no paran de llegar) o mutable. Algunos de los ejemplos de este tipo de fuente podrían ser: sistemas de mensajería (como Apache Kafka), archivos nuevos de un directorio (registros de un servidor web) o cualquier otro sistema que recabe datos en tiempo real (como los sensores de IoT). El común denominador de todas estas fuentes es que siempre tenemos que esperar a que lleguen los datos nuevos. Claro que podemos dividir nuestros datos en lotes (por tiempo o tamaño de los datos) y tratar cada división por lote, pero costaría mucho aplicar algunas funciones en todos los conjuntos de datos consumidos y crear toda la canalización para ello. Por suerte, existen varios motores de streaming que nos permiten hacer frente a este tipo de tratamiento de datos fácilmente: Apache Spark, Apache Flink, Apache Apex, Google DataFlow. Todos son compatibles con Apache Beam y podemos ejecutar la misma canalización en distintos motores sin tener que modificar el código. Además podemos emplear la misma canalización por lotes o en streaming con unos cambios mínimos (basta con ajustar correctamente la fuente de entrada y listos), que todo funcionará tal y como nos llega. ¡Pura magia! Hace un tiempo me hubiera parecido un sueño, cuando me tocaba reescribir mis tareas por lotes a streaming.

Bueno, suficiente teoría: es momento de centrarnos en un ejemplo y escribir nuestro primer código en streaming. Vamos a leer algunos datos de Kafka (fuente ilimitada), realizar un simple procesamiento de los datos y mandar los resultados escritos de vuelta a Kafka también.

Pongamos que tengo un flujo ilimitado de coordenadas de GPS (X y Y) de algunos objetos en un mapa (para este ejemplo imaginemos que se trata de coches) que llega en tiempo real y queremos seleccionar tan solo los que están ubicados en una zona determinada. Dicho de otra manera, tenemos que consumir datos de texto de un tema de Kafka, analizarlos, filtrarlos según unos límites concretos y volver a escribirlos a otro tema de Kafka. Veamos cómo podemos hacerlo con ayuda de Apache Beam.

Todos los mensajes de Kafka contienen datos de texto en este formato:
id,x,y

en el que:
id: identificador único del objeto,
x, y: coordenadas en el mapa (enteros).

Tendremos que ocuparnos del formato si no es válido y saltar esos registros.

Crear una canalización

Al igual que en la entrada anterior del blog, en la que realizamos un procesamiento por lotes, creamos una canalización de la misma manera:

Pipeline pipeline = Pipeline.create(options);

Podemos ampliar el objeto Options para transferir opciones de línea de comandos a la canalización. Pueden consultar todo el ejemplo en Github si desean obtener más información.

Luego tenemos que leer datos del tema de entrada de Kafka. Como he comentado anteriormente, Apache Beam ya proporciona varios conectores de I/O y KafkaIO es uno de ellos. Por lo tanto, creamos una nueva PTransform ilimitada que consume los mensajes que llegan de un tema de Kafka específico y los propaga hasta el próximo paso:

pipeline.apply(
    KafkaIO.<Long, String>read()
        .withBootstrapServers(options.getBootstrap())
        .withTopic(options.getInputTopic())
        .withKeyDeserializer(LongDeserializer.class)
        .withValueDeserializer(StringDeserializer.class))

Por defecto, KafkaIO encapsula todos los mensajes consumidos en un objeto KafkaRecord. Aun así, la próxima transformación se limita a recuperar una carga útil (cadena de texto) a través del objeto DoFn de reciente creación:

.apply(
    ParDo.of(
        new DoFn<KafkaRecord<Long, String>, String>() {
            @ProcessElement
            public void processElement(ProcessContext processContext) {
                KafkaRecord<Long, String> record = processContext.element();
                processContext.output(record.getKV().getValue());
            }
        }
    )
)

Después de este paso, es hora de filtrar los registros (consulte la tarea inicial mencionada más arriba), pero antes tenemos que analizar nuestra cadena de texto según el formato definido. Esto permite encapsularlos en un objeto funcional que luego será utilizado por la transformación interna de Beam Filter.

.apply(
    "FilterValidCoords",
    Filter.by(new FilterObjectsByCoordinates(
        options.getCoordX(), options.getCoordY()))
)

A continuación tenemos que preparar mensajes filtrados para escribirlos y devolverlos a Kafka creando un nuevo par de clave/valor con la categoría interna de Beam KV, que puede emplearse en distintos conectores de I/O, entre los cuales se cuenta KafkaIO.

.apply(
    "ExtractPayload",
    ParDo.of(
        new DoFn<String, KV<String, String>>() {
           @ProcessElement
           public void processElement(ProcessContext c) throws Exception {
                c.output(KV.of("filtered", c.element()));
           }
        }
    )
)

La última transformación la necesitamos para escribir mensajes en Kafka, así que sencillamente utilizaremos KafkaIO.write() (implementación Sink) con esa finalidad. En cuanto a la lectura, tenemos que configurar esa transformación con algunas opciones necesarias, como los servidores Bootstrap de Kafka, el nombre de tema de salida o los serializadores para clave/valor.

.apply(
    "WriteToKafka",
    KafkaIO.<String, String>write()
        .withBootstrapServers(options.getBootstrap())
        .withTopic(options.getOutputTopic())
        .withKeySerializer(org.apache.kafka.common.serialization.StringSerializer.class)
        .withValueSerializer(org.apache.kafka.common.serialization.StringSerializer.class)
);

Por último, ejecutamos nuestra canalización como siempre:

pipeline.run();

En esta ocasión puede parecer un tanto más complicado que en el artículo anterior, pero como se puede observar fácilmente no hemos tenido que hacer nada muy específico para que nuestra canalización sea compatible con el streaming. Toda la responsabilidad recae en la implementación del modelo de datos de Apache Beam, que facilita considerablemente la alternancia entre el procesamiento por lotes y en streaming para los usuarios de Beam.

Crear y ejecutar una canalización

Añadamos las dependencias necesarias para que sea posible usar Beam KafkaIO:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-kafka</artifactId>
  <version>2.4.0</version>
</dependency>

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>1.1.0</version>
</dependency>

Después basta con crear un .JAR y ejecutarlo con DirectRunner para comprobar cómo funciona:

# mvn clean package
# mvn exec:java -Dexec.mainClass=org.apache.beam.tutorial.analytic.FilterObjects -Pdirect-runner -Dexec.args="--runner=DirectRunner"

Si hace falta, podemos añadir otros argumentos utilizados en la canalización con ayuda de la opción «exec.args». Además, asegúrese de que sus servidores de Kafka estén disponibles y correctamente especificados antes de ejecutar la canalización de Beam. Por último, el comando Maven inicializará una canalización y la ejecutará eternamente hasta que se la detenga manualmente (opcionalmente, se puede indicar un tiempo de ejecución máximo). Esto significa que los datos se tratarán de forma continua, en streaming.

Como siempre, todo el código de este ejemplo está publicado en este repositorio de GitHub.

¡Feliz streaming!

Join The Conversation

0 Comments

Deja un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *