Spark Best Practices

This topic describes best practices for running Spark jobs.

Spark Configuration Recommendations

  • Set --max-executors. Other parameters are ideally not required to be set as the default parameters are sufficient.
  • Try to avoid setting too many job-level parameters.

Ensuring Jobs Get their Fair Share of Resources

To prevent jobs from blocking each other, you should use the YARN Fair Scheduler to ensure that each job gets its fair share of resources. For example, if each interpreter can use a maximum of 100 executors, and ten users are running one interpreter each, the YARN Fair Scheduler can ensure that each user’s job gets about ten executors. As the number of running jobs rises or declines, the number of executors each gets will decline or rise inversely– 100 jobs will get one executor each; if there is only one job, it will get all 100 executors.

To configure this, configure notebooks and interpreters for the running Spark cluster as follows. (For information about using notebooks and interpreters, see Running Spark Applications in Notebooks.)

Note

Do not do this if you have enabled interpreter user mode; in that case, QDS configures the Fair Scheduler for you.

  1. Assign an interpreter for each user and note. This ensures that each job is submitted to YARN as a separate application.
  2. Edit the interpreter to use a fairly low value for spark.executor.instances, so that an interpreter that is not in use does not hold on to too many executors.

Specifying Dependent Jars for Spark Jobs

You can specify dependent jars using these two options:

  • In the QDS UI’s Workbench query composer for a Spark command, add a Spark Submit argument such as the following to add jars at the job level:

    --jars gs://bucket/dir/x.jar,gs://bucket/dir2/y.jar --packages "com.datastax.spark:spark-cassandra-connector_2.10:1.4.0-M1"

  • Another option for specifying jars is to download jars to /usr/lib/spark/lib via the node bootstrap script; for example:

    hdfs dfs -get gs://bucket/path/app.jar /usr/lib/spark/lib/
    hdfs dfs -get gs://bucket/path/dep1.jar /usr/lib/spark/lib/
    hdfs dfs -get gs://bucket/path/dep2.jar /usr/lib/spark/lib/
    

Handling Skew in the Join

To handle skew in the join keys, you can specify the hint ` /*+ SKEW ('<table_name>') */ ` for a join that describes the column and the values upon which skew is expected. Based on that information, the engine automatically ensures that the skewed values are handled appropriately.

You can specify the hint in the following formats:

  • Format 1:

    /*+ SKEW('<tableName>') */
    

    This shows that all the columns in a given table are skewed and the value on which they are skewed is not known. With this hint, the Spark optimizer tries to identify the values on which the column involved in the join is skewed. This operation is performed when the Spark optimizer identifies that a column is involved in the join and then it samples the data on the table.

    Example: In a query, suppose there is a table t1 where all columns involved in the join are skewed. But the skew values are unknown. In this case, you can specify the skew hint as ` /*+ SKEW('t1') */ `.

  • Format 2:

    /*+ SKEW ('<tableName>', (<COLUMN-HINT>), (<ANOTHER-COLUMN-HINT>)) */
    

    <COLUMN-HINT> can be either a column name (example, column1) or a column name and list of values on which the column is skewed (example - column1, ('a', 'b', 'c')). The Spark optimizer identifies the skew values from the hint. As a result, the sampling of data is not required.

    Example: Suppose there is a table t1 with 4 columns - c1, c2, c3, c4. Consider that c1 is skewed on value ‘a’ and ‘b’, c2 and c3 are also skewed but the skew values are unknown, and c4 is not a skewed column. In this case, you can specify the hint as ` /*+ SKEW('t1', ('c1', ('a', 'b')), ('c2'), ('c3')) */ `.

Example Query

SELECT /*+ SKEW('t1', ('c1', ('a', 'b')), ('c2'), ('c3')) */ *

FROM

(SELECT t2.c1 as temp_col1 from t1 join t2 on t1.c1 = t2.c1) temp_table1 JOIN

(SELECT t3.c2 as temp_col2 from t1 join t3 on t1.c2 = t3.c2) temp_table2

 WHERE temp_table1.temp_col1 = temp_table2.temp_col2 .

Optimizing Query Execution with Adaptive Query Execution

Spark on Qubole supports Adaptive Query Execution on Spark 2.4.3 and later versions, with which query execution is optimized at the runtime based on the runtime statistics.

At runtime, the adaptive execution mode can change shuffle join to broadcast join if the size of one table is less than the broadcast threshold. Spark on Qubole Adaptive execution also supports handling skew in input data, and optimizes the joins using Qubole skew join optimization. In general, adaptive execution decreases the effort involved in tuning SQL query parameters, and improves the execution performance by selecting a better execution plan and parallelism at runtime.

Configuring the Spark External Shuffle Service

The Spark external shuffle service is an auxiliary service which runs as part of the Yarn NodeManager on each worker node in a Spark cluster. When enabled, it maintains the shuffle files generated by all Spark executors that ran on that node.

Spark executors write the shuffle data and manage it. If the Spark external shuffle service is enabled, the shuffle service manages the shuffle data, instead of the executors. This helps in downscaling the executors, because the shuffle data is not lost when the executors are removed. It also helps improve the behavior of the Spark application in case of error because the shuffle data does not need to be re-processed when an executor crashes.

In open-source Spark, Spark job-level autoscaling (also known as Spark Dynamic Allocation) works in tandem with the external shuffle service and the shuffle service is mandatory for autoscaling to work. See Spark Shuffle Behavior for more information. In Spark on Qubole, on the other hand, the external shuffle service is optional and Qubole-based Spark job-level autoscaling works whether or not the shuffle service is enabled. (If the external shuffle service is disabled, the executors are not removed until the shuffle data goes away.)

Qubole provides the Spark external shuffle service in in Spark 1.5.1 and later supported Spark versions.

The external shuffle service is enabled by default in Spark 1.6.2 and later versions. To disable it, set spark.shuffle.service.enabled to false.

Spark external shuffle service is not enabled by default in Spark 1.5.1 and Spark 1.6.0. To enable it for one of these versions, configure it as follows:

  • Override Hadoop Configuration Variables

    Before starting a Spark cluster, pass the following Hadoop overrides to start Spark external shuffle service:

    yarn.nodemanager.aux-services=mapreduce_shuffle,spark_shuffle
    yarn.nodemanager.aux-services.spark_shuffle.class=org.apache.spark.network.yarn.YarnShuffleService
    

    See Advanced Configuration: Modifying Hadoop Cluster Settings for setting the Hadoop override configuration variables in the QDS UI.

  • Spark Configuration Variable

    Set the configuration to enable external shuffle service on a Spark application, a Spark cluster or a Spark notebook.

    Enabling External Shuffle Service on a Spark Cluster

    Set the following configuration in the Override Spark Configuration Variables text box of the cluster configuration page:

    spark-defaults.conf:
    
    spark.shuffle.service.enabled    true
    

    See Configuring a Spark Cluster for more information.

    Note

    If you set spark.shuffle.service.enabled to false, then the Spark application does not use the external shuffle service.

    Enabling External Shuffle Service on a Spark Command

    Configure the following setting as a Spark-submit option in the command/query composer while composing a Spark application:

    --conf spark.shuffle.service.enabled=true

    See Composing Spark Commands in the Analyze Page for more information.

    For example, sqlContext.sql("select count(*) from default_qubole_memetracker").collect() generates a lot of shuffle data. So, set --conf spark.shuffle.service.enabled=true in the bin/spark-shell

    Enabling External Shuffle Service on a Spark Notebook

    Add spark.shuffle.service.enabled as an interpreter setting and add its Value as true in a Spark notebook’s Interpreter. Bind the Spark Interpreter settings to the notebook that you use if it is not bound already. See Running Spark Applications in Notebooks and Understanding Spark Notebooks and Interpreters for more information.

External shuffle service logs are part of the NodeManager logs located at /media/ephemeral0/logs/yarn/yarn-nodemanager*.log. NodeManager logs are present on each worker node in the cluster.

Continuously Running Spark Streaming Applications

You can continuously run Spark streaming applications by setting the following parameters:

  • Set yarn.resourcemanager.app.timeout.minutes=-1 as an Hadoop override at the Spark cluster level.
  • To avoid all Spark streaming applications on a specific cluster from being timed out, set spark.qubole.idle.timeout -1 as a Spark configuration variable in the Override Spark Configuration Variables text field of the Spark cluster configuration UI page. See Configuring a Spark Cluster for more information.

Using UDFs in Spark SQL

An UDF (user-defined function) is a way of adding a function to Spark SQL. It operates on distributed DataFrames and works row-by-row unless it is created as an user-defined aggregation function. Open-source Spark provides two alternative methods:

  • Using Hive functions
  • Using Scala functions

The following example uses Hive functions to add an UDF and use it in Spark SQL.

import org.apache.spark._
import org.apache.spark.sql._
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;

class SimpleUDFExample extends UDF {
  def evaluate(input: Text) : Text = {
    if (input == null) return null
    return new Text("Hello " + input.toString)
  }
}
object sqltest {
  def main(args: Array[String]) {
    val sc = new SparkContext(new SparkConf())
    val sqlContext = new  org.apache.spark.sql.hive.HiveContext(sc)
    sqlContext.sql("create temporary function hello as 'SimpleUDFExample'")
    val result = sqlContext.sql("""
        select hello(name) from products_avro order by month, name, price
       """)
    result.collect.foreach(println)
  }
}

For an example using Scala functions, see UDF Registration.