Orchestrating an analytic workflow in the Teradata Unified Data Architecture

UDA
The UDA channel is for Teradata’s Unified Data Architecture including the Analytical Ecosystem and other UDA influences. This channel provides information specific to the integration and co-existence of multiple systems, in particular when a mix of Aster, Teradata, and Hadoop are present. It is also meant to support information around the UDA enabling technologies so products like Viewpoint, Data Mover, Connectors, QueryGrid, etc.
Teradata Employee

Orchestrating an analytic workflow in the Teradata Unified Data Architecture

Overview

This article describes how to combine exploratory analytics and operational analytics within the Teradata Unified Data architecture (UDA). The UDA is a logical and physical architecture that adds a data lake platform to complement the Teradata Integrated Data Warehouse. In the Teradata advocated solution, the data lake platform can either be Hadoop or a Teradata Integrated Big Data Platform optimized for storage and processing of big data. Query Grid is an orchestration mechanism that supports seamless integration of multiple types of purpose built analytic engines within the UDA.

The sample business problem used in this article is to predict a New York City taxi trips travel time using Internet Of Things (IOT) fleet data obtained from [1]. The simple predictive model is based on clustering historical taxi trip data using trip starting geospatial coordinates. The generated model is then used by the operational analytics model scoring routine to predict duration of new trips.  Specifically the process flow uses a spark based discovery clustering algorithm and a Teradata Integrated Data Warehouse clustering scoring algorithm. K-means cluster analysis algorithm is used to demonstrate this capability.  K-means is a computationally expensive algorithm that under certain conditions is advantageous to execute the model building component on the data lake cluster. On the other hand model scoring can be a very operational in nature and it may be advantageous to execute within the IDW.

The article describes several techniques

·        How to use query grid to bi-directionally transfer data between Teradata and HDFS / Hive.

·        How to convert Teradata exported data, using Spark's python domain specific language, to a format consumable by spark's MLlib K-means algorithm

·        How to convert the Spark K-means algorithm model output to a format consumable by Teradata

·        How to score the model on Teradata using k-means function

·        How to use R to interface with the Teradata IDW to visualize the model characteristics

The process flow requires a query grid export from the Teradata IDW to HDFS to prepare the data stored in the Teradata IDW. A similar common scenario would be to execute k-means on data distributed between HDFS and the IDW for example current data in IDW and history in “Hadoop”.  Spark SQL is used to create a data frame from the input data set which is then converted into a spark python numpy vector RDD which is processed by k-means . The k-means model output, centroids, are then stored back to HDFS using Spark SQL. The k-means model can then be accessed by the IDW for scoring the model using a query grid import operation. The process flow's required potential physical disk IO is not considered significant for the exploratory analytics phase. Further the model output data is K rows which in general will be 100s or fewer rows.

The produced k-means model which is based on clustering the data by trip origination coordinates is not meant to be a high quality model but rather is used as an example of a UDA based analytics process flow. Though it is true that using IOT taxi data to predict trip time and distance is a valid real world fleet management business problem.

Process Flow

This section describes the key processing steps in the UDA based analytic workflow.

1) From IDW client load test data for this example

CREATE TABLE taxidata (
medallion VARCHAR(32)
, hack_license VARCHAR(32)
, vendor_id VARCHAR(16)
, rate_code VARCHAR(8)
, store_and_fwd_flag VARCHAR(3)
, pickup_datetime TIMESTAMP
, dropoff_datetime TIMESTAMP
, passenger_count INTEGER
, trip_time_in_secs INTEGER
, trip_distance FLOAT
, pickup_longitude FLOAT
, pickup_latitude FLOAT
, dropoff_longitude FLOAT
, dropoff_latitude FLOAT
) PRIMARY INDEX (medallion, pickup_datetime);
.IMPORT VARTEXT ',' FILE = "trip_data_1.txt", SKIP 1
.repeat * PACK 10000
USING a1(VARCHAR(32)), a2(VARCHAR(32)), a3(VARCHAR(32)), a4(VARCHAR(32)), a5(VARCHAR(32))
,a6(VARCHAR(32)), a7(VARCHAR(32)), a8(VARCHAR(32)), a9(VARCHAR(32)), a10(VARCHAR(32))
,a11(VARCHAR(32)), a12(VARCHAR(32)), a13(VARCHAR(32)), a14(VARCHAR(32))
INSERT INTO taxidata (:a1, :a2, :a3, :a4, :a5, :a6, :a7, :a8, :a9, :a10, :a11, :a12, :a13, :a14 );

2) From IDW client prepare Input data by normalizing the variables to be clustered using z-score. z-score based normalization is a common technique for k-means. If the data is not normalized then a single variable can dominate the cluster definitions.

CREATE TABLE indata AS (
SELECT
CAST(row_number() OVER (order by medallion) AS BIGINT) AS tripid
,(pickup_longitude - plonmean) / plonstd as zlon
,(pickup_latitude - platmean) / platstd as zlat
,t.*
FROM
taxidata t
,(SELECT
AVG(pickup_longitude) plonmean
,AVG(pickup_latitude) platmean
,STDDEV_POP(pickup_longitude) plonstd
,STDDEV_POP(pickup_latitude) platstd
FROM taxidata
) as d
WHERE pickup_longitude BETWEEN -75 and -72 AND pickup_latitude BETWEEN 39 and 41
AND trip_time_in_secs > 0
) WITH DATA NO PRIMARY INDEX

3) From IDW client export the data to HDFS

The assumption is that the input K-means Teradata table row layout is a single BIG INTEGER key value followed by N floating point variables.  In a specific test case the Hive DDL is as follows:

CALL SYSLIB.ExecuteForeignSQL('DROP TABLE temp.kmdata','hive_server_sql');
CALL SYSLIB.ExecuteForeignSQL('
CREATE TABLE temp.kmdata (
tripid BIGINT
,zlon DOUBLE
,zlat DOUBLE
)','hive_server_sql');

INSERT INTO temp.kmdata@hive_server SELECT tripid, zlon, zlat FROM indata;

Note if the data to be exported to Hive is “small”,  by default query grid minimally creates one HDFS block per AMP, you may want to use the foreign server “merge_hdfs_files('value’)” option. This option indicates that files under the same partition should be merged whenever possible. The default is to not merge. A value of TRUE means that files will be merged FALSE otherwise. The following Teradata DDL command can be used to modify this option  “ALTER FOREIGN SERVER tdsqlhhive ADD merge_hdfs_files('true') ;”

4) From Hadoop client build k-means model using spark

execute spark program when logged in as user Hive.

/usr/hdp/current/spark-client/bin/spark-submit --master yarn-client --num-executors 36 --driver-memory 1g --executor-memory 2g --executor-cores 1 km1.py 5 100 temp.kmdata

This invokes spark with 36 units of parallelism. The spark script parameters are:

1 - script name, in this instance km1.py

2 - K value for k-means, in this instance 5

3 - Iteration count, in this instance 100

4 - input table, in this instance temp.kmdata

The model results are stored in the table temp.temp0 with the centroids stored as a comma separated string.

Spark python code in km1.py

import sys
from pyspark.mllib.clustering import KMeans, KMeansModel
from numpy import array
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.mllib.stat import Statistics
from pyspark.sql import HiveContext
from pyspark import SparkContext

if len(sys.argv) != 4:
print "\nerror need 3 argument K, iterations and table name, passed %d" % len(sys.argv)
sys.exit(-1);
# set command line argument values
K = int(sys.argv[1])
I = int(sys.argv[2])
tablename = sys.argv[3]
# establish contexts
sc =SparkContext()
sqlContext = HiveContext(sc)
# turn off some spark logging
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org").setLevel( logger.Level.OFF )
logger.LogManager.getLogger("akka").setLevel( logger.Level.OFF )
# create a dataframe of the input table
results = sqlContext.sql("SELECT * from " + tablename )
# convert all but first columns of data frame into an RDD of numpy arrrays
rdd = results.map(lambda row: array([float(x) for x in row[1:]])).cache();
# train the kmeans model
kmmodel = KMeans.train(rdd, k=K, maxIterations=I, epsilon=0.5, initializationMode="random")
# create an rdd of model centroids and add a numeric index
mdata1 = sc.parallelize(kmmodel.centers).zipWithIndex()
# convert list of numpy arrays to a list of lists and remove brackets
mdata2 = mdata1.map(lambda p: (p[1],str(p[0].tolist()).strip('[]')))
# create a data frame from rdd
df = sqlContext.createDataFrame(mdata2,['clusterid','centroids']);
# create a hive table from data frame
sqlContext.registerDataFrameAsTable(df, "df")
df1 = sqlContext.sql("drop table if exists temp.temp0");
df2 = sqlContext.sql("create table temp.temp0 as select clusterid, centroids FROM df")
quit()

spark input data frame, variable  "results"

+------+--------------------+-------------------+
|tripid| zlon| zlat|
+------+--------------------+-------------------+
| 3460|-0.13739773062347316|0.08376156855310186|
| 3461|-0.12675271088498358|0.08236429958709017|
| 3462| -0.1376704397048961| 0.0830650311027165|
| 3463|-0.12674807845597885|0.08235730947835547|
| 3464|-0.13562592723405106|0.08157301927831441|
| 3465|-0.11762118626640074|0.06843849742555715|
| 3466|-0.14035342173860255|0.07569068146780829|
| 3467|-0.13901656315075403|0.08013671324352162|
| 3468|-0.13856782176714297|0.08002971080981294|
| 3469| -0.1260350872091265|0.08202221441962368|
+------+--------------------+-------------------+

spark output RDD, variable "mdata2"

[(0, '-0.13373417020539682, 0.092519885313754241')
, (1, '-0.12872195185176416, 0.086335802400621581')
, (2, '-0.13053908855450827, 0.096213315236239619')
, (3, '-0.13322660223193433, 0.095896225019594328')
, (4, '-0.129960878391222, 0.099533583694781033')]

Hive table output schema

hive> describe temp.temp0;
OK
clusterid bigint
centroids string

step 5) From IDW client convert K-means cluster model output to a format consumable by IDW. The centroid definition string is parsed using STRTOK 

DROP TABLE model1;
CREATE TABLE model1 AS (
SELECT
clusterid + 1 as clusterid
,CAST(0 AS BIGINT) as pkey
,CAST(strtok(centroids,',',1) AS FLOAT) as zlon
,CAST(strtok(centroids,',',2) AS FLOAT) as zlat
FROM temp.temp0@hive_server
) WITH DATA NO PRIMARY INDEX;

Step 6) From IDW client create data for visualizations. It is useful to visually analyze the k-means model output, the following SQL creates a table with a set of rows that associates the cluster identifier with each of the input data rows

CREATE TABLE results_km AS (
SELECT
t.tripid
,d.clusterid
,CAST(pickup_longitude AS decimal(12,6)) as lon
,CAST(pickup_latitude AS decimal(12,6)) as lat
,trip_time_in_secs as tsecs
from
indata t
,(SELECT *
FROM td_kmeans_1_0
(
ON (SELECT CAST(-1 as INTEGER) as clusterid, tripid, zlon, zlat FROM indata)
ON (SELECT * FROM model1) DIMENSION ORDER BY clusterid
USING K(5) phase(2)
) AS x
) AS d
WHERE
t.tripid=d.tripid
) WITH DATA NO PRIMARY INDEX;

Step 7) From R client plot the trip elapsed time distributions. This visualization uses R and the Teradata R connector [2] to plot the 5 cluster distributions. Cluster 3 has a unique distribution

library(teradataR)
library(ggplot2)
library(gpclib)
library(maptools)
gpclibPermit()
library(rgdal)
library(sm)
library(RgoogleMaps)

tdConnect(dsn="2800p",uid="mww",pwd="mww")
tdf <- td.data.frame("results_km")
mydata <- as.data.frame.td.data.frame(tdf, size=100000)

cf <- factor(mydata$clusterid, levels= c(1,2,3,4,5), labels = c("c1", "c2", "c3", "c4", "c5"))
sm.density.compare(mydata$tsecs, mydata$clusterid, xlab="Trip Time In seconds")
title(main="Cluster Distribution by Trip Time")
colfill<-c(2:(2+length(levels(cf))))
legend(locator(1), levels(cf), fill=colfill)

Step 7) From R client plot Clusters, Trips and New York Boroughs. This visualization uses R to plot the cluster polygon hulls in blue, the trip data points in red and the New York City boroughs in black with green fill

find_hull <- function(mydata) mydata[chull(mydata$lon, mydata$lat), ]
hulls <- ddply(mydata, "clusterid", find_hull)

counties<-readOGR("nybb.shp", layer="nybb")
# nad SRS from counties
coordinates(mydata)<-~lon+lat
proj4string(mydata)<-CRS("+proj=longlat +datum=NAD83")
mydata<-spTransform(mydata, CRS(proj4string(counties)))
identical(proj4string(mydata),proj4string(counties))
mydata<-data.frame(mydata)

coordinates(hulls)<-~lon+lat
proj4string(hulls)<-CRS("+proj=longlat +datum=NAD83")
hulls<-spTransform(hulls, CRS(proj4string(counties)))
identical(proj4string(hulls),proj4string(counties))
hulls<-data.frame(hulls)

cnames <- aggregate(cbind(lon, lat) ~ clusterid, data=hulls,
FUN=function(x)mean(range(x)))
ggplot() + geom_polygon(data=counties, aes(x=long, y=lat, group=group), alpha=0.2,size=1.5, fill="green", colour="black") + geom_point(data=mydata, aes(x=lon, y=lat), color="red", size=1) + geom_polygon(data=hulls, aes(x=lon, y=lat, group=clusterid), alpha=1, size=1.5,fill=NA,colour="blue") + geom_text(data=cnames, aes(lon, lat, label = clusterid), size=18) + labs(x="x", y="y", title="NY Taxi Trip Start Clusters")

Step 9) From IDW client score the model. To perform the operational analytics we need to invoke the following function on the data to be clustered using the centroids stored in the model1 table. This function can be obtained from [3]. The following SQL represents this process invocation

SELECT TOP 10 *              
FROM td_kmeans_1_0
(
ON (SELECT CAST(-1 as INTEGER) as clusterid, tripid, zlon, zlat FROM indata d SAMPLE 100000)
ON (SELECT * FROM model1) DIMENSION ORDER BY clusterid
USING K(5) phase(2)
) AS d1

SQL explain plan:

  1) First, we lock a distinct MWW."pseudo table" for read on a RowHash
to prevent global deadlock for MWW.model1.
2) Next, we lock a distinct MWW."pseudo table" for read on a RowHash
to prevent global deadlock for MWW.d.
3) We lock MWW.model1 for read, and we lock MWW.d for read.
4) We do an all-AMPs SAMPLING step from MWW.d by way of an all-rows
scan with no residual conditions into Spool 1 (all_amps), which is
built locally on the AMPs. Samples are specified as a number of
rows.
5) We do an all-AMPs RETRIEVE step from MWW.model1 by way of an
all-rows scan with no residual conditions into Spool 2 (used to
materialize view, derived table, table function or table operator
TblOpInputSpool) (all_amps), which is duplicated on all AMPs.
Then we do a SORT to order Spool 2 by the sort key in spool field1.
The size of Spool 2 is estimated with high confidence to be 800
rows (42,400 bytes). The estimated time for this step is 0.02
seconds.
6) We do an all-AMPs RETRIEVE step executing table operator
MWW.td_kmeans_1_0 with a condition of ("(1=1)"). The size of
Spool 3 is estimated with high confidence to be 14,492,087 rows (
710,112,263 bytes). The estimated time for this step is 0.58
seconds.
7) We do an all-AMPs STAT FUNCTION step from Spool 3 by way of an
all-rows scan into Spool 10, which is redistributed by hash code
to all AMPs. The result rows are put into Spool 6 (group_amps),
which is built locally on the AMPs. This step is used to retrieve
the TOP 10 rows. Load distribution optimization is used.
If this step retrieves less than 10 rows, then execute step 8.
The size is estimated with high confidence to be 10 rows (490
bytes). The estimated time for this step is 0.58 seconds.
8) We do an all-AMPs STAT FUNCTION step from Spool 3 (Last Use) by
way of an all-rows scan into Spool 10 (Last Use), which is
redistributed by hash code to all AMPs. The result rows are put
into Spool 6 (group_amps), which is built locally on the AMPs.
This step is used to retrieve the TOP 10 rows. The size is
estimated with high confidence to be 10 rows (490 bytes). The
estimated time for this step is 0.58 seconds.
9) Finally, we send out an END TRANSACTION step to all AMPs involved
in processing the request.
-> The contents of Spool 6 are sent back to the user as the result of
statement 1.

DBQL performance data

As can been seen from the dbql data the time to score 100,000 rows is less than 1 second with most of the time being to read the input data, step 4. Step 5 is the duplication of the 5 centroids across 160 AMPs and step 6 contains the invocation of the k-means scoring routine. 














  QueryID elapsed stepnum StepName CPUTime IOcount PhysIO RowCount SpoolUsage
1 307,194,490,388,605,025 0 00:00:00.000000 1 MLK 0.00 0.00 0.00 1.00 0.00
2 307,194,490,388,605,025 0 00:00:00.000000 2 MLK 0.00 0.00 0.00 1.00 0.00
3 307,194,490,388,605,025 0 00:00:00.010000 3 MLK 0.02 0.00 0.00 160.00 0.00
4 307,194,490,388,605,025 0 00:00:00.150000 4 SAMP 7.99 7,728.00 0.00 100,000.00 6,602,752.00
5 307,194,490,388,605,025 0 00:00:00.010000 5 RET 0.41 4,320.00 0.00 800.00 7,913,472.00
6 307,194,490,388,605,025 0 00:00:00.000000 6 RET 0.41 5,440.00 0.00 100,000.00 14,516,224.00
7 307,194,490,388,605,025 0 00:00:00.010000 7 STATFN 0.22 2,121.00 0.00 10.00 6,623,232.00
8 307,194,490,388,605,025 0 00:00:00.000000 8 STATFN 0.00 0.00 0.00 ? 0.00

Summary

This article has described using several technologies

  • Teradata Query Grid
  • Spark SQL and Spark Python Domain Specific Language
  • Spark MLlib K-means algorithm
  • Teradata IDW k-means scoring function

for performing advance analytics within the Teradata UDA. It should be noted that additional Spark MLlib algorithms can be invoked in a similar manner though all of the required operational analytic functions may not exist within the Teradata IDW

Configuration Information

System Configuration

Teradata Version: 15.00.05 installed on a 4 Node 2800

Hadoop Version HDP 2.3 installed on 1 name node and 8 data nodes.

Spark Version 1.4.1

Python Version 2.6.9, note for this use case python requires the numpy package, to install

http://www.scipy.org/scipylib/building/linux.html

on name node and each data node

unzip numpy.zip

python setup.py build

python setup.py install

Query Grid Foreign Server Definitions:

CREATE FOREIGN SERVER TD_SERVER_DB.hive_server_sql
USING
hosttype('hive')
hiveserver('39.1.152.2')
hiveport('10000')
DO IMPORT WITH SYSLIB.efsspop_t2h;
CREATE FOREIGN SERVER TD_SERVER_DB.hive_server USING 
hosttype ('hive')
hiveserver ('39.1.152.2')
server ('39.1.152.2')
username ('hive')
ip_device ('byn1')
hadoop_properties('<yarn.resourcemanager.address.TDH51A=http://39.1.152.2:8050>,<dfs.client.socketcache.capacity=0>,<dfs.client.use.datanode.hostname=true>,<dfs.datanode.use.datanode.hostname=true>,<dfs.ha.namenodes.TDH51A=nn1,nn2>,<dfs.nameservices=TDH51A>,<dfs.namenode.rpc-address.TDH51A.nn1=39.1.152.2:8020>,<dfs.namenode.rpc-address.TDH51A.nn2=39.1.152.5:8020>,<dfs.client.failover.proxy.provider.TDH51A=org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider>,<oozie.base.url.TDH51A=http://39.1.152.2:11000/oozie>'
)
DO IMPORT WITH SYSLIB.LOAD_FROM_HIVE_HDP2_3_0
,DO EXPORT WITH SYSLIB.LOAD_TO_HIVE_HDP2_3_0 ;

References

[1] https://archive.org/details/nycTaxiTripData2013

[2] https://github.com/Teradata/teradataR and https://developer.teradata.com/applications/articles/in-database-analytics-with-teradata-r

[3]  https://developer.teradata.com/extensibility/articles/implementing-a-multiple-input-stream-teradata-...