Implementing a multiple input stream Teradata 15.0 Table Operator for K-means clustering

Extensibility
Extensibility covers the mechanisms by which you, as the user or developer, can extend the functionality of the Teradata Database, for example with the use of User Defined Functions, or UDFs.
Teradata Employee

Implementing a multiple input stream Teradata 15.0 Table Operator for K-means clustering

Background

This article is a follow on to article [1] which discussed implementing K-means using a Teradata release 14.10 table operator. The main contribution of this article is to discuss how to use the new Teradata 15.0 multiple input stream feature and a short discussion on a gcc compiler performance optimization.

For background on K-means and more details on the overall table operator K-means implementation see [1].  This article will focus on the changes required to take advantage of the multiple input stream feature. This new features allows the programmer to read from multiple tables (input streams) inside table operators using multiple ON clauses. This allows table operators to be applied to related groups of information derived from different data sets. The order of the ON clauses determines the stream number in the table operator implementation. The maximum number of input streams for a table operator is 16.

The attached zip file contains all of the source code referenced in this article.

K-means process flow changes

The attached SPL stored procedure run_kmeans_1_0 implements the modified K-means process flow. The key change is in the generation of the invocation of the Map Reduce style table operators. An example of the generated map/reduce SQL statement for 3 variables, K=3 and phase=1 is

CREATE MULTISET VOLATILE TABLE outc_work AS (
   SELECT clusterid
          ,NULLIFZERO(SUM(apkey)) AS cnt
          , SUM(v1) / cnt AS averagev1
, SUM(v2) / cnt AS averagev2 
, SUM(v3) / cnt AS averagev3
  FROM td_kmeans_1_0 (
                 ON (SELECT
                        CAST(-1 as INTEGER) as clusterid                     
                       ,CAST(apkey AS BIGINT) AS apkey
                       ,CAST(v1 AS FLOAT) AS v1
                       ,CAST(v2 AS FLOAT) AS v2
                       ,CAST(v3 AS FLOAT) AS v3 
                     FROM indata AS t
                    )   
                ON (SELECT * FROM outc ) DIMENSION ORDER BY clusterid                
                 USING K(3)  phase(1)                       
           ) AS d1              
   GROUP BY 1           
) WITH DATA PRIMARY INDEX (clusterid) ON COMMIT PRESERVE ROWS;

Some items to note on the SQL syntax

    • The first ON clause, stream 1, selects the input data rows to be clustered. The input row format is as follows;  INTEGER cluster identifier column, BIG INTEGER unique key identifier column and N FLOAT variable columns. The constant “-1” clusterid column is to facilitate programming simplicity. The -1 constant allows the input and output row schemas to be the same.
    • The second ON clause, stream 2, selects the input cluster definition table. In the first iteration this table is initially generated using the SAMPLE clause selecting from the input data table. The DIMENSION clause specifies that a duplicate copy of the dimension table is created for every AMP (or partition if PARTITION BY is specified) on which the function operates. The DIMENSION clause results in the tables rows being duplicated to all the AMPs.  The ORDER BY clause enforces ordering on the cluster rows allowing them to be passed to the function in clusterid order.
    • They are two USING clause Key Vale pairs. The first key "K" is the K value in the K-means process. The second Key "phase" determines whether the function will output the cluster definitions (1) or output the observations with the appended cluster identifier (2).
    • The Reduce Phase is implemented using the SQL SUM function rather than a REDUCE table operator. It should be noted that the cluster count is returned in the BIG INTEGER key column position. The main motivation was to show how you can intermix the Map / Reduce functionality. There are also some potential performance benefits because the Teradata optimizer and execution engine has robust mechanisms to handle local/global aggregate processing in a parallel environment.

Programming changes

First the K-means implementation from [1] was reorganized into three source files to ease future function changes.

    • td_kmeans_1_0.h: include header file containing all dependent include file definitions and any global structures and constants.
    • lib_td_kmeans_1_0.c:  source file containing common support routines.
    • td_kmeans_1_0.c:  source file containing the table operator contract and execution function implementations.

The changes to support multiple input streams are isolated to “td_kmeans_1_0.c”. Specifically in the execution function td_kmeans_1_0 some additional error checking is added and there are changes to support the two streams. In the function td_kmeans_1_0 there are two while loops. The first one reads the cluster definitions from the second input stream and stores the definitions in the current cluster definitions array. The second while loop reads the data rows from the first input stream and assigns them to the new cluster definition array based on minimum distance from the clusters in the current cluster definition array.

While loop to process clusters:

// read in clusters in stream 1, define stream number in FNC_TblOpOpen   
ctx->Handle = (FNC_TblOpHandle_t *)FNC_TblOpOpen(CLUSTERSTREAM, 'r', NOOPTIONS);
 
while (FNC_TblOpRead(ctx->Handle) == TBLOP_SUCCESS) 
{
   /* Access clusterid column offset 0*/
      FNC_TblOpGetAttributeByNdx(
          ctx->Handle, 0, (void **)&ctx->clusterid, &null, &length);  
   /* Access key column offset 1 */
      FNC_TblOpGetAttributeByNdx(
          ctx->Handle, 1, (void **)&ctx->pkey, &null, &length); 

      if (*ctx->clusterid > ctx->K || ctx->rowcount > ctx->K)
      {  
       sprintf(ctx->buf,"Too many clusters =%d",ctx->K);
          FNC_TblOpSetError("U0001",ctx->buf);
          cleanup_1_0(ctx);
          return;
   }
      if (add_cluster_1_0(ctx) == ERROR) /* add row to cluster heap*/
          return;               
   ctx->rowcount++;  
 }

While loop to process data rows

// Main loop: read data rows in stream 0
ctx->Handle = (FNC_TblOpHandle_t *)FNC_TblOpOpen(DATASTREAM, 'r', NOOPTIONS);    
ctx->OutHandle = (FNC_TblOpHandle_t*)FNC_TblOpOpen(DATASTREAM,'w',NOOPTIONS);

while (FNC_TblOpRead(ctx->Handle) == TBLOP_SUCCESS)
{
    /* Access key column offset 1 */
      FNC_TblOpGetAttributeByNdx(
           ctx->Handle, 1, (void **)&ctx->pkey, &null, &length); 

       if (add_row_1_0(ctx) == ERROR)    /* add row to new cluster heap*/                     
       return;
}  

Performance

The following example shows how to use the –O3 gcc compiler option to improve the performance of K-means when K * the number of variables V is large. Large is this context is ~10’s-100’s or more. It should be noted that this compile option is beneficial for many compute intensive algorithms.  A brief background description of the O3 compiler optimization from the  gcc man page “

Turning on optimization flags makes the compiler attempt to improve the performance and/or code size at the expense of compilation time and possibly the ability to debug the program.

-O3 turns on all optimizations specified by -O2 and also turns on the -finline-functions, -funswitch-loops, -fpredictive-commoning, -fgcse-after-reload, -ftree-vectorize and -fipa-cp-clone options.

The test example executes the K-means process on a 999,999,801 row table with 20 variables and K=100. In this example K*V is 2000. The test system utilized was a 4 node 2700 with 36 AMPs per node.

Input data table DDL:

CREATE SET TABLE in1    (
      uid INTEGER,
      v1 INTEGER,
      v2 INTEGER,
      v3 INTEGER,
      v4 INTEGER,
      v5 INTEGER,
      v6 INTEGER,
      v7 INTEGER,
      v8 INTEGER,
      v9 INTEGER,
      v10 INTEGER,
      v11 INTEGER,
      v12 INTEGER,
      v13 INTEGER,
      v14 INTEGER,
      v15 INTEGER,
      v16 INTEGER,
      v17 INTEGER,
      v18 INTEGER,
      v19 INTEGER,
      v20 INTEGER)
PRIMARY INDEX ( uid ,v1 )

Test SQL invocation script for K=100 and 1 iteration)

BEGIN QUERY LOGGING WITH ALL ON mww;
call run_kmeans_1_0('mww','in1','outc', '', 100, 1, 0.5, rvalue);
END QUERY LOGGING WITH ALL ON mww;

Test case1: function created from source, no compiler optimizations

CREATE FUNCTION td_kmeans_1_0()
 RETURNS TABLE VARYING USING FUNCTION td_kmeans_contract_1_0
 SPECIFIC td_kmeans_1_0
 LANGUAGE C 
 NO SQL 
 NO EXTERNAL DATA 
 PARAMETER STYLE  SQLTable 
 NOT DETERMINISTIC 
 CALLED ON NULL INPUT 
 EXTERNAL NAME 'CI!td_kmeans_1_0!td_kmeans_1_0.h!CS!lib_td_kmeans!lib_td_kmeans_1_0.c!CS!td_kmeans_1_0!td_kmeans_1_0.c'

DBQL output for one K-means iteration

 
elapsed
sl1
sl2
StepName
CPUTime
IOcount
PhysIO
RowCount

1

00:00:00.000000

1

0

MLK

0.000000

0.000000

0.000000

1.000000

2

00:00:00.000000

2

0

MLK

0.016000

0.000000

0.000000

144.000000

3

00:00:00.010000

3

0

Ctb

0.024000

1,296.000000

0.000000

144.000000

4

00:00:09.350000

4

1

RET

813.064000

460,152.000000

15,553.000000

999,999,801.000000

5

00:00:00.010000

4

2

RET

0.000000

0.000000

0.000000

14,400.000000

6

00:00:45.470000

5

0

RET

5,856.268000

464,502.000000

17,569.000000

14,400.000000

7

00:00:00.010000

6

0

SUM

0.156000

3,148.000000

68.000000

100.000000

8

00:00:00.010000

7

0

RET

0.232000

3,364.000000

0.000000

100.000000

9

00:00:00.010000

8

0

MRG

0.072000

2,480.000000

0.000000

100.000000

10

00:00:00.000000

9

0

Edt

0.000000

21.000000

0.000000

1.000000

Test case 2: function created from object compiled using the O3 compile option.

Compile command:

gcc -fpic -I /usr/tdbms/etc -g -O3 -c td_kmeans_1_0.c lib_td_kmeans_1_0.c
CREATE FUNCTION td_kmeans_1_0()
 RETURNS TABLE VARYING USING FUNCTION td_kmeans_contract_1_0
 SPECIFIC td_kmeans_1_0
 LANGUAGE C
 NO SQL
 NO EXTERNAL DATA
 PARAMETER STYLE  SQLTable
 NOT DETERMINISTIC
 CALLED ON NULL INPUT
 EXTERNAL NAME 'CO!lib_td_kmeans!lib_td_kmeans_1_0.o!CO!td_kmeans_1_0!td_kmeans_1_0.o'

DBQL output for one K-means iteration

 
elapsed
sl1
sl2
StepName
CPUTime
IOcount
PhysIO
RowCount

1

00:00:00.000000

1

0

MLK

0.000000

0.000000

0.000000

1.000000

2

00:00:00.010000

2

0

MLK

0.020000

0.000000

0.000000

144.000000

3

00:00:00.000000

3

0

Ctb

0.028000

1,296.000000

0.000000

144.000000

4

00:00:09.470000

4

1

RET

812.804000

463,131.000000

15,553.000000

999,999,801.000000

5

00:00:00.010000

4

2

RET

0.000000

0.000000

0.000000

14,400.000000

6

00:00:25.960000

5

0

RET

3,213.388000

464,502.000000

17,569.000000

14,400.000000

7

00:00:00.010000

6

0

SUM

0.228000

3,148.000000

68.000000

100.000000

8

00:00:00.000000

7

0

RET

0.148000

3,364.000000

0.000000

100.000000

9

00:00:00.010000

8

0

MRG

0.080000

2,480.000000

0.000000

100.000000

10

00:00:00.010000

9

0

Edt

0.000000

21.000000

0.000000

1.000000

Analysis of Steps in DBQL output of test case 1 and test case 2:

    • Step Line 4:In both test cases this step spools the input table in1, spooling the table is it not necessary but always done when there are multiple input streams.
    • Step line 5: In both test cases duplicate the cluster definitions DIMENSION table. Observe the resulting spool row count is 14,400 (144 AMPs * K = 100).
    • Step line 6: In both test cases this is the map phase table operator td_kmeans_1_0 execution.  Observe there are 999,999,801 input rows and 14,400 output rows. Also observe that the the compiler optimization reduced the CPU path length from test case 1 value of 5,856 to test case 2 value of 3,213 cpu seconds, a 45% reduction.
    • Step Line 7: In both test cases this is the reduce phase SUM operation. There are 14,400 input rows and 100 output rows (the 100 cluster definitions).

Reference

[1] , http://developer.teradata.com/extensibility/articles/k-means-clustering-and-teradata-14-10-table-ope...