Significant Parameters in YARN¶
Qubole offers Spark-on-YARN variant and so, the YARN parameters are applicable to both Hadoop 2 and Spark.
The parameters that can be useful in Hadoop 2 (Hive) and Spark configuration are described in the following sub-topics.
- Configuring Job History Compression describes compressing job history to store it in a Cloud location.
- Configuring Job Runtime describes how to configure how long a YARN application can run.
- Enabling Container Packing in Hadoop 2 and Spark describes how to more effectively downscale in Hadoop 2 (Hive) and Spark clusters.
- Understanding YARN Virtual Cores describes the YARN virtual cores.
- Configuring Direct File Output Committer describes configuring a DirectFileOutputCommitter (DFOC) for a MapReduce task in an Hadoop 2 cluster. For DFOC on a Spark cluster, see DFOC in Spark.
- Improving Performance of Data Writes shows how to improve the performance of data writes in Hadoop 2 and Spark.
See Composing a Hadoop Job for information on composing a Hadoop job.
Configuring Job History Compression¶
mapreduce.jobhistory.completed.codec specifies the codec to use to compress the job history files while storing them
in a Cloud location. The default value is
Configuring Job Runtime¶
yarn.resourcemanager.app.timeout.minutes to configure how many minutes a YARN application can run.
This parameter can prevent a runaway application from keeping the cluster alive unnecessarily.
This is a cluster-level setting; set it in the Override Hadoop Configuration Variables field under the Advanced Configuration tab of the Clusters page in the QDS UI. See Advanced Configuration: Modifying Hadoop Cluster Settings for more information.
The Resource Manager kills a YARN application if it runs longer than the configured timeout.
Setting this parameter to -1 means that the application never times out.
Enabling Container Packing in Hadoop 2 and Spark¶
Qubole allows you to pack containers in Hadoop 2 (Hive) and Spark. You must enable this feature; it is disabled by default. When enabled, container packing causes the scheduler to pack containers on a subset of nodes instead of distributing them across all the nodes of the cluster. This increases the probability of some nodes remaining unused; these nodes become eligible for downscaling, reducing your cost.
How Container Packing Works¶
Packing works by separating nodes into three sets:
- Nodes with no containers (the Low set)
- Nodes with memory utilization greater than the threshold (the High set)
- All other nodes (the Medium set)
When container packing is enabled, YARN schedules each container request in this order: nodes in the Medium set first, nodes in the Low set next, nodes the High set last.
Configuring Container Packing¶
Configure container packing as an Hadoop cluster override in the Override Hadoop Configuration Variables field on the Edit Cluster page. See Managing Clusters for more information. The configuration options are:
To enable container packing, set
In clusters smaller than the configured minimum size, containers are distributed across all. This minimum number of nodes is governed by the following parameter:
yarn.scheduler.fair.continuous-scheduling-packed.min.nodes=<value>. Its default value is 5.
A node’s memory-utilization threshold percentage, above which Qubole schedules containers on another node, is governed by the following parameter:
yarn.scheduler.fair.continuous-scheduling-packed.high.memory.threshold=<value>. Its default value is 60.
This parameter also denotes the threshold above which a node moves to the High set from the Medium set.
Understanding YARN Virtual Cores¶
As of Hadoop 2.4, YARN introduced the concept of vcores (virtual cores). A vcore is a share of host CPU that the YARN Node Manager allocates to available resources.
yarn.scheduler.maximum-allocation-vcores is the maximum allocation for each container request at the Resource Manager,
in terms of virtual CPU cores. Requests higher than this would not get effective and get capped to this value.
The default value for
yarn.scheduler.maximum-allocation-vcores in Qubole is set to twice the number of CPUs. This
over subscription assumes that CPUs are not always running a thread, and hence, assigning more cores enables maximum CPU
Configuring Direct File Output Committer¶
In general, the final output of a MapReduce job is written to a location in Cloud storage or HDFS, but is first written into a temporary location. The output data is moved from the temporary location to the final location in the task’s commit phase.
When DirectFileOutputCommitter (DFOC) is enabled, the output data is written directly to the final location. In this case, a commit phase is not required. DFOC is a Qubole-specific parameter that is also supported by other big-data vendors. Qubole supports DFOC on Amazon S3n and S3a, and Azure Blob and Data Lake storage.
For DFOC on a Spark cluster, see DFOC in Spark.
The pros and cons of setting DFOC are:
- Improves performance when data is written to a Cloud location. (DFOC does not have much impact on performance when data is written into a HDFS location, because in HDFS the movement of files from one directory to another directory is very fast.)
- DFOC does not perform well in case of failure: in these cases, stale data may be left in the final location and workflows are generally designed to delete the final location. Hence Qubole does not enable DFOC by default. If DFOC is disabled, the abort phase of the task deletes the data in the temporary directory and a retry takes care of data deletion; no stale data is left in the final location.
DFOC can be set in the MapReduce APIs
mapreduce as follows:
DFOC in Mapred API:
DFOC in Mapreduce API:
To set these parameters for a cluster, navigate to the Clusters section of the QDS UI, choose the cluster, and enter both strings in the Override Hadoop Configuration Variables field under the Advanced Configuration tab. You can also set them at the job level.
Improving Performance of Data Writes¶
To improve the speed of data writes, set the following Qubole options to
mapreduce.use.parallelmergepathsfor Hadoop 2 jobs
spark.hadoop.mapreduce.use.parallelmergepathsfor Spark jobs with Parquet data.