Colección de citas famosas - Slogan de motivación - ¿Cómo funciona el mecanismo de redundancia del sistema de archivos de almacenamiento HDFS en Hadoop? ¿Cuáles son las características?

¿Cómo funciona el mecanismo de redundancia del sistema de archivos de almacenamiento HDFS en Hadoop? ¿Cuáles son las características?

Puede ejecutar un trabajo de MapReduce con una sola línea de código: JobClient.runJon(conf). Cuatro entidades participan en la ejecución del trabajo:

? 1.JobClient escribe código, configura trabajos y envía trabajos.

? 2.JobTracker: inicializa trabajos, asigna trabajos y coordina las operaciones de trabajo. Este es un programa java, la clase principal es JobTracker.

? 3.TaskTracker: ejecuta la tarea después de dividir el trabajo, es decir, ejecuta la tarea Mapear o Reducir en la distribución de datos asignada.

? 4.HDFS: guarda datos del trabajo e información de configuración, y guarda los resultados del trabajo.

El proceso general de ejecución del trabajo Map/Reduce:

? Escritura de código->¿Configuración del trabajo? ->? ¿Envío de trabajo? ->? ¿Mapear la asignación y ejecución de tareas? ->? ¿Procesar resultados intermedios? ->? ¿Reducir la asignación y ejecución de tareas? ->? Resultados de salida

Para cada ejecución de trabajo, también incluye:

? Entra listo? ->? ¿Ejecución de la misión? ->? Resultados de salida

Cliente de trabajo de envío de trabajo:

? El método runJob de JobClient genera una instancia de Jobclient y llama a su método submitJob. Luego, runJob inicia un bucle y llama al método getTaskCompetionEvents en el bucle para obtener la instancia de TaskCompletionEvent. Sondea el progreso del trabajo cada segundo (las actualizaciones de progreso y estado se presentarán más adelante). ) y escribe el progreso. Ingresa a la consola, muestra el contador de trabajos una vez completado el trabajo y registra el error en la consola cuando falla.

? Método de envío de tareaProceso de envío de trabajo:

? 1. Solicite un nuevo JobId de JobTracker.

? 2. Verifique la ruta relacionada con el trabajo y devuelva un error si la ruta es incorrecta.

? 3. Calcule los sectores de entrada del trabajo y su información de división.

? 4. Copie recursos (archivos jar, archivos de configuración, etc.) para realizar operaciones de trabajo en el HDFS compartido y copie varias copias (control de parámetros, el valor predeterminado es 10) para el acceso al rastreador de tareas y, al mismo tiempo, los fragmentos calculados. se copian a HDFS.

? 5. Llame al método submitJob() del objeto JobTracker para enviar realmente el trabajo y decirle a JobTracker que el trabajo está listo para su ejecución.

Inicialización de JobTracker:

? Después de recibir la llamada al método submitJob, JobTracker colocará la llamada en una cola interna, que será programada e inicializada por el programador de trabajos. La inicialización del trabajo crea un objeto de trabajo.

? Cuando se programa un trabajo, JobTracker crea un objeto JobInProgress que representa el trabajo y encapsula la tarea y registra la información en este objeto para rastrear el estado y el progreso de la tarea.

? El proceso de inicialización se inicializa mediante el método initTasks del objeto JobInProgress.

? Pasos de inicialización:

? 1. Lea la información de job.split correspondiente al trabajo desde HDFS para prepararse para la inicialización posterior.

? 2. Cree e inicialice mapas y reduzca tareas. Determine la cantidad de tareas de mapa en función de la cantidad de información de fragmentación de datos y luego genere un objeto TaskInProgress para cada tarea de mapa para procesar la fragmentación de datos. Este objeto se coloca primero en nonRunningMapCache para que JobTracker lo utilice al asignar tareas. A continuación, utilice el método setNumReduceTasks() para establecer el número de tareas reducidas de acuerdo con la propiedad mapred.reduce.tasks en JobConf, y luego créela de la misma manera que la tarea de mapa.

? 3. Finalmente, cree dos tareas de inicialización para inicializar el mapa y reducirlo.

JobTracker de distribución de tareas:

Latido de entrega de mensajes: tasktracker ejecuta un bucle simple que envía periódicamente latidos al JobTracker. El latido le dice al JobTracker si está vivo, mientras pasa otra información como canal de mensajes (solicitando nuevas tareas). Como parte del latido, el rastreador de tareas indicará si está listo para ejecutar una nueva tarea y, de ser así, el rastreador de trabajos le asignará una tarea.

Asignar el trabajo al que pertenece la tarea: Antes de que Jobtracker asigne la tarea, es necesario determinar el trabajo al que pertenece la tarea. Más adelante se presentarán varios algoritmos de programación de trabajos. El valor predeterminado es la programación de trabajos FIFO.

Asignar tareas de Mapear y Reducir: tasktrack tiene un número fijo de espacios para tareas. Un tasktrack puede ejecutar múltiples tareas de Mapear y Reducir al mismo tiempo, pero el número exacto está determinado por el número de núcleos y el tamaño de la memoria. del seguimiento de tareas. El programador predeterminado llenará primero los espacios para tareas de Mapa y luego los espacios para tareas de Reducir. Jobtracker elegirá el rastreador de tareas más cercano al archivo de segmento. Idealmente, las tareas son locales para los datos, pero también pueden ser locales para el rack. Sin localización, necesitan recuperar datos de otros racks. Reducir la asignación de tareas es muy simple: el rastreador de trabajos simplemente elegirá una de la lista de tareas de Reducir para ejecutar, independientemente de la localidad de los datos.

Rastreador de tareas de ejecución de tareas:

? Después de recibir una nueva tarea, TaskTracker la ejecutará localmente. El primer paso para ejecutar una tarea es inyectar configuración, datos, programas y otra información necesaria para localizar la tarea a través de localizedJob.

? 1. Localice datos: copie job.split y job.jar localmente desde el sistema de archivos * * * (en el caché distribuido) y escriba la información de configuración del trabajo en job.xml.

? 2. Cree un nuevo directorio de trabajo local: tasktracker comprimirá el archivo job.jar en este directorio de trabajo.

? 3. Llame al método launchTaskForJob para publicar la tarea (lo que creará una nueva instancia de TaskRunner para ejecutar la tarea). Si es una tarea de Mapa, se habilitará MapTaskRunner y, para Reducir, será ReduceTaskRunner.

TaskRunner luego iniciará una nueva JVM para ejecutar cada tarea de Mapa/Reducir para evitar que el rastreador de tareas falle debido a razones del programa. Sin embargo, la JVM aún se puede reutilizar entre diferentes tareas. Se discutirá la reutilización de la JVM. más tarde.

? Para un solo mapa, el proceso simple de ejecución de la tarea es:

? 1. ¿Asignar parámetros de ejecución de tareas

? 2. Agregue información de la tarea del mapa al archivo temporal del niño (el niño es el proceso principal que ejecuta las tareas de mapa y reducción).

? 3. Configure la carpeta de registro y los parámetros de comunicación y salida de la tarea de mapa.

? 4. Lea la entrada dividida para generar datos leídos por RecordReader.

? 5. Genere MapRunnable para Map, reciba datos de RecordReader a su vez y llame a la función Map para su procesamiento.

? 6. Finalmente, recopile las llamadas de salida de la función de mapa en MapOutputBuffer (el parámetro controla su tamaño).

Arroyos y Tuberías:

? Tanto los flujos como las canalizaciones ejecutan tareas especiales de Mapa y Reducción para ejecutar y comunicarse con ejecutables proporcionados por el usuario.

? Flujos: utilice flujos de entrada y salida estándar para comunicarse con los procesos.

? Tuberías: utilizadas para escuchar el socket, enviará un número de puerto al programa C ++ y los dos pueden establecer un enlace.

?

Actualizaciones de progreso y estado:

? Los trabajos y sus tareas tienen estados, que incluyen: estado de ejecución exitosa o fallida, mapeo/reducción del progreso, valores del contador de trabajos y mensajes de estado.

? Comunicación entre mensajes de estado y clientes:

? 1. Realice un seguimiento del progreso de una tarea de mapa: el progreso es la proporción de entrada que se ha procesado.

? 2. Para reducir: un poco complicado, la tarea de Reducir se divide en tres etapas (cada etapa representa 1/3), a saber, copiar, ordenar y Reducir procesamiento. Si reducir ha ejecutado la mitad de la entrada, entonces el progreso de la tarea es 1/3+1/6 = 5.

? 3. Contadores de tareas: las tareas tienen un conjunto de contadores que se encargan de contar los eventos cuando se ejecuta la tarea.

? 4. Informes de progreso de la tarea: si la tarea informa el progreso, se establecerá una bandera que indica que el estado se enviará al rastreador de tareas. Un hilo separado verifica este indicador cada tres segundos y, si está configurado, le informa al rastreador de tareas el estado actual.

? 5. Informe de progreso de Tasktracker: tasktracker enviará un latido a jobtracker cada 5 segundos (este latido está determinado por el tamaño del clúster. Cuanto más grande sea el clúster, más tiempo llevará durante el proceso de llamada, todo el estado de ejecución de tasktracker será). enviado a jobtracker.

? 6. Informe de tareas fusionadas de Jobtracker: genera una vista global que muestra el estado de las tareas de todas las máquinas que ejecutan trabajos.

? El JobClient mencionado anteriormente recibe el último estado consultando JobTracker cada segundo. El método getJob del cliente JobClient puede obtener una instancia de RunningJob, que contiene toda la información de estado del trabajo.

?

Finalización del trabajo:

? Cuando el rastreador de trabajos recibe una notificación de que se ha completado la última tarea del trabajo, establece el estado del trabajo en exitoso. Cuando JobClient consulta el estado, sabe que el trabajo se completó exitosamente, por lo que JobClient imprime un mensaje para notificar al usuario y luego regresa del método runJob.

? Si jobtracker tiene la configuración correspondiente, las notificaciones de trabajo HTTP también se enviarán al cliente. Los clientes que deseen recibir instrucciones de devolución de llamada pueden configurarlo a través de la propiedad job.end.notification.url.

? Jobtracker indica el estado de trabajo del trabajo, lo que indica que el rastreador de tareas también borra el estado de trabajo del trabajo, como eliminar la salida intermedia.

?

¿Falló

? De hecho, si ocurren errores de software en el código de usuario, el proceso fallará y la máquina fallará, pero Hadoop puede manejar bien estos fallos y completar el trabajo.

? 1. ¿Misión fallida

? Excepción de subtarea: si el código de usuario en la tarea Mapa/Reducir genera una excepción, el proceso JVM de la subtarea enviará un informe de error al rastreador de tareas del proceso principal antes de salir, y el error se registrará en el registro de usuario. Tasktracker marca este intento de tarea como finalizado y libera el espacio de tarea para ejecutar otra tarea.

? El proceso secundario JVM se cierra repentinamente: debido a algunas razones especiales causadas por el código de usuario causado por errores de JVM, la JVM puede cerrarse. En este caso, el rastreador de tareas notará que el proceso finalizó y marcará el intento como fallido.

? Tarea pendiente: una vez que el rastreador de tareas detecta que no ha recibido actualizaciones de progreso durante un período de tiempo, marcará la tarea como fallida y el proceso secundario de JVM finalizará automáticamente. El intervalo entre fallas de tareas suele ser de 10 minutos y el tiempo de vencimiento se puede establecer por trabajo o por clúster. El parámetro es mapred.task.timeout Nota: Si el valor del parámetro se establece en 0, la tarea suspendida nunca liberará su espacio de tarea, lo que reducirá la eficiencia de todo el clúster con el tiempo.

? Intentos fallidos de tarea: cuando jobtracker se entera de que tasktracker falló, reprogramará la tarea. Por supuesto, jobtracker intentará evitar reprogramar las tareas fallidas de tasktracker. Si la tarea se intenta más de 4 veces, no se volverá a intentar. Este valor se puede configurar. Para tareas de mapa, este parámetro es mapred.map.max.attempts y para tareas de reducción, este parámetro está controlado por la propiedad mapred.reduce.max.attempts. Si el número excede el límite, toda la operación fallará. Por supuesto, a veces no queremos detener todo el trabajo cuando algunas tareas fallan, porque incluso si algunas tareas fallan, algunos resultados del trabajo aún pueden ser útiles. En este caso, puede establecer el porcentaje máximo de errores de tareas que permite el trabajo sin provocar un error en el trabajo. Las tareas de mapa y las tareas de reducción se pueden controlar de forma independiente y los parámetros se asignan como porcentaje máximo de fallas.

? Matar: la finalización de la tarea es diferente del fracaso de la tarea. Un intento de tarea se puede abortar porque es una réplica especulativa o porque su rastreador de tareas falló, lo que hará que el rastreador de trabajos marque todos los intentos de tareas anteriores como finalizados. Los intentos de tareas abortadas no se contarán en el número de intentos de ejecución de tareas porque el intento abortado no es un error de la tarea.

? 2. ¿Falló el rastreador de tareas

? Si el Tasktracker falla porque falla o se ejecuta demasiado lento, dejará de enviar latidos al jobtracker (o enviará muy pocos latidos). Jobtracker nota que el tasktracker ha dejado de enviar latidos (el tiempo de vencimiento lo establece el parámetro mapred.tasktracker.expire.interval, en milisegundos) y lo elimina del grupo de tasktrackers en espera de ser programados. En el caso de un trabajo sin terminar, el rastreador de trabajos programa que la tarea de mapa que se ejecutó exitosamente en el segundo rastreador de tareas se vuelva a ejecutar porque no se puede acceder a la tarea de reducción en este momento (la salida intermedia se almacena en el sistema de archivos local del rastreador de tareas fallido) .

? Incluso si el rastreador de tareas no falla, es posible que el rastreador de trabajos lo incluya en la lista negra.

Si la cantidad de tareas fallidas en un rastreador de tareas es mucho mayor que la cantidad promedio de tareas fallidas en el clúster, se incluirá en la lista negra y el rastreador de tareas incluido en la lista negra se puede eliminar de la lista negra del rastreador de trabajos reiniciando.

3.jobtracker falló

? El fallo de las versiones anteriores de JobTracker era un único punto de fallo, en cuyo caso el trabajo estaba destinado a fracasar.

Plan de tareas:

? Programación avanzada de trabajos FIFO: FIFO en el orden de envío del trabajo. Puede establecer la prioridad configurando el atributo mapred.job.priority de JobClient o el método setJobPriority() (prioridad: muy_alta, alta, normal, baja, muy_baja). Tenga en cuenta que el algoritmo de programación FIFO no admite la preferencia, por lo que los trabajos de alta prioridad aún pueden ser bloqueados por trabajos de baja prioridad que se han estado ejecutando durante mucho tiempo.

? Programador justo: el objetivo es permitir que cada usuario disfrute de las capacidades del clúster de manera justa. Cuando hay muchos trabajos en el clúster, los espacios para tareas inactivas se asignarán de manera que "permita que cada usuario * * * disfrute del clúster". De forma predeterminada, cada usuario tiene su propio grupo de trabajos. FairScheduler admite la preferencia, por lo que si un grupo no obtiene una parte justa de los recursos dentro de un cierto período de tiempo, finalizará las tareas en el grupo con demasiados recursos, cediendo así espacios de tareas a grupos con recursos insuficientes. FairScheduler es un módulo de seguimiento. Para usarlo, debe colocar su archivo jar en el classpath de Hadoop. Se puede configurar a través de los parámetros map.red.job tracker.programador de tareas (el valor es org.Apache.Hadoop.map red.fair Scheduler).

? Programador de capacidad:

? Un clúster consta de muchas colas, cada cola tiene una capacidad asignada, similar a FairScheduler, excepto que dentro de cada cola, los trabajos se programan según FIFO. Básicamente, el Programador de capacidad permite a un usuario u organización simular un clúster con uso FIFO independiente para cada usuario.

Reproducción y clasificación aleatoria:

? MapReduce garantiza que la entrada a cada Reductor esté ordenada por clave. El proceso de clasificación realizado por el sistema: el proceso de pasar la salida del mapa como entrada al reductor se llama reproducción aleatoria. Shuffle es parte de una base de código que se optimiza y mejora constantemente. En muchos sentidos, la reproducción aleatoria es el núcleo de MapReduce.

? Todo el proceso de barajado debería verse así:

? ¿Partición del resultado del mapa? ¿Desbordamiento de separación de clasificación de clasificación? ¿Fusionar el mismo departamento? ¿Fusionar el mismo departamento? ¿Fusionar la clasificación de resultados para reducir el rendimiento del procesamiento

? Fin del mapa:

? Búfer de escritura: el recopilador procesa la salida de la función Mapa y los resultados no se escriben simplemente en el disco. Escribe en la memoria utilizando un búfer y la clasifica previamente para mayor eficiencia. Cada mapa tiene una memoria intermedia en anillo para la salida de tareas. El tamaño de búfer predeterminado es 100 MB (ajustado por el parámetro io.sort.mb). Una vez que el contenido del búfer alcanza el umbral (el valor predeterminado es 0,8), el proceso en segundo plano comienza a escribir el contenido en el disco (desbordamiento). La salida del mapa seguirá escribiéndose en el búfer mientras se escribe en el disco, pero si el búfer está lleno, el mapa impedirá la escritura en el disco. Las escrituras en el disco se escribirán mediante sondeo en el subdirectorio específico del trabajo especificado por la propiedad mapred.local.dir.

? Escriba el búfer: cuando Collect escribe el contenido del búfer, llama a la función sortAndSpill, que se utiliza principalmente para crear un archivo de desbordamiento, ordenar los datos según el valor clave y escribir los datos en el archivo según el división. Si la clase combinadora está configurada, llamará a la función combineAndSpill antes de escribir en el archivo. SortAndspill escribe en un archivo de derrame cada vez que se llama.

? Fusionar archivos de desbordamiento de todos los mapas: TaskTracker fusionará todos los archivos de desbordamiento generados por el mapa después de cada tarea de mapa. La regla de fusión es fusionar los datos en la misma partición de todos los archivos desbordados según las particiones y escribirlos en un archivo de salida de mapa ordenado por partición. La fase de reproducción aleatoria en el mapa finaliza después de que se haya escrito el último registro en el archivo de salida del mapa ordenado y particionado de forma única.

? Antes de escribir en el disco, el hilo primero divide los datos en particiones de respuesta según el reductor al que eventualmente se entregarán los datos. Dentro de cada partición, un subproceso en segundo plano realiza una clasificación interna por clave. Si hay un combinador, se ejecutará en la salida ordenada.

? Cuando la memoria alcanza el umbral de escritura de desbordamiento, se crea un nuevo archivo de escritura de desbordamiento porque habrá varios archivos de escritura de desbordamiento después de que la tarea de mapa complete su último registro de salida. Los archivos de escritura desbordados se fusionarán en un archivo de salida particionado y ordenado antes de que se complete la tarea.

La propiedad de configuración io.sort.facor controla la cantidad máxima de flujos de datos que se pueden combinar al mismo tiempo, con un valor predeterminado de 10.

? Si se especifica un combinador y el número de escrituras es al menos 3 (establecido por min.mum.spills.for.combine), el combinador se ejecutará antes de que el archivo de salida se escriba en el disco. El objetivo de ejecutar el combinador es hacer que la salida del mapa sea más compacta y escribir en el disco local, enviando menos datos al reductor.

? Compresión al escribir en el disco: la compresión al escribir en el disco hará que la escritura sea más rápida, ahorrará espacio en el disco y reducirá la cantidad de datos transferidos al reductor. De forma predeterminada, la salida está sin comprimir, pero la compresión se puede habilitar estableciendo el valor mapred.compress.map.output en verdadero. La biblioteca de compresión utilizada se realiza mediante compresión de mapas mapred.

? Subprocesos de trabajo obtenidos por el reductor: el reductor obtiene la partición del archivo de salida a través de http. El número de subprocesos de trabajo utilizados para la partición del archivo se especifica mediante la propiedad tracker.http.threads. Esta configuración se aplica por rastreador de tareas, no por ranura de tarea de mapa. El valor predeterminado es 40, que se puede aumentar según sea necesario en clústeres grandes.

Fin de la reducción:

? Fase de copia: reducir obtendrá periódicamente la ubicación de salida del mapa del JobTracker. Una vez que se obtiene la posición de salida, reducir copiará la salida del mapa del TaskTracker correspondiente localmente (si la salida del mapa es pequeña, se copiará a la memoria del nodo TaskTracker; de lo contrario, se liberará como un disco) sin esperar. todas las tareas del mapa para completar (por supuesto, esto también está determinado por el control de parámetros).

? Fase de fusión: los archivos de salida del mapa copiados de cada TaskTracker (ya sea en el disco o en la memoria) se integran y se mantiene el orden original de los datos.

? Etapa de reducción: extraiga secuencialmente un dato del archivo fusionado para procesarlo mediante la función de reducción y luego envíe el resultado al HDFS local.

? El archivo de salida del mapa se encuentra en el disco local del rastreador de tareas que ejecuta la tarea del mapa. Ahora, tasktracker quiere ejecutar tareas reducidas en archivos de partición. El tiempo de finalización de cada tarea puede ser diferente, pero tan pronto como se completa una tarea, la tarea de reducción comienza a copiar su salida. Esta es la fase de copia de la tarea de reducción. La tarea de reducción tiene una pequeña cantidad de subprocesos de copia para que pueda obtener la salida del mapa en paralelo. El valor predeterminado es 5 subprocesos y se puede configurar mediante la propiedad mapred.reduce.parallel.copies.

? Cómo sabe el Reducer de qué rastreador de tareas obtener la salida del mapa: cuando se completa la tarea del mapa, notifica a su rastreador de tareas principal que el estado se ha actualizado, y el rastreador de tareas luego notifica (a través de un latido) al rastreador de trabajos. Entonces, JobTracker conoce la relación de mapeo entre la salida del mapa y el rastreador de tareas, y un subproceso en el reductor le pide periódicamente al rastreador de trabajos que conozca la ubicación de la salida del mapa. Debido a que los reductores pueden fallar, el rastreador de tareas no elimina inmediatamente la salida del mapa del disco cuando el primer reductor la recupera. En cambio, espera a que el rastreador de trabajos anuncie que se puede eliminar la salida del mapa, que es la última ejecución después de que se completa el trabajo.

? Si la salida del mapa es más pequeña, se copia directamente al búfer de memoria del rastreador de tareas reducido (el tamaño está controlado por mapred . job . buffer . input . buffer . percent ; de lo contrario, la salida del mapa será un porcentaje del espacio del montón); copiarse al disco. Una vez que el búfer de memoria alcanza el tamaño del umbral (a través de mapred . iob . buffer . merge . percent)

o se alcanza el tamaño del umbral de salida del mapa (mapred.inmem.threadhold), el desbordamiento combinado se escribe en el disco .

? A medida que hay más copias disponibles en el disco, un hilo en segundo plano las fusiona en archivos ordenados más grandes. Nota: Para poder fusionarlo, la salida del mapa comprimido debe descomprimirse en la memoria.

? Fase de clasificación: una vez completada la fase de copia, la tarea de reducción entrará en la fase de clasificación, o más precisamente, en la fase de fusión, que fusionará las salidas del mapa y mantendrá su orden. La fusión es cíclica y el número de archivos de salida para cada fusión está determinado por el factor de fusión. Pero permite generar archivos intermedios.

? Etapa de reducción: en la etapa de reducción final, los archivos ordenados se ingresan directamente en la función de reducción y los archivos intermedios ya no se fusionan. La fusión final puede provenir de la memoria o del disco. El resultado de esta etapa se escribirá directamente en el sistema de archivos, generalmente HDFS.

? Detalles: la fusión aquí no es una fusión ordinaria.

Por ejemplo, hay 40 archivos y el factor de combinación es 10. En lugar de fusionar 10 archivos por pasada, fusionamos cuatro pasadas. En cambio, se fusionan cuatro archivos en la primera pasada y 10 en las últimas tres pasadas. En la última pasada, se fusionaron 4 archivos y los 6 archivos restantes se fusionaron directamente en reducción.