Apache Spark y Talend: Rendimiento y ajustes

Apache Spark y Talend: Rendimiento y ajustes

  • Petros Nomikos
    I have 3 years of experience with installation, configuration, and troubleshooting of Big Data platforms such as Cloudera, MapR, and HortonWorks. I joined Talend in 2014, and prior to Talend I held positions as manager of technical support, and project manager for data warehouse implementations. In my current role, I assist companies in understanding how to implement Talend in their Big Data Ecosystem.

Quisiera empezar dando las gracias a todos aquellos que hayan leído mis dos publicaciones anteriores del blog sobre el funcionamiento de Talend y Apache Spark.

Si este es su primer contacto con esta serie de artículos del blog y no ha leído mis artículos anteriores, puede empezar a leerlos aquí: “Talend y Apache Spark: manual técnico” y encontrará la segunda parte aquí: “Comparativa de configuraciones entre Talend y Spark Submit: ¿Qué diferencias existen?”.

Las dos primeras publicaciones de mi serie sobre Apache Spark ofrecieron una visión general sobre cómo funciona Talend con Spark, qué semejanzas encontrábamos entre Talend y Spark Submit, así como las opciones de configuración disponibles para las tareas Spark en Talend.

En este artículo vamos a analizar el rendimiento y los posibles ajustes de Apache Spark. Se trata de un debate habitual entre prácticamente todos los usuarios de Apache Spark, incluso fuera de la esfera de Talend. Cuando desarrolle y ejecute sus primeras tareas de Spark, habrá siempre una serie de preguntas que se planteará.

  • ¿Cuántos ejecutores debería asignar a mi tarea de Spark?
  • ¿Cuánta memoria necesita cada ejecutor?
  • ¿Cuántos núcleos debería utilizar?
  • ¿Por qué algunas tareas de Spark tardan horas para procesar 10 GB de datos y cómo soluciono ese problema?

En este texto repasaré todas estas preguntas y ofreceré respuestas e información de utilidad. Antes de seguir adelante con este tema, veamos algunos conceptos básicos que aparecerán en esta publicación.

Partición: Una partición es una porción de un conjunto de datos distribuido. La crea el tamaño del bloque HDFS por defecto. Spark utiliza particiones para efectuar un procesamiento en paralelo de conjuntos de datos.

Tareas: Las tareas son unidades de trabajo que pueden ejecutarse en un ejecutor.

Núcleo: Un núcleo es la unidad de procesamiento de una CPU que determina el número de tareas paralelas en Spark que pueden ejecutarse en un ejecutor.

Ejecutor: Un proceso que arranca en los nodos trabajadores que ejecuta su solicitud de tarea en memoria o en disco.

Maestro de aplicaciones: Cada aplicación YARN desencadena un proceso maestro de aplicaciones que se encarga de solicitar recursos al administrador de recursos. Una vez asignados los recursos, el proceso trabaja con gestores de nodo para iniciar los contenedores internos necesarios.

Ajuste de Spark

Para empezar, veamos cómo puede ajustar sus tareas de Apache Spark en Talend. Como he mencionado antes, en su tarea de Talend Spark encontrará la pestaña Spark Configuration (Configuración de Spark) en la que podrá configurar las propiedades de ajuste. Por defecto siempre aparece sin marcar en Talend.

En este apartado se le da la opción de configurar la memoria y los núcleos que su maestro de aplicaciones y ejecutores utilizarán y cuántos ejecutores solicitará su tarea. Ahora la pregunta principal que se plantea una vez uno empieza a rellenar los valores de este apartado es "¿Cómo determino el número de núcleos o memoria que necesitan mi maestro de aplicaciones o mis ejecutores para tener un buen rendimiento?". Abordemos la cuestión.

Cómo elegir el número de núcleos para su tarea de Spark

Llegados a este punto hay varios factores que debemos considerar antes de proseguir. Se trata de:

  1. El tamaño de nuestros conjuntos de datos
  2. El plazo de tiempo en el que debe completarse nuestra tarea
  3. Las operaciones y acciones que realiza nuestra tarea

Con todos estos aspectos presentes, podemos empezar a configurar nuestra tarea para maximizar el rendimiento. Empecemos ajustando nuestro maestro de aplicaciones. Para el maestro de aplicaciones podemos dejar los valores por defecto, puesto que tan solo realiza la orquestación de los recursos y no el procesamiento, lo que significa que no se requieren valores elevados de memoria o de núcleos.

Nuestro próximo paso es configurar la memoria y los núcleos para nuestros ejecutores. Aquí la cuestión más importante es cuántos ejecutores, cuánta memoria y cuántos núcleos deberían utilizarse. Para obtener una respuesta, imaginemos que tenemos un clúster de Hadoop con 6 nodos trabajadores y que cada uno tiene 32 núcleos y 120 GB de memoria. Lo primero que seguramente nos viene a la cabeza es que, cuantas más tareas concurrentes podamos tener por ejecutor, mejor será nuestro rendimiento. Si investigamos al respecto, veremos en guías de ajuste del rendimiento de distribuciones de Hadoop como Cloudera como ejemplo en este enlace, se ha demostrado que más de 5 núcleos por ejecutor generará una I/O de HDFS errónea. Por ese motivo el valor de núcleos óptimo a efectos de rendimiento es 5.

A continuación veamos cuántos ejecutores nos interesa inicializar. En función de la cantidad de núcleos y nodos, podemos determinar fácilmente ese número. Como hemos dicho, 5 núcleos es el número óptimo a utilizar por ejecutor. Bien, de cada uno de los 32 núcleos que tenemos por nodo debemos eliminar los que no podemos utilizar para nuestras tareas, puesto que los necesita el sistema operativo y los daemons de Hadoop que se ejecutan en el nodo. La herramienta de gestión del clúster de Hadoop ya cumple esa función por nosotros, lo que facilita determinar cuántos núcleos por nodo tenemos a nuestra disposición para utilizarlos en nuestras tareas de Spark.

Tras realizar ese cálculo, pongamos que nos quedan 30 núcleos por nodo para utilizar. Como ya hemos determinado que 5 núcleos es el número óptimo por ejecutor, eso significa que podemos ejecutar hasta 6 ejecutores por nodo. ¡Pan comido!

Por último, acabemos calculando la cantidad de memoria que podemos utilizar. Conforme a las especificaciones de hardware de arriba, vemos que hay 120 GB de memoria por nodo, pero como he dicho al hablar de los núcleos, no podemos usar toda esa memoria para las tareas, ya que el sistema operativo necesita una parte. Aquí también nuestra herramienta de gestión del clúster de Hadoop puede determinar cuánta memoria podemos usar para nuestras tareas. Si el sistema operativo y los daemons de Hadoop necesitan 2 GB de memoria, eso nos deja con 118 GB de memoria para usar en nuestras tareas de Spark. Como ya hemos determinado que podemos tener 6 ejecutores por nodo, si hacemos cálculos podremos usar aproximadamente hasta 20 GB de memoria por ejecutor. No obstante, esto no es así al cien por cien, ya que también deberíamos incluir en los cálculos el gasto de memoria básico que tendrá cada ejecutor. En mi publicación anterior dije que por defecto este gasto general es de 384 MB. Si resto esa cantidad de las 20 GB, podríamos decir que a grandes rasgos la cantidad máxima que podría asignar a un ejecutor sería de 19 GB.

Asignación dinámica o fija de recursos de clúster

Las cifras mencionadas funcionan tanto para una asignación fija como dinámica de recursos de clúster en una tarea de Spark. La diferencia la encontramos en la asignación dinámica. Con la asignación dinámica puede indicar la cantidad inicial de ejecutores a utilizar, un mínimo de ejecutores que la tarea puede emplear cuando no haya una gran carga de trabajo y una cantidad máxima cuando se requiera una mayor potencia de procesamiento. Aunque estaría bien poder utilizar todos los recursos del clúster de nuestra tarea, nos toca compartir esa potencia de procesamiento con otras tareas que se ejecutan en el clúster. Por ello, lo que hemos identificado como nuestros requisitos al repasar los factores que hemos definido antes pensando en ajustar nuestra tarea de Talend Spark es lo que determinará el porcentaje de esos valores máximos que podemos utilizar.

Una vez configurada nuestra tarea, ¡ya podemos ejecutarla de verdad! Pongamos que seguimos viendo que nuestra tarea de Spark tarda mucho en finalizar, incluso habiendo configurado los ajustes máximos que hemos definido más arriba. Tenemos que regresar a nuestra tarea y analizar algunos ajustes más para garantizar que su uso permite el máximo rendimiento.

Rendimiento de Spark

Para empezar, pongamos que estamos combinando dos tablas en nuestra tarea de Spark. Uno de los factores que hemos considerado antes de empezar a optimizar nuestras tareas de Spark era el tamaño de nuestros conjuntos de datos. Ahora, cuando nos fijamos en el tamaño de las tablas y determinamos que una es de 50 GB y la otra de 100 MB, tenemos que mirar a ver si estamos haciendo un buen uso de los componentes Talend de JOIN replicados.

JOIN replicado

Los JOIN replicados (también llamados JOIN de Map-Side) se utilizan extensamente al unir una tabla grande con otra pequeña para difundir los datos de la tabla pequeña a todos los ejecutores. En este caso, como el conjunto de datos más pequeño cabe en memoria, podemos usar un JOIN replicado para difundirlo a todos los ejecutores y optimizar el rendimiento de nuestra tarea de Spark.

Ya que los datos de la tabla deben combinarse a nivel del ejecutor con los datos laterales, al difundir el conjunto de datos más pequeño a todos los ejecutores estamos evitando que los datos de la tabla más grande sean transferidos por la red. Gran parte de los problemas de rendimiento de Spark suceden debido al barajado de grandes cantidades de datos por la red. Esto es algo que podemos controlar fácilmente en la tarea de Talend: habilitando la opción "Use replicated join" (Utilizar JOIN replicado) en el componente tMap como se indica abajo. Esto difundirá los datos de la tabla de consulta a todos los ejecutores.

 

El próximo paso es fijarse si nuestra tarea contiene operaciones que realicen recomputaciones costosas.

Caché de Spark

Para explicar las recomputaciones, centrémonos en un ejemplo sencillo como es la carga de un archivo con datos de compra de clientes. De estos datos nos interesa captar algunos indicadores:

  • el número total de clientes
  • el número de productos adquiridos

En este caso, si no utilizamos un caché de Spark, cada una de estas operaciones cargará los datos. Esto afectará a nuestro rendimiento, ya que genera una recomputación costosa. Como nosotros sabemos que este conjunto de datos se acabará utilizando en algún momento en nuestra tarea, es mejor usar Spark Cache para guardarlo en memoria para más adelante y no tener que volver a cargarlo constantemente.

En nuestras tareas de Talend Spark esto se hace con los componentes tCacheIn y tCacheOut, que encontramos en la paleta de Apache Spark en Talend y nos permiten utilizar el mecanismo de caché de Spark, que ofrece distintas opciones existentes.

También puede seleccionar si quiere guardar los datos tan solo en el caché de disco, y luego se nos da la opción de serializar los datos en caché para memoria, disco o ambos. Por último, también puede seleccionar que los datos en caché se repliquen en dos nodos más. La opción más habitual es memoria sin serialización, dado que es más rápida, pero cuando sabemos que los RDD en caché no caben en la memoria y no queremos verterlos a disco, debe escogerse la serialización, puesto que reduce el espacio consumido por el conjunto de datos, aunque implica un coste superior de gasto que afecta al rendimiento. Por ello conviene evaluar todas las opciones y elegir que la mejor se ajuste a nuestras necesidades.

 

Si tras todos estos pasos siguen produciéndose problemas de rendimiento, tendremos que empezar a analizar la interfaz web de Spark History (Historial de Spark) para ver qué sucede. Como ya mencioné en mi artículo anterior, en el apartado Spark History de Spark Configuration en Talend podemos habilitar el registro en Spark. El registro en Spark ayuda a resolver problemas de las tareas de Spark conservando los registros una vez terminada la tarea y nos los ofrece a través de la interfaz web de Spark History. Tener habilitado el registro de incidencias en nuestras tareas de Spark supone una buena práctica y nos permite resolver más fácilmente cualquier incidencia en nuestro rendimiento.

 

Con el registro de incidencias de Spark habilitado, puede consultar la interfaz web de Spark History y encontraremos las siguientes pestañas cuando busquemos el número de aplicación para nuestra tarea:

En nuestra IU de Spark de arriba vamos a fijarnos en la pestaña Stages (Etapas), identificaremos la que está afectando al rendimiento de nuestra tarea, consultaremos los detalles y comprobaremos si observamos una conducta parecida a la siguiente:

 

Lo que vemos es hay una que está procesando la mayoría de los datos y el resto están inactivas, incluso habiendo asignado 10 ejecutores. ¿Y esto por qué sucede? Para dar una respuesta tenemos que identificar la etapa de la tarea donde existe el problema. A guisa de ejemplo, imaginemos que vemos que esto sucede en la parte de la tarea de Spark en la que leemos los datos a partir de un archivo comprimido. Como los archivos .archive no se dividen al leerse por defecto, se generará un RDD con una única partición para cada archivo .archive que leamos, lo que causará ese comportamiento. Si este archivo comprimido está en formato .archive, que es divisible como BZIP2 y puede particionarse al leerse, en los ajustes avanzados de tFileInputDelimited podemos habilitar la propiedad "Set Minimum Partitions" (Configurar particiones mínimas) y como mínimo configurar tantas particiones como ejecutores tengamos para empezar.

Sin embargo, en el caso de un archivo .archive como GZIP que no puede volver a dividirse en particiones al leerse, podemos reparticionarlo explícitamente con nuestro componente tPartition. Como se indica más abajo, este componente nos permite volver a dividir el archivo en particiones para poder repartir la carga por igual entre los ejecutores.

Las particiones al leer pueden utilizarse al leer de una base de datos con nuestros componentes tJDBC y usando las siguientes propiedades:

Tan solo puede aplicarse esta nueva división en particiones a determinadas situaciones, como vemos arriba. Si determinamos que nuestro conjunto de datos está sesgado en las claves que utilizamos para el JOIN, entonces deben emplearse distintos métodos. Así pues, ¿cómo podemos identificar el sesgo de los datos? Empiece analizando el conjunto de datos por partición y fíjese en cómo se agrupan los datos entre las claves que usamos para el JOIN. He aquí un ejemplo de cómo sería un conjunto de datos sesgado por partición:

En este caso, si no podemos volver a dividir en particiones por una clave distinta, deberíamos pensar en otros métodos para mejorar nuestra tarea de Talend Spark. Una técnica muy utilizada es la denominada como "salting". Con el salting estará añadiendo una clave falsa a su clave real para igualar la distribución de datos por partición. Esto puede lograrse con nuestro componente tmap en la tarea de Spark como muestra este ejemplo:

Como hemos visto, estamos añadiendo a nivel del tMap la clave falsa como numérico aleatorio y la estamos vinculando junto con nuestra clave real con el conjunto de datos de consulta al que ya hemos añadido la clave falsa. Como la unión se produce según nuestra clave real más la clave falsa que hemos generado para la distribución, esto ayudará a evitar particiones sesgadas que pueden afectar a nuestro rendimiento al unir conjuntos de datos en Spark.

Conclusión

Existen muchas técnicas distintas que podemos utilizar para mejorar el rendimiento y ajustar nuestras tareas de Talend Spark. Espero que las pocas que hemos repasado en esta publicación sean de utilidad. A partir de aquí, ¡que creéis muchas más tareas de Spark en Talend!

Referencias:

https://spark.apache.org/docs/latest/tuning.html

https://www.cloudera.com/documentation/enterprise/latest/topics/admin_spark_tuning1.html

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-tuning.html

https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.4/bk_spark-component-guide/content/ch_tuning-spark.html

Join The Conversation

0 Comments

Deja un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *