如何制作可复制性好的Apache Spark实例

pyspark spark-dataframe apache-spark dataframe apache-spark-sql pyspark-sql


我花了很多时间阅读有关pysparkspark- dataframe标签的一些问题,而且我经常发现海报提供的信息不足以真正理解他们的问题。我通常发表评论,要求他们发布MCVE,但有时让他们展示一些示例输入/输出数据就像拔牙。例如:请参阅对此问题的评论。

问题的一部分可能是人们只是不知道如何轻松地为spark-dataframe创建MCVE。我认为将这个熊猫问题的Spark-Dataframe版本作为可链接的指南会很有用。

那么,如何去创造一个好的、可复制的例子呢?




Answer 1 pault


提供小的样本数据,可以方便地进行再创造。

最起码,发帖人应该在自己的数据框架上提供几个行和列,并提供一些代码,以便于创建。我所说的简单,是指剪切和粘贴。让它尽可能小,以证明你的问题。


我有以下数据框:

+-----+---+-----+----------+
|index|  X|label|      date|
+-----+---+-----+----------+
|    1|  1|    A|2017-01-01|
|    2|  3|    B|2017-01-02|
|    3|  5|    A|2017-01-03|
|    4|  7|    B|2017-01-04|
+-----+---+-----+----------+

可以使用以下代码创建:

df = sqlCtx.createDataFrame(
    [
        (1, 1, 'A', '2017-01-01'),
        (2, 3, 'B', '2017-01-02'),
        (3, 5, 'A', '2017-01-03'),
        (4, 7, 'B', '2017-01-04')
    ],
    ('index', 'X', 'label', 'date')
)

显示所需的输出。

询问您的具体问题,并向我们展示您想要的输出。


如何创建一个新列 'is_divisible' 具有价值 'yes' 如果月的一天 'date' 加7天整除值列 'X' 'no' ,否则?

所需的输出:

+-----+---+-----+----------+------------+
|index|  X|label|      date|is_divisible|
+-----+---+-----+----------+------------+
|    1|  1|    A|2017-01-01|         yes|
|    2|  3|    B|2017-01-02|         yes|
|    3|  5|    A|2017-01-03|         yes|
|    4|  7|    B|2017-01-04|          no|
+-----+---+-----+----------+------------+

解释一下如何得到你的输出。

详细地解释一下你是如何得到你想要的输出的。这有助于展示一个计算实例。


例如,在第1行中,X = 1,日期= 2017-01-01。迄今为止,再加上7天就会产生2017年1月8日。一个月中的一天是8,并且由于8可被1整除,因此答案为“是”。

同样,对于最后一行,X = 7,日期= 2017-01-04。将7加到日期将产生11,作为月的一天。由于11%7不为0,答案为“否”。


分享你现有的代码。

向我们展示您已经做过或尝试过的事情,包括所有*代码,即使它不起作用。告诉我们您在哪里卡住,如果收到错误,请附上错误消息。

(*你可以省略创建火花上下文的代码,但你应该包括所有的导入。)


我知道如何添加一个 date 加上7天的新列,但是我很难将月份中的日期作为整数。

from pyspark.sql import functions as f
df.withColumn("next_week", f.date_add("date", 7))

包括版本、导入,并使用语法高亮显示。


对于性能调整岗位,包括执行计划。


解析火花输出文件

  • MaxU此答案中提供了有用的代码,以帮助将Spark输出文件解析为DataFrame。

其他说明:




Answer 2 Alper t. Turker


性能调校

如果问题与性能调整有关,请包括以下信息。

执行计划

最好包括扩展执行计划。在Python中:

df.explain(True) 

在Scala中。

df.explain(true)

要么 具有统计数据的扩展执行计划。在Python中:

print(df._jdf.queryExecution().stringWithStats())

在Scala中。

df.queryExecution.stringWithStats

模式和集群信息

  • mode - localclient ,`集群。
  • 集群管理器(如果适用)--无(本地模式)、独立、YARN、Mesos、Kubernetes。
  • 基本配置信息(内核数、执行器内存)。

时间信息

缓慢是相对的,特别是当您移植非分布式应用程序或期望低延迟时。可以从Spark UI( sc.uiWebUrl 中检索不同任务和阶段的确切时间) jobs 或Spark REST UI中。

使用统一的语境名称

对每个语境使用既定的名称,可以让我们快速重现问题。

  • sc 用于 SparkContext
  • sqlContext 用于 SQLContext
  • spark -对 SparkSession

提供类型信息( Scala

强大的类型推理是Scala最有用的功能之一,但它使我们很难分析脱离上下文的代码。即使类型从上下文中可以明显看出,最好是对变量进行注释。最好是使用

val lines: RDD[String] = sc.textFile("path")
val words: RDD[String] = lines.flatMap(_.split(" "))

over

val lines = sc.textFile("path")
val words = lines.flatMap(_.split(" "))

常用的工具可以帮助你。

  • spark-shell /斯卡拉壳

    采用 :t

    scala> val rdd = sc.textFile("README.md")
    rdd: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24
    
    scala> :t rdd
    org.apache.spark.rdd.RDD[String]
  • InteliJ的理念

    UseAlt+=




Answer 3 desertnaut


好问题和答案;一些其他建议:

包括你的Spark版本

Spark仍在不断发展,尽管没有1.x时代那么快。包括工作版本始终是一个好主意(但特别是如果您使用的是较旧的版本)。就个人而言,我总是开始回答

spark.version
# u'2.2.0'

or

sc.version
# u'2.2.0'

包括你的Python版本,也绝对不是一个坏主意。


包括你所有的进口产品

如果您的问题不完全是关于Spark SQL和数据框的,例如,如果您打算在某些机器学习操作中使用数据框,请明确说明您的导入-请参阅此问题,其中仅在进行了广泛交换后才将导入添加到OP中(现已删除)注释(结果证明这些错误的导入是问题的根本原因)。

为什么要这样做呢?因为比如说,这个LDA

from pyspark.mllib.clustering import LDA

不同的,从这个LDA:

from pyspark.ml.clustering import LDA

前者来自于旧的、基于RDD的API(原Spark MLlib),后者来自于新的、基于数据框的API(Spark ML)。


包括代码高亮显示

好的,我承认这是主观的:我认为默认情况下不应将PySpark问题标记为 python ;事实是, python 标签会自动使代码突出显示(我相信这是那些将其用于PySpark问题的主要原因)。无论如何,如果您同意,并且仍然想要一个不错的突出显示的代码,只需添加相关的markdown指令即可:

<!-- language-all: lang-python -->

在你的帖子中的某个地方,在你的第一个代码片段之前。

[更新:我已要求pysparksparkr 标签进行自动语法高亮显示,实际上已经实现了]




Answer 4 MaxU


这个小助手函数可能有助于将Spark输出文件解析成DataFrame。

PySpark:

from pyspark.sql.functions import *

def read_spark_output(file_path):
    step1 = spark.read \
             .option("header","true") \
             .option("inferSchema","true") \
             .option("delimiter","|") \
             .option("parserLib","UNIVOCITY") \
             .option("ignoreLeadingWhiteSpace","true") \
             .option("ignoreTrailingWhiteSpace","true") \
             .option("comment","+") \
             .csv("file://{}".format(file_path))
    # select not-null columns
    step2 = t.select([c for c in t.columns if not c.startswith("_")])
    # deal with 'null' string in column
    return step2.select(*[when(~col(col_name).eqNullSafe("null"), col(col_name)).alias(col_name) for col_name in step2.columns])

Scala:

// read Spark Output Fixed width table:
def readSparkOutput(filePath: String): org.apache.spark.sql.DataFrame = {
  val step1 = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .option("delimiter", "|")
    .option("parserLib", "UNIVOCITY")
    .option("ignoreLeadingWhiteSpace", "true")
    .option("ignoreTrailingWhiteSpace", "true")
    .option("comment", "+")
    .csv(filePath)

  val step2 = step1.select(step1.columns.filterNot(_.startsWith("_c")).map(step1(_)): _*)

  val columns = step2.columns
  columns.foldLeft(step2)((acc, c) => acc.withColumn(c, when(col(c) =!= "null", col(c))))
}

Usage:

df = read_spark_output("file:///tmp/spark.out")

PS:对于pyspark,可从 spark 2.3 获取 eqNullSafe