Назад | Перейти на главную страницу

Как определяется количество разделов RDD в Apache Spark?

Вопрос

  1. Как Spark определяет количество разделов?
  2. Нужно ли где-то явно указывать количество доступных ядер ЦП, чтобы количество разделов было одинаковым (например, аргумент numPartition метода parallelize, но затем нужно обновлять программу при каждом изменении количества ядер)?

Задний план

Установлен кластер Spark как в Окружающая среда без изменений в файлах spark-env.sh, spark-defaults.conf и объекте SparkConf в программах.

Для программы N Queen количество разделов было 2, и только одному узлу были назначены задачи. Для программы подсчета слов количество разделов было 22, и задачи были распределены по всем узлам. Используется spark-submit для обеих программ.

Программы

N Королева

val sparkConf = new SparkConf().setAppName("NQueen").set("spark.files.overwrite", "true")
val sc = new SparkContext(sparkConf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
def isSafe(column: Int, placement: List[Int]): Boolean = { ... }
def placeQueensAt(row: Int, placements: Set[List[Int]]): Set[List[Int]] = { ... }

val initial = sc.parallelize(queensAtFirst)
//val initial = sc.parallelize(queensAtFirst, 12)
println("Partitions = %d".format(initial.partitions.size))

val result = initial.flatMap(x => placeQueensAt(1, Set(x))).collect()

Количество слов

val sparkConf = new SparkConf().setAppName("WordCount").set("spark.files.overwrite", "true")
val sc = new SparkContext(sparkConf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val lines = sc.textFile("hdfs:/user/wynadmin/sfpd.csv")
println("Patitions = %d".format(lines.partitions.size))

val words = for (line <- lines; word <- line.split(",") if word.toLowerCase.matches("[a-z]+")) yield (word, 1)
val counts = words.reduceByKey(_ + _)

Окружающая среда

Spark 2.0.1 (3 узла по 4 ЦП каждый) в Ubuntu 14.04.
Автономное развертывание (не YARN и не Mesos)

Нашел информацию в Практическое руководство: настройка заданий Apache Spark (часть 2).

Как определяется это число? То, как Spark группирует RDD по этапам, описано в предыдущем посте. (Напоминаем, что такие преобразования, как repartition и reduceByKey, создают границы этапа.) Количество задач на этапе такое же, как количество разделов в последнем RDD на этапе. Количество разделов в RDD такое же, как и количество разделов в RDD, от которых оно зависит, с парой исключений: преобразование coalesce позволяет создать RDD с меньшим количеством разделов, чем его родительский RDD, преобразование union создает RDD с сумма количества разделов его родителей, а декартово создает RDD с их продуктом.

А как насчет RDD без родителей? Разделы RDD, созданные с помощью textFile или hadoopFile, определяются используемым базовым MapReduce InputFormat. Обычно для каждого читаемого блока HDFS будет отдельный раздел. Разделы для RDD, созданные с помощью parallelize, берутся из параметра, заданного пользователем, или spark.default.parallelism, если он не задан.

Параметр spark.default.parallelism исправил проблему.

--conf spark.default.parallelism=24

Установка на 12 (то же самое с количеством ядер) приведет к неравномерному использованию узлов.