Hadoop de principiante a competente 33: Análisis del proceso de mezcla de los principios básicos de MapReduce
Shuffle, es decir, barajar y barajar, se refiere al proceso de intercambio de datos entre Mapper (fusionador, clasificador, divisor), Reducer y otros procesos durante la ejecución del programa MapReduce.
Algunas explicaciones sobre el proceso de reproducción aleatoria en la imagen de arriba:
Descripción: el nodo del mapa ejecuta la tarea del mapa y genera el resultado de salida del mapa.
Barajar el contenido del trabajo:
Desde la perspectiva de la eficiencia operativa, los resultados de salida del mapa se almacenan primero en la memoria del nodo del mapa. Cada tarea de mapa tiene un búfer de memoria que almacena la salida del mapa. Cuando se alcanza el umbral del búfer de memoria (80%), los datos del búfer deben guardarse en el disco como un archivo temporal. Una vez completada toda la tarea del mapa, todos los archivos temporales generados en el disco por la tarea del mapa se fusionarán para producir el archivo de salida final. Finalmente, espere a que la tarea de reducción extraiga los datos. Por supuesto, si los resultados de la tarea de mapa no son lo suficientemente grandes como para almacenarse completamente en el búfer de memoria y no se alcanza el umbral del búfer de memoria, no se escribirá el archivo temporal en el disco ni se fusionará posteriormente.
El proceso detallado es el siguiente:
(1) Ejecute la tarea del mapa y la fuente de los datos de entrada es: bloque HDFS. Por supuesto, en el concepto de reducción de mapas, la tarea del mapa lee los fragmentos divididos. Correspondencia entre divisiones y fragmentos: uno a uno (predeterminado).
Aquí es necesario explicar la fragmentación y la división:
Bloqueo (partición física): cuando un archivo se carga en HDFS, los datos deben dividirse en fragmentos. La partición aquí es una partición física y el tamaño del bloque se puede configurar a través de dfs.block.size (el valor predeterminado de primera generación es 64 M, la segunda generación es 128 M. Para garantizar la seguridad de los datos, el bloque adopta un mecanismo de redundancia: el). El valor predeterminado es 3 copias, que se pueden configurar a través de dfs.replication Nota: Cuando la configuración del tamaño de bloque cambia, el tamaño de bloque del archivo recién cargado es el valor recién configurado y el tamaño de bloque del archivo cargado anteriormente es el valor configurado previamente.
División (división lógica): en Hadoop, la división es una división lógica y su propósito es simplemente permitir que la tarea del mapa obtenga mejores datos. La división se obtiene mediante el método getSplit() en la interfaz InputFormat en hadoop. Entonces, ¿cómo se obtiene exactamente el tamaño de la división?
Primero, introduzca varios volúmenes de datos:
TotalSize: el tamaño total de todas las entradas de todo el trabajo de mapreduce. Nota: La unidad básica es el número de bloques, no el número de bytes.
NumSplits: de job . getnummaptasks (), es decir, el valor establecido por el usuario con org Hadoop mapred . A juzgar por el nombre del método, se utiliza para establecer el número del mapa. Sin embargo, el número final de mapas, es decir el número de divisiones, no necesariamente toma el valor establecido por el usuario. El número de mapas configurado por el usuario es sólo una indicación del número final de mapas y es sólo un factor que influye, no un factor decisivo.
Tamaño objetivo: tamaño total/número de divisiones, es decir, tamaño de división esperado, es decir, cuántos datos maneja cada asignador.
Pero esto es solo una expectativa
Minsize: el valor mínimo de la división, que se puede establecer de dos maneras:
Finalmente, ¡toma el valor máximo de goalSize y minSize!
Finalmente: el principio de cálculo del tamaño de división: tamaño de división final = máximo (tamaño mínimo, mínimo (tamaño objetivo, tamaño de bloque))
Luego, el número de texturas = tamaño total/ finalSplitSize.
Nota: En la nueva API, la cantidad de tareas de mapa establecidas por el usuario ya no se considera en el algoritmo de partición InputSplit, sino que se reemplaza por mapred.max.split.size (marcado como maxSize).
En otras palabras, la fórmula para calcular el tamaño de InputSplit es: tamaño de división = max {minsize, min {maxsize, block size}}
A continuación, hablaré brevemente sobre cómo calcular el tamaño según las necesidades del negocio. Ajustar el número de mapas.
Cuando usamos Hadoop para procesar una gran cantidad de big data, una de las situaciones más comunes es que la cantidad de mapeadores iniciados por el trabajo excede el límite del sistema, lo que hace que Hadoop arroje una excepción y finalice. la ejecución.
Solución: ¡Reducir el número de mapeadores! Los detalles son los siguientes:
A. La cantidad de archivos de entrada es grande, pero no pequeña.
En este caso, la cantidad de mapeadores necesarios se puede reducir aumentando el tamaño de entrada de cada mapeador (es decir, aumentando el tamaño mínimo o aumentando el tamaño del bloque). Por lo general, no es factible aumentar el tamaño del bloque porque el tamaño del bloque ya está determinado después de que Hadoop nombra HDFS -format (determinado por dfs.block.size al formatear). Si desea cambiar el tamaño del bloque, debe volver a formatear HDFS, lo que por supuesto perderá los datos existentes. Por lo tanto, normalmente solo se puede aumentar minSize, es decir, se puede aumentar el valor de mapred.min.split.size.
B. La cantidad de archivos de entrada es enorme y todos son archivos pequeños.
El llamado archivo pequeño significa que el tamaño de un solo archivo es menor que blockSize. En este caso, aumentar el tamaño de mapred.min.split.size no es factible, por lo que debe usar CombineFileInputFormat derivado de FileInputFormat para combinar múltiples rutas de entrada en un InputSplit y enviarlo al asignador para su procesamiento, reduciendo así la cantidad del asignador. . Se puede aumentar el número de mapeadores reduciendo la entrada a cada mapeador, es decir, reduciendo el tamaño del bloque o reduciendo el valor de mapred.min.split.size
(2) Después de realizar 2) mapeo, obtener pares clave/valor. La siguiente pregunta es: ¿a qué reductor deberían ir estos pares clave-valor? Nota: Al enviar un trabajo, puede establecer la cantidad reducida mediante el método establecido.
MapReduce proporciona una interfaz de partición para resolver los problemas anteriores. La operación predeterminada es: módulo el número de tareas de reducción después del hash de clave, y el valor de retorno determina qué reducción debe procesar el par clave-valor. Este modo predeterminado solo se utiliza para promediar la potencia de procesamiento de reducir, evitar sesgos de datos y garantizar el equilibrio de carga. Si los usuarios tienen sus propios requisitos para las particiones, pueden personalizarlas y configurarlas para que funcionen.
A continuación, debe escribir la clave/valor y los resultados de la partición en el búfer. La función del búfer es recopilar los resultados del mapa en lotes y reducir el impacto de la E/S del disco. Por supuesto, estos datos se serializarán en una matriz de bytes antes de escribirlos. Todo el búfer de memoria es una matriz de bytes. Este búfer de memoria tiene un tamaño limitado, con un valor predeterminado de 100 MB. Cuando una tarea de mapa tiene muchos resultados, la memoria puede desbordarse. Es necesario escribir temporalmente los datos del búfer en el disco y luego reutilizar el búfer.
La escritura de datos desde la memoria al disco se llama desbordamiento. Se realiza mediante un subproceso separado y no afecta al subproceso que escribe los resultados del mapeo en el búfer. Ratio de cobertura: porcentaje de derrame (el valor predeterminado es 0,8).
Cuando los datos en el búfer alcanzan el umbral, se inicia el hilo de desbordamiento, bloquea la memoria de 80 MB y ejecuta el proceso de desbordamiento. Los 20 MB restantes continúan escribiendo los resultados de salida de la tarea del mapa.
¡No interfieran entre sí!
Cuando se inicia el hilo de desbordamiento, las claves deben ordenarse en este espacio de 80 MB. La clasificación es el comportamiento predeterminado del modelo mapreduce y también es el orden de bytes serializados. Regla de clasificación: ¡clasificación por diccionario!
Después de que la salida de la tarea de mapa se escribe en la memoria, cuando el subproceso de desbordamiento no se inicia, la salida no se fusionará. Como se puede ver en la imagen oficial, la combinación se refleja en el archivo de disco temporal sobrescrito, y esta combinación es la combinación de diferentes valores en el lado reducido. Por lo tanto, un detalle importante del proceso de desbordamiento es que si hay muchos pares clave/valor que deben enviarse al terminal de reducción, estos pares clave/valor deben empalmarse para reducir los registros de índice relacionados con la partición. . Si el cliente tiene un conjunto Combiner, agregará los valores de los pares clave/valor con la misma clave, reduciendo así la cantidad de datos derramados en el disco. Nota: La fusión aquí no garantiza que se fusionen todos los pares clave-valor con el mismo valor clave en el resultado de la asignación. Su rango de fusión es solo de 80 MB. ¡Lo que garantiza es que los valores clave de todos los pares clave-valor sean diferentes en cada archivo de desbordamiento individual!
La cantidad de archivos temporales generados por la escritura de desbordamiento aumenta con la cantidad de datos en los resultados de salida del mapa. Cuando se completa toda la tarea de mapeo, todos los datos en la memoria se transfieren a un archivo de desbordamiento en el disco. Es decir, en cualquier caso, ¡el proceso de desbordamiento genera al menos un archivo de desbordamiento! Pero solo puede haber un archivo final y estos archivos desbordados deben fusionarse. Merge combina todos los archivos desbordados en un solo archivo. Combinado con el alcance de los combinadores anteriores, los pares clave-valor en los archivos combinados pueden tener la misma clave. Si el cliente ha configurado una fusión, los pares clave-valor con el mismo valor clave también se fusionarán durante este proceso. De lo contrario, la combinación obtendrá un conjunto de valores clave, como {"AAA", [5, 8, 2,...] Nota: una configuración razonable del combinador puede mejorar la eficiencia, pero el uso inadecuado afectará la eficiencia. !
¡En este punto, todo el trabajo en el mapa ha terminado!
Después de enviar la tarea de reducción de mapas, la tarea de reducción obtendrá continuamente información sobre si la tarea de mapa se completó desde JobTracker a través de RPC. Si se sabe que la ejecución de la tarea del mapa en TaskTracker se ha completado, comenzará la segunda mitad de Shuffle. De hecho, el trabajo de la tarea de reducción antes de la ejecución es extraer continuamente los resultados finales de cada tarea de mapa en el trabajo actual, fusionar continuamente los datos extraídos de diferentes lugares y finalmente formar un archivo como archivo de entrada de la tarea de reducción. .
1. Copiar proceso, simplemente extraer los datos. El proceso de Reducción inicia algunos subprocesos de copia de datos (Fether) y obtiene el archivo de salida de la tarea de mapa a través de una solicitud HTTP al TaskTracker donde se encuentra la tarea de mapa. Dado que la tarea de mapeo finalizó, TaskTracker administra estos archivos en el disco local.
Nº 2.2. Proceso de fusión. La fusión aquí es similar a la operación de fusión en el lado del mapa, excepto que los valores copiados de los diferentes lados del mapa se almacenan en una matriz. Los datos copiados se colocarán primero en el búfer de memoria, donde el tamaño del búfer es más flexible que en el lado del mapa. Se basa en la configuración del tamaño del montón de la JVM. La mayor parte de la memoria debe usarse para la reproducción aleatoria, porque el reductor no se ejecutará durante la fase de reproducción aleatoria.
Las fusiones se presentan en tres formas: memoria a memoria, memoria a disco y disco a disco. De forma predeterminada, el primer formulario no está habilitado. La fusión de memoria a disco comienza cuando la cantidad de datos en la memoria alcanza un cierto umbral. Al igual que en el lado del mapa, este también es un proceso de desbordamiento. Por supuesto, si el Combinador se configura aquí, se iniciará y luego se generarán muchos archivos de desbordamiento en el disco. El segundo modo de fusión continúa hasta que no hay datos en el lado del mapa, luego se inicia el tercer modo de fusión de disco a disco para generar el archivo final.
3.3. Archivo de entrada del reductor. Después de una fusión continua, eventualmente se generará un "archivo final". Este archivo final puede estar en el disco o en la memoria. Por supuesto, queremos que esté en la memoria y se use directamente como entrada para el reductor, pero de forma predeterminada el archivo se almacena en el disco. Toda la reproducción aleatoria finalmente finaliza cuando se configura el archivo de entrada del reductor. Luego viene la ejecución del reductor, que almacena los resultados en HDFS.