Submit a Spark Command

POST /api/v1.2/commands/

Use this API to submit a Spark command.

Required Role

The following users can make this API call:

  • Users who belong to the system-admin or system-user group.
  • Users who belong to a group associated with a role that allows submitting a command. See Managing Groups and Managing Roles for more information.

Parameters

Note

Parameters marked in bold below are mandatory. Others are optional and have default values.

Parameter Description
program Provide the complete Spark Program in Scala, SQL, Command, R, or Python.
language

Specify the language of the program. The supported values are scala (Scala), sql (SQL), command_line (Command), R (R), or py (Python). Required only when a program is used.

Note

Values are case-sensitive.

script_location Specify an cloud storage path where the Spark query (Scala, Python, SQL, R, and Command Line) script is stored. Storage credentials stored in the account are used to retrieve the script file.
arguments Specify the spark-submit command line arguments here.
user_program_arguments Specify the arguments that the user program takes in.
cmdline Alternatively, you can provide the spark-submit command line itself. If you use this option, you cannot use any other parameters mentioned here. All required information is captured in command line itself.
command_type SparkCommand
label Specify the cluster label on which this command is to be run.
retry Denotes the number of retries for a job. Valid values of retry are 1, 2, and 3.
retry_delay Denotes the time interval between the retries when a job fails.
app_id ID of an app, which is a main abstraction of the Spark Job Server API. An app is used to store the configuraton for a Spark application. See Understanding the Spark Job Server for more information.
name Add a name to the command that is useful while filtering commands from the command history. It does not accept & (ampersand), < (lesser than), > (greater than), ” (double quotes), and ‘ (single quote) special characters, and HTML tags as well. It can contain a maximum of 255 characters.
tags Add a tag to a command so that it is easily identifiable and searchable from the commands list in the Commands History. Add a tag as a filter value while searching commands. It can contain a maximum of 255 characters. A comma-separated list of tags can be associated with a single command. While adding a tag value, enclose it in square brackets. For example, {"tags":["<tag-value>"]}.
macros Denotes the macros that are valid assignment statements containing the variables and its expression as: macros: [{"<variable>":<variable-expression>}, {..}]. You can add more than one variable. For more information, see Macros.
pool Use this parameter to specify the Fairscheduler pool name for the command to use.
timeout It is a timeout for command execution that you can set in seconds. Its default value is 129600 seconds (36 hours). QDS checks the timeout for a command every 60 seconds. If the timeout is set for 80 seconds, the command gets killed in the next minute that is after 120 seconds. By setting this parameter, you can avoid the command from running for 36 hours.

Note

  • You can run Spark commands with large script file and large inline content.
  • You can use macros in script files for the Spark commands with subtypes scala (Scala), py (Python), R (R), command_line (Command), and sql (SQL). You can also use macros in large inline contents and large script files for scala (Scala), py (Python), R (R), and sql (SQL).

These features are not enabled for all users by default. Create a ticket with Qubole Support to enable these features on the QDS account.

Note

If you are submitting a Scala code that contains multiple lines, then you must escape every new line with the escape character \n .

Examples

Examples are written in python and uses pyCurl. Using CURL directly is possible but hard as the program needs escaping. Also, JSON does not support new lines. To avoid confusion, these python API examples are provided which are clear and can be used directly.

Alternatively, you can use qds-sdk-py directly.

Example Python API Framework

import sys
import pycurl
import json
c= pycurl.Curl()
url="https://gcp.qubole.com/api/v1.2/commands"
auth_token = <provide auth token here>
c.setopt(pycurl.URL, url)
c.setopt(pycurl.HTTPHEADER, ["X-AUTH-TOKEN: "+ auth_token, "Content-Type:application/json", "Accept: application/json"])
c.setopt(pycurl.POST,1)

(After this, select any of the following examples depending on the requirement.)

The above code snippet can be used to make API calls. The following examples uses the above program as its base and shows various use-cases.

Example to Submit Spark Scala Program

prog = '''
import scala.math.random

import org.apache.spark._

/** Computes an approximation to pi */
object SparkPi {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Spark Pi")
    val spark = new SparkContext(conf)
    val slices = if (args.length > 0) args(0).toInt else 2
    val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
    val count = spark.parallelize(1 until n, slices).map { i =>
      val x = random * 2 - 1
      val y = random * 2 - 1
      if (x*x + y*y < 1) 1 else 0
    }.reduce(_ + _)
    println("Pi is roughly " + 4.0 * count / n)
    spark.stop()
  }
}
'''
data=json.dumps({"program":prog,"language":"scala","arguments":"--class SparkPi", "command_type":"SparkCommand"})

c.setopt(pycurl.POSTFIELDS, data)
c.perform()

To submit a snippet to the Spark Job Server app, use the following data payload instead of the above data.

data=json.dumps({"program":prog,"language":"scala","arguments":"--class SparkPi", "command_type":"SparkCommand",
"label"="spark","app_id"="3"})

Where app_id = Spark Job Server app ID. See Understanding the Spark Job Server for more information.

Example to Submit Spark Python Program

Here is the Spark Pi example in Python.

prog = '''
import sys
from random import random
from operator import add

from pyspark import SparkContext


if __name__ == "__main__":
    """
        Usage: pi [partitions]
    """
    sc = SparkContext(appName="PythonPi")
    partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
    n = 100000 * partitions

    def f(_):
        x = random() * 2 - 1
        y = random() * 2 - 1
        return 1 if x ** 2 + y ** 2 < 1 else 0

    count = sc.parallelize(xrange(1, n + 1), partitions).map(f).reduce(add)
    print "Pi is roughly %f" % (4.0 * count / n)

    sc.stop()
'''
data=json.dumps({"program":prog,"language":"python","command_type":"SparkCommand"})

c.setopt(pycurl.POSTFIELDS, data)
c.perform()

Example to Add Spark Submit Options

Add arguments in JSON body to supply spark-submit options. You can pass remote files in a cloud storage location in addition to the local files as values to the --py-files argument.

data=json.dumps(
{"program":prog,
"language":"python", "arguments": "--num-executors 10 --max-executors 10 --executor-memory 5G --executor-cores 2"
"command_type":"SparkCommand"})

Example to Add Arguments to User Program

Add user_program_arguments in JSON body. Here is a sample program which takes in arguments (input and output location).

prog=
'''spark.range(args(0).toInt).collect.foreach (println)'''

data=json.dumps(
{"program":prog,
"language":"scala",
"user_program_arguments": "10",
"command_type":"SparkCommand",})

c.setopt(pycurl.POSTFIELDS, data)
c.perform()

Example to Use Command Line Parameter

For power users, Qubole provides the ability to provide the spark-submit command line directly. This is explained in detail here.

Note

It is not recommended to run a Spark application as a Bash command under the Shell command options because automatic changes such as increase in the Application Coordinator memory based on the driver memory and debug options’ availability do not happen. Such automatic changes occur when you run a Spark application through the Command Line option.

In this case, you must compile the program (in case of Scala), create a jar, upload the file to cloud storage and invoke the command line. Note that Qubole’s deployment of Spark is available at the /usr/lib/spark directory:

/usr/lib/spark/bin/spark-submit [options] <app jar in cloud storage | python file> [app options]

Here is an example.

/usr/lib/spark/bin/spark-submit --class <classname>  --max-executors 100 --num-executors 15 --driver-memory 10g
--executor-memory 3g --executor-cores 5 <jar_path_in-storage> <arguments>

Here is a REST API example to submit a Spark command in the command-line language.

curl  -i -X POST -H "X-AUTH-TOKEN: $AUTH_TOKEN" -H "Content-Type: application/json" -H "Accept: application/json" \
-d '{"cmdline":"/usr/lib/spark/bin/spark-submit --class org.apache.spark.examples.SparkPi
     --master yarn-client /usr/lib/spark/spark-examples-*", "language":"command_line", "command_type":"SparkCommand",
     "label":"sparkcluster"}' \
      "https://gcp.qubole.com/api/v1.2/commands"

Example to Submit Spark Command in SQL

You can submit a Spark Command in SQL. Here is an example to submit a Spark Command in SQL.

curl  -i -X POST -H "X-AUTH-TOKEN: $AUTH_TOKEN" -H "Content-Type: application/json" -H "Accept: application/json" \
-d '{
      "sql":"select * from default_qubole_memetracker limit 10;",
      "language":"sql","command_type":"SparkCommand", "label":"spark"
    }' \
"https://gcp.qubole.com/api/v1.2/commands"

When submitting a Spark command in SQL, you can specify the location of a SparkSQL script in the script_location parameter as shown in the following example.

curl  -i -X POST -H "X-AUTH-TOKEN: $AUTH_TOKEN" -H "Content-Type: application/json" -H "Accept: application/json" \
-d '{"script_location":"<S3 Path>", "language":"sql", "command_type":"SparkCommand", "label":"<cluster-label>"
    }' \
"https://gcp.qubole.com/api/v1.2/commands"

Example to Submit a Spark Command in SQL to a Spark Job Server App

You can submit a Spark command in SQL to an existing Spark Job Server app.

curl  -i -X POST -H "X-AUTH-TOKEN: $AUTH_TOKEN" -H "Content-Type: application/json" -H "Accept: application/json" \
-d '{
      "sql":"select * from default_qubole_memetracker limit 10;",
      "language":"sql","command_type":"SparkCommand", "label":"spark","app_id":"3"
    }' \
"https://gcp.qubole.com/api/v1.2/commands"

Where app_id = Spark Job Server app ID. See Understanding the Spark Job Server for more information.

Known Issue

The Spark Application UI may display the state of the application incorrectly when preemptible instances are used.

When GCP reclaims the instance the coordinator node is running on, the Spark Application UI may still show the application is running. You can see the actual status of the Qubole command on the Workbench or Notebooks page.

To avoid this issue, use an on-demand instance for the coordinator node.