再現性の高いApache Sparkのサンプルの作り方

pyspark spark-dataframe apache-spark dataframe apache-spark-sql pyspark-sql


私はいくつかの質問を読ん時間のかなりの量を費やしてきたpyspark火花データフレームのタグと非常に多くの場合、私はポスターが本当に自分の質問を理解するのに十分な情報を提供しないことがわかります。私は通常、MCVEを投稿するように依頼するコメントをしますが、時々、サンプルの入出力データを表示するように依頼することは、歯を引っ張るようなものです。例:この質問のコメントを参照してください。

おそらく問題の一部は、spark-dataframesのMCVEを簡単に作成する方法を人々が知らないことです。リンクできるガイドとして、このパンダの質問のスパークデータフレームバージョンを用意しておくと便利だと思います。

では、再現性の高い良い例を作るにはどうすればいいのでしょうか?




Answer 1 pault


簡単に再現できる小さなサンプルデータを提供してください。

少なくとも、ポスターは、彼らのデータフレームとそれを簡単に作成するために使用できるコードの行と列のカップルを提供する必要があります。簡単というのは、カットアンドペーストという意味です。あなたの問題を示すために、できるだけ小さくしてください。


次のデータフレームがあります:

+-----+---+-----+----------+
|index|  X|label|      date|
+-----+---+-----+----------+
|    1|  1|    A|2017-01-01|
|    2|  3|    B|2017-01-02|
|    3|  5|    A|2017-01-03|
|    4|  7|    B|2017-01-04|
+-----+---+-----+----------+

これはこのコードで作成できます:

df = sqlCtx.createDataFrame(
    [
        (1, 1, 'A', '2017-01-01'),
        (2, 3, 'B', '2017-01-02'),
        (3, 5, 'A', '2017-01-03'),
        (4, 7, 'B', '2017-01-04')
    ],
    ('index', 'X', 'label', 'date')
)

希望する出力を表示します。

具体的な質問をして、希望の出力を見せてください。


どのように私は、新しい列を作成することができます 'is_divisible' の値を持つ 'yes' の月の日場合は 'date' を加えた7日は、列の値で割り切れるない 'X'および 'no' そうでない場合は?

望ましい出力:

+-----+---+-----+----------+------------+
|index|  X|label|      date|is_divisible|
+-----+---+-----+----------+------------+
|    1|  1|    A|2017-01-01|         yes|
|    2|  3|    B|2017-01-02|         yes|
|    3|  5|    A|2017-01-03|         yes|
|    4|  7|    B|2017-01-04|          no|
+-----+---+-----+----------+------------+

アウトプットの取り方を説明してください。

どのようにして希望の出力を得るのか、非常に詳細に説明してください。計算例を示すのに役立ちます。


たとえば、行1では、X = 1および日付= 2017-01-01です。これまでに7日間追加すると、2017-01-08になります。月の日は8で、8は1で割り切れるので、答えは「はい」です。

同様に、最後の行の場合、X = 7、日付= 2017-01-04です。日付に7を追加すると、月の日として11が生成されます。11%7は0ではないので、答えは「いいえ」です。


既存のコードを共有してください。

コードが機能しない場合でも、コードのすべて*を含め、実行または試行したことを示してください。行き詰まっている場所をお知らせください。エラーが発生した場合は、エラーメッセージを含めてください。

(*スパークコンテキストを作成するコードは省いても構いませんが、すべてのインポートを含める必要があります)


date に7日間を加えた新しい列を追加する方法を知っていますが、月の日を整数として取得できません。

from pyspark.sql import functions as f
df.withColumn("next_week", f.date_add("date", 7))

バージョン、インポートを含め、構文の強調表示を使用する


パフォーマンスチューニングのポストには、実行計画を含める

  • user8371915によって書かれたこの回答の詳細。
  • コンテキストに標準化された名前を使用するのに役立ちます。

スパーク出力ファイルのパース

  • MaxUは、Spark出力ファイルをDataFrameに解析するのに役立つ便利なコードをこの回答に提供しました。

その他の注意事項。




Answer 2 Alper t. Turker


パフォーマンスチューニング

パフォーマンスチューニングに関連する質問の場合は、以下の内容を記載してください。

実行計画

拡張された実行プランを含めることをお勧めします。Pythonの場合:

df.explain(True) 

Scalaで。

df.explain(true)

または統計付きの拡張実行プラン。Pythonの場合:

print(df._jdf.queryExecution().stringWithStats())

をScalaで使用しています。

df.queryExecution.stringWithStats

モードとクラスタ情報

  • mode - localclient 、 `クラスター。
  • クラスタマネージャ(該当する場合)-なし(ローカルモード)、スタンドアロン、YARN、Mesos、Kubernetes。
  • 基本的な構成情報(コア数、実行メモリ)。

タイミング情報

特に非分散型アプリケーションを移植する場合、または待ち時間が短いことが予想される場合は、速度は相対的です。さまざまなタスクとステージの正確なタイミングは、Spark UI( sc.uiWebUrljobs またはSpark REST UI から取得できます。

コンテキストに標準化された名前を使用する

各コンテキストに確立された名前を使用することで、問題を迅速に再現することができます。

  • sc - SparkContext の場合。
  • sqlContext - SQLContext の場合。
  • spark - SparkSession の場合。

タイプ情報を提供する(Scala

強力な型推論はScalaの最も便利な機能の一つですが、コンテキストから取り出されたコードを解析するのは困難です。文脈から型が明らかであっても、変数に注釈をつけた方が良いでしょう。望ましいのは

val lines: RDD[String] = sc.textFile("path")
val words: RDD[String] = lines.flatMap(_.split(" "))

over

val lines = sc.textFile("path")
val words = lines.flatMap(_.split(" "))

一般的に使用されているツールは、あなたを支援することができます。

  • spark-shell / Scala shell

    使用 :t

    scala> val rdd = sc.textFile("README.md")
    rdd: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24
    
    scala> :t rdd
    org.apache.spark.rdd.RDD[String]
  • InteliJ アイディア

    UseAlt+=




Answer 3 desertnaut


良い質問と答え。追加の提案:

Sparkのバージョンを含める

1.xの時代ほど急速ではありませんが、Sparkはまだ進化しています。常に(ただし、多少古いバージョンを使用している場合は特に)作業中のバージョンを含めることをお勧めします。個人的に、私は常に私の答えを以下から始めます

spark.version
# u'2.2.0'

or

sc.version
# u'2.2.0'

Pythonのバージョンも含めることは決して悪い考えではありません。


すべての輸入品を含める

質問が厳密にSpark SQLとデータフレームに関するものではない場合、たとえば一部の機械学習操作でデータフレームを使用する場合は、インポートについて明示してください- この質問を参照してください。インポートがOPに追加されたのは、 (現在は削除されています)コメント(これらの間違ったインポートが問題の根本的な原因であることが判明しました)。

なぜこれが必要なのか?それは、例えば、このLDA

from pyspark.mllib.clustering import LDA

このLDA とは異なります。

from pyspark.ml.clustering import LDA

最初のものは古いRDDベースのAPI(旧Spark MLlib)から来ており、2番目のものは新しいデータフレームベースのAPI(Spark ML)から来ています。


コードのハイライトを含む

OK、私はこれが主観的であることを告白します。PySparkの質問は、デフォルトで python としてタグ付けしないでください。問題は、 python タグが自動的にコードを強調表示することです(これがPySparkの質問に使用する主な理由です)。とにかく、もしあなたが同意して偶然、そして強調された素晴らしいコードが欲しいなら、単に関連するmarkdownディレクティブを含めてください:

<!-- language-all: lang-python -->

のどこかで、最初のコードスニペットの前に書いてください。

[更新:私は実際に実装されている pyspark および sparkr タグの自動構文強調表示を要求しました ]




Answer 4 MaxU


この小さなヘルパー関数は、Sparkの出力ファイルをDataFrameにパースするのに役立つかもしれません。

PySpark:

from pyspark.sql.functions import *

def read_spark_output(file_path):
    step1 = spark.read \
             .option("header","true") \
             .option("inferSchema","true") \
             .option("delimiter","|") \
             .option("parserLib","UNIVOCITY") \
             .option("ignoreLeadingWhiteSpace","true") \
             .option("ignoreTrailingWhiteSpace","true") \
             .option("comment","+") \
             .csv("file://{}".format(file_path))
    # select not-null columns
    step2 = t.select([c for c in t.columns if not c.startswith("_")])
    # deal with 'null' string in column
    return step2.select(*[when(~col(col_name).eqNullSafe("null"), col(col_name)).alias(col_name) for col_name in step2.columns])

Scala:

// read Spark Output Fixed width table:
def readSparkOutput(filePath: String): org.apache.spark.sql.DataFrame = {
  val step1 = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .option("delimiter", "|")
    .option("parserLib", "UNIVOCITY")
    .option("ignoreLeadingWhiteSpace", "true")
    .option("ignoreTrailingWhiteSpace", "true")
    .option("comment", "+")
    .csv(filePath)

  val step2 = step1.select(step1.columns.filterNot(_.startsWith("_c")).map(step1(_)): _*)

  val columns = step2.columns
  columns.foldLeft(step2)((acc, c) => acc.withColumn(c, when(col(c) =!= "null", col(c))))
}

Usage:

df = read_spark_output("file:///tmp/spark.out")

PS:pysparkの場合、 eqNullSafespark 2.3 から利用できます。