Spark Best Practices¶
This topic describes best practices for running Spark jobs.
- Spark Configuration Recommendations
- Ensuring Jobs Get their Fair Share of Resources
- Specifying Dependent Jars for Spark Jobs
- Handling Skew in the Join
- Optimizing Query Execution with Adaptive Query Execution
- Configuring the Spark External Shuffle Service
- Continuously Running Spark Streaming Applications
- Using UDFs in Spark SQL
Spark Configuration Recommendations¶
- Set
--max-executors
. Other parameters are ideally not required to be set as the default parameters are sufficient. - Try to avoid setting too many job-level parameters.
Specifying Dependent Jars for Spark Jobs¶
You can specify dependent jars using these two options:
In the QDS UI’s Workbench query composer for a Spark command, add a Spark Submit argument such as the following to add jars at the job level:
--jars gs://bucket/dir/x.jar,gs://bucket/dir2/y.jar --packages "com.datastax.spark:spark-cassandra-connector_2.10:1.4.0-M1"
Another option for specifying jars is to download jars to
/usr/lib/spark/lib
via the node bootstrap script; for example:hdfs dfs -get gs://bucket/path/app.jar /usr/lib/spark/lib/ hdfs dfs -get gs://bucket/path/dep1.jar /usr/lib/spark/lib/ hdfs dfs -get gs://bucket/path/dep2.jar /usr/lib/spark/lib/
Handling Skew in the Join¶
To handle skew in the join keys, you can specify the hint ` /*+ SKEW ('<table_name>') */ `
for a join that describes the column and the values upon which skew is expected.
Based on that information, the engine automatically ensures that the skewed values are handled appropriately.
You can specify the hint in the following formats:
Format 1:
/*+ SKEW('<tableName>') */
This shows that all the columns in a given table are skewed and the value on which they are skewed is not known. With this hint, the Spark optimizer tries to identify the values on which the column involved in the join is skewed. This operation is performed when the Spark optimizer identifies that a column is involved in the join and then it samples the data on the table.
Example: In a query, suppose there is a table t1 where all columns involved in the join are skewed. But the skew values are unknown. In this case, you can specify the skew hint as
` /*+ SKEW('t1') */ `
.Format 2:
/*+ SKEW ('<tableName>', (<COLUMN-HINT>), (<ANOTHER-COLUMN-HINT>)) */
<COLUMN-HINT> can be either a column name (example,
column1
) or a column name and list of values on which the column is skewed (example -column1
,('a', 'b', 'c')
). The Spark optimizer identifies the skew values from the hint. As a result, the sampling of data is not required.Example: Suppose there is a table t1 with 4 columns - c1, c2, c3, c4. Consider that c1 is skewed on value ‘a’ and ‘b’, c2 and c3 are also skewed but the skew values are unknown, and c4 is not a skewed column. In this case, you can specify the hint as
` /*+ SKEW('t1', ('c1', ('a', 'b')), ('c2'), ('c3')) */ `
.
Example Query¶
SELECT /*+ SKEW('t1', ('c1', ('a', 'b')), ('c2'), ('c3')) */ *
FROM
(SELECT t2.c1 as temp_col1 from t1 join t2 on t1.c1 = t2.c1) temp_table1 JOIN
(SELECT t3.c2 as temp_col2 from t1 join t3 on t1.c2 = t3.c2) temp_table2
WHERE temp_table1.temp_col1 = temp_table2.temp_col2 .
Optimizing Query Execution with Adaptive Query Execution¶
Spark on Qubole supports Adaptive Query Execution on Spark 2.4.3 and later versions, with which query execution is optimized at the runtime based on the runtime statistics.
At runtime, the adaptive execution mode can change shuffle join to broadcast join if the size of one table is less than the broadcast threshold. Spark on Qubole Adaptive execution also supports handling skew in input data, and optimizes the joins using Qubole skew join optimization. In general, adaptive execution decreases the effort involved in tuning SQL query parameters, and improves the execution performance by selecting a better execution plan and parallelism at runtime.
Configuring the Spark External Shuffle Service¶
The Spark external shuffle service is an auxiliary service which runs as part of the Yarn NodeManager on each worker node in a Spark cluster. When enabled, it maintains the shuffle files generated by all Spark executors that ran on that node.
Spark executors write the shuffle data and manage it. If the Spark external shuffle service is enabled, the shuffle service manages the shuffle data, instead of the executors. This helps in downscaling the executors, because the shuffle data is not lost when the executors are removed. It also helps improve the behavior of the Spark application in case of error because the shuffle data does not need to be re-processed when an executor crashes.
In open-source Spark, Spark job-level autoscaling (also known as Spark Dynamic Allocation) works in tandem with the external shuffle service and the shuffle service is mandatory for autoscaling to work. See Spark Shuffle Behavior for more information. In Spark on Qubole, on the other hand, the external shuffle service is optional and Qubole-based Spark job-level autoscaling works whether or not the shuffle service is enabled. (If the external shuffle service is disabled, the executors are not removed until the shuffle data goes away.)
Qubole provides the Spark external shuffle service in in Spark 1.5.1 and later supported Spark versions.
The external shuffle service is enabled by default in Spark 1.6.2 and later versions.
To disable it, set spark.shuffle.service.enabled
to false
.
Spark external shuffle service is not enabled by default in Spark 1.5.1 and Spark 1.6.0. To enable it for one of these versions, configure it as follows:
Override Hadoop Configuration Variables
Before starting a Spark cluster, pass the following Hadoop overrides to start Spark external shuffle service:
yarn.nodemanager.aux-services=mapreduce_shuffle,spark_shuffle yarn.nodemanager.aux-services.spark_shuffle.class=org.apache.spark.network.yarn.YarnShuffleService
See Advanced Configuration: Modifying Hadoop Cluster Settings for setting the Hadoop override configuration variables in the QDS UI.
Spark Configuration Variable
Set the configuration to enable external shuffle service on a Spark application, a Spark cluster or a Spark notebook.
Enabling External Shuffle Service on a Spark Cluster
Set the following configuration in the Override Spark Configuration Variables text box of the cluster configuration page:
spark-defaults.conf: spark.shuffle.service.enabled true
See Configuring a Spark Cluster for more information.
Note
If you set
spark.shuffle.service.enabled
tofalse
, then the Spark application does not use the external shuffle service.Enabling External Shuffle Service on a Spark Command
Configure the following setting as a Spark-submit option in the command/query composer while composing a Spark application:
--conf spark.shuffle.service.enabled=true
See Composing Spark Commands in the Analyze Page for more information.
For example,
sqlContext.sql("select count(*) from default_qubole_memetracker").collect()
generates a lot of shuffle data. So, set--conf spark.shuffle.service.enabled=true
in the bin/spark-shellEnabling External Shuffle Service on a Spark Notebook
Add
spark.shuffle.service.enabled
as an interpreter setting and add its Value astrue
in a Spark notebook’s Interpreter. Bind the Spark Interpreter settings to the notebook that you use if it is not bound already. See Running Spark Applications in Notebooks and Understanding Spark Notebooks and Interpreters for more information.
External shuffle service logs are part of the NodeManager logs located at /media/ephemeral0/logs/yarn/yarn-nodemanager*.log
.
NodeManager logs are present on each worker node in the cluster.
Continuously Running Spark Streaming Applications¶
You can continuously run Spark streaming applications by setting the following parameters:
- Set
yarn.resourcemanager.app.timeout.minutes=-1
as an Hadoop override at the Spark cluster level. - To avoid all Spark streaming applications on a specific cluster from being timed out, set
spark.qubole.idle.timeout -1
as a Spark configuration variable in the Override Spark Configuration Variables text field of the Spark cluster configuration UI page. See Configuring a Spark Cluster for more information.
Using UDFs in Spark SQL¶
An UDF (user-defined function) is a way of adding a function to Spark SQL. It operates on distributed DataFrames and works row-by-row unless it is created as an user-defined aggregation function. Open-source Spark provides two alternative methods:
- Using Hive functions
- Using Scala functions
The following example uses Hive functions to add an UDF and use it in Spark SQL.
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
class SimpleUDFExample extends UDF {
def evaluate(input: Text) : Text = {
if (input == null) return null
return new Text("Hello " + input.toString)
}
}
object sqltest {
def main(args: Array[String]) {
val sc = new SparkContext(new SparkConf())
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.sql("create temporary function hello as 'SimpleUDFExample'")
val result = sqlContext.sql("""
select hello(name) from products_avro order by month, name, price
""")
result.collect.foreach(println)
}
}
For an example using Scala functions, see UDF Registration.