reducebykey In Apache Spark, perché RDD.union non conserva il partizionatore?



spark map reduce (2)

Questo non è più vero. I due RDD hanno esattamente lo stesso partizionamento e il numero di partizioni, l' union e RDD avranno anche le stesse partizioni. Questo è stato introdotto in https://github.com/apache/spark/pull/4629 e incorporato in Spark 1.3.

Come tutti sanno, i partizionatori di Spark hanno un enorme impatto sulle prestazioni su qualsiasi operazione "ampia", quindi di solito sono personalizzati nelle operazioni. Stavo sperimentando il seguente codice:

val rdd1 =
  sc.parallelize(1 to 50).keyBy(_ % 10)
    .partitionBy(new HashPartitioner(10))
val rdd2 =
  sc.parallelize(200 to 230).keyBy(_ % 13)

val cogrouped = rdd1.cogroup(rdd2)
println("cogrouped: " + cogrouped.partitioner)

val unioned = rdd1.union(rdd2)
println("union: " + unioned.partitioner)

Vedo che per impostazione predefinita cogroup() produce sempre un RDD con il partizionatore personalizzato, ma union() no, verrà sempre ripristinato di default. Ciò è controintuitivo in quanto di solito assumiamo che un PairRDD debba utilizzare il suo primo elemento come chiave di partizione. C'è un modo per "forzare" Spark a unire 2 PairRDD per usare la stessa chiave di partizione?


Answer #1

union è un'operazione molto efficiente, perché non sposta alcun dato in giro. Se rdd1 ha 10 partizioni e rdd2 ha 20 partizioni, allora rdd1.union(rdd2) avrà 30 partizioni: le partizioni dei due RDD messi l'una dopo l'altra. Questo è solo un cambiamento di contabilità, non c'è shuffle.

Ma necessariamente scarta il partizionatore. Un partizionatore è costruito per un dato numero di partizioni. L'RDD risultante ha un numero di partizioni diverso da rdd1 e rdd2 .

Dopo aver preso l'unione è possibile eseguire la repartition per mescolare i dati e organizzarli per chiave.

C'è un'eccezione a quanto sopra. Se rdd1 e rdd2 hanno lo stesso partizionamento (con lo stesso numero di partizioni), l' union si comporta diversamente. Unirà le partizioni dei due RDD a coppie, dandogli lo stesso numero di partizioni di ciascuno degli input. Ciò può comportare lo spostamento di dati in giro (se le partizioni non sono co-locate) ma non implicherà un shuffle. In questo caso il partizionatore viene mantenuto. (Il codice per questo è in PartitionerAwareUnionRDD.scala .)





hadoop-partitioning