Performance analysis of Query Grid queries using Teradata DBQL

UDA
The UDA channel is for Teradata’s Unified Data Architecture including the Analytical Ecosystem and other UDA influences. This channel provides information specific to the integration and co-existence of multiple systems, in particular when a mix of Aster, Teradata, and Hadoop are present. It is also meant to support information around the UDA enabling technologies so products like Viewpoint, Data Mover, Connectors, QueryGrid, etc.
Teradata Employee

Performance analysis of Query Grid queries using Teradata DBQL

Introduction

The following article describes how to use Teradata DBQL tables to analyze the performance of Teradata to Hadoop (T2H) queries that transfer data from Hadoop to Teradata. This articles focuses on the steps that are part of the interaction between Teradata and Hadoop, it should be clear that query can have multiple other pre and post processing Teradata steps.

T2H has two processing methods

1)  Direct HDFS Read (DHR): The Teradata database directly reads from the HDFS file system using a Hadoop remote data access API. This approach uses little Hadoop resources and does not interact with Hive or Yarn. It accesses HDFS returning data in any of the supported data formats; Text, ORC, etc.

2) Hive Query Execution (HQE): This access method is based on the Foreign table syntax or the “Use native query” approach that issues a SQL statement to Hive to build a temporary table in the text HDFS format which is then read using the DHR approach. This approach can consume significant Hadoop resources and is under the control of YARN resource management.

Consider the following two queries which will be profiled in this article.

/* Query 1 */ SELECT TOP 20 * FROM mww.store_sales_1@hdp_test WHERE ss_sold_date_sk < 2451722 AND partkey = '1’

/* Query 2 */ SELECT TOP 20 * FROM FOREIGN TABLE (SELECT  * FROM mww.store_sales_1 WHERE ss_sold_date_sk < 2451722 AND partkey = '1’)@ hdp_test AS d

Query 1 uses the DHR method and Query 2 uses the HQE method.

The first item to consider is there are three phases of a query grid query execution as explained in the following table

Phase

Name

Description

Teradata STEP integration

1

Hadoop query execution and data mapping

If this is a HQE type of access a query is issued to Hive that will build a TEXT formatted result table in HDFS. The lowest AMP in the system will initiate the Hive query and that AMP will define the HDFS data to AMP mapping assignments. This step will output the data mapping assignment information to the output spool where the spool row key field will be defined so that the designated AMP will read and process this row.

This phase will be invoked from an all AMP RET step with the following explain description

“all-AMPs RETRIEVE step executing table operator      SYSLIB.load_from_hcatalog ..”

In all cases the AMP step communicates using a local inter process communication (IPC) mechanism with a Teradata node resident JVM. The JVM communicates with the “Hadoop cluster” using a remote IPC mechanism typically across the BYNET. The Teradata node resident JVM is the same JVM that is used to execute JAVA UDFs.

2

Data mapping redistribution

Redistribute data mappings to all amps

This phase will be invoked from an all AMP RET step which reads form the spool built in the first phase and redistributes by a hash key to all AMPs.

3

Data transfer and Spool

This phase reads the data from HDFS using a native Hadoop API: Tasks

JVM: Read data buffers from network

JVM: Convert the data into Teradata indicdata format

JVM: Transfer data to AMP

AMP: build Teradata internal row

AMP: Apply any constant conditions

AMP: Write rows to spool

This phase will be invoked from an all AMP RET step which reads the data mappings from the spool built in phase 2. with the following explain description

“We do an all-AMPs RETRIEVE step from Spool 3 (Last Use) by way of      an all-rows scan executing table operator SYSLIB.load_from_hcatalog with a condition”

DBQL Analysis

Now let’s review the DBQL output for the defined queries. You can use DBQL to obtain some of the key metrics for Teradata to Hadoop queries. The key metrics are

  1. Row count transferred  between Hadoop and Teradata JVM
  2. Data volume transferred from Teradata JVM to AMP
  3. Data volume transferred from AMP to spool
  4. Data transfer step skew
  5. CPU consumed to transform the data from HDFS format to Teradata format and write to spool
  6. Logical and Physical IO consumed
  7. Elapsed time of the hadoop processing
  8. Elapsed time of the data transfer processing
  9. Total elapsed time

Step level DBQL for Query 1, DBQL query syntax is in additional information section at the end of the article.

Stepelapsed

level1

StepName

cputime

RowCount

logicalio

PhysIO

MaxAmpCPU

MinAmpCPU

SpoolUsage

ServerByteCount

0 00:00:01.560000

1

RET

0.66

374.00

1,133.00

0.00

0.45

0.00

1,855,488.00

?

0 00:00:00.010000

2

RET

0.16

374.00

2,410.00

0.00

0.01

0.00

2,842,624.00

?

0 00:01:55.740000

3

RET

3,485.59

93,954,141.00

23,206.00

160.00

34.20

17.88

16,917,512,192.00

50,955,355,270.00

0 00:00:00.480000

4

RET

53.69

93,954,141.00

59,840.00

1,544.00

34.53

18.10

16,165,400,576.00

?

0 00:00:00.010000

5

STATFN

0.12

20.00

2,122.00

0.00

0.01

0.00

16,165,457,920.00

?

?

6

STATFN

?

?

?

0.00

?

?

0.00

?

0 00:00:00.000000

7

Edt

0.09

160.00

0.00

0.00

0.00

0.00

16,165,457,920.00

?

Explanation of step level DBQL query results.

Teradata Step

Phase

Description

1: RET

1

Elapsed time of less than 2 seconds and consumes very little Teradata or Hadoop resources. This step writes 374 rows to spool which contain the AMP to HDFS data mappings.

2: RET

2

Elapsed time less than 1 second and consumes very little resources. This step redistributes the mappings to the designated AMP.

3: RET

3

Data transfer step as noted by a non NULL DBQL field ServerByteCount. Approximately 50GB are written to the AMP from the JVM that is executing the table operator. After the 50GB is written to the AMP the AMP writes only 17GB written to spool as 93,954,141 rows. This is because the “<” condition can not be applied directly to the HDFS file, only partition eliminations are pushed to Hadoop with the DHR approach. The condition is applied by the AMP before writing to spool, this is only true for literal conditions.   This step consumed 3,485 CPU seconds to process the data. There is minimal physical IO because the data completely fits in the FSG cache. You can observe the Max/Min CPU consumption vary between 17.8 and 34.2 CPU seconds. There are 160 AMPs in this system and there are 374 splits as defined by the explain. This results in 54 AMPs having 3 blocks and 106 AMPs having 2 blocks. Not this partition is 47GB and the block size is 128MB so there are roughly 47,000/128 or 367 data blocks. You can browse the HDFS structures using this URL http://10.25.11.24:50070/explorer.html#/apps/hive/warehouse/mww.db/store_sales_1/partkey=1 where the ip address is that of the name node and the path is the HDFS path to the table / partition.

4: RET

NA

Retrieve step use to organize the spool for the TOP statistical function.

5: STAT

NA

TOP execution

6: STAT

NA

TOP execution

7: EDT

NA

End of Transaction

Notes: The amount of data transferred between the Hadoop and the Teradata JVM can vary based on file type and compression, for text it is usually close to the server byte count. If you want to monitor the “on the wire”  rate you need to use an operating system level tool like “sar –n DEV”. Also this data transfer activity is not logged in resusage BYNET fields.  

Step level DBQL for Query 2

stepelapsed

level1

StepName

cputime

RowCount

logicalio

PhysIO

MaxAmpCPU

MinAmpCPU

SpoolUsage

ServerByteCount

0 00:01:00.050000

1

RET

0.69

374.00

1,133.00

0.00

0.45

0.00

1,867,776.00

?

0 00:00:00.000000

2

RET

0.16

374.00

2,410.00

0.00

0.00

0.00

2,842,624.00

?

0 00:00:32.380000

3

RET

981.71

93,954,141.00

23,206.00

160.00

9.89

5.07

16,917,512,192.00

13,852,524,148.00

0 00:00:00.010000

4

STATFN

0.20

20.00

2,122.00

0.00

0.01

0.00

16,917,573,632.00

?

?

5

STATFN

?

?

?

0.00

?

?

0.00

?

0 00:00:00.000000

6

Edt

0.04

160.00

0.00

0.00

0.00

0.00

16,917,573,632.00

?

Explanation of step level DBQL query results.

Teradata Step

Phase

Description

1: RET

1

Step has an elapsed time of 60 seconds and consumes very little Teradata resources. This elapsed time is the time for hive to execute the foreign table SQL and build a temporary result table with the defined result. This step writes 374 rows to spool which contain the AMP to HDFS data mappings.

2: RET

2

Step has a short elapsed time and consumer very little resources. This step redistributes the mappings to the designated AMP.

3: RET

3

Data transfer step and as noted by the non NULL DBQL field ServerByteCount field. 13.8GB are written to the AMP from the JVM that is executing the table operator. For this case the input table is always text format so the amount of data transferred between the JVM and Hadoop is close to the “on the wire” volume. After the 13.8GB is written to the AMP the AMP writes 16.9GB to spool as 93,954,141 rows. This is because the extra bytes required for the Teradata row overhead (15 bytes per row) and data type and UTF8 to UTF16 string conversions.

This step consumed 981 CPU seconds to process the data. This is significantly less than for query 1 because there is more row filtering on the hadoop side. There is minimal physical IO because the data completely fits in the FSG cache. You can observe the Max/Min CPU consumption vary between 5.07 and 9.892 CPU seconds.

4: STAT

NA

TOP execution

5: STAT

NA

TOP execution

6: EDT

NA

End of Transaction

Summary

The following article described how to obtain some of the key metrics for Teradata to Hadoop query execution. It is important to understand these factors when performing performance analysis or capacity planning.

Additional Information

Test system

UDA: 4 2800 nodes and 1 name node and 8 data nodes

Teradata release: 15.00.02.08

Hadoop Release: HDP 2.1

DBQL Query

SELECT 
    s.queryid
    ,(st.stepstoptime - st.stepstarttime) DAY(4) TO SECOND as stepelapsed
    ,st.steplev1num as level1
    ,st.steplev2num as level2
    ,st.stepname
    ,cast(st.cputime as decimal (18,2)) as cputime
    ,st.rowcount
    ,st.iocount as LogicalIO
    ,st.physio as PhysicalIO
    ,st.MaxAmpCPUTime
    ,st.MinAmpCPUTime
    ,st.SpoolUsage
    ,st.ServerByteCount
FROM dbc.dbqlogtbl d, dbc.dbase db, dbc.dbqlsqltbl s, dbc.dbqlsteptbl st
WHERE d.userid = db.databaseid
AND d.procid=s.procid
AND d.queryid=s.queryid
AND s.queryid=st.queryid
AND s.procid=st.procid
ORDER BY 1,3,4

Explain Query 1

EXPLAIN SELECT TOP 20 * FROM mww.store_sales_1@hdp_test WHERE ss_sold_date_
sk < 2451722 AND partkey = '1';

 *** Help information returned. 72 rows.
 *** Total elapsed time was 1 second.

Explanation
---------------------------------------------------------------------------
  1) First, we do an all-AMPs RETRIEVE step executing table operator
     SYSLIB.load_from_hcatalog with a condition of ("(1=1)").  The size
     of Spool 2 is estimated with low confidence to be 160 rows (
     3,426,240 bytes).  The estimated time for this step is 0.02
     seconds.
  2) Next, we do an all-AMPs RETRIEVE step from Spool 2 (Last Use) by
     way of an all-rows scan into Spool 3 (used to materialize view,
     derived table, table function or table operator TblOpInputSpool)
     (all_amps), which is redistributed by hash code to all AMPs.  The
     size of Spool 3 is estimated with low confidence to be 160 rows (
     3,426,240 bytes).  The estimated time for this step is 0.03
     seconds.
  3) We do an all-AMPs RETRIEVE step from Spool 3 (Last Use) by way of
     an all-rows scan executing table operator
     SYSLIB.load_from_hcatalog with a condition of (
     "(store_sales_1.SS_SOLD_DATE_SK < 2451722) AND
     (store_sales_1.PARTKEY = '1')") into Spool 4 (used to materialize
     view, derived table, table function or table operator
     store_sales_1) (all_amps), which is built locally on the AMPs.
     < BEGIN EXPLAIN FOR REMOTE QUERY -->
     We use tdsqlh_td 15.00.02.01DR175750 to connect to tdsqlh_hdp
     02.01.02.01 Hive Metastore server(39.0.80.3) on port 9083, then we
     retrieve and process 374 hadoop splits for partitions partkey =
     "1" that is about 50163076361 bytes worth of rowdata from remote
     table mww.store_sales_1 for the qualifying columns
     (ss_sold_date_sk,ss_sold_time_sk,ss_item_sk,ss_customer_sk,ss_cdemo_sk
     ,ss_hdemo_sk,ss_addr_sk,ss_store_sk,ss_promo_sk,ss_ticket_number,ss_qu
     antity,ss_wholesale_cost,ss_list_price,ss_sales_price,ss_ext_discount_
     amt,ss_ext_sales_price,ss_ext_wholesale_cost,ss_ext_list_price,ss_ext_
     tax,ss_coupon_amt,ss_net_paid,ss_net_paid_inc_tax,ss_net_profit,partke
     y) and map them to the following Teradata output
     columns. ss_sold_date_sk INT => INTEGER_DT, ss_sold_time_sk INT =>
     INTEGER_DT, ss_item_sk INT => INTEGER_DT, ss_customer_sk INT =>
     INTEGER_DT, ss_cdemo_sk INT => INTEGER_DT, ss_hdemo_sk INT =>
     INTEGER_DT, ss_addr_sk INT => INTEGER_DT, ss_store_sk INT =>
     INTEGER_DT, ss_promo_sk INT => INTEGER_DT, ss_ticket_number BIGINT
     => BIGINT_DT, ss_quantity INT => INTEGER_DT, ss_wholesale_cost
     DOUBLE => REAL_DT, ss_list_price DOUBLE => REAL_DT, ss_sales_price
     DOUBLE => REAL_DT, ss_ext_discount_amt DOUBLE => REAL_DT,
     ss_ext_sales_price DOUBLE =>REAL_DT, ss_ext_wholesale_cost DOUBLE
     => REAL_DT, ss_ext_list_price DOUBLE => REAL_DT, ss_ext_tax DOUBLE
     => REAL_DT, ss_coupon_amt DOUBLE => REAL_DT, ss_net_paid DOUBLE =>
     REAL_DT, ss_net_paid_inc_tax DOUBLE => REAL_DT, ss_net_profit
     DOUBLE => REAL_DT, partkey STRING => VARCHAR_DT
     <-- END EXPLAIN FOR REMOTE QUERY >
     The size of Spool 4 is estimated with low confidence to be 160
     rows (245,120 bytes).  The estimated time for this step is 0.02
     seconds.
  4) We do an all-AMPs RETRIEVE step from Spool 4 (Last Use) by way of
     an all-rows scan with a condition of ("(store_sales_1.PARTKEY =
     '1') AND (store_sales_1.SS_SOLD_DATE_SK < 2451722)") into Spool 6
     (all_amps), which is built locally on the AMPs.  The size of Spool
     6 is estimated with low confidence to be 160 rows (244,160 bytes).
     The estimated time for this step is 0.02 seconds.
  5) We do an all-AMPs STAT FUNCTION step from Spool 6 by way of an
     all-rows scan into Spool 9, which is redistributed by hash code to
     all AMPs.  The result rows are put into Spool 5 (group_amps),
     which is built locally on the AMPs.  This step is used to retrieve
     the TOP 20 rows.  Load distribution optimization is used.
     If this step retrieves less than 20 rows, then execute step 6.
     The size is estimated with low confidence to be 20 rows (49,260
     bytes).
  6) We do an all-AMPs STAT FUNCTION step from Spool 6 (Last Use) by
     way of an all-rows scan into Spool 9 (Last Use), which is
     redistributed by hash code to all AMPs.  The result rows are put
     into Spool 5 (group_amps), which is built locally on the AMPs.
     This step is used to retrieve the TOP 20 rows.  The size is
     estimated with low confidence to be 20 rows (49,260 bytes).
  7) Finally, we send out an END TRANSACTION step to all AMPs involved
     in processing the request.
  -> The contents of Spool 5 are sent back to the user as the result of
     statement 1.

Explain Query 2

EXPLAIN SELECT TOP 20 * FROM FOREIGN TABLE (SELECT  * FROM mww.store_sales_
1 WHERE ss_sold_date_sk < 2451722 AND partkey = '1')@ hdp_test AS d;

 *** Help information returned. 39 rows.
 *** Total elapsed time was 1 second.

Explanation
---------------------------------------------------------------------------
  1) First, we do an all-AMPs RETRIEVE step executing table operator
     SYSLIB.load_from_hcatalog with a condition of ("(1=1)").  The size
     of Spool 2 is estimated with low confidence to be 160 rows (
     3,426,240 bytes).  The estimated time for this step is 0.02
     seconds.
  2) Next, we do an all-AMPs RETRIEVE step from Spool 2 (Last Use) by
     way of an all-rows scan into Spool 3 (used to materialize view,
     derived table, table function or table operator TblOpInputSpool)
     (all_amps), which is redistributed by hash code to all AMPs.  The
     size of Spool 3 is estimated with low confidence to be 160 rows (
     3,426,240 bytes).  The estimated time for this step is 0.03
     seconds.
  3) We do an all-AMPs RETRIEVE step from Spool 3 (Last Use) by way of
     an all-rows scan executing table operator
     SYSLIB.load_from_hcatalog with a condition of ("(1=1)") into Spool
     4 (used to materialize view, derived table, table function or
     table operator d) (all_amps), which is built locally on the AMPs.
     The size of Spool 4 is estimated with low confidence to be 160
     rows (245,120 bytes).  The estimated time for this step is 0.02
     seconds.
  4) We do an all-AMPs STAT FUNCTION step from Spool 4 by way of an
     all-rows scan into Spool 9, which is redistributed by hash code to
     all AMPs.  The result rows are put into Spool 5 (group_amps),
     which is built locally on the AMPs.  This step is used to retrieve
     the TOP 20 rows.  Load distribution optimization is used.
     If this step retrieves less than 20 rows, then execute step 5.
     The size is estimated with low confidence to be 20 rows (49,260
     bytes).  The estimated time for this step is 0.02 seconds.
  5) We do an all-AMPs STAT FUNCTION step from Spool 4 (Last Use) by
     way of an all-rows scan into Spool 9 (Last Use), which is
     redistributed by hash code to all AMPs.  The result rows are put
     into Spool 5 (group_amps), which is built locally on the AMPs.
     This step is used to retrieve the TOP 20 rows.  The size is
     estimated with low confidence to be 20 rows (49,260 bytes).  The
     estimated time for this step is 0.02 seconds.
  6) Finally, we send out an END TRANSACTION step to all AMPs involved
     in processing the request.
  -> The contents of Spool 5 are sent back to the user as the result of
     statement 1.  The total estimated time is 0.12 seconds.

Hive DDL describe extended. store_sales table from the TPCDS benchmark. The table row count is 28,569,573,605 rows

Detailed Table Information      
Table(tableName:store_sales_1, dbName:mww, owner:hive, createTime:1436456841, lastAccessTime:0, retention:0, sd:StorageDescriptor(
cols:[FieldSchema(name:ss_sold_date_sk, type:int, comment:null), FieldSchema(name:ss_sold_time_sk, type:int, comment:null), FieldSchema(name:ss_item_sk,
type:int, comment:null), FieldSchema(name:ss_customer_sk, type:int, comment:null), FieldSchema(name:ss_cdemo_sk, type:int, comment:null),
FieldSchema(name:ss_hdemo_sk, type:int, comment:null), FieldSchema(name:ss_addr_sk, type:int, comment:null), FieldSchema(name:ss_store_sk,
type:int, comment:null), FieldSchema(name:ss_promo_sk, type:int, comment:null), FieldSchema(name:ss_ticket_number, type:bigint, comment:null),
FieldSchema(name:ss_quantity, type:int, comment:null), FieldSchema(name:ss_wholesale_cost, type:double, comment:null), FieldSchema(name:ss_list_price,
type:double, comment:null), FieldSchema(name:ss_sales_price, type:double, comment:null), FieldSchema(name:ss_ext_discount_amt, type:double, comment:null),
FieldSchema(name:ss_ext_sales_price, type:double, comment:null), FieldSchema(name:ss_ext_wholesale_cost, type:double, comment:null),
FieldSchema(name:ss_ext_list_price, type:double, comment:null), FieldSchema(name:ss_ext_tax, type:double, comment:null), FieldSchema(name:ss_coupon_amt,
type:double, comment:null), FieldSchema(name:ss_net_paid, type:double, comment:null), FieldSchema(name:ss_net_paid_inc_tax, type:double, comment:null),
FieldSchema(name:ss_net_profit, type:double, comment:null), FieldSchema(name:partkey, type:string, comment:null)],
location:hdfs://hdp010-3:8020/apps/hive/warehouse/mww.db/store_sales_1, inputFormat:org.apache.hadoop.mapred.TextInputFormat,
outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null,
serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, parameters:{serialization.format=1}), bucketCols:[], sortCols:[], parameters:{},
skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{}), storedAsSubDirectories:false),
partitionKeys:[FieldSchema(name:partkey, type:string, comment:null)], parameters:{transient_lastDdlTime=1436456841}, viewOriginalText:null, vi
ewExpandedText:null, tableType:MANAGED_TABLE)

hive> ANALYZE TABLE mww.store_sales_1 PARTITION(partkey) COMPUTE STATISTICS noscan;

Partition mww.store_sales_1{partkey=1} stats: [numFiles=1, numRows=345602094, totalSize=50163076361, rawDataSize=49817474267]