Implementing Scoring SDK on Spark Streaming - A Demo

Learn Data Science
Teradata Employee

Aster Scoring SDK that was released months ago holds a great promise that it will enable operationalization of predictive models created in Aster on real time or streaming systems.

Currently the recommended way to "operationalize" analysis in Aster is by using AppCenter. Using AppCenter apps, we can create an app that does data loading from Hadoop or Teradata, then create an app that recreates prediction models, and another app that predicts something using those models. Or, we can just put all those activities or processes into a single app and schedule its execution time. It is already pretty solid.

Now that Aster Scoring SDK has come into the picture, we have an extra option to operationalize the prediction/scoring part of analysis process somewhere else outside Aster platform. Compared to the approaches that require us to bring data to Aster, analyze it there, and bring the result out of Aster somewhere, now we can analyze the data where it resides or flows without bringing it to Aster platform.

We can also go as big as deploying the model in REST services. I imagine that in an enterprise, there may lie a bunch of different applications which connect to web services that serve a particular data or functionalities. Hence it will be great to have a centralized prediction/scoring service if we expect there will be multiple applications consuming its functionalities.

In this post I will demonstrate, in a high level, how I implemented a prediction model created in Aster in a Spark Streaming application. The motivation behind choosing Spark is the rising adoption of Spark and streaming applications. Spark certainly caught many eyes with its much better performance compared to MapReduce, easy to use high level APIs, and a more complete set of libraries that you can use in the same application. There are many contenders to Spark, though, like Apache Flink and Apache Beam.

For this demo, I used a public data set of telco churn which I downloaded from here: https://www.sgi.com/tech/mlc/db/. The training data set was quickly loaded to Aster using ncluster_loader tool. Using that data set, I created a prediction model using Aster's Naive Bayes function. Please note that I've removed some part of the codes for the sake of readability.

DROP TABLE IF EXISTS tif2016.telco_churn_nbmodel;
CREATE TABLE tif2016.telco_churn_nbmodel
DISTRIBUTE BY HASH(class)
AS
SELECT * FROM NaiveBayesReduce (
ON (
SELECT *
FROM NaiveBayesMap (
ON tif2016.telco_churn
Response ('churn')
NumericInputs(
'number_vmail_messages',
'total_day_minutes',
...
)
CategoricalInputs('voice_mail_plan')
)
) PARTITION BY class
);


After I created the model, which was saved as a table, I gauged the model's accuracy using Aster's ConfusionMatrix function. It is a really nice function to help us quickly see how the prediction model performs. It creates 3 tables, the first one is the confusion matrix, and the other 2 contains statistics like the false positives and negatives. I was satisfied enough with the model's performance.

This is where we start using Aster Scoring SDK -- after we create a model. The following AMLGenerator function was executed to export the model table as a flat model file. Afterwards, we need to download the file and develop an application which uses the JAR file provided by Aster Scoring SDK.

Select * from AMLGenerator (
ON (SELECT 1) PARTITION BY 1
MODELTYPE ('nb') -- Naive Bayes
MODELTABLE ('tif2016.telco_churn_nbmodel') -- Source table name
REQUESTCOLNAMES (
'number_vmail_messages',
'total_day_minutes',
...
'voice_mail_plan'
)
REQUESTCOLTYPES (
'INT',
'DOUBLE',
...
'STRING'
)
AMLPREFIX('telco_churn_nbmodel') -- File name
OVERWRITEOUTPUT('true')
REQUESTARGNAME1('IdCol')
REQUESTARGVAL1('phone_number') -- Name-value pair 1 for ID
...
);

Now comes the Spark Streaming part. I developed a simple Spark Streaming application, which was modified from the NetworkWordCount example. Some modifications were made to accept different input arguments, and to separate the record processing functionality into its own function. The Spark Streaming code looks like the following snippet.

// Import
import com.asterdata.analytics.scoring.*
...
// Main function
def main(args: Array[String]) {
// Spark configurations
...
val ssc = new StreamingContext(sparkConf, Seconds(1))
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.map(line => processLine(line, args(2)))
...
// Send results somewhere else
}
// Row processing function
// Dashboard is a socket-enabled NodeJS dashboard
def processLine(line: String, dashboardHost: String): Response = {
val stringArray = line.split(",")
// Create scorer and load model file
val scorer: Scorer = new Scorer
val request: Request = scorer.configure("telco_churn_nbmodel.aml")
val requestRow: RequestRow = new RequestRow(request.getColumnCount)
// Add variables to be scored
requestRow.setStringAt(0, stringArray(3).trim)
requestRow.setIntAt(1, stringArray(6).trim.toInt)
...
// Predict the churn
request.addRequest(requestRow)
val response: Response = scorer.score(request)
return response
}

After the spark code was created and compiled (using SBT or through an IDE), all I needed to do is to submit it to my Spark cluster. I needed to include the scoring JAR file I got from the SDK and my model file in the submission:

spark-submit --class com.teradata.aster.spark.ChurnScorer --master spark://localhost:7077 --executor-memory 128M --jars /root/scoring.jar,/root/churn_scorer/telco_churn_nbmodel.aml churnscorer-spark-scala-0.1.0-all.jar localhost 9898 http://localhost:3702

In the demo I developed, there was also a NodeJS dashboard that displayed the number of subscribers predicted as churn or not. However, for the sake of simplicity I would not discuss the code here. You can see the end result of this demo in the following video.

Video Link : 1048