Teradata Query Grid and Machine Learning in Hadoop

Extensibility
Extensibility covers the mechanisms by which you, as the user or developer, can extend the functionality of the Teradata Database, for example with the use of User Defined Functions, or UDFs.
Teradata Employee

Teradata Query Grid and Machine Learning in Hadoop

This article describes how to use Teradata query grid to execute a Mahout machine learning algorithm on a Hadoop cluster based on data sourced from the Teradata Integrated Data Warehouse. Specifically the Mahout K-means cluster analysis algorithm is demonstrated.  K-means is a computationally expensive algorithm that under certain conditions is advantageous to execute on the Hadoop cluster. Query Grid is an enabling technology for the Teradata Unified Data Architecture (UDA). The UDA is a logical and physical architecture that adds a discovery platform and a data platform to complement the Teradata Integrated Data Warehouse. In the Teradata advocated solution, the discovery platform is Teradata Aster, while the data platform can either be Hadoop or a Teradata Integrated Big Data Platform optimized for storage and processing of big data. Query Grid is an orchestration mechanism that supports seamless integration of multiple types of purpose built analytic engines.

The article describes several techniques

  • How to use query grid to bidirectionally transfer data between Teradata and Hadoop
  • How to convert Teradata exported data to a format consumable by the Mahout K-means algorithm
  • How to use query grid to execute a Mahout analytic on the Hadoop cluster
  • How to convert the Mahout K-means algorithm output to a format consumable by Teradata

System Configuration

Teradata Version: 15.0 installed on a 4 Node 2700

Hadoop Version: Hadoop 2.4.0.2.1.5.0-695 installed on 1 name node and 8 data nodes.

Mahout Version: 0.9.0.2.1.5.0-695. Mahout Installed on the Hadoop cluster using "sudo yum install mahout".

Hive Version: 13.0

Query Grid Foreign Server Definition:

CREATE FOREIGN SERVER TD_SERVER_DB.tdsqlhhive USING 
hosttype  ('hadoop')
port  ('9083')
hiveport  ('10000')
username  ('hive')
server  ('39.16.72.2')
hiveserver  ('39.16.72.2')
DO IMPORT WITH SYSLIB.LOAD_FROM_HCATALOG USING
transformformatting  ('true')
,DO EXPORT WITH SYSLIB.LOAD_TO_HCATALOG USING
hbasepath  ('/apps/hive/warehouse/')
export_file_format  ('text')

HDFS Disk Space Usage

/apps/hive/warehouse/temp:   Hive temporary work space for query grid input and output data

/tmp/mahout:  HDFS temporary workspace for Mahout K-means algorithm and Map Reduce conversion Jobs

Step 1:

Export Teradata table to Hive table. The assumption is that the input K-means Teradata table row layout is a single BIG INTEGER key value followed by N floating point variables.  In a specific test case the DDL is as follows:

Teradata table definition.

CREATE SET TABLE kmdata     (
      pkey BIGINT GENERATED ALWAYS AS IDENTITY  (START WITH 1 INCREMENT BY 1 MINVALUE -2147483647 MAXVALUE 2147483647  NO CYCLE),
      v1 FLOAT,
      v2 FLOAT,
      v3 FLOAT)
PRIMARY INDEX ( pkey );

Query Grid SQL commands to create a Hive table and export a Teradata table.

CALL SYSLIB.HCTAS('kmdata',null,null,'tdsqlhhive','temp') ;
INSERT INTO temp.kmdata@tdsqlhhive SELECT * FROM kmdata;

Hive table Definition:

describe kmdata;
pkey                    bigint
v1                      double
v2                      double
v3                      double

Note if the data to be exported to Hive is “small”,  by default query grid minimally creates one HDFS block per AMP, you may want to use the foreign server “merge_hdfs_files('value’)” option. This option indicates that files under the same partition should be merged whenever possible. The default is to not merge. A value of TRUE means that files will be merged FALSE otherwise. The following Teradata DDL command can be used to modify this option  “ALTER FOREIGN SERVER tdsqlhhive ADD merge_hdfs_files('true') ;”

Step 2:

Convert Hive data to a format consumable by Mahout K-means. Mahout K-means requires the input data to be in the following format: SequenceFile (Key, VectorWritable) where VectorWritable is a vector containing the variables which define the records to be clustered.  In addition if you want the key values associated with each clustered record you need to create NamedVector input vectors where the Name contains the Key value.  A more seamless approach to this use case would have been to use a Hive UDF, specifically as Generic UDF, to handle the data format conversions.  Unfortunately two issues where encountered with Hive UDFs:

  • Could not associate a Mahout external JAR with Hive 13 UDF.  “ADD JAR” did not resolve this Mahout "classdefnotfounderror" issue.
  • The Hive sequence file serde does not provide the key values to the UDF, or any reader. The key value contains the K-means cluster assignment value which is the main purpose of running K-means.

Because of the Hive UDF issue the following java MapReduce Mapper code was used to do the conversions. Mahout classes where used to construct the input vectors.  In addition by default query grid exports data to hive a text file format using the default field delimiter of ‘\u0001’, the defined delimiter value is passed to the mapper in the configuration object. The map reduce job will create an appropriately formatted HDFS file in the HDFS directory /tmp/mahout/input

Map Reduce Job code to convert a CSV textfile to a vector sequence file:

package org.myorg;

import java.io.IOException;
import java.util.*;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;
import org.apache.mahout.math.*;

public class CsvToVec extends Configured implements Tool {

public static class MapCsvVec extends Mapper<LongWritable, Text, Text, VectorWritable> {
@Override
   public void map(LongWritable key, Text value, Context context) throws IOException {
       Configuration conf = context.getConfiguration();
       String delim = conf.get("delim");
       String line = value.toString();
       String[] c = line.split(delim);
       int vlen = c.length - 1; //first field is key, remaining are variables  
     double[] d = new double[vlen];
       String pkey = new String(c[0]); //access key in field 0
      Text opkey = new Text(pkey);

       for (int i = 0; i < vlen; i++) //create array of variables
          d[i] = Double.parseDouble(c[i+1]);

       NamedVector vec = new NamedVector(new DenseVector(d), pkey );
       VectorWritable writable = new VectorWritable();
       writable.set(vec);
       try {
           context.write(opkey, writable);
       } catch(InterruptedException e) {
       }
   }
}

  public int run(String[] args) throws Exception {
    Job job = Job.getInstance(super.getConf());

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(VectorWritable.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(VectorWritable.class);

    job.setMapperClass(MapCsvVec.class);
    job.setNumReduceTasks(0);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);

    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.setJarByClass(CsvToVec.class);

    job.waitForCompletion(true);
    return 0;
    }

 public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    conf.set("delim",otherArgs[2]);
    int res = ToolRunner.run(conf, new CsvToVec(), args);
    System.exit(res);
 }
}

Compile command for CsvToVec.java:

export HADOOP_HOME=/usr/lib/hadoop
export HADOOP_VERSION=2.4.0.2.1.5.0-69
export JAVA_HOME=/usr/jdk64/jdk1.7.0_45/bin
mkdir csvtovec_classes
$JAVA_HOME/javac -Xlint -classpath ${HADOOP_HOME}/hadoop-common.jar:/usr/lib/hadoop-mapreduce/hadoop-mapreduce-client-core.jar:/usr/lib/mahout/mahout-math-0.9.0.2.1.5.0-695.jar:/usr/lib/mahout/mahout-core-0.9.0.2.1.5.0-695-job.jar:/usr/lib/mahout/mahout-integration-0.9.0.2.1.5.0-695.jar -d csvtovec_classes CsvToVec.java
$JAVA_HOME/jar -cvf CsvToVec.jar -C csvtovec_classes/ .

Map Reduce invocation bash shell script: assume run as root

MAHOUT_VERSION=0.9.0.2.1.5.0-695
MAHOUT_HOME=/usr/lib/mahout
HIVEPATH=/apps/hive/warehouse/temp.db
LIBJARS=$MAHOUT_HOME/mahout-math-$MAHOUT_VERSION.jar,$MAHOUT_HOME/mahout-core-$MAHOUT_VERSION-job.jar,,$MAHOUT_HOME/mahout-integration-$MAHOUT_VERSION.jar
export HADOOP_CLASSPATH=$MAHOUT_HOME/mahout-math-$MAHOUT_VERSION.jar:$MAHOUT_HOME/mahout-core-$MAHOUT_VERSION-job.jar:$MAHOUT_HOME/mahout-integration-$MAHOUT_VERSION.jar
infile=/tmp/mahout/input
delim='\u0001'
#clean work directory
hadoop fs -rm -r /tmp/mahout
hadoop jar CsvToVec.jar org.myorg.CsvToVec -libjars ${LIBJARS} $HIVEPATH/kmdata $infile $delim
# The Hive transform function will invoke Mahout as user YARN so make accessible by YARN
hadoop fs -chmod 777 /tmp/mahout

Note on using libjars when invoking the map reduce job. You have to use the Configuration object passed to the ToolRunner.run method in your MapReduce driver code. Otherwise your job won’t be correctly configured and the Mahout JAR’s won’t be accessible in the mappers JVM’s.

Step 3:

Execute Mahout K-means to cluster the input data based on the N input variables, in this example v1,v2,v3. The Hive TRANSFORM function is used to execute K-means. The Hive TRANSFORM function can be used within the FOREGIN TABLE query grid syntax to execute a “script” on the Hadoop cluster.  Hive Transform allows users to plug in their own custom mappers and reducers in the Hive query processing data stream by implementing mappers and the reducers as “scripts”. The specific usage in this case of query grid and Mahout K-means is to use TRANSFORM to execute  a BASH shell script that invokes Mahout K-means and processes the data stream external to Hive.   For more details on TRANSFORM see https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Transform

Because the FOREIGN TABLE syntax does not support the ADD FILE command the TRANSFORM script to run K-means must be installed on each node within the Hadoop cluster. Note the Hive ADD FILE is the typical mechanism to distribute TRANSFORM scripts. Also a one row one integer column table named temp.onerow is used to satisfy the Transform input data requirements.

Query Grid SQL used to invoke K-means:

SELECT trim(BOTH FROM t1.x) FROM FOREIGN TABLE (
 SELECT
  TRANSFORM(c1,'/tmp/mahout/input','/tmp/mahout/kmeans/output','/tmp/mahout/kmeans/clusters-0-final',10,10,0.5) USING 'bash /tmp/runmh' AS (oc1)
  FROM temp.onerow
)@tdsqlhhive t1 (x) ;

The input parameters to TRANSFORM will be passed to the BASH shell script and accessible from STDIN. The second parameter to TRANSFORM is a comma separated list of values defined as: input directory, output directory, initial cluster definitions work directory, Maximum Number of Iterations, K value, match error tolerance. See the following for all mahout K-means command line input definitions, https://mahout.apache.org/users/clustering/k-means-commandline.html . The USING clause contains the script to execute, in this example a BASH shell script "/tmp/runmh".

BASH shell script runmh: installed in /tmp directory on all Hadoop data nodes. 

#!/bin/bash
#
export JAVA_HOME=/usr/jdk64/jdk1.7.0_45/bin/java

logfile=/tmp/log.txt
# read TRANSFORM inputs from STDIN
read col1value infile outfile workclusters iter K tolerance INPUT

/usr/bin/mahout kmeans --input $infile  --output $outfile  --numClusters $K   --clusters $workclusters --maxIter $iter  --method mapreduce --clustering -ow -cl -cd $tolerance > $logfile

echo "Initiated on cluster node $(uname -n)"
res=$(<$logfile)
echo "$res"
rm $logfile

Step 4

Convert K-means output to a format consumable by Teradata and Hive. Mahout K-means output is in the format SequenceFile (Key, VectorWritable).  Where the Key is the cluster assignment and the Vector is a Named Vector of the Format {Primary Key, v1, v2, …Vn). Because of the Hive UDF issue and the fact that Hive sequence file serde does not return the Key the following java MapReduce code was used to do the conversions. This process creates an output external Hive table named kmout in the /tmp/mahout directory.

Map Reduce Job code to convert a sequence file vector to a CSV text file.

package org.myorg;

import java.io.IOException;
import java.util.*;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;
import org.apache.mahout.math.*;
import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable;
import org.apache.mahout.utils.vectors.VectorHelper;

public class VecToCsv extends Configured implements Tool {

public static class MapVecCsv extends Mapper<IntWritable, WeightedPropertyVectorWritable, NullWritable, Text> {
@Override
   public void map(IntWritable key, WeightedPropertyVectorWritable value, Context context) throws IOException, InterruptedException {
       String skey = key.toString();
       Text okey = new Text(skey);
       NamedVector vector =  ((NamedVector)value.getVector());
       String resStr = skey + "," + vector.getName() + "," + VectorHelper.vectorToCSVString(vector, false);
       resStr = resStr.replaceAll("(\r|\n)", "");
       Text outvalue = new Text(resStr);

       try {
           if (resStr.length() > 0) {
              context.write(NullWritable.get(), outvalue);
           }
       } catch(InterruptedException e) {
          throw e;
       }
   }
}

  public int run(String[] args) throws Exception {

    Job job = Job.getInstance(super.getConf());

    job.setMapOutputKeyClass(NullWritable.class);
    job.setMapOutputValueClass(Text.class);

    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(Text.class);

    job.setMapperClass(MapVecCsv.class);
    job.setNumReduceTasks(0);

    job.setInputFormatClass(SequenceFileInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.setJarByClass(VecToCsv.class);

    job.waitForCompletion(true);
    return 0;
    }

 public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    int res = ToolRunner.run(conf, new VecToCsv(), args);
    System.exit(res);
 }
}

Compile Command for VecToCsv.java

export HADOOP_HOME=/usr/lib/hadoop
export HADOOP_VERSION=2.4.0.2.1.5.0-69
export JAVA_HOME=/usr/jdk64/jdk1.7.0_45/bin
mkdir vectocsv_classes
$JAVA_HOME/javac -Xlint -classpath ${HADOOP_HOME}/hadoop-common.jar:/usr/lib/hadoop-mapreduce/hadoop-mapreduce-client-core.jar:/usr/lib/mahout/mahout-math-0.9.0.2.1.5.0-695.jar:/usr/lib/mahout/mahout-core-0.9.0.2.1.5.0-695-job.jar:/usr/lib/mahout/mahout-integration-0.9.0.2.1.5.0-695.jar -d vectocsv_classes VecToCsv.java
$JAVA_HOME/jar -cvf ./VecToCsv.jar -C vectocsv_classes/ .

Map Reduce invocation bash shell Script: assume run as root

MAHOUT_VERSION=0.9.0.2.1.5.0-695
MAHOUT_HOME=/usr/lib/mahout
LIBJARS=$MAHOUT_HOME/mahout-math-$MAHOUT_VERSION.jar,$MAHOUT_HOME/mahout-core-$MAHOUT_VERSION-job.jar,$MAHOUT_HOME/mahout-integration-$MAHOUT_VERSION.jar
export HADOOP_CLASSPATH=$MAHOUT_HOME/mahout-math-$MAHOUT_VERSION.jar:$MAHOUT_HOME/mahout-core-$MAHOUT_VERSION-job.jar:$MAHOUT_HOME/mahout-integration-$MAHOUT_VERSION.jar
HIVEPATH=/apps/hive/warehouse/temp.db
TEMPPATH=/tmp/mahout
infile=$TEMPPATH/kmeans/output/clusteredPoints
# convert to csv
hadoop jar VecToCsv.jar org.myorg.VecToCsv -libjars ${LIBJARS} $infile  $TEMPPATH/kmout

Create Hive external table to be used for Query Grid import

use temp;
DROP TABLE kmout;
CREATE EXTERNAL TABLE kmout (clusterid INT, pkey BIGINT, v1 FLOAT, v2 FLOAT, v3 FLOAT) ROW FORMAT DELIMITED  FIELDS TERMINATED BY ',' STORED AS TEXTFILE
              LOCATION '/tmp/mahout/kmout';

Step 5

Import the data to Teradata and cleanup the temp Hive tables

SELECT * FROM temp.kmout@tdsqlhhive;

CALL SYSLIB.HDROP('temp','kmdata','tdsqlhhive');
CALL SYSLIB.HDROP('temp','onerow','tdsqlhhive');

Example Output for 1,000,000 input rows and K=10

SELECT clusterid,COUNT(*),MIN(v1),MAX(v1) FROM temp.kmout@tdsqlhhive GROUP BY 1 ORDER BY 3;

 *** Query completed. 10 rows found. 4 columns returned.
 *** Total elapsed time was 48 seconds.

  clusterid     Count(*)             Minimum(v1)             Maximum(v1)
-----------  -----------  ----------------------  ----------------------
     532949       139506   1.00000000000000E 000   1.39506000000000E 005
     397723       128241   1.39507000000000E 005   2.67747000000000E 005
     495004       109499   2.67748000000000E 005   3.77246000000000E 005
      43426        89500   3.77247000000000E 005   4.66746000000000E 005
     557758        74702   4.66747000000000E 005   5.41448000000000E 005
     235205        69648   5.41449000000000E 005   6.11096000000000E 005
     138421        75580   6.11097000000000E 005   6.86676000000000E 005
     656378        89851   6.86677000000000E 005   7.76527000000000E 005
     150185       106277   7.76528000000000E 005   8.82804000000000E 005
     260781       117196   8.82805000000000E 005   1.00000000000000E 006

Summary:

This article has described using several technologies

  • Teradata Query Grid
  • Hive Transform Function
  • Mahout K-means
  • Map Reduce   

for performing advance analytics within the Teradata UDA. It should be noted that additional Mahout algorithms can be invoked in a similar manner assuming the input data format requirements have been met.  Future articles will discuss executing other Mahout algorithms and replacing the conversion MapReduce jobs with a Hive UDF to streamline the process.

5 REPLIES

Re: Teradata Query Grid and Machine Learning in Hadoop

nice ...

Enthusiast

Re: Teradata Query Grid and Machine Learning in Hadoop

Nice Article, Mike.

Does it allows ANSI Sql access to Hadoop data?

Teradata Employee

Re: Teradata Query Grid and Machine Learning in Hadoop

In general query grid supports executing SQL in the hive hadoop engine. SO the question becomes does hive support the syntax you want to execute on the hadoop cluster.

Teradata Employee

Re: Teradata Query Grid and Machine Learning in Hadoop

Hi Mike, 

Great article. Would be curious to see how it would perform if we swap Mahout for Spark MLib. 

Also does Query Grid supports Hive on Spark? In theory it's no different but wonder if it's been benchmarked  on our Hadoop appliance. 

Thanks!

Teradata Employee

Re: Teradata Query Grid and Machine Learning in Hadoop

in general k-means is an analytic that will not benefit as much from in memory RDDs as say would logistic regression. Analytics with a short per iteration invocation time duration and high iteration value would benefit the most.  Currently query grid only support hive but plans are in place to support presto.