Submit a Spark Command¶
-
POST
/api/v1.2/commands/
¶
Use this API to submit a Spark command.
Required Role¶
The following users can make this API call:
- Users who belong to the system-admin or system-user group.
- Users who belong to a group associated with a role that allows submitting a command. See Managing Groups and Managing Roles for more information.
Parameters¶
Note
Parameters marked in bold below are mandatory. Others are optional and have default values.
Parameter | Description |
---|---|
program | Provide the complete Spark Program in Scala, SQL, Command, R, or Python. |
language | Specify the language of the program. The supported values are Note Values are case-sensitive. |
script_location | Specify an cloud storage path where the Spark query (Scala, Python, SQL, R, and Command Line) script is stored. Storage credentials stored in the account are used to retrieve the script file. |
arguments | Specify the spark-submit command line arguments here. |
user_program_arguments | Specify the arguments that the user program takes in. |
cmdline | Alternatively, you can provide the spark-submit command line itself. If you use this option, you cannot use any other parameters mentioned here. All required information is captured in command line itself. |
command_type | SparkCommand |
label | Specify the cluster label on which this command is to be run. |
retry | Denotes the number of retries for a job. Valid values of retry are 1, 2, and 3. |
retry_delay | Denotes the time interval between the retries when a job fails. |
app_id | ID of an app, which is a main abstraction of the Spark Job Server API. An app is used to store the configuraton for a Spark application. See Understanding the Spark Job Server for more information. |
name | Add a name to the command that is useful while filtering commands from the command history. It does not accept & (ampersand), < (lesser than), > (greater than), ” (double quotes), and ‘ (single quote) special characters, and HTML tags as well. It can contain a maximum of 255 characters. |
tags | Add a tag to a command so that it is easily identifiable and searchable from the commands list in the Commands History. Add a tag as a filter value while searching commands.
It can contain a maximum of 255 characters. A comma-separated list of tags can be associated with a single command. While adding a tag value, enclose it in square brackets. For example,
{"tags":["<tag-value>"]} . |
macros | Denotes the macros that are valid assignment statements containing the variables and its expression as: macros: [{"<variable>":<variable-expression>}, {..}] . You can add more than one variable.
For more information, see Macros. |
pool | Use this parameter to specify the Fairscheduler pool name for the command to use. |
timeout | It is a timeout for command execution that you can set in seconds. Its default value is 129600 seconds (36 hours). QDS checks the timeout for a command every 60 seconds. If the timeout is set for 80 seconds, the command gets killed in the next minute that is after 120 seconds. By setting this parameter, you can avoid the command from running for 36 hours. |
Note
- You can run Spark commands with large script file and large inline content.
- You can use macros in script files for the Spark commands with subtypes
scala
(Scala),py
(Python),R
(R),command_line
(Command), andsql
(SQL). You can also use macros in large inline contents and large script files forscala
(Scala),py
(Python),R
(R), andsql
(SQL).
These features are not enabled for all users by default. Create a ticket with Qubole Support to enable these features on the QDS account.
Note
If you are submitting a Scala code that contains multiple lines, then you must escape every new line with the escape character \n
.
Examples¶
Examples are written in python and uses pyCurl. Using CURL directly is possible but hard as the program needs escaping. Also, JSON does not support new lines. To avoid confusion, these python API examples are provided which are clear and can be used directly.
Alternatively, you can use qds-sdk-py directly.
Example Python API Framework¶
import sys
import pycurl
import json
c= pycurl.Curl()
url="https://gcp.qubole.com/api/v1.2/commands"
auth_token = <provide auth token here>
c.setopt(pycurl.URL, url)
c.setopt(pycurl.HTTPHEADER, ["X-AUTH-TOKEN: "+ auth_token, "Content-Type:application/json", "Accept: application/json"])
c.setopt(pycurl.POST,1)
(After this, select any of the following examples depending on the requirement.)
The above code snippet can be used to make API calls. The following examples uses the above program as its base and shows various use-cases.
Example to Submit Spark Scala Program¶
prog = '''
import scala.math.random
import org.apache.spark._
/** Computes an approximation to pi */
object SparkPi {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark Pi")
val spark = new SparkContext(conf)
val slices = if (args.length > 0) args(0).toInt else 2
val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
val count = spark.parallelize(1 until n, slices).map { i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / n)
spark.stop()
}
}
'''
data=json.dumps({"program":prog,"language":"scala","arguments":"--class SparkPi", "command_type":"SparkCommand"})
c.setopt(pycurl.POSTFIELDS, data)
c.perform()
To submit a snippet to the Spark Job Server app, use the following data
payload instead of the above data
.
data=json.dumps({"program":prog,"language":"scala","arguments":"--class SparkPi", "command_type":"SparkCommand",
"label"="spark","app_id"="3"})
Where app_id
= Spark Job Server app ID. See Understanding the Spark Job Server for more information.
Example to Submit Spark Python Program¶
Here is the Spark Pi example in Python.
prog = '''
import sys
from random import random
from operator import add
from pyspark import SparkContext
if __name__ == "__main__":
"""
Usage: pi [partitions]
"""
sc = SparkContext(appName="PythonPi")
partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
n = 100000 * partitions
def f(_):
x = random() * 2 - 1
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 < 1 else 0
count = sc.parallelize(xrange(1, n + 1), partitions).map(f).reduce(add)
print "Pi is roughly %f" % (4.0 * count / n)
sc.stop()
'''
data=json.dumps({"program":prog,"language":"python","command_type":"SparkCommand"})
c.setopt(pycurl.POSTFIELDS, data)
c.perform()
Example to Add Spark Submit Options¶
Add arguments in JSON body to supply spark-submit options. You can pass remote files in a cloud storage location in
addition to the local files as values to the --py-files
argument.
data=json.dumps(
{"program":prog,
"language":"python", "arguments": "--num-executors 10 --max-executors 10 --executor-memory 5G --executor-cores 2"
"command_type":"SparkCommand"})
Example to Add Arguments to User Program¶
Add user_program_arguments in JSON body. Here is a sample program which takes in arguments (input and output location).
prog=
'''spark.range(args(0).toInt).collect.foreach (println)'''
data=json.dumps(
{"program":prog,
"language":"scala",
"user_program_arguments": "10",
"command_type":"SparkCommand",})
c.setopt(pycurl.POSTFIELDS, data)
c.perform()
Example to Use Command Line Parameter¶
For power users, Qubole provides the ability to provide the spark-submit command line directly. This is explained in detail here.
Note
It is not recommended to run a Spark application as a Bash command under the Shell command options because automatic changes such as increase in the Application Coordinator memory based on the driver memory and debug options’ availability do not happen. Such automatic changes occur when you run a Spark application through the Command Line option.
In this case, you must compile the program (in case of Scala), create a jar, upload the file to cloud storage and invoke the command line. Note that Qubole’s deployment of Spark is available at the /usr/lib/spark directory:
/usr/lib/spark/bin/spark-submit [options] <app jar in cloud storage | python file> [app options]
Here is an example.
/usr/lib/spark/bin/spark-submit --class <classname> --max-executors 100 --num-executors 15 --driver-memory 10g
--executor-memory 3g --executor-cores 5 <jar_path_in-storage> <arguments>
Here is a REST API example to submit a Spark command in the command-line language.
curl -i -X POST -H "X-AUTH-TOKEN: $AUTH_TOKEN" -H "Content-Type: application/json" -H "Accept: application/json" \
-d '{"cmdline":"/usr/lib/spark/bin/spark-submit --class org.apache.spark.examples.SparkPi
--master yarn-client /usr/lib/spark/spark-examples-*", "language":"command_line", "command_type":"SparkCommand",
"label":"sparkcluster"}' \
"https://gcp.qubole.com/api/v1.2/commands"
Example to Submit Spark Command in SQL¶
You can submit a Spark Command in SQL. Here is an example to submit a Spark Command in SQL.
curl -i -X POST -H "X-AUTH-TOKEN: $AUTH_TOKEN" -H "Content-Type: application/json" -H "Accept: application/json" \
-d '{
"sql":"select * from default_qubole_memetracker limit 10;",
"language":"sql","command_type":"SparkCommand", "label":"spark"
}' \
"https://gcp.qubole.com/api/v1.2/commands"
When submitting a Spark command in SQL, you can specify the location of a SparkSQL script in the script_location
parameter as shown in the following example.
curl -i -X POST -H "X-AUTH-TOKEN: $AUTH_TOKEN" -H "Content-Type: application/json" -H "Accept: application/json" \
-d '{"script_location":"<S3 Path>", "language":"sql", "command_type":"SparkCommand", "label":"<cluster-label>"
}' \
"https://gcp.qubole.com/api/v1.2/commands"
Example to Submit a Spark Command in SQL to a Spark Job Server App¶
You can submit a Spark command in SQL to an existing Spark Job Server app.
curl -i -X POST -H "X-AUTH-TOKEN: $AUTH_TOKEN" -H "Content-Type: application/json" -H "Accept: application/json" \
-d '{
"sql":"select * from default_qubole_memetracker limit 10;",
"language":"sql","command_type":"SparkCommand", "label":"spark","app_id":"3"
}' \
"https://gcp.qubole.com/api/v1.2/commands"
Where app_id
= Spark Job Server app ID. See Understanding the Spark Job Server for more information.
Known Issue¶
The Spark Application UI may display the state of the application incorrectly when preemptible instances are used.
When GCP reclaims the instance the coordinator node is running on, the Spark Application UI may still show the application is running. You can see the actual status of the Qubole command on the Workbench or Notebooks page.
To avoid this issue, use an on-demand instance for the coordinator node.