재현 가능한 Apache Spark 예제를 만드는 방법

pyspark spark-dataframe apache-spark dataframe apache-spark-sql pyspark-sql


나는 pysparkspark- dataframe 태그로 몇 가지 질문을 읽는 데 상당한 시간을 소비 했으며 , 종종 포스터가 그들의 질문을 진정으로 이해하기에 충분한 정보를 제공하지 못한다는 것을 알게되었습니다. 나는 보통 그들에게 MCVE 를 게시하라고 요청 하지만 때로는 샘플 입 / 출력 데이터를 표시하는 것은 치아를 당기는 것과 같습니다. 예를 들면 : 이 질문 에 대한 의견을보십시오 .

아마도 문제의 일부는 사람들이 스파크 데이터 프레임을위한 MCVE를 쉽게 만드는 방법을 모른다는 것입니다. 나는 이 팬더 질문 의 스파크 데이터 프레임 버전을 링크 할 수있는 가이드로 사용하는 것이 유용 할 것이라고 생각합니다 .

그렇다면 재현 가능한 좋은 예를 만드는 방법은 무엇입니까?




Answer 1 pault


쉽게 재현 할 수있는 작은 샘플 데이터를 제공하십시오.

최소한 포스터는 데이터 프레임과 코드에 쉽게 만들 수있는 몇 개의 행과 열을 제공해야합니다. 쉽게 말해서 잘라내어 붙여 넣기를 의미합니다. 문제를 설명하기 위해 가능한 작게 만드십시오.


다음과 같은 데이터 프레임이 있습니다.

+-----+---+-----+----------+
|index|  X|label|      date|
+-----+---+-----+----------+
|    1|  1|    A|2017-01-01|
|    2|  3|    B|2017-01-02|
|    3|  5|    A|2017-01-03|
|    4|  7|    B|2017-01-04|
+-----+---+-----+----------+

이 코드로 만들 수 있습니다 :

df = sqlCtx.createDataFrame(
    [
        (1, 1, 'A', '2017-01-01'),
        (2, 3, 'B', '2017-01-02'),
        (3, 5, 'A', '2017-01-03'),
        (4, 7, 'B', '2017-01-04')
    ],
    ('index', 'X', 'label', 'date')
)

원하는 출력을 보여줍니다.

구체적인 질문을하고 원하는 결과를 보여주십시오.


어떻게 새 열을 만들 수 있습니다 'is_divisible' 값이없는이 'yes' 의 월의 일 경우 'date' 플러스 칠일이 열의 값으로 나누어 'X' , 그리고 'no' 그렇지?

원하는 출력 :

+-----+---+-----+----------+------------+
|index|  X|label|      date|is_divisible|
+-----+---+-----+----------+------------+
|    1|  1|    A|2017-01-01|         yes|
|    2|  3|    B|2017-01-02|         yes|
|    3|  5|    A|2017-01-03|         yes|
|    4|  7|    B|2017-01-04|          no|
+-----+---+-----+----------+------------+

결과를 얻는 방법을 설명하십시오.

원하는 결과를 얻는 방법을 자세하게 설명하십시오. 계산 예제를 표시하는 데 도움이됩니다.


예를 들어 행 1에서 X = 1이고 날짜 = 2017-01-01입니다. 지금까지 7 일을 추가하면 2017-01-08이됩니다. 이 달의 날짜는 8이고 8은 1로 나눌 수 있으므로 대답은 '예'입니다.

마찬가지로 마지막 행 X = 7이고 날짜 = 2017-01-04입니다. 날짜에 7을 추가하면 해당 월의 날짜가 11이됩니다. 11 % 7이 0이 아니기 때문에 답은 '아니요'입니다.


기존 코드를 공유하십시오.

작동하지 않더라도 모든 코드 * 를 포함하여 수행했거나 시도한 작업을 알려주십시오 . 문제가 발생한 위치를 알려주고 오류가 발생하면 오류 메시지를 포함하십시오.

(* Spark 컨텍스트를 만들기 위해 코드를 생략 할 수 있지만 모든 가져 오기를 포함해야합니다.)


date 에 7 일을 더한 새 열을 추가하는 방법을 알고 있지만 해당 월의 일을 정수로 얻는 데 문제가 있습니다.

from pyspark.sql import functions as f
df.withColumn("next_week", f.date_add("date", 7))

버전 포함, 가져 오기 및 구문 강조 사용


성능 조정 게시물의 경우 실행 계획을 포함하십시오.

  • 이 답변의 자세한 내용은 user8371915가 작성했습니다 .
  • 컨텍스트에 표준화 된 이름을 사용하는 데 도움이됩니다.

스파크 출력 파일 파싱

  • MaxU 출력에 Spark 출력 파일을 DataFrame으로 구문 분석하는 데 유용한 코드를 제공 했습니다.

다른 메모.

  • 요청 하는 방법 과 최소, 완료 및 검증 가능한 예제를 작성하는 방법을 먼저 읽으십시오 .
  • 위에 링크 된이 질문에 대한 다른 답변을 읽으십시오.
  • 좋은 제목이 있습니다.
  • 공손. SO에있는 사람들은 자원 봉사자이므로 잘 부탁합니다.



Answer 2 Alper t. Turker


성능 조정

질문이 성능 조정과 관련이있는 경우 다음 정보를 포함하십시오.

실행 계획

확장 된 실행 계획 을 포함하는 것이 가장 좋습니다 . 파이썬에서 :

df.explain(True) 

스칼라에서 :

df.explain(true)

또는 통계가 포함 된 확장 실행 계획 . 파이썬에서 :

print(df._jdf.queryExecution().stringWithStats())

스칼라에서 :

df.queryExecution.stringWithStats

모드 및 클러스터 정보

  • mode - local , client ,`클러스터.
  • 클러스터 관리자 (해당되는 경우)-없음 (로컬 모드), 독립형, YARN, Mesos, Kubernetes
  • 기본 구성 정보 (코어 수, 실행기 메모리).

타이밍 정보

특히 비 분산 응용 프로그램을 이식하거나 지연 시간이 짧은 경우에는 속도 가 상대적입니다. 다른 작업 및 단계에 대한 정확한 타이밍은 Spark UI ( sc.uiWebUrl ) jobs 또는 Spark REST UI 에서 검색 할 수 있습니다 .

컨텍스트에 표준화 된 이름 사용

각 컨텍스트에 설정된 이름을 사용하면 문제를 신속하게 재현 할 수 있습니다.

  • sc -에 대한 SparkContext .
  • sqlContext - SQLContext 용 입니다.
  • spark - SparkSession 용 입니다.

타입 정보 제공 ( Scala )

강력한 유형 유추는 Scala의 가장 유용한 기능 중 하나이지만 컨텍스트에서 가져온 코드를 분석하기가 어렵습니다. 컨텍스트에서 유형이 분명하더라도 변수에 주석을 추가하는 것이 좋습니다. 취하다

val lines: RDD[String] = sc.textFile("path")
val words: RDD[String] = lines.flatMap(_.split(" "))

over

val lines = sc.textFile("path")
val words = lines.flatMap(_.split(" "))

일반적으로 사용되는 도구가 도움이 될 수 있습니다.

  • spark-shell / 스칼라 쉘

    사용 :t

    scala> val rdd = sc.textFile("README.md")
    rdd: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24
    
    scala> :t rdd
    org.apache.spark.rdd.RDD[String]
  • InteliJ 아이디어

    UseAlt+=




Answer 3 desertnaut


좋은 질문과 답변; 몇 가지 추가 제안 사항 :

스파크 버전 포함

스파크는 여전히 진화하고 있지만 1.x 시대와 같이 빠르지는 않습니다. 항상 (특히 이전 버전을 사용하는 경우) 작업 버전을 포함하는 것이 좋습니다. 개인적으로 나는 항상 다음과 같이 을 시작 합니다 .

spark.version
# u'2.2.0'

or

sc.version
# u'2.2.0'

파이썬 버전도 포함시키는 것은 결코 나쁜 생각이 아닙니다.


모든 수입품 포함

스파크 SQL 및 데이터 프레임에 대한 질문이 아닌 경우 (예 : 일부 머신 러닝 작업에서 데이터 프레임을 사용하려는 경우) 가져 오기에 대해 명시 적입니다. 이 질문을 참조하십시오 . (이제 제거됨) 의견 (이 잘못된 수입품이 문제의 근본 원인임이 밝혀졌습니다).

왜 이것이 필요한가요? 예를 들어이 LDA가

from pyspark.mllib.clustering import LDA

이다 다른 이 LDA에서 :

from pyspark.ml.clustering import LDA

첫 번째는 기존의 RDD 기반 API (이전의 Spark MLlib)에서 온 것이고 두 번째는 새로운 데이터 프레임 기반 API (Spark ML)의 것입니다.


코드 강조 표시 포함

자, 이것이 주관적이라고 고백 할 것입니다. PySpark 질문은 기본적 으로 python 으로 태그되어서는 안된다고 생각합니다 . 문제는 python 태그가 자동으로 코드 강조 표시를 제공 한다는 것입니다 (PySpark 질문에이를 사용하는 사람들에게는 이것이 주된 이유라고 생각합니다). 어쨌든 동의 할 때에도 여전히 강조 표시된 멋진 코드를 원한다면 관련 markdown 지시문을 포함하십시오.

<!-- language-all: lang-python -->

첫 번째 코드 스 니펫 전에 게시물의 어딘가에 있습니다.

[업데이트 : 실제로 구현 된 pysparksparkr 태그에 대한 자동 구문 강조를 요청했습니다 .]




Answer 4 MaxU


이 작은 도우미 함수는 Spark 출력 파일을 DataFrame으로 구문 분석하는 데 도움이 될 수 있습니다.

PySpark:

from pyspark.sql.functions import *

def read_spark_output(file_path):
    step1 = spark.read \
             .option("header","true") \
             .option("inferSchema","true") \
             .option("delimiter","|") \
             .option("parserLib","UNIVOCITY") \
             .option("ignoreLeadingWhiteSpace","true") \
             .option("ignoreTrailingWhiteSpace","true") \
             .option("comment","+") \
             .csv("file://{}".format(file_path))
    # select not-null columns
    step2 = t.select([c for c in t.columns if not c.startswith("_")])
    # deal with 'null' string in column
    return step2.select(*[when(~col(col_name).eqNullSafe("null"), col(col_name)).alias(col_name) for col_name in step2.columns])

Scala:

// read Spark Output Fixed width table:
def readSparkOutput(filePath: String): org.apache.spark.sql.DataFrame = {
  val step1 = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .option("delimiter", "|")
    .option("parserLib", "UNIVOCITY")
    .option("ignoreLeadingWhiteSpace", "true")
    .option("ignoreTrailingWhiteSpace", "true")
    .option("comment", "+")
    .csv(filePath)

  val step2 = step1.select(step1.columns.filterNot(_.startsWith("_c")).map(step1(_)): _*)

  val columns = step2.columns
  columns.foldLeft(step2)((acc, c) => acc.withColumn(c, when(col(c) =!= "null", col(c))))
}

Usage:

df = read_spark_output("file:///tmp/spark.out")

추신 : pyspark의 경우 eqNullSafespark 2.3 에서 사용할 수 있습니다 .