Как сделать хорошие воспроизводимые примеры Apache Spark

pyspark spark-dataframe apache-spark dataframe apache-spark-sql pyspark-sql


Я потратил немало времени, читая некоторые вопросы с помощью тегов pyspark и spark-dataframe, и очень часто обнаруживаю, что постеры не предоставляют достаточно информации, чтобы по-настоящему понять их вопрос. Я обычно комментирую, прося их опубликовать MCVE, но иногда заставляя их показывать некоторые примерные данные ввода / вывода, это все равно что тянуть зубы. Например: см. Комментарии к этому вопросу .

Возможно, часть проблемы в том, что люди просто не знают, как легко создать MCVE для фреймов с искровыми данными. Я думаю, что было бы полезно иметь версию этого вопроса о пандах, основанную на искровых данных, в качестве руководства, которое можно связать.

Так как же создать хороший,воспроизводимый пример?




Answer 1 pault


Предоставьте небольшие образцы данных,которые можно легко воссоздать.

По крайней мере,плакаты должны содержать пару строк и столбцов на своем фрейме данных и код,который можно легко использовать для его создания.Под простотой я имею в виду вырезать и вставить.Сделайте его как можно меньше,чтобы продемонстрировать вашу проблему.


У меня есть следующий фрейм данных:

+-----+---+-----+----------+
|index|  X|label|      date|
+-----+---+-----+----------+
|    1|  1|    A|2017-01-01|
|    2|  3|    B|2017-01-02|
|    3|  5|    A|2017-01-03|
|    4|  7|    B|2017-01-04|
+-----+---+-----+----------+

который может быть создан с помощью этого кода:

df = sqlCtx.createDataFrame(
    [
        (1, 1, 'A', '2017-01-01'),
        (2, 3, 'B', '2017-01-02'),
        (3, 5, 'A', '2017-01-03'),
        (4, 7, 'B', '2017-01-04')
    ],
    ('index', 'X', 'label', 'date')
)

Покажите желаемый результат.

Задайте свой конкретный вопрос и покажите нам желаемый результат.


Как создать новый столбец 'is_divisible' со значением 'yes' если день месяца 'date' плюс 7 дней делится на значение в столбце 'X' , а в противном случае 'no' ?

Желаемый вывод:

+-----+---+-----+----------+------------+
|index|  X|label|      date|is_divisible|
+-----+---+-----+----------+------------+
|    1|  1|    A|2017-01-01|         yes|
|    2|  3|    B|2017-01-02|         yes|
|    3|  5|    A|2017-01-03|         yes|
|    4|  7|    B|2017-01-04|          no|
+-----+---+-----+----------+------------+

Объясните,как получить результат.

Объясните в деталях,как вы получаете желаемый результат.Поможет показать пример расчета.


Например, в строке 1 X = 1 и дата = 2017-01-01. Добавление 7 дней до даты урожайности 2017-01-08. День месяца - 8, а так как 8 делится на 1, ответ «да».

Аналогично, для последней строки X = 7 и даты = 2017-01-04. Добавление 7 к дате дает 11 как день месяца. Поскольку 11% 7 не 0, ответ «нет».


Поделитесь своим существующим кодом.

Покажите нам, что вы сделали или попробовали, включая весь * код, даже если он не работает. Сообщите нам, где вы застряли, и если вы получили сообщение об ошибке, пожалуйста, включите сообщение об ошибке.

(*Вы можете пропустить код для создания контекста искры,но вы должны включить все импорты).


Я знаю, как добавить новый столбец с date плюс 7 дней, но у меня возникают проблемы с получением целого дня месяца.

from pyspark.sql import functions as f
df.withColumn("next_week", f.date_add("date", 7))

Включать версии,импортировать и использовать подсветку синтаксиса


Для должностей,связанных с настройкой производительности,включить план выполнения

  • Полная информация в этом ответе написана пользователем 8371915 .
  • Это помогает использовать стандартизированные названия в различных контекстах.

Разбор файлов вывода искры

  • MaxU предоставил полезный код в этом ответе, чтобы помочь разобрать выходные файлы Spark в DataFrame.

Другие заметки.




Answer 2 Alper t. Turker


Настройка производительности

Если вопрос касается настройки производительности,пожалуйста,включите следующую информацию.

План выполнения

Лучше всего включить расширенный план выполнения . В Python:

df.explain(True) 

В Скале:

df.explain(true)

или расширенный план выполнения со статистикой . В Python:

print(df._jdf.queryExecution().stringWithStats())

в Скале:

df.queryExecution.stringWithStats

Информация о режиме и кластере

  • mode - local , client , кластерный.
  • Кластерный менеджер (если применимо)-нет (локальный режим),автономный,YARN,Mesos,Kubernetes.
  • Базовая информация о конфигурации (количество ядер,память исполнителя).

Информация о сроках

Параметр slow является относительным, особенно если вы портируете нераспределенное приложение или ожидаете низкой задержки. Точные тайминги для различных задач и этапов, могут быть извлечены из Спарка пользовательского интерфейса ( sc.uiWebUrl ) jobs или искра REST UI.

Использовать стандартные названия для контекста

Использование установленных имен для каждого контекста позволяет быстро воспроизвести проблему.

  • sc - для SparkContext .
  • sqlContext - для SQLContext .
  • spark - для SparkSession .

Предоставить информацию о типе ( Scala )

Мощный тип вывода-одна из самых полезных функций Scala,но она затрудняет анализ кода,вырванного из контекста.Даже если тип очевиден из контекста,лучше аннотировать переменные.Предпочтителен .

val lines: RDD[String] = sc.textFile("path")
val words: RDD[String] = lines.flatMap(_.split(" "))

over

val lines = sc.textFile("path")
val words = lines.flatMap(_.split(" "))

Обычные инструменты могут вам помочь:

  • spark-shell / Scala shell

    использование :t

    scala> val rdd = sc.textFile("README.md")
    rdd: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24
    
    scala> :t rdd
    org.apache.spark.rdd.RDD[String]
  • Идея "Интеллидж

    UseAlt+=




Answer 3 desertnaut


Хороший вопрос и ответ; некоторые дополнительные предложения:

Включите вашу версию Spark

Spark все еще развивается, хотя и не так быстро, как в дни 1.x. Всегда (но особенно если вы используете несколько более старую версию) хорошая идея включить вашу рабочую версию. Лично я всегда начинаю свои ответы с:

spark.version
# u'2.2.0'

or

sc.version
# u'2.2.0'

Включая и вашу версию Python,никогда не бывает плохой идеей.


Включите весь ваш импорт

Если ваш вопрос касается не только Spark SQL и фреймов данных, например, если вы намереваетесь использовать свой фрейм данных в какой-либо операции машинного обучения, четко указывайте свои операции импорта - см. Этот вопрос , где импорт был добавлен в OP только после обширного обмена в (теперь удалены) комментарии (и оказалось, что эти неправильные операции импорта были основной причиной проблемы).

Почему это необходимо? Потому что,например,эта LDA

from pyspark.mllib.clustering import LDA

это отличается от этого LDA:

from pyspark.ml.clustering import LDA

Первый-из старого,основанного на RDD API (бывший Spark MLlib),а второй-из нового,основанного на данных,API (Spark ML).


Включает подсветку кода

Хорошо, я признаю, что это субъективно: я считаю, что вопросы PySpark не должны быть помечены как python по умолчанию ; дело в том, python тег автоматически выделяет код (и я считаю, что это главная причина для тех, кто использует его для вопросов PySpark). В любом случае, если вы согласны и вам все еще нужен хороший выделенный код, просто включите соответствующую директиву уценки:

<!-- language-all: lang-python -->

где-то в твоем посте,до твоего первого фрагмента кода.

[ОБНОВЛЕНИЕ: я запросил автоматическую подсветку синтаксиса для pyspark и sparkr , что действительно реализовано]




Answer 4 MaxU


Эта небольшая вспомогательная функция может помочь при разборе выходных файлов Spark в DataFrame:

PySpark:

from pyspark.sql.functions import *

def read_spark_output(file_path):
    step1 = spark.read \
             .option("header","true") \
             .option("inferSchema","true") \
             .option("delimiter","|") \
             .option("parserLib","UNIVOCITY") \
             .option("ignoreLeadingWhiteSpace","true") \
             .option("ignoreTrailingWhiteSpace","true") \
             .option("comment","+") \
             .csv("file://{}".format(file_path))
    # select not-null columns
    step2 = t.select([c for c in t.columns if not c.startswith("_")])
    # deal with 'null' string in column
    return step2.select(*[when(~col(col_name).eqNullSafe("null"), col(col_name)).alias(col_name) for col_name in step2.columns])

Scala:

// read Spark Output Fixed width table:
def readSparkOutput(filePath: String): org.apache.spark.sql.DataFrame = {
  val step1 = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .option("delimiter", "|")
    .option("parserLib", "UNIVOCITY")
    .option("ignoreLeadingWhiteSpace", "true")
    .option("ignoreTrailingWhiteSpace", "true")
    .option("comment", "+")
    .csv(filePath)

  val step2 = step1.select(step1.columns.filterNot(_.startsWith("_c")).map(step1(_)): _*)

  val columns = step2.columns
  columns.foldLeft(step2)((acc, c) => acc.withColumn(c, when(col(c) =!= "null", col(c))))
}

Usage:

df = read_spark_output("file:///tmp/spark.out")

PS: для pyspark , eqNullSafe доступен от spark 2.3 .