Cómo utilizar Spark para implementar programas MapReduce existentes
Pares clave-valor como tuplas
Supongamos que necesitamos calcular la longitud de cada línea en un texto grande e informar el número de líneas para cada longitud. En HadoopMapReduce, primero usamos un asignador para generar un par clave-valor con la longitud de la fila como clave y 1 como valor.
La clase pública LineLengthMapper extiende
Mapper & ltLongWritable, Text, IntWritable, IntWritable & gt{
@Override
Mapa vacío protegido ( Número de línea LongWritable, línea de texto, contexto de contexto)
Lanza IOException, InterruptedException {
context write(new int writable(line . getlength()), new int writable (1) );
}
}
Vale la pena señalar que los mapeadores y reductores solo operan en pares clave-valor. Entonces, la entrada proporcionada por TextInputFormat a LineLengthMapper es en realidad un par clave-valor, la posición en el texto es Key (rara vez se usa, pero siempre debe haber algo como Key) y el texto actúa como valor.
Implementación de Spark correspondiente:
lines .map(line = > (line.length, 1))
En Spark, la entrada es simplemente un RDD. de cadenas, en lugar de pares clave-valor. La representación de un par clave-valor en Spark es una tupla de Scala, creada utilizando la sintaxis (a, b). El resultado de la operación de mapeo anterior es un RDD de tuplas (Int, Int). Cuando un RDD contiene muchas tuplas, obtiene múltiples métodos, como reduceByKey, que son muy importantes para reproducir el comportamiento de MapReduce.
Reducir
Reduce() y reduceBykey()
Para calcular los pares clave-valor de longitudes de fila, debe tratar cada longitud como una clave en el Reductor y calcula la suma de sus filas como un valor.
La clase pública LineLengthReducer extiende
Reducer & ltIntWritable, IntWritable, IntWritable, IntWritable & gt{
@override
protected void reduce( Longitud de IntWritable, Iterable & ltIntWritable & gt count,
ContextContext) arroja IOException, InterruptedException {
int sum =
for (IntWritable count: counts) {
suma+= recuento . get();
}
context.write(longitud, nuevo int grabable(suma));
}
}
La implementación correspondiente de los mapeadores y reductores anteriores en Spark solo requiere una línea de código:
val length counts = lines .map (línea = >(longitud.línea, 1)). reduceByKey(_ + _)
La API RDD de Spark tiene un método de reducción, pero reduce todos los pares clave-valor a un solo valor.
Este no es el comportamiento de Hadoop MapReduce, su contraparte en Spark es ReduceByKey.
Además, el método Reduce de Reducer recibe un flujo de valores múltiples y produce un resultado de 0, 1 o más. reduceByKey acepta una función que convierte dos valores en uno, que es una función de suma simple que asigna dos números a su suma. Las personas que llaman pueden utilizar esta función relacionada para reducir varios valores a un solo valor. Reducir valores basados en claves es una API más simple y precisa en comparación con el método Reducer.
Mapmaker
Map() y flatMap()
Ahora, considere un algoritmo que cuenta el número de palabras que comienzan con una letra mayúscula. Para cada línea de texto de entrada, Mapper puede generar 0, 1 o más pares clave-valor.
La clase pública CountUppercaseMapper extiende
Mapper & ltLongWritable, Text, Text, IntWritable & gt{
@Override
Mapa vacío protegido ( Número de línea LongWritable, línea de texto, contexto de contexto)
lanza IOException, InterruptedException {
for (String word: line.toString().split(" "){
if(carácter. isupper case(palabra. charat(0))){
context.write(new Text(word), new int writable(1));
p>
}
}
}
}
Chispa escritura correspondiente:
líneas flatMap(
_.Split(" ").Filter(palabra = & gtCharacter.isUpperCase(palabra(0))).Map(palabra = & gt(palabra, 1))
)
La función simple de mapa de Spark no es adecuada para este escenario, porque el mapa solo puede producir una salida para cada entrada, pero en este ejemplo, una fila necesita producir múltiples salidas, por lo que en relación con MapperAPI, la función de mapa de Spark tiene una semántica más simple y un alcance de aplicación más limitado.
La solución de Spark es asignar primero cada fila a un conjunto de valores de salida, que pueden estar vacíos o tener varios valores. la función flatMap. Las palabras en la matriz se filtran y convierten en tuplas en la función. En este caso, es flatMap el que realmente imita el comportamiento del asignador
groupByKey ()
<. p>Es muy sencillo escribir un reductor de recuento en Spark, se puede utilizar reduceByKey para contar el número total de cada palabra. Por alguna razón, es necesario generar cada palabra en el archivo. sus números En MapReduce, la implementación es la siguiente:La clase pública CountUppercaseReducer extiende
Reducer & ltText, IntWritable, Text, IntWritable & gt{ p>
@. Anular
Reducción de vacío protegido (palabra de texto, iterable & ltIntWritable & gt count, contexto contexto)
Lanza IOException, InterruptedException {
int sum = 0;
for (Recuento IntWritable: recuentos) {
suma+= recuento .
}
Contexto
. escribir (nuevo texto (palabra.toString().
toUpperCase()), new int writable(sum));
}
}
Pero redeceByKey no puede funcionar solo en Spark porque conserva la clave original. Para hacer simulaciones en Spark necesitamos algo más parecido a la API Reducer. Sabemos que el método de reducción de Reducer acepta una clave y un conjunto de valores, y luego completa un conjunto de transformaciones. GroupByKey y las operaciones de mapeo continuo pueden lograr este objetivo:
groupByKey(). map { case (word, ones) = & gt; (word.toUpperCase, ones.sum) }
GroupByKey solo recopila todos los valores de una clave y no proporciona una función de reducción. Sobre esta base, cualquier transformación puede operar sobre una clave y un rango de valores. Aquí, las claves se convierten a letras mayúsculas y los valores se suman directamente.
Setup() y cleanup()
En MapReduce, Mapper y Reducer pueden declarar un método de configuración y ejecutar este método para asignar recursos costosos, como conexiones de bases de datos, antes de procesar la entrada. Al mismo tiempo, puede utilizar la función de limpieza para liberar recursos.
La clase pública SetupCleanupMapper extiende
Mapper & ltLongWritable, Text, Text, IntWritable & gt{
Conexión privada dbConnection
@Override
Configuración de anulación protegida (contexto contextual) {
Conexión de base de datos =...
}
...
@override
protegido vacío limpio(contexto de contexto) {
conexión db close()
}
}
Los métodos map y flatMap en Spark solo pueden operar en una entrada a la vez, y no hay forma de ejecutar el código antes y después de convertir una gran cantidad de valores. Parece que el código de configuración y limpieza se puede colocar directamente antes y después de la llamada a la función Sparkmap:
val Database Connection=...
lines.map(...dbConnection. createStatement(.. .)...)
dbConnection.close() // ¡Error!
Sin embargo, este enfoque no es factible porque:
Pone el objeto dbConnection en el cierre de la función de mapa, lo que requiere que sea serializable (por ejemplo, a través de java.io. Serializable). Sin embargo, los objetos como las conexiones de bases de datos generalmente no se pueden serializar.
El mapa es una conversión, no una operación, y retrasará la ejecución. El objeto de conexión no se puede cerrar a tiempo.
Aun así, solo puede cerrar la conexión en el controlador, pero no puede liberar la conexión de recursos asignada por la versión de copia serializada.
De hecho, ni map ni flatMap son la función correspondiente más cercana a Mapper en Spark, pero la función correspondiente más cercana a Mapper en Spark es el muy importante método mapPartitions(), que no solo puede asignar un único valor. a un solo valor, también puede asignar un conjunto de valores a otro, de forma muy similar al método de mapa masivo. Esto significa que el método mapPartitions() puede asignar recursos localmente al principio y liberarlos al final del mapeo por lotes.
Agregar un método de configuración es fácil, agregar un método de limpieza es más difícil porque aún es difícil detectar la finalización de la conversión.
Por ejemplo, esto funciona:
líneas . mapear particiones { valor iterador = >p>
val conexión de base de datos =... // OK
val transformadoIterator = iterador de valor. map(...conexión de base de datos...)
dbConnection.close() // ¡Aún está mal! No puede calcular iteradores
Transformadores
}
Reimprimir