Установлен кластер Spark как в Окружающая среда без изменений в файлах spark-env.sh, spark-defaults.conf и объекте SparkConf в программах.
Для программы N Queen количество разделов было 2, и только одному узлу были назначены задачи. Для программы подсчета слов количество разделов было 22, и задачи были распределены по всем узлам. Используется spark-submit для обеих программ.
N Королева
val sparkConf = new SparkConf().setAppName("NQueen").set("spark.files.overwrite", "true")
val sc = new SparkContext(sparkConf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
def isSafe(column: Int, placement: List[Int]): Boolean = { ... }
def placeQueensAt(row: Int, placements: Set[List[Int]]): Set[List[Int]] = { ... }
val initial = sc.parallelize(queensAtFirst)
//val initial = sc.parallelize(queensAtFirst, 12)
println("Partitions = %d".format(initial.partitions.size))
val result = initial.flatMap(x => placeQueensAt(1, Set(x))).collect()
Количество слов
val sparkConf = new SparkConf().setAppName("WordCount").set("spark.files.overwrite", "true")
val sc = new SparkContext(sparkConf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val lines = sc.textFile("hdfs:/user/wynadmin/sfpd.csv")
println("Patitions = %d".format(lines.partitions.size))
val words = for (line <- lines; word <- line.split(",") if word.toLowerCase.matches("[a-z]+")) yield (word, 1)
val counts = words.reduceByKey(_ + _)
Spark 2.0.1 (3 узла по 4 ЦП каждый) в Ubuntu 14.04.
Автономное развертывание (не YARN и не Mesos)
Нашел информацию в Практическое руководство: настройка заданий Apache Spark (часть 2).
Как определяется это число? То, как Spark группирует RDD по этапам, описано в предыдущем посте. (Напоминаем, что такие преобразования, как repartition и reduceByKey, создают границы этапа.) Количество задач на этапе такое же, как количество разделов в последнем RDD на этапе. Количество разделов в RDD такое же, как и количество разделов в RDD, от которых оно зависит, с парой исключений: преобразование coalesce позволяет создать RDD с меньшим количеством разделов, чем его родительский RDD, преобразование union создает RDD с сумма количества разделов его родителей, а декартово создает RDD с их продуктом.
А как насчет RDD без родителей? Разделы RDD, созданные с помощью textFile или hadoopFile, определяются используемым базовым MapReduce InputFormat. Обычно для каждого читаемого блока HDFS будет отдельный раздел. Разделы для RDD, созданные с помощью parallelize, берутся из параметра, заданного пользователем, или spark.default.parallelism, если он не задан.
Параметр spark.default.parallelism исправил проблему.
--conf spark.default.parallelism=24
Установка на 12 (то же самое с количеством ядер) приведет к неравномерному использованию узлов.