Help with avoiding redistribution using a hash index

Database

Help with avoiding redistribution using a hash index

Hi,

I am working on a query to combine multiple rows with overlapping dates for the same information. I use partition by and reset when to achieve this. However, even though the table has primary index, hash index with order by, and collected statistics on the partition by columns, the data gets redistributed twice across all amps and takes more than an hour to complete. The table has around 200 million records, and the first query sets the start date of all records having the same party_id, Source_Name_Txt, Source_Address_Txt within an unbroken interval(RESET WHEN used to ensure that) as the min(party_xref_start_dt). I would later use an outer query to group and combine all records with the same start date and associated values. The second query with TD_NORMALIZED_MEET achieves the same and combines the records in the same step and runs in less than 15 mins. However, I cannot use TD_NORMALIZE_MEET as I need to use additional attributes like minimum of ins_audit_id of the combined records which isn't supported by TD_NORMALIZED_MEET. Is there any way to avoid the redistribution in the below query and make the hash index to be used in the execution?

Thanks

Suma

Table cdw_sandbox.suma_xref_prep4 has around 185 million records. After collapsing unbroken intervals with the same data, it has 175 million records.

Query 1: Using ordered analytical functions- redistributed twice and takes more than an hour:

SELECT party_id, Source_Name_Txt, Source_Address_Txt,
COALESCE( MIN(party_xref_start_dt) OVER
(PARTITION BY party_id, Source_Name_Txt, Source_Address_Txt
ORDER BY party_xref_start_dt
RESET WHEN party_xref_start_dt >MAX(party_xref_end_dt) OVER
(PARTITION BY party_id, Source_Name_Txt, Source_Address_Txt ORDER BY party_xref_start_dt
ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING
)
ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING
), party_xref_start_dt) party_xref_start_dt,
party_xref_end_dt

FROM cdw_sandbox.suma_xref_prep4
WHERE party_xref_start_dt <> party_xref_end_dt

Query 2: Same functionality with TD_NORMALIZE_MEET table function and LOCAL ORDER BY- runs in <15 mins:

WITH Prod(party_id, Source_Name_Txt, Source_Address_Txt , Period_Dt) AS
(
SELECT party_id, Source_Name_Txt, Source_Address_Txt
, PERIOD(Party_xref_Start_Dt , Party_xref_End_Dt ) AS Period_Dt
FROM CDW_SANDBOX.suma_xref_prep4
WHERE Party_xref_Start_Dt < Party_xref_End_Dt
)

SELECT party_id, Source_Name_Txt, Source_Address_Txt,
Period_Dt
, Nrm_Count

FROM TABLE (TD_NORMALIZE_MEET
(NEW VARIANT_TYPE(prod.party_id, prod.Source_Name_Txt, prod.Source_Address_Txt)
,Prod.Period_Dt)
RETURNS (Party_Id DECIMAL(18,0)
,Source_Name_Txt VARCHAR(500)
, Source_Address_Txt VARCHAR(500)
,Period_Dt PERIOD(DATE)
,Nrm_Count INTEGER)
HASH BY party_id, Source_Name_Txt, Source_Address_Txt
LOCAL ORDER BY party_id, Source_Name_Txt, Source_Address_Txt, Period_Dt
) A
;
FROM cdw_sandbox.suma_xref_prep4
WHERE party_xref_start_dt <> party_xref_end_dt
) a
GROUP BY 1, 2, 3, 4
;

Table information:

SHOW TABLE cdw_sandbox.suma_xref_prep4;
CREATE SET TABLE cdw_sandbox.suma_xref_prep4 ,NO FALLBACK ,
NO BEFORE JOURNAL,
NO AFTER JOURNAL,
CHECKSUM = DEFAULT,
DEFAULT MERGEBLOCKRATIO
(
Party_Id DECIMAL(18,0),
Party_Xref_Type_Cd BYTEINT,
Party_Xref_Start_Dt DATE FORMAT 'YY/MM/DD',
Source_Account_Nbr DECIMAL(9,0),
Source_Sub_Account_Nbr DECIMAL(5,0),
Source_Email_Address_Txt VARCHAR(500) CHARACTER SET LATIN NOTCASESPECIFIC,
Source_Email_Key DECIMAL(10,0),
Source_Name_Txt VARCHAR(500) CHARACTER SET LATIN NOT CASESPECIFIC,
Source_Address_Txt VARCHAR(500) CHARACTER SET LATIN NOTCASESPECIFIC,
Source_Mobile_Phone_Num DECIMAL(18,0),
next_row_start_dt DATE FORMAT 'YY/MM/DD',
party_xref_end_dt DATE FORMAT 'YY/MM/DD',
INS_AUDIT_ID INTEGER,
Updt_Audit_Id INTEGER)
PRIMARY INDEX ( Party_Id ,Source_Name_Txt ,Source_Address_Txt );

Hash Index:

CREATE HASH INDEX cdw_sandbox.hash_12

(party_id, Source_Name_Txt, Source_Address_Txt, Party_xref_Start_Dt, Party_xref_end_Dt) ON cdw_sandbox.suma_xref_prep4

BY (party_id, Source_Name_Txt, Source_Address_Txt)

ORDER BY VALUES(Party_xref_Start_Dt)

;

Stats:

COLLECT STATISTICS ON cdw_sandbox.suma_xref_prep4 INDEX (party_id, Source_Name_Txt, Source_Address_Txt)           
;

Explain of query 1:

Explain SELECT party_id, Source_Name_Txt, Source_Address_Txt,
COALESCE( MIN(party_xref_start_dt) OVER
(PARTITION BY party_id, Source_Name_Txt, Source_Address_Txt
ORDER BY party_xref_start_dt
RESET WHEN party_xref_start_dt >MAX(party_xref_end_dt) OVER
(PARTITION BY party_id, Source_Name_Txt, Source_Address_Txt ORDER BY party_xref_start_dt
ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING
)
ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING
), party_xref_start_dt) party_xref_start_dt,
party_xref_end_dt

FROM cdw_sandbox.suma_xref_prep4
WHERE party_xref_start_dt <> party_xref_end_dt

1) First, we lock a distinct cdw_sandbox."pseudo table" for read on a
RowHash to prevent global deadlock for cdw_sandbox.suma_xref_prep4.
2) Next, we lock cdw_sandbox.suma_xref_prep4 for read.
3) We do an all-AMPs RETRIEVE step from cdw_sandbox.suma_xref_prep4
by way of an all-rows scan with a condition of (
"cdw_sandbox.suma_xref_prep4.Party_Xref_Start_Dt <>
cdw_sandbox.suma_xref_prep4.party_xref_end_dt") into Spool 3
(all_amps), which is built locally on the AMPs. The input table
will not be cached in memory, but it is eligible for synchronized
scanning. The result spool file will not be cached in memory.
The size of Spool 3 is estimated with no confidence to be
167,160,741 rows (61,347,991,947 bytes). The estimated time for
this step is 35.54 seconds.
4) We do an all-AMPs STAT FUNCTION step from Spool 3 (Last Use) by
way of an all-rows scan into Spool 6 (Last Use), which is built
locally on the AMPs. The result rows are put into Spool 4
(all_amps), which is built locally on the AMPs. The size is
estimated with no confidence to be 167,160,741 rows (
232,854,912,213 bytes).
5) We do an all-AMPs RETRIEVE step from Spool 4 (Last Use) by way of
an all-rows scan into Spool 2 (all_amps), which is built locally
on the AMPs. The result spool file will not be cached in memory.
The size of Spool 2 is estimated with no confidence to be
167,160,741 rows (63,353,920,839 bytes). The estimated time for
this step is 26.76 seconds.
6) We do an all-AMPs STAT FUNCTION step from Spool 2 (Last Use) by
way of an all-rows scan into Spool 12 (Last Use), which is
redistributed by hash code to all AMPs. The result rows are put
into Spool 10 (all_amps), which is built locally on the AMPs. The
size is estimated with no confidence to be 167,160,741 rows (
232,854,912,213 bytes).
7) We do an all-AMPs RETRIEVE step from Spool 10 (Last Use) by way of
an all-rows scan into Spool 1 (all_amps), which is built locally
on the AMPs. The result spool file will not be cached in memory.
The size of Spool 1 is estimated with no confidence to be
167,160,741 rows (63,688,242,321 bytes). The estimated time for
this step is 26.76 seconds.
8) We do an all-AMPs STAT FUNCTION step from Spool 1 (Last Use) by
way of an all-rows scan into Spool 19 (Last Use), which is
redistributed by hash code to all AMPs. The result rows are put
into Spool 15 (group_amps), which is built locally on the AMPs.
The size is estimated with no confidence to be 167,160,741 rows (
232,854,912,213 bytes).
9) Finally, we send out an END TRANSACTION step to all AMPs involved
in processing the request.
-> The contents of Spool 15 are sent back to the user as the result
of statement 1.

4 REPLIES
Junior Contributor

Re: Help with avoiding redistribution using a hash index

You can get a similar result to the TD_NORMALIZE_MEET using only 2 instead of 3 STAT steps. Is the min(ins_audit_id) the value from the starting row?

A Join Index might be better than a Hash Index. 

But the main reason why it's slow are probably the VARCHAR(500) used in ORDER BY (they will be expanded to CHAR for sorting). If you know the actual length you might cast them to shorter VARCHARs.

Re: Help with avoiding redistribution using a hash index

Thank you, I changed the type of Source_Name_Txt to VARCHAR(30) and Source_Address_Txt to VARCHAR(95), and created join index. The query completed in 1 hour 25 minutes. Explain plan looks the same. 

The min(ins_audit_id) is indeed from the starting row. Can you please elaborate on how to get it done with only 2 instead of 3 STAT steps?

If someone can explain this- why does the data get redistributed to all AMPs twice when the table primary index is the same as the partition by columns? Is there any way to avoid this? 60-80% of the query execution time is spent in step 6 of the explain plan. 

Junior Contributor

Re: Help with avoiding redistribution using a hash index

What's your TD release? I've seen Explains with multiple nested OLAP functions resulting in STAT steps  with local spools...

I did some search & replace on an existing query, this should be correct:

SELECT party_id, Source_Name_Txt, Source_Address_Txt, ins_audit_id,
party_xref_start_dt, 
COALESCE(MIN(prev_end_time) -- get the end time of the next row (or the max end time)
OVER (PARTITION BY party_id, Source_Name_Txt, Source_Address_Txt
ORDER BY party_xref_start_dt
ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING)
,max_end_time) AS End_time
FROM
(
SELECT party_id, Source_Name_Txt, Source_Address_Txt, ins_audit_id,
party_xref_start_dt,
MAX(party_xref_end_dt) -- previous end date
OVER (PARTITION BY party_id, Source_Name_Txt, Source_Address_Txt
ORDER BY party_xref_start_dt
ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING) AS prev_end_time,
MAX(party_xref_end_dt) -- max end date (needed in the next step)
OVER (PARTITION BY party_id, Source_Name_Txt, Source_Address_Txt) AS max_end_time
FROM suma_xref_prep4
-- find the first row after a gap or the first row within this partition
QUALIFY party_xref_start_dt > prev_end_time OR prev_end_time IS NULL
) AS dt

Re: Help with avoiding redistribution using a hash index

Thank you Dieter. I modified the query as you suggester(clever!), changed the varchar columns to varchar(95) for address and varchar(25) for name, created a join index and collected stats on the index columns. Now the query runs in one-third the original CPU-time it used to take, uses two STAT steps and does not redistribute the data. The Teradata version is 14.10.

Current explain plan:

  1) First, we lock a distinct cdw_sandbox."pseudo table" for read on a
RowHash to prevent global deadlock for
cdw_sandbox.suma_xref_prep_1.
2) Next, we lock cdw_sandbox.suma_xref_prep_1 for read.
3) We do an all-AMPs STAT FUNCTION step from
cdw_sandbox.suma_xref_prep_1 by way of an all-rows scan with no
residual conditions into Spool 5 (Last Use), which is assumed to
be redistributed by value to all AMPs. The result rows are put
into Spool 3 (all_amps), which is built locally on the AMPs. The
size is estimated with high confidence to be 731,330,867 rows (
293,263,677,667 bytes).
4) We do an all-AMPs RETRIEVE step from Spool 3 (Last Use) by way of
an all-rows scan with a condition of ("(Party_Xref_Start_Dt >
Field_14) OR (Field_14 IS NULL)") into Spool 1 (used to
materialize view, derived table, table function or table operator
dt) (all_amps), which is built locally on the AMPs. The result
spool file will not be cached in memory. The size of Spool 1 is
estimated with high confidence to be 731,330,867 rows (
107,505,637,449 bytes). The estimated time for this step is 49.20
seconds.
5) We do an all-AMPs STAT FUNCTION step from Spool 1 (Last Use) by
way of an all-rows scan into Spool 12 (Last Use), which is assumed
to be redistributed by value to all AMPs. The result rows are put
into Spool 8 (group_amps), which is built locally on the AMPs.
The size is estimated with high confidence to be 731,330,867 rows
(196,728,003,223 bytes).
6) Finally, we send out an END TRANSACTION step to all AMPs involved
in processing the request.
-> The contents of Spool 8 are sent back to the user as the result of
statement 1.

Appreciate your help on this. 


I looked up the difference between hash and join indexes in the below url and understand a hash index is only a subset of join index capabilities:


However I don't understand why a join index avoids redistribution but a hash index doesn't in this context. I would appreciate it if anyone can explain the reason for this.

Thank you

Suma