Cómo crear una tarea de tratamiento de datos con Apache Beam

Cómo crear una tarea de tratamiento de datos con Apache Beam

  • Alexey Romanenko
    Alexey Romanenko is Open Source Engineer in Talend (France) with more than 15 years of experience in software development. During his career, he has been working on different projects, like high-load web services, web search engines and cloud storage. Also, he developed and presented a course devoted to Hadoop/Cloud technologies for students. Recently, he joined the Apache Beam project as a new contributor. He spends his spare time with his family and he likes to play ice hockey.

Esta publicación es la 1.ª parte de una serie de entradas de blog dedicadas a Apache Beam.

¿Conoce Apache Beam? Si no es el caso, no se avergüence, puesto que Apache Beam es uno de los últimos proyectos desarrollados por la Apache Software Foundation y fue presentado por primera vez en junio de 2016, por lo que sigue siendo relativamente nuevo en el mundo del tratamiento de datos. En realidad, yo empecé a trabajar a menudo con Apache Beam hace poco y fue entonces cuando descubrí todo cuanto ofrecía y desde aquel día me encantó.

Apache Beam es un modelo de programación unificada que ofrece una forma fácil de implementar tareas de tratamiento de datos por lotes y en streaming y de ejecutarlas en cualquier motor de ejecución con una serie de IO distintas. ¿Suena prometedor, pero le sigue pareciendo confuso? Este el motivo por el que decidí publicar una serie de entradas de blog dedicadas a Apache Beam. En esta y en las siguientes entradas daré varios ejemplos concretos y destacaré determinados casos prácticos de tareas de tratamiento de datos mediante Apache Beam.

El tema de hoy es el procesamiento por lotes. Veamos el siguiente ejemplo: Usted trabaja en un concesionario de automóviles y desea analizar las ventas de vehículos a lo largo de un cierto periodo de tiempo (p. ej., cuántos coches de cada marca se vendieron). Esto significa que nuestro conjunto de datos es limitado (cantidad finita de datos) y no se actualizará (las ventas tuvieron lugar en el pasado). En este caso, podemos optar por un proceso por lotes para analizar nuestros datos.

Como datos de entrada tenemos registros de texto de coches vendidos en el siguiente formato:

id,brand_name,model_name,sales_number

Por ejemplo:
1,Toyota,Prius,3
2,Nissan,Sentra,2
3,Ford,Fusion,4

Antes de empezar a implantar nuestra primera aplicación de Beam debemos entender algunas de las ideas clave que se utilizarán en todos los casos. En Beam existen tres concepciones principales: Pipeline, PCollection y PTransform.

  • Pipeline encapsula el flujo de trabajo de todas sus tareas de tratamiento de datos desde el principio hasta el final.
  • PCollection es una abstracción de conjunto de datos distribuida que Beam utiliza para transferir datos entre PTransforms.
  • PTransform es un proceso que opera con datos de entrada (PCollection de entrada) y produce datos de salida (PCollection de salida). Normalmente, la primera y la última PTransformsrepresentan una forma de datos de entrada/salida que puede ser limitada (procesamiento por lotes) o ilimitada (procesamiento en streaming).

Para simplificar las cosas podemos entender la Pipeline como un DAG (grafo acíclico dirigido) que representa todo su flujo de trabajo, las PTransforms como nodos (que transforman los datos) y las PCollections como los bordes de este grafo. Encontrará más información en la Guía de programación con Beam.

Regresemos ahora a nuestro ejemplo e intentemos aplicar la primera canalización, que procesará el conjunto de datos proporcionado.

Crear una canalización

Empecemos por crear una nueva canalización:

Pipeline pipeline = Pipeline.create();

Ahora creemos una nueva PTransform con el método pipeline.apply(), que leerá datos del archivo de texto y creará una nueva PCollection de cadenas. Para ello utilizaremos uno de los I/O ya implementados en Beam: TextIO. TextIO nos permite leer de y escribir en uno o varios archivos de texto línea por línea. Tiene muchas otras funcionalidades, como trabajar con distintos sistemas de archivos, ofrecer compatibilidad con patrones de archivos o permitir el flujo de archivos. Para más información, consulte la documentación de Apache Beam.

apply(TextIO.read().from(/path/to/input/file))

La salida de esta PTransform es una nueva instancia de PCollection en la que todas las entradas de la compilación son una línea de texto del archivo de entrada.

Como queremos obtener como resultado el número total de ventas por marca, debemos agruparlas correctamente. Por lo tanto, el próximo paso consistirá en analizar todas las líneas y crear un par de clave/valor en la que la clave sea un nombre de marca y el valor un número de ventas. Cabe mencionar que la PCollection de salida a partir de una PTransform anterior será la PCollection de entrada para esta.

En este paso utilizaremos la PTransform interna de Beam, que se llama MapElements, para crear otro par de claves/valores por cada entrada inicial mediante la implementación proporcionada de la interfaz SimpleFunction.

A continuación agrupamos el número de ventas por marca usando otra transformación de Beam: GroupByKey. Como resultado de salida tenemos una PCollection de clave/valor en la que la clave es un nombre de marca y el valor es una compilación iterable de ventas para aquella marca.

.apply(GroupByKey.<String, Integer>create())

 

Ahora ya estamos listos para sumar todas las cifras de venta de los vehículos por marca mediante nuestra propia implementación de la transformación ParDo:

Para finalizar la canalización, aplicamos otra transformación de I/O para tomar la PCollection de cadenas y escribirlas en un archivo de texto:

.apply(TextIO.write().to(/path/to/output/dir).withoutSharding());

Lo último que tenemos que hacer es ejecutar la canalización que hemos creado.

pipeline.run();

Parece bastante sencillo, ¿a qué sí? Todo es gracias a la potencia de Apache Beam, que permite crear canalizaciones de tratamiento de datos complicadas con una cantidad mínima de código.

Los que conozcan Hadoop a lo mejor se habrán fijado en que esta canalización guarda parecido con algo:

  • Lee y analiza datos de texto línea por línea y crea nuevos pares de clave/valor (Map)
  • Luego agrupa esos pares de clave/valor por clave (GroupBy)
  • Por último, itera sobre todos los valores de una clave aplicando una función de usuario (Reduce)

Pues sí, llevan razón: ¡esta sencilla canalización puede efectuarse con una clásica tarea de MapReduce! De todos modos, basta con fijarse en la facilidad y claridad que vemos en Beam (¡a pesar de estar en Java!) y, si decidimos ampliar nuestras canalizaciones añadiendo otra transformación, la cosa no se complicará demasiado.

Crear y ejecutar una canalización

Como he dicho antes, una canalización de Beam puede ejecutarse en distintos ejecutores (motores de procesamiento):

  • Direct Runner
  • Apache Apex
  • Apache Flink
  • Apache Gearpump
  • Apache Spark
  • Google Cloud Dataflow

Para ello basta con añadir una dependencia corresponsal a nuestra configuración de proyecto Maven o Gradle. Lo bueno es que no tenemos que ajustar ni reescribir código de la canalización para ejecutarla en cada ejecutor. Incluso mejor, no tenemos que recompilar nuestros .JAR siempre y cuando la dependencia de todos los ejecutores necesarios haya sido incluida: basta con elegir el ejecutor que deseamos usar y listos.

Direct Runner es un ejecutor local que suele emplearse para realizar pruebas en la canalización. Al utilizar Java deberá especificar su dependencia en Direct Runner de su pom.xml.


<dependency>
   <groupId>org.apache.beam</groupId>
   <artifactId>beam-runners-direct-java</artifactId>
   <version>2.3.0</version>
   <scope>runtime</scope>
</dependency>


Después, debe compilar su proyecto:
# mvn clean package

Y ejecutar su canalización en Direct Runner:
# mvn exec:java -Dexec.mainClass=org.apache.beam.tutorial.analytic.SalesPerCarsBrand -Pdirect-runner -Dexec.args="--runner=DirectRunner”

Por ejemplo, si nuestro fichero de salida contiene los siguientes datos:
# cat /tmp/beam/cars_sales_log
1,Toyota,Prius,3
2,Nissan,Sentra,2
1,Toyota,Yaris,4
3,Ford,Fusion,5
3,Ford,Kuga,3

El resultado final será este:
# cat /tmp/beam/cars_sales_report
Toyota: 7
Nissan: 2
Ford: 8

Tanto la lista de todos los ejecutores soportados como las instrucciones y su manual de uso, los encontrará en esta página.

Por último, todo el código de este ejemplo está publicado en este repositorio de GitHub: https://github.com/aromanenko-dev/beam-tutorial.

En la próxima parte de esta serie de entradas del blog hablaré sobre el tratamiento de datos en streaming en Beam. Tomaré otro ejemplo de tarea de analítica de datos con una fuente de datos ilimitada y veremos qué nos ofrece Beam en ese caso.

Join The Conversation

0 Comments

Deja un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *