Presto Output very slow

Presto
Teradata Employee

Presto Output very slow

Hi,

We are doing a PoC for Presto running it on Hadoop Cluster and we are facing a very strange behavior that when we run the query on a table of 29 Columns (Full Table Scan), it immediately starts returning the rows but return rate is extremely slow(6 to 10 rows). The total returned rows are 360 only (searching is based on Mobile Number) from a data set of 3 Months with 300 Billion and around 1.5 TB (ORC Zlib Compressed) of data partitioned on Date. The System has 9 DataNodes, allocated memory to Presto is 128 GB per node and Presto Version in 157t. The data spill starts immediately however complete query takes around 3 minutes to finish.

 

Any advice or settings recommendation to return the rows faster as we are unable to understand the behavior as how Presto is handling this data even though the memory is less and data size is huge though we have not set the spill to disk property on as we tried before but didn't reflect any difference.

 

Thanks in advance.

Affan

  • output
  • Presto

Accepted Solutions
Teradata Employee

Re: Presto Output very slow

Currently, predicate pushdown into connectors with OR is only supported on the same column (see https://github.com/prestodb/presto/issues/5861). So you're limited by disk I/O speed for this query; a query on only one of the predicates may be faster because it can push the processing down into ORC and doesn't have to read all of the splits.

 

It may help to rewrite this query to:

select * FROM hive3.dp_xdr_archive.na_alshamel_gp MCH where mediated_call_originating_num='504179523' and mediated_call_terminating_num='504179523' UNION ALL select * from hive3.dp_xdr_archive.na_alshamel_gp where mediated_call_originating_num='504179523' and mediated_call_terminating_num !='504179523' UNION ALL select * from hive3.dp_xdr_archive.na_alshamel_gp where mediated_call_originating_num!='504179523' and mediated_call_terminating_num ='504179523';

 [note: corrected query to use UNION ALL so it doesn't remove duplicate rows in the dataset]

 

That way, Presto will be able to use the ORC stats to prune splits, because the single predicate is pushed down.

 

If you can't rewrite the query, monitor the I/O on the query (e.g. with a tool from this stack overflow answer: http://unix.stackexchange.com/questions/55212/how-can-i-monitor-disk-io). If Disk I/O is 100%, there's not much more you can do to improve perf on that query.

 

A side note, spill to disk is available only for aggregations right now, and your query contains no aggregations. Spill to disk also is for queries with large intermediate result sets, which is not the case with your query.

1 ACCEPTED SOLUTION
4 REPLIES
Teradata Employee

Re: Presto Output very slow

Here are some more details on the issue.

 

Explain and Explain Analyze result :

 

presto> explain select *

     -> FROM hive3.dp_xdr_archive.na_alshamel_gp MCH

     -> where mediated_call_originating_num='504179523'

     -> or  mediated_call_terminating_num='504179523'

     -> ;

 

-------------------------------------------------------------------------------------------------------------------------------------------------------------

- Output[network_activity_id, call_start_dt, call_start_tm, call_type_cd, mediated_call_terminating_num, mediated_call_duration_tm, call_special_service_typ

     - RemoteExchange[GATHER] => network_activity_id:decimal(18,0), call_start_dt:date, call_start_tm:varchar, call_type_cd:varchar, mediated_call_terminatin

         - ScanFilter[table = hive3:hive3:dp_xdr_archive:na_alshamel_gp, originalConstraint = (("mediated_call_originating_num" = '504179523') OR ("mediated_

                 LAYOUT: hive3

                 network_activity_id := HiveColumnHandle{clientId=hive3, name=network_activity_id, hiveType=decimal(18,0), hiveColumnIndex=0, columnType=REGU

                 call_start_dt := HiveColumnHandle{clientId=hive3, name=call_start_dt, hiveType=date, hiveColumnIndex=1, columnType=REGULAR}

                 call_start_tm := HiveColumnHandle{clientId=hive3, name=call_start_tm, hiveType=string, hiveColumnIndex=2, columnType=REGULAR}

                 call_type_cd := HiveColumnHandle{clientId=hive3, name=call_type_cd, hiveType=string, hiveColumnIndex=3, columnType=REGULAR}

                 mediated_call_terminating_num := HiveColumnHandle{clientId=hive3, name=mediated_call_terminating_num, hiveType=string, hiveColumnIndex=4, co

                 mediated_call_duration_tm := HiveColumnHandle{clientId=hive3, name=mediated_call_duration_tm, hiveType=int, hiveColumnIndex=5, columnType=RE

                 call_special_service_type_cd := HiveColumnHandle{clientId=hive3, name=call_special_service_type_cd, hiveType=string, hiveColumnIndex=6, colu

                 called_to_country_cd := HiveColumnHandle{clientId=hive3, name=called_to_country_cd, hiveType=string, hiveColumnIndex=7, columnType=REGULAR}

                 called_from_country_cd := HiveColumnHandle{clientId=hive3, name=called_from_country_cd, hiveType=string, hiveColumnIndex=8, columnType=REGUL

                 call_service_detail_type_cd := HiveColumnHandle{clientId=hive3, name=call_service_detail_type_cd, hiveType=string, hiveColumnIndex=9, column

                 mediated_call_originating_num := HiveColumnHandle{clientId=hive3, name=mediated_call_originating_num, hiveType=string, hiveColumnIndex=10, c

                 mediated_call_dialed_num := HiveColumnHandle{clientId=hive3, name=mediated_call_dialed_num, hiveType=string, hiveColumnIndex=11, columnType=

                 call_service_type_cd := HiveColumnHandle{clientId=hive3, name=call_service_type_cd, hiveType=string, hiveColumnIndex=12, columnType=REGULAR}

                 mediated_call_domestic_ind := HiveColumnHandle{clientId=hive3, name=mediated_call_domestic_ind, hiveType=string, hiveColumnIndex=13, columnT

                 switch_instance_id := HiveColumnHandle{clientId=hive3, name=switch_instance_id, hiveType=string, hiveColumnIndex=14, columnType=REGULAR}

                 orig_switch_instance_id := HiveColumnHandle{clientId=hive3, name=orig_switch_instance_id, hiveType=string, hiveColumnIndex=15, columnType=RE

                 term_switch_instance_id := HiveColumnHandle{clientId=hive3, name=term_switch_instance_id, hiveType=string, hiveColumnIndex=16, columnType=RE

                 mediated_called_imei_num := HiveColumnHandle{clientId=hive3, name=mediated_called_imei_num, hiveType=string, hiveColumnIndex=17, columnType=

                 mediated_calling_imei_num := HiveColumnHandle{clientId=hive3, name=mediated_calling_imei_num, hiveType=string, hiveColumnIndex=18, columnTyp

                 mediated_calling_imsi_num := HiveColumnHandle{clientId=hive3, name=mediated_calling_imsi_num, hiveType=string, hiveColumnIndex=19, columnTyp

                 originating_cell_site_id := HiveColumnHandle{clientId=hive3, name=originating_cell_site_id, hiveType=string, hiveColumnIndex=20, columnType=

                 terminating_cell_site_id := HiveColumnHandle{clientId=hive3, name=terminating_cell_site_id, hiveType=string, hiveColumnIndex=21, columnType=

                 cell_site_meas := HiveColumnHandle{clientId=hive3, name=cell_site_meas, hiveType=string, hiveColumnIndex=22, columnType=REGULAR}

                 mediated_called_imsi_num := HiveColumnHandle{clientId=hive3, name=mediated_called_imsi_num, hiveType=string, hiveColumnIndex=23, columnType=

                 originating_lac_cd := HiveColumnHandle{clientId=hive3, name=originating_lac_cd, hiveType=string, hiveColumnIndex=24, columnType=REGULAR}

                 mediated_call_redirect_num := HiveColumnHandle{clientId=hive3, name=mediated_call_redirect_num, hiveType=string, hiveColumnIndex=25, columnT

                 call_service_det_type_name := HiveColumnHandle{clientId=hive3, name=call_service_det_type_name, hiveType=string, hiveColumnIndex=26, columnT

                 call_completion_type_cd := HiveColumnHandle{clientId=hive3, name=call_completion_type_cd, hiveType=string, hiveColumnIndex=27, columnType=RE

                 call_ext_type_cd := HiveColumnHandle{clientId=hive3, name=call_ext_type_cd, hiveType=string, hiveColumnIndex=28, columnType=REGULAR}

                 obsdate := HiveColumnHandle{clientId=hive3, name=obsdate, hiveType=string, hiveColumnIndex=-1, columnType=PARTITION_KEY}

                     :: [[2016-09-01, 2016-11-30]]

 

(1 row)

 

Query 20170314_090308_00001_r9cz8, FINISHED, 1 node

Splits: 1 total, 1 done (100.00%)

0:04 [0 rows, 0B] [0 rows/s, 0B/s]

 

presto> EXPLAIN ANALYZE select *

     -> FROM hive3.dp_xdr_archive.na_alshamel_gp MCH

     -> where mediated_call_originating_num='504179523'

     -> or  mediated_call_terminating_num='504179523';

 

-------------------------------------------------------------------------------------------------------------------------------------------------------------

Fragment 1 [SINGLE]

     Cost: CPU 89.63ms, Input: 286 rows (2.51kB), Output: 286 rows (2.51kB)

     Output layout: [network_activity_id]

     Output partitioning: SINGLE []

     - Output[Query Plan] => [network_activity_id:decimal(18,0)] {rows: ?, bytes: ?}

             Cost: 85.11%, Output: 286 rows (2.51kB)

             TaskOutputOperator := Drivers: 1, Input avg.: 286.00 lines, Input std.dev.: 0.00%

             Query Plan := network_activity_id

         - RemoteSource[2] => [network_activity_id:decimal(18,0)] {rows: ?, bytes: ?}

                 Cost: 14.89%, Output: 286 rows (2.51kB)

                 ExchangeOperator := Drivers: 1, Input avg.: 286.00 lines, Input std.dev.: 0.00%

 

Fragment 2 [SOURCE]

     Cost: CPU 2.72h, Input: 286 rows (2.51kB), Output: 286 rows (2.51kB)

     Output layout: [network_activity_id]

     Output partitioning: SINGLE []

     - ScanFilterProject[table = hive3:hive3:dp_xdr_archive:na_alshamel_gp, originalConstraint = (("mediated_call_originating_num" = '504179523') OR ("mediat

             Cost: 100.00%, Input: 33555316096 rows (1.29TB), Output: 286 rows (2.51kB), Filtered: 100.00%

             ScanFilterAndProjectOperator := Drivers: 20927, Input avg.: 1603446.08 lines, Input std.dev.: 35.41%

             TaskOutputOperator := Drivers: 20927, Input avg.: 0.01 lines, Input std.dev.: 973.07%

             LAYOUT: hive3

             network_activity_id := HiveColumnHandle{clientId=hive3, name=network_activity_id, hiveType=decimal(18,0), hiveColumnIndex=0, columnType=REGULAR}

             mediated_call_terminating_num := HiveColumnHandle{clientId=hive3, name=mediated_call_terminating_num, hiveType=string, hiveColumnIndex=4, column

             mediated_call_originating_num := HiveColumnHandle{clientId=hive3, name=mediated_call_originating_num, hiveType=string, hiveColumnIndex=10, colum

             HiveColumnHandle{clientId=hive3, name=obsdate, hiveType=string, hiveColumnIndex=-1, columnType=PARTITION_KEY}

                 :: [[2016-09-01, 2016-11-30]]

 

 

(1 row)

 

Query 20170314_090357_00002_r9cz8, FINISHED, 10 nodes

Splits: 20,929 total, 20,929 done (100.00%)

3:57 [33.6B rows, 1.29TB] [142M rows/s, 5.56GB/s]

 

presto>

 

 

 

 

 

 

 

 

 

 

configuration files :

 

 

coordinator

 

presto@c3edge01:~/.prestoadmin/coordinator> more jvm.config

-server

-Xmx124G

-XX:-UseBiasedLocking

-XX:+UseG1GC

-XX:G1HeapRegionSize=32M

-XX:+ExplicitGCInvokesConcurrent

-XX:+HeapDumpOnOutOfMemoryError

-XX:+UseGCOverheadLimit

-XX:OnOutOfMemoryError=kill -9 %p

-XX:ReservedCodeCacheSize=512M

-DHADOOP_USER_NAME=hive

 

 

 

 

presto@c3edge01:~/.prestoadmin/coordinator> more config.properties

query.max-memory=558GB

node-scheduler.include-coordinator=false

discovery.uri=http://c3edge01.stc.corp:8089

discovery-server.enabled=true

http-server.http.port=8089

coordinator=true

query.max-memory-per-node=62GB

#sink.max-buffer-size=96 MB

node-scheduler.network-topology=flat

presto@c3edge01:~/.prestoadmin/coordinator>

 

 

 

 

Worker :

 

presto@c3edge01:~/.prestoadmin/workers> more jvm.config

-server

-Xmx124G

-XX:-UseBiasedLocking

-XX:+UseG1GC

-XX:G1HeapRegionSize=32M

-XX:+ExplicitGCInvokesConcurrent

-XX:+HeapDumpOnOutOfMemoryError

-XX:+UseGCOverheadLimit

-XX:OnOutOfMemoryError=kill -9 %p

-XX:ReservedCodeCacheSize=512M

-DHADOOP_USER_NAME=hive

 

 

 

presto@c3edge01:~/.prestoadmin/workers>  more jvm.config

-server

-Xmx124G

-XX:-UseBiasedLocking

-XX:+UseG1GC

-XX:G1HeapRegionSize=32M

-XX:+ExplicitGCInvokesConcurrent

-XX:+HeapDumpOnOutOfMemoryError

-XX:+UseGCOverheadLimit

-XX:OnOutOfMemoryError=kill -9 %p

-XX:ReservedCodeCacheSize=512M

-DHADOOP_USER_NAME=hive

 

 

catalog :

 

 

presto@c3edge01:~/.prestoadmin/catalog> more hive3.properties

connector.name=hive-hadoop2

hive.metastore.uri=thrift://c3master01-nn.stc.corp:9083,thrift://c3master02-nn.stc.corp:9083,thrift://c3master03-nn.stc.corp:9083

hive.config.resources=/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml

hive.force-local-scheduling=true

hive.domain-compaction-threshold=150

hive.orc.max-buffer-size=64 MB

hive.orc.stream-buffer-size=64 MB

hive.max-outstanding-splits=2000

 

Thanks in advance.

Teradata Employee

Re: Presto Output very slow

Currently, predicate pushdown into connectors with OR is only supported on the same column (see https://github.com/prestodb/presto/issues/5861). So you're limited by disk I/O speed for this query; a query on only one of the predicates may be faster because it can push the processing down into ORC and doesn't have to read all of the splits.

 

It may help to rewrite this query to:

select * FROM hive3.dp_xdr_archive.na_alshamel_gp MCH where mediated_call_originating_num='504179523' and mediated_call_terminating_num='504179523' UNION ALL select * from hive3.dp_xdr_archive.na_alshamel_gp where mediated_call_originating_num='504179523' and mediated_call_terminating_num !='504179523' UNION ALL select * from hive3.dp_xdr_archive.na_alshamel_gp where mediated_call_originating_num!='504179523' and mediated_call_terminating_num ='504179523';

 [note: corrected query to use UNION ALL so it doesn't remove duplicate rows in the dataset]

 

That way, Presto will be able to use the ORC stats to prune splits, because the single predicate is pushed down.

 

If you can't rewrite the query, monitor the I/O on the query (e.g. with a tool from this stack overflow answer: http://unix.stackexchange.com/questions/55212/how-can-i-monitor-disk-io). If Disk I/O is 100%, there's not much more you can do to improve perf on that query.

 

A side note, spill to disk is available only for aggregations right now, and your query contains no aggregations. Spill to disk also is for queries with large intermediate result sets, which is not the case with your query.

Teradata Employee

Re: Presto Output very slow

Thanks a lot for the prompt reply. We will rewrite the query to see the performance gain.
Moreover, one more question related this spill to disk that data is in fact larger than the memory allocated so why the query is not failing? Is it reading in data block and finding the data?
Teradata Employee

Re: Presto Output very slow

When doing a table scan, Presto reads the data in increments called splits (in the Hive connector, roughly equivalent to files on HDFS). It passes on the data that match the filter, and discards the rest of that file. So Presto doesn't need to have all of the data of a table in memory.

 

In general, the operators you have to worry about running out of memory are aggregations, joins, order by, and window functions. The others can safely stream.