Aster Spark Connector

Learn Data Science
Teradata Employee

What is the Aster Spark Connector:

The Aster-Spark connector enables you to run Spark functions by running a simple Aster query. It allows you to use the existing and the growing Spark MLib and analytics libraries from an Aster application. The Aster Analytics developers and customers can write Spark functions using its easy-to-use powerful APIs that can then be used by Aster applications.
The Aster-Spark connector consists of a function called RunOnSpark, a set of Spark extensions, an API, and a set of ready to use Spark functions. The RunOnSpark function, together with the Spark extensions, forms the framework to enable running Spark functions from within Aster. The API allows developers and users to write Spark code that can be used with the framework. The ready to use Spark functions are already built functions that can be used with the Framework.

The Aster-Spark connector consists of a function called RunOnSpark, a set of Spark extensions, an API, and a set of ready to use Spark functions. The RunOnSpark function, together with the Spark extensions, forms the framework to enable running Spark functions from within Aster. The API allows developers and users to write Spark code that can be used with the framework. The ready to use Spark functions are already built functions that can be used with the Framework.

Below is a simple example that shows how to use the Aster-Spark connector. The RunOnSpark function runs on the Aster side, communicates with Spark, and executes the Spark function LinearRegrWithSGDTrain. As is explained later, LinearRegrWithSGDTrain is wrapper written in Scala that runs the Spark MLlib LinearRegrWithSGD function.

select mse
from RunOnSpark(on
linearRegrTrainSet
sparkcode('com.teradata.aster.examples.LinearRegrWithSGDTrain')
outputs ( 'mse double precision' )
) ;

More about the RunOnSpark Function

This section specifies the syntax for running the RunOnSpark function. The RunOnSpark function arguments can also be defined in a configuration file (see Configuration File section below) so that you do not have to specify them when writing queries.

RunOnSpark Syntax

RunOnSpark (
ON <statement or table>
SPARKCODE (<executable-name>[ <arguments>])
[OUTPUTS ('colname coltype', ...)]
[MEM_LIMIT_MB ('limit')]
[TIMEOUT_SEC(<timeout-value>))
[STATUS_INTERVAL_SEC(<status-interval-value>))
[SPARK_MASTER_URL('yarn'|’<standalone-master-url>')]
[USE_STANDALONE_REST_SUBMIT('true'|'false')]
[APP_RESOURCE('<aster-jar-location>')]
[JARS('<jar1>,<jar2>,....')]
[EXTRA_SPARK_SUBMIT_OPTIONS('--<option1> <value1> ...')]
[SPARK_CLUSTER_ID('<cluster-id>')]
[DATA_TRANSFER('Socket|Socket-persist|File')]
[PERSIST_LOCATION(<’path’)]
[SPARK_PROPERTIES(‘spark-property-name:spark-propery-value’, ….)]
[REST_URL(‘<rest-url>’)]
[SSH_HOST(‘<host-to-ssh-to>’)]
[IDENTITY_FILE('<identity-file>')]
[SPARK_SUBMIT_COMMAND(‘<spark-submit-command>’)]
[YARN_COMMAND(‘<yarn-command>’)]
[HADOOP_JARS_LOCATIONS(‘<locations-of-hadoop-jars>’]
[HADOOP_CONF_LOCATION(‘<location-of-hadoop-configuration>’)]
[SPARK_CONF_LOCATION(‘<location-of-spark-configuration>’)]
[SPARK_JOB_USER_KEY_TAB('<spark-job-user-key-tab>')]
[KINIT('<kinit-cmd>')]
[WORKERS_IP_ADDRESSES('<ip-address>,<ip-address>, ..')
[LOGGING_LEVEL(‘INFO’|’DEBUG’|’ERROR’|’WARN’)]
[MAILMANSERVER_BLOCK_TIMEOUT_SEC(<value-seconds>)]
);

Aster-Spark API


An API is available on the Spark side to help you write Spark functions that use the RunOnSpark feature. The API offers a set of classes and methods developed in the Scala programming language that allows the Spark-side code to transparently:


• Connect to the Aster function and receive data that serves as input for Spark processing.
• Persist the received data to files using custom persisting or native one if needed.
• Write results back to Aster once the processing is done.
• Recognize and take into account the different data types and data type conversions.

The API also allows the Spark code developer to write the processing code by following the provided Spark function template.


Functions developed on the Spark side to work with Aster RunOnSpark can derive from one of AsterSparkFunction sub classes which with other predefined Scala classes (custom RDDs) encapsulate/hide all complexities of Aster-Spark interaction and data transfers. Tasks that are performed by the Custom RDDS include:

• Processing all the arguments.
• Connecting to the RunOnSpark instances to get data and persist it if needed.
• Connecting to RunOnSpark instances and sending results back.
• Data types handling and conversions.

Aster-Spark API Example


Example Using Wrapper with DataFrames


/** The EchoDF Class is an example wrapper function that uses the Aster-Spark
* API and simply returns the input DataFrame as output. It mimics the behavior of
* an echo function and it derives from the base class AsterSparkFunctionDF
* which hides all the interactions with Aster.
* The Echo class overrides the run method and implements the echo
* functionality.
*
*/
class EchoDF (args : Array[String], name: String, mstr:String = null )
extends AsterSparkFunctionDF (args, name, mstr) {
/**
* The run method is overridden to implement the echo functionality.
* @param input The input data frame that reads data from the source.
* @param sparkFuncParams String representing the parameters specific to the
* function the user is implementing. In this example sparkFuncParams
* parameter is not used.
* @return The result is a data frame containing all the input data.
*/
override def run(input:DataFrame, sparkFunctParams: String ): DataFrame = {
input
}
}