Cómo hacer buenos ejemplos reproducibles de Apache Spark

pyspark spark-dataframe apache-spark dataframe apache-spark-sql pyspark-sql


He pasado una buena cantidad de tiempo leyendo algunas preguntas con las etiquetas pyspark y spark-dataframe y muy a menudo encuentro que los carteles no brindan suficiente información para comprender realmente su pregunta. Normalmente comento pidiéndoles que publiquen un MCVE, pero a veces hacer que muestren algunos datos de entrada / salida de muestra es como tirar de los dientes. Por ejemplo: vea los comentarios sobre esta pregunta .

Quizás parte del problema es que las personas simplemente no saben cómo crear fácilmente un MCVE para marcos de datos de chispa. Creo que sería útil tener una versión de marco de datos de chispa de esta pregunta de pandas como una guía que pueda vincularse.

Entonces,¿cómo se puede crear un buen ejemplo reproducible?




Answer 1 pault


Proporcionar pequeños datos de muestra,que pueden ser fácilmente recreados.

Como mínimo,los carteles deberían proporcionar un par de filas y columnas en su marco de datos y código que puedan utilizarse para crearlo fácilmente.Por fácil,me refiero a cortar y pegar.Hágalo lo más pequeño posible para demostrar su problema.


Tengo el siguiente marco de datos:

+-----+---+-----+----------+
|index|  X|label|      date|
+-----+---+-----+----------+
|    1|  1|    A|2017-01-01|
|    2|  3|    B|2017-01-02|
|    3|  5|    A|2017-01-03|
|    4|  7|    B|2017-01-04|
+-----+---+-----+----------+

que se puede crear con este código:

df = sqlCtx.createDataFrame(
    [
        (1, 1, 'A', '2017-01-01'),
        (2, 3, 'B', '2017-01-02'),
        (3, 5, 'A', '2017-01-03'),
        (4, 7, 'B', '2017-01-04')
    ],
    ('index', 'X', 'label', 'date')
)

Mostrar la salida deseada.

Haga su pregunta específica y muéstrenos su resultado deseado.


¿Cómo puedo crear una nueva columna 'is_divisible' que tenga el valor 'yes' si el día del mes de la 'date' más 7 días es divisible por el valor en la columna 'X' , y 'no' de locontrario?

Salida deseada:

+-----+---+-----+----------+------------+
|index|  X|label|      date|is_divisible|
+-----+---+-----+----------+------------+
|    1|  1|    A|2017-01-01|         yes|
|    2|  3|    B|2017-01-02|         yes|
|    3|  5|    A|2017-01-03|         yes|
|    4|  7|    B|2017-01-04|          no|
+-----+---+-----+----------+------------+

Explica cómo obtener tu salida.

Explique,con gran detalle,cómo obtiene el resultado deseado.Ayuda mostrar un ejemplo de cálculo.


Por ejemplo, en la fila 1, la X = 1 y la fecha = 2017-01-01. Agregar 7 días hasta la fecha produce 2017-01-08. El día del mes es 8 y dado que 8 es divisible por 1, la respuesta es 'sí'.

Del mismo modo, para la última fila X = 7 y la fecha = 2017-01-04. Agregar 7 a la fecha produce 11 como el día del mes. Como 11% 7 no es 0, la respuesta es 'no'.


Comparte tu código existente.

Muéstrenos lo que ha hecho o probado, incluido todo * del código, incluso si no funciona. Díganos dónde se está atascando y si recibe un error, incluya el mensaje de error.

(*Puedes dejar fuera el código para crear el contexto de la chispa,pero debes incluir todas las importaciones.)


Sé cómo agregar una nueva columna que es la date más 7 días, pero tengo problemas para obtener el día del mes como un número entero.

from pyspark.sql import functions as f
df.withColumn("next_week", f.date_add("date", 7))

Incluir versiones,importaciones y usar resaltado de sintaxis


Para los puestos de ajuste de rendimiento,incluya el plan de ejecución


Análisis de los archivos de salida de la chispa

  • MaxU proporcionó un código útil en esta respuesta para ayudar a analizar los archivos de salida de Spark en un DataFrame.

Otras notas.




Answer 2 Alper t. Turker


Ajuste del rendimiento

Si la pregunta está relacionada con el ajuste del rendimiento,por favor incluya la siguiente información.

Plan de ejecución

Es mejor incluir un plan de ejecución extendido . En Python:

df.explain(True) 

En Scala:

df.explain(true)

o plan de ejecución extendido con estadísticas . En Python:

print(df._jdf.queryExecution().stringWithStats())

en Scala:

df.queryExecution.stringWithStats

Información sobre el modo y el cúmulo

  • mode - local , client e , `cluster.
  • Gestor de clusters (si procede)-ninguno (modo local),autónomo,YARN,Mesos,Kubernetes.
  • Información básica de configuración (número de núcleos,memoria de ejecución).

Información de tiempo

lento es relativo, especialmente cuando transfiere aplicaciones no distribuidas o espera baja latencia. Los tiempos exactos para diferentes tareas y etapas se pueden recuperar de los jobs de Spark UI ( sc.uiWebUrl ) o Spark REST UI.

Utilizar nombres normalizados para los contextos

El uso de nombres establecidos para cada contexto nos permite reproducir rápidamente el problema.

  • sc : para SparkContext .
  • sqlContext : para SQLContext .
  • spark - para SparkSession .

Proporcionar información de tipo ( Scala )

La inferencia de tipo poderoso es una de las características más útiles de Scala,pero hace difícil analizar el código sacado de contexto.Incluso si el tipo es obvio por el contexto es mejor anotar las variables.Prefiera

val lines: RDD[String] = sc.textFile("path")
val words: RDD[String] = lines.flatMap(_.split(" "))

over

val lines = sc.textFile("path")
val words = lines.flatMap(_.split(" "))

Las herramientas comúnmente usadas pueden ayudarte:

  • spark-shell / concha Scala

    uso :t

    scala> val rdd = sc.textFile("README.md")
    rdd: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24
    
    scala> :t rdd
    org.apache.spark.rdd.RDD[String]
  • Idea InteliJ

    UseAlt+=




Answer 3 desertnaut


Buena pregunta y respuesta; algunas sugerencias adicionales:

Incluya su versión de la Chispa

Spark todavía está evolucionando, aunque no tan rápido como en los días de 1.x. Siempre es (pero especialmente si está utilizando una versión algo anterior) una buena idea para incluir su versión de trabajo. Personalmente, siempre comienzo mis respuestas con:

spark.version
# u'2.2.0'

or

sc.version
# u'2.2.0'

Incluir tu versión en Python,también,nunca es una mala idea.


Incluya todas sus importaciones

Si su pregunta no es estrictamente acerca de Spark SQL y los marcos de datos, por ejemplo, si tiene la intención de usar su marco de datos en alguna operación de aprendizaje automático, sea explícito acerca de sus importaciones; vea esta pregunta , donde las importaciones se agregaron en el OP solo después de un intercambio extenso en el (ahora eliminado) comentarios (y resultó que estas importaciones incorrectas fueron la causa principal del problema).

¿Por qué es necesario? Porque,por ejemplo,este LDA

from pyspark.mllib.clustering import LDA

es diferente de este LDA:

from pyspark.ml.clustering import LDA

la primera proveniente de la antigua API basada en RDD (antes Spark MLlib),mientras que la segunda proveniente de la nueva API basada en un marco de datos (Spark ML).


Incluir el resaltado del código

OK, confesaré que esto es subjetivo: creo que las preguntas de PySpark no deben etiquetarse como python de manera predeterminada ; El problema es que la etiqueta de python proporciona resaltado de código automáticamente (y creo que esta es una razón principal para aquellos que la usan para preguntas de PySpark). De todos modos, si está de acuerdo, y aún desea un código bonito y resaltado, simplemente incluya la directiva de rebajas correspondiente:

<!-- language-all: lang-python -->

en algún lugar de su puesto,antes de su primer fragmento de código.

[ACTUALIZACIÓN: he solicitado el resaltado automático de sintaxis para las etiquetas pyspark y sparkr , que se ha implementado de hecho]




Answer 4 MaxU


Esta pequeña función de ayuda podría ayudar a analizar los archivos de salida de la Chispa en el DataFrame:

PySpark:

from pyspark.sql.functions import *

def read_spark_output(file_path):
    step1 = spark.read \
             .option("header","true") \
             .option("inferSchema","true") \
             .option("delimiter","|") \
             .option("parserLib","UNIVOCITY") \
             .option("ignoreLeadingWhiteSpace","true") \
             .option("ignoreTrailingWhiteSpace","true") \
             .option("comment","+") \
             .csv("file://{}".format(file_path))
    # select not-null columns
    step2 = t.select([c for c in t.columns if not c.startswith("_")])
    # deal with 'null' string in column
    return step2.select(*[when(~col(col_name).eqNullSafe("null"), col(col_name)).alias(col_name) for col_name in step2.columns])

Scala:

// read Spark Output Fixed width table:
def readSparkOutput(filePath: String): org.apache.spark.sql.DataFrame = {
  val step1 = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .option("delimiter", "|")
    .option("parserLib", "UNIVOCITY")
    .option("ignoreLeadingWhiteSpace", "true")
    .option("ignoreTrailingWhiteSpace", "true")
    .option("comment", "+")
    .csv(filePath)

  val step2 = step1.select(step1.columns.filterNot(_.startsWith("_c")).map(step1(_)): _*)

  val columns = step2.columns
  columns.foldLeft(step2)((acc, c) => acc.withColumn(c, when(col(c) =!= "null", col(c))))
}

Usage:

df = read_spark_output("file:///tmp/spark.out")

PD: para pyspark , eqNullSafe está disponible en spark 2.3 .