Autoscaling in Spark¶
Each Spark cluster contains a configured maximum and minimum number of nodes. A cluster starts with the minimum number of nodes and can scale up to maximum. Later, it can scale back to the minimum, depending on the cluster workload. This topic explains how a Spark cluster and job applications autoscale, and discusses various settings to fine-tune autoscaling. See Autoscaling in Qubole Clusters for a broader discussion.
Advantages of Autoscaling¶
Autoscaling clusters provides the following benefits:
- Adds nodes when the load is high
- Contributes to good cost management as the cluster capacity is dynamically scaled up and down as required
Autoscaling Spark jobs provides the following benefits:
- Decides on the optimum number of executors required for a Spark job based on the load
Understanding Spark Autoscaling Properties¶
The following table describes the autoscaling properties of Spark.
Note
Qubole supports open-source dynamic allocation properties in Spark 1.6.1 and later versions.
Property Name | Default Value | Description |
---|---|---|
spark.qubole.autoscaling.enabled | true | Enables autoscaling. Not applicable to Spark 1.6.1 and later versions. |
spark.dynamicAllocation.enabled | true | Enables autoscaling. Only applicable to Spark 1.6.1 and later versions. |
spark.dynamicAllocation.maxExecutors | If it is not set,
default is spark.executor.instances . |
The maximum number of executors to be used.
Its Spark submit option is --max-executors . |
spark.executor.instances | If it is not set, default is 2. | The minimum number of executors.
Its Spark submit option is --num-executors . |
spark.qubole.autoscaling.stagetime | 2 * 60 * 1000 milliseconds | If expectedRuntimeOfStage is greater than this value, increase the number
of executors. |
spark.qubole.autoscaling.memorythreshold | 0.75 | If memory used by the executors is greater than this value, increase the number of executors. |
spark.qubole.autoscaling.memory.downscaleCachedExecutors | true | Executors with cached data are also downscaled by default. Set its value to
false if you do not want downscaling in presence of cached data.
It is not applicable to Spark 1.6.1 and later versions.. |
spark.dynamicAllocation.cachedExecutorIdleTimeout | Infinity | Timeout in seconds. If an executor with cached data has been idle for more than this configured timeout, it gets removed. It is applicable only to Spark 1.6.1, 1.6.2 and later versions. |
Note
The spark.qubole.max.executors
parameter is deprecated, however, it continues to work. If you specify both spark.qubole.max.executors
and spark.dynamicAllocation.maxExecutors
parameters, then spark.dynamicAllocation.maxExecutors
overrides spark.qubole.max.executors
.
Spark Configuration Recommendations¶
These are a few points to remember related to Spark cluster and job configuration in general:
- Set
--max-executors
. Other parameters are ideally not required to be set as the default parameters are sufficient. --num-executors
orspark.executor.instances
acts as a minimum number of executors with a default value of 2. The minimum number of executors does not imply that the Spark application waits for the specific minimum number of executors to launch, before it starts. The specific minimum number of executors only applies to autoscaling. For example, during the application start-up:- If YARN is unable to schedule resources for
--num-executors
orspark.executor.instances
, the Spark application starts with as many executors as it can schedule. - Once
--num-executors
orspark.dynamicAllocation.minExecutors executors
are allocated, it never goes below that number.
- If YARN is unable to schedule resources for
- Try to avoid setting too many job-level parameters.
Note
--max-executors
is the Spark submit option for spark.dynamicAllocation.maxExecutors
and --num-executors
is the Spark submit option for spark.executor.instances
.
In Spark, autoscaling can be done at both the cluster level and the job level. See the following topics for more information:
Spark on Qubole’s capabilities include fine-grained downscaling, downscaling of cached executors after idle timeout, and support for open-source dynamic allocation configurations.
Autoscaling in Spark Clusters¶
A Spark cluster spins up with the configured minimum number nodes and can scale up to the maximum depending on the load. Once the load drops, the cluster scales down towards the minimum.
Qubole runs Spark on YARN: each Spark application is submitted as a YARN application. By default, Spark uses a static allocation of resources. That is, when you submit a job, exact resource requirements are specified. The application requests containers and YARN allocates the containers.
Here is an example of a Spark 2.0.0 cluster:
Property Name | Property Value |
---|---|
minimum nodes | 2 |
maximum nodes | 10 |
node type | (Choose a large instance type; for example 8 cores, 30G memory) |
spark.dynamicAllocation.enabled | true |
yarn.nodemanager.resource.memory | 26680 MB |
spark.yarn.executor.memoryOverhead | 1024 MB |
spark.executor.memory | 12 GB |
spark.executor.cores | 4 |
If a job with a minimum number of executors set to 4 is submitted to the cluster, YARN schedules two containers in the first worker node and the other two containers in the second worker node. The ApplicationMaster takes up an additional container.
Here is the logic to find the number of executors per node from the above example of a Spark 2.0.0 cluster.
Total memory = 30 GB
yarn.nodemanager.resource.memory = 26680 MB
If number of executor per node = 2
Total resource memory = number of executors per node * (spark.executor.memory + spark.yarn.executor.memoryOverhead)
That is 2 * (12 GB + 1 GB) = 26 GB
Which is equivalent to the value of yarn.nodemanager.resource.memory
Here is the logic to check whether the number of cores per executor is correct from the above example of a Spark 2.0.0 cluster.
Total number of cores = 8
If spark.executor.cores = 4 and number of executor per node = 2
Total number of cores = spark.executor.cores * number of executors per node
In the above table, spark.executor.cores = 4 and number of executors per node = 2
Hence, total number of cores = 4 * 2
Thus, the total number of cores = 8
Now, if you submit a new job to the same cluster in parallel, YARN does not have enough resources to run it, and this triggers Qubole’s YARN-level autoscaling: YARN figures out that two more nodes are required for the new job to run and requests the two nodes. These nodes are added to the current cluster, for a total of four nodes.
When the job completes, YARN recovers the resources. If the added nodes are idle and there is no active job, the cluster scales back to the minimum number of nodes.
Note
A node is available for downscaling under these conditions.
Autoscaling within a Spark Job¶
A Spark job uses a set of resources based on the number of executors. These executors are long-running Java Virtual Machines (JVMs) that are up during a Spark job’s lifetime. Statically determining the number of executors required by a Spark application may not get the best results. When you use the autoscaling feature within a Spark application, QDS monitors job progress at runtime and decides the optimum number of executors using SLA-based autoscaling.
By default, autoscaling within a Spark Job is enabled, with the following parameter set to true:
spark.qubole.autoscaling.enabled=true
in Spark 1.6.0 and earlier versions
or
spark.dynamicAllocation.enabled=true
in Spark 1.6.1 and later versions (including all versions supported by Azure and Oracle OCI.
Note
These settings become active only when you configure spark.dynamicAllocation.maxExecutors
.
When the first Spark job is submitted, the Spark cluster starts with two nodes, the configured minimum. In the configuration described above, each node can have two executors. When the first Spark job is submitted, the cluster spins up with two large instances as worker nodes.
Depending on the job progress, or when new jobs are submitted, the Spark job-level autoscaler decides to add or release executors at runtime. The cluster starts with eight executors (running on two large instances) and can autoscale up to 20 executors (running on ten large instances). It downscales back towards the minimum eight executors if the workload declines.
Changing from Qubole Dynamic Allocation Strategy¶
Qubole supports open-source dynamic allocation strategy in addition to Qubole’s dynamic allocation strategy which is the
default, that is spark.dynamicAllocation.strategy=org.apache.spark.dynamicallocation.QuboleAllocationStrategy
.
To change the Qubole dynamic allocation strategy to open source dynamic allocation strategy, set
spark.dynamicAllocation.strategy=org.apache.spark.dynamicallocation.DefaultAllocationStrategy
. With this, you
can as is use all open-source dynamic allocation configurations such as spark.dynamicAllocation.maxExecutors
,
spark.dynamicAllocation.minExecutors
, and spark.dynamicAllocation.initialExecutors
.
Autoscaling Examples¶
The following section describes different scenarios of autoscaling in Spark.
Autoscaling Nodes Running in a Single Cluster¶
For Spark clusters, autoscaling is enabled by default. QDS increases the number of nodes, up to the cluster’s maximum size, if multiple big jobs are submitted to the cluster.
Conversely, QDS reduces the number of nodes, down to the cluster’s minimum size, as the workload declines.
Upscaling a Single Memory Intensive Spark Job¶
You can set a limit on the executor memory a job can use by setting spark.executor.memory
.
For example, in the cluster described above, if the executor memory is configured to be 25G and the worker nodes have 30GB of memory, only one executor can run on one node. The first Spark job starts with two executors (because the minimum number of nodes is set to two in this example).
The cluster can autoscale to a maximum of ten executors (because the maximum number of nodes is set to ten).
Running Many Jobs on a Single Cluster¶
You can set a limit on the maximum number of executors a job can use by setting the property spark.dynamicAllocation.maxExecutors
.
This configuration is usually preferred when there are many jobs in parallel and sharing the cluster resources becomes
a necessity.
If the cluster resources are being fully used, new jobs either upscale the cluster if it is not yet at its maximum size, or wait until current jobs complete.
Autoscaling Executors in a Spark Job¶
By default, autoscaling of executors is enabled in a Spark job. The number of executors increases up to the maximum if the Spark job is long-running or memory-intensive.
Configuring Autoscaling Parameters for a Spark Job Stage Runtime¶
You can set a threshold for the job’s expected stage runtime by setting the property,
spark.qubole.autoscaling.stagetime
. Executors are added to the Spark job if the expected stage runtime
is greater than the spark.qubole.autoscaling.stagetime
value.
Note
The expected stage runtime is calculated only after the first task’s completion.
Adding Executors in a Single Spark Job with Memory-intensive Executors¶
You can set a threshold for the job’s expected stage runtime by setting the property,
spark.qubole.autoscaling.memorythreshold
, which is an autoscaling memory alogrithm. Executors are added to
the Spark job if the executor memory exceeds spark.qubole.autoscaling.memorythreshold
.