如何管理Spark的分区一文中,介绍了Spark是如何管理分区的,分别解释了Spark提供的两种分区方法,并给出了相应的使用示例和分析,感兴趣的可以参考之前的分享。我们知道,Apache Spark通常用于以分布式方式处理大规模数据集,既然是分布式,就会面临一个问题:数据是否均匀地分布。当数据分布不均匀时,数据量较少的分区将会很快的被执行完成,而数据量较大的分区将需要很长时间才能够执行完毕,这就是我们经常所说的数据倾斜, 这可能会导致Spark作业的性能降低。那么,该如何解决类似的问题呢?我们可以使用 Spark提供的自定义分区器在RDD上应用数据分区的逻辑。以下是正文,希望对你有所帮助。

公众号『大数据技术与数仓』,回复『资料』领取大数据资料包

Spark默认的分区器

Spark在处理大规模数据集时,会将数据分为不同的分区,并以并行方式处理数据。 默认情况下,它使用哈希分区程序将数据分散到不同的分区中。 哈希分区程序使用的是hashcode函数, 其原理是相等的对象具有相同的哈希码,这样就可以根据key的哈希值,将其分布到各自的分区中。

哈希分区源码

class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

def numPartitions: Int = partitions

def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}

override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}

override def hashCode: Int = numPartitions
}

示例

// 创建spark session
val spark = SparkSession
.builder
.master("local[4]")
.getOrCreate
val bigData = util.Arrays.asList("Hadoop", "Spark", "Flink", "Hive", "Impala", "Hbase", "Kafka", "ClickHouse", "KUDU", "zookeeper")
import spark.implicits._
val stringDataset = spark.createDataset(bigData)
println("当前rdd的分区数为:" + stringDataset.rdd.partitions.length) // 当前rdd的分区数为:4

/**
* 每个分区的数据会写入一个对应的分区文件中,
* 每个分区文件对应的数据如下:
* Partiiton 0: Hadoop Spark
* Partition 1: Flink Hive Impala
* Partition 2: Hbase Kafka
* Partition 3: ClickHouse KUDU zookeeper
*/
stringDataset.write.csv("E://testPartition")

通常情况下,我们需要加大分区的数量,从而保证每个分区的数据量尽量少,进而可以提升处理的并行度。此时,就需要使用repartition() 方法对数据进行重分区。

repartition() 方法既可以用于增加分区,也可以用于减少分区。比如,对上面的示例进行增加分区,如下代码所示:

val reparationDS = stringDataset.repartition(8)
println("重分区之后的分区为:" + reparationDS.rdd.partitions.length) // 重分区之后的分区为:8
/**
*
* Partition 0: FlinK ClickHouse
* Partition 1:
* Partition 2:
* Partition 3:
* Partition 4:
* Partition 5:
* Partition 6: Spark Impala Hbase zookeeper
* Partition 7: Hadoop Hive Kafka KUDU
*/
reparationDS.write.csv("E://repartition")

上面的数据没有在所有分区中平均分配。 尽管通过应用repartition()方法增加了分区的数量,但是数据并不是均匀分布的。 上面提到,Spark使用的是哈希分区,所以有时,应用repartition()也可能无法解决问题(可能会存在部分分区无数据,而个别分区数据比较多的情况)。

为了解决这个问题,Spark提供了自定义分区器,用户可以根据处理数据的特点,进行自定义分区器。

如何自定义Spark的分区器

需要注意的是,自定义分区器只能应用于key-value形式的 pair RDD。所以在使用自定义分区器的时候,需要从原始的RDD中创建出PairedRDD,然后再使用自定义分区器。

实现一个自定义的分区器非常简单,只需要继承一个org.apache.spark.Partitioner类,然后重写下面的方法即可:

  • numPartitions:此方法返回要为RDD创建的分区数

  • def getPartition(key: Any):此方法返回key对应的分区号(范围从0到numPartitions - 1)

源码

/**
* An object that defines how the elements in a key-value pair RDD are partitioned by key.
* Maps each key to a partition ID, from 0 to `numPartitions - 1`.
*/
abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
}

自定义分区器

  • 自定义分区类
class CustomerPartition(partitions: Int) extends Partitioner {

def numPartitions: Int = partitions

def getPartition(key: Any): Int = {

(key.toString.charAt(0) + scala.util.Random.nextInt(10)) % numPartitions
}
}
  • 使用示例
object DefaultPartition {

def main(args: Array[String]): Unit = {

// 创建spark session
val spark = SparkSession
.builder
.master("local[4]")
.getOrCreate
val bigData = util.Arrays.asList("Hadoop", "Spark", "Flink", "Hive", "Impala", "Hbase", "Kafka", "ClickHouse", "KUDU", "zookeeper")
import spark.implicits._
val stringDataset = spark.createDataset(bigData)
println("当前rdd的分区数为:" + stringDataset.rdd.partitions.length) // 当前rdd的分区数为:4

/**
* 每个分区的数据会写入一个对应的分区文件中,
* 每个分区文件对应的数据如下:
* Partiiton 0: Hadoop Spark
* Partition 1: Flink Hive Impala
* Partition 2: Hbase Kafka
* Partition 3: ClickHouse KUDU zookeeper
*/
//stringDataset.write.csv("E://testPartition")

val reparationDS = stringDataset.repartition(8)
println("重分区之后的分区为:" + reparationDS.rdd.partitions.length) // 重分区之后的分区为:8
/**
*
* Partition 0: FlinK ClickHouse
* Partition 1:
* Partition 2:
* Partition 3:
* Partition 4:
* Partition 5:
* Partition 6: Spark Impala Hbase zookeeper
* Partition 7: Hadoop Hive Kafka KUDU
*/
// reparationDS.write.csv("E://repartition")

val stringRDD = stringDataset.rdd
val pairRDD = stringRDD.map(word => (word, word.length))
// 使用自定义分区器
val resultRDD = pairRDD.partitionBy(new CustomerPartition(8)) // 自定义分区的数量:8
println("自定义分区的数量:" + resultRDD.getNumPartitions)

// 数据写入CSV文件
val outputRDD = resultRDD.map(_._1)
val outputDS = spark.createDataset(outputRDD)

/**
* Partition 0: Hive
* Partition 1: Impala
* Partition 2: ClickHouse
* Partition 3: Spark Hbase
* Partition 4: KUDU
* Partition 5: Hadoop
* Partition 6: Flink
* Partition 7: Kafka zookeeper
*/
outputDS.write.csv("E:/customerpartition/")
}
}

从上面的结果可以看出,数据均匀地分布在了每个分区上,这样就会缓解数据倾斜造成的性能瓶颈。

总结

本文主要分享了如何使用Spark提供的Partitioner类进行自定义一个分区器,并给出了具体的示例。通过自定义分区器,就可以有效地分布数据,从而缓解数据倾斜的性能瓶颈。如果本文对你所有帮助,请分享转发。

公众号『大数据技术与数仓』,回复『资料』领取大数据资料包