Red de conocimiento informático - Material del sitio web - Cómo lograr la compatibilidad de la versión Spark

Cómo lograr la compatibilidad de la versión Spark

En Spark 1.6, la mayoría de las clases relacionadas con el aprendizaje automático usan vectores o org.apache.spark.mllib.linalg.Vector

pero en 2.0, básicamente todas cambiaron a

org.apache.spark .mllib.linalg .Vector

y el objeto Vectors correspondiente. Esto crea grandes dificultades, por ejemplo, es difícil lograr compatibilidad con el siguiente código y no se compilará al cambiar de Spark:

No importa cómo lo escriba, no hay forma de funcionar en Spark 1.6 y Spark 2.0. al mismo tiempo Normalmente, siempre arrojará un error porque los nombres de los paquetes de Vector, Vectors y otras clases han cambiado.

En Spark, puede obtener la versión de Spark a través de org.apache.spark.SPARK_VERSION.

Definimos una clase:

Compilación dinámica

Scala es fácil de compilar dinámicamente y el código también es muy simple, aproximadamente como sigue:

El efecto Similar al efecto en Spark Shell, donde ref es el valor de retorno. Normalmente, escribe dos copias de su código, una para Spark 1.6 y otra para Spark 2.0, y decide cuál compilar en tiempo de ejecución. Sin embargo, un inconveniente de este enfoque (especialmente difícil de evitar en Spark) es que si el valor de referencia devuelto por compileCode es algo que debe serializarse en el Ejecutor, la deserialización puede causar problemas, porque algunas de las clases anónimas generadas no son Existe en Ejecutor. Más allá de eso, este enfoque es la forma más eficaz de lograr compatibilidad.

Inicialmente, consideré usar el código CodeGen interno de Spark, que se vería así:

Desafortunadamente, la API en sí cambia constantemente; por ejemplo, CodeAndComment solo está disponible en 2.0.

Métodos de proyecto separados

Aislar las partes de la API que cambian. Por ejemplo, como mencionamos anteriormente, para las API relacionadas con Vector, 1.6 y 2.0 son diferentes, por lo que podemos separar los dos proyectos, adaptarlos a las versiones correspondientes respectivamente y luego de acuerdo con el mecanismo de perfil en Maven, de acuerdo con Spark. versión Publicar paquetes jar para introducir diferentes paquetes de adaptación. Este método es relativamente engorroso.

Método de reflexión (el método utilizado por StreamingPro)

El siguiente es el código que utilicé para resolver el problema del cambio de nombre del paquete Vector:

Dinámicamente ajústelo según las diferentes versiones. Cargue la clase correspondiente y luego llame al método mediante reflexión para evitar errores en el tiempo de compilación. Sin embargo, no se puede utilizar un código similar con la reflexión:

Porque la función udf requiere la capacidad de deducir los valores de entrada y retorno. Si es por reflexión, dado que no podemos determinar el valor de retorno (puede ser org.apache.spark.ml.linalg.Vector u org.apache.spark.mllib.linalg.Vector), no se puede compilar en este momento. Por lo tanto, reescribimos la implementación de udf. Sin embargo, esta implementación también encontró contratiempos. Por ejemplo, usó la clase UserDefinedFunction, que ya estaba en un paquete diferente. Aún así, resolvimos el problema irradiando:

Es. feo.

Hay otro problema aquí: aunque udf devuelve todos los objetos UserDefinedFunction, también tienen versiones incompatibles, es decir, no podemos permitir que el compilador determine cuál es el valor de retorno. Usamos otro truco de sintaxis de Scala, como sigue:

En el centro de la última línea, afirmamos que el objeto devuelto satisface la siguiente firma:

{def apply(exprs: Columna *): Columna}

En este punto, puede usarlo directamente:

Resumen

La API interna de Spark 1.6 y 2.0 ha cambiado mucho. pero para los usuarios normales, la compatibilidad API sigue siendo muy buena. Lograr la compatibilidad de versiones no parece ser una tarea fácil. Por lo tanto, cuando uso StreamingPro para trabajos relacionados con el aprendizaje automático, solo soy compatible con Spark 1.6 y 2.0, renunciando a la versión 1.5. Pero para ETL normal y computación de transmisión, se admiten las tres versiones.