Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Push the runtime filter from HashJoin down to SeqScan or AM. #724

Open
wants to merge 10 commits into
base: main
Choose a base branch
from

Conversation

zhangyue-hashdata
Copy link
Contributor

@zhangyue-hashdata zhangyue-hashdata commented Nov 21, 2024

+----------+ AttrFilter +------+ ScanKey +------------+
| HashJoin | ------------> | Hash | ---------> | SeqScan/AM |
+----------+ +------+ +------------+

If "gp_enable_runtime_filter_pushdown" is on, three steps will be run:

Step 1. In ExecInitHashJoin(), try to find the mapper between the var in
hashclauses and the var in SeqScan. If found we will save the mapper in
AttrFilter and push them to Hash node;

Step 2. We will create the range/bloom filters in AttrFilter during building
hash table, and these filters will be converted to the list of ScanKey
and pushed down to Seqscan when the building finishes;

Step 3. If AM support SCAN_SUPPORT_RUNTIME_FILTER, these ScanKeys will be pushed
down to the AM module further, otherwise will be used to filter slot in
Seqscan;

perf:
CPU E5-2680 v2 10 cores, memory 32GB, 3 segments

  1. tpcds 10s off: 865s on: 716s 17%
  2. tpcds 100s off: 4592s on: 3751s 18%

tpcds 10s details

NO. off on
1 3010.442 1505.904
2 4590.019 2886.249
3 10139.056 8662.587
4 1536.426 1571.688
5-1 17449.665 14285.31
5-2 17309.173 14364.56
6 5655.546 4526.129
7 3544.227 1905.936
8 3537.828 1991.268
9 5648.002 3332.254
10 27294.172 19219.05
11 2807.175 1551.244
12 2411.667 1531.898
13 1206.672 683.703
14 11665.143 6650.221
15 3466.404 2245.559
16 5470.071 2956.460
17 2845.955 1940.403
18 2011.526 830.537
19 3919.305 2178.245
20 12328.441 11188.26
21 7325.944 7589.761
22 5732.785 2776.143
23 5883.509 4125.208
24 2127.196 2243.762
25 5175.339 3148.642
26 6964.247 3149.069
27 3954.195 2143.255
28 6469.966 3063.967
29 5556.811 3247.220
30 15339.387 16605.79
31 8874.316 5753.976
32 3457.843 1840.951
33 2593.154 2581.501
34 10363.949 8788.906
35 53733.608 50989.11
36 3221.735 3248.548
37 1622.209 984.430
38 10090.589 8616.105
39 3371.617 1961.635
40 1642.191 776.952
41 8706.150 6689.429
42 9732.914 10688.31
43 40751.892 39953.03
44 1649.785 1055.199
45 3017.519 1338.320
46 19065.850 17877.98
47 8222.628 4597.950
48 4280.849 3348.528
49 5328.509 5402.897
50 5485.369 4545.250
51 20117.770 19872.07
52 102.495 89.684
53 8643.800 6744.744
54 8253.421 8583.968
55 36436.196 33946.69
56 4441.651 2868.195
57 2105.832 1354.731
58 3758.967 1729.755
59 3145.889 1330.409
60 3897.952 1802.964
61 3142.506 1373.074
62 195.975 199.306
63 603.724 601.162
64 1551.600 565.874
65 1627.829 1032.944
66 8174.817 5436.670
67 9236.159 6004.216
68-1 32361.201 33408.38
68-2 33742.304 33370.19
69-1 18798.420 16717.96
69-2 16326.340 14353.27
70 6570.589 5913.169
71 15146.116 11911.68
72 5233.319 2976.308
73 3556.395 2224.097
74 5783.208 5771.482
75 971.755 688.099
76 16137.706 15918.47
77 9652.487 8718.570
78 5418.827 2852.876
79 3932.542 2046.743
80 1499.008 416.289
81 6994.725 5654.408
82 2892.431 1220.703
83 4490.649 2765.032
84 7648.792 7543.295
85 2824.855 1585.448
86 2519.700 1862.130
87 43705.181 41615.89
88 3406.351 1791.792
89 5802.109 4454.484
90 5539.748 2536.063
91 5410.310 5009.545
92-1 6148.298 4211.587
92-2 7342.791 4105.239
93 28426.593 28242.50
94 4050.031 4061.338
95 4918.673 3057.766
96 2109.959 1150.658
97 5618.986 3110.099
98 5632.959 4541.739
99 1755.003 2069.720
total 865 716 ~17%

Fixes #ISSUE_Number

What does this PR do?

Type of Change

  • Bug fix (non-breaking change)
  • New feature (non-breaking change)
  • Breaking change (fix or feature with breaking changes)
  • Documentation update

Breaking Changes

Test Plan

  • Unit tests added/updated
  • Integration tests added/updated
  • Passed make installcheck
  • Passed make -C src/test installcheck-cbdb-parallel

Impact

Performance:

User-facing changes:

Dependencies:

Checklist

Additional Context

⚠️ To skip CI: Add [skip ci] to your PR title. Only use when necessary! ⚠️



/* append new runtime filters to target node */
SeqScanState *sss = castNode(SeqScanState, attr_filter->target);
sss->filters = list_concat(sss->filters, scankeys);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we merge filter here on the same attno ?

Copy link
Contributor Author

@zhangyue-hashdata zhangyue-hashdata Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Combining Bloom filters will result in a higher False Positive Rate (FPR) compared to using each of the individual Bloom filters separately, so it is not recommended;
  2. There is the same problem to combine range filters like combining Bloom filters;
  3. There is only one Bloom filter and one range filter on the same attribute in many cases;

Copy link
Member

@yjhjstz yjhjstz Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create table t1(a int, b int) with(parallel_workers=2);
create table rt1(a int, b int) with(parallel_workers=2);
create table rt2(a int, b int);
create table rt3(a int, b int);
insert into t1 select i, i from generate_series(1, 100000) i;
insert into t1 select i, i+1 from generate_series(1, 10) i;
insert into rt1 select i, i+1 from generate_series(1, 10) i;
insert into rt2 select i, i+1 from generate_series(1, 10000) i;
insert into rt3 select i, i+1 from generate_series(1, 10) i;
analyze t1;
analyze rt1;
analyze rt2;
analyze rt3;

explain analyze select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a = t1.b;

postgres=# explain select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a = t1.b;
                                   QUERY PLAN                                   
--------------------------------------------------------------------------------
 Gather Motion 3:1  (slice1; segments: 3)  (cost=2.45..428.51 rows=17 width=24)
   ->  Hash Join  (cost=2.45..428.29 rows=6 width=24)
         Hash Cond: (t1.b = rt1.a)
         ->  Hash Join  (cost=1.23..427.00 rows=6 width=16)
               Hash Cond: (t1.b = rt3.a)
               ->  Seq Scan on t1  (cost=0.00..342.37 rows=33337 width=8)
               ->  Hash  (cost=1.10..1.10 rows=10 width=8)
                     ->  Seq Scan on rt3  (cost=0.00..1.10 rows=10 width=8)
         ->  Hash  (cost=1.10..1.10 rows=10 width=8)
               ->  Seq Scan on rt1  (cost=0.00..1.10 rows=10 width=8)
 Optimizer: Postgres query optimizer
(11 rows)

you can try this case, will got two range filters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it

continue;

val = slot_getattr(slot, sk->sk_attno, &isnull);
if (isnull)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CREATE TABLE distinct_1(a int);
CREATE TABLE distinct_2(a int);
INSERT INTO distinct_1 VALUES(1),(2),(NULL);
INSERT INTO distinct_2 VALUES(1),(NULL);
SELECT * FROM distinct_1, distinct_2 WHERE distinct_1.a IS NOT DISTINCT FROM distinct_2.a;

test got wrong result.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will fix it.

return slot;

if (node->filter_in_seqscan && node->filters &&
!PassByBloomFilter(node, slot))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tpcds 1TB, bloom filter will lose efficacy or create failed due to large rows ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, when creating the Bloom filter, the system evaluates the estimated number of rows that this hash join will process and the amount of available memory during the execution plan generation. It determines whether using a Bloom filter for filtering data would be effective based on this evaluation. If it is assessed that the Bloom filter would not sufficiently enhance performance, then the Bloom filter will not be created.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It determines whether using a Bloom filter for filtering data would be effective based on this evaluation

That makes sense, but where is related code, I just didn't see them in this pr.
Does it compares the number of rows between the output of hashtable and data in the probe table? If the rows of the hashtable are far less than that of the probe table , then use the runtime filter?

src/backend/executor/nodeHashjoin.c Outdated Show resolved Hide resolved
{
match = false;

if (!IsA(lfirst(lc), Var))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could it support other expression, whose one arg is the column attr, and the other is a const?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that expressions like t1.c1 = 5 should be pushed down by the optimizer to operators such as SeqScan for early processing. Therefore, this feature does not handle expressions of the form t1.c1 = 5.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I didn't make it clear. I don't mean the predication on the var. like the below sql

 EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = (t2.c2 + 10);
                                        QUERY PLAN
-------------------------------------------------------------------------------------------
 Gather Motion 3:1  (slice1; segments: 3) (actual rows=0 loops=1)
   ->  Hash Join (actual rows=0 loops=1)
         Hash Cond: (t1.c2 = (t2.c2 + 10))
         Extra Text: (seg2)   Hash chain length 8.0 avg, 8 max, using 4 of 524288 buckets.
         ->  Seq Scan on t1 (actual rows=128 loops=1)
         ->  Hash (actual rows=32 loops=1)
               Buckets: 524288  Batches: 1  Memory Usage: 4098kB
               ->  Seq Scan on t2 (actual rows=32 loops=1)
 Optimizer: Postgres query optimizer
(9 rows)

As t2.c2 + 10 is not a Var but a T_OpExpr , the runtime filter cannot handle it.
Could we just iterate the expression tree and check if it only contains var and const ?

@fanfuxiaoran
Copy link
Contributor

Looks interesting. And I have some questions to discuss.

  • Beside the seqscan, can the runtime filter apply to other types of scan? such as the index scan.
  • Looks only when the hashjoin node and seqscan node run in the same process can use the runtime filter. Which means the tables should have same distributed policy on the join columns or one of the table is replicated.

* result (hash filter)
* seqscan on t1, t1 is replicated table
*/
if (!IsA(child, HashJoinState) && !IsA(child, ResultState))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hash Join  (cost=0.00..4019.55 rows=37 width=9) (actual time=3203.012..9927.435 rows=1399 loops=1)
                                                   Hash Cond: (web_sales_1_prt_2.ws_item_sk = item.i_item_sk)
                                                   Join Filter: (web_sales_1_prt_2.ws_ext_discount_amt > ((1.3 * avg(web_sales_1_prt_2_1.ws_ext_discount_amt))))
                                                   Rows Removed by Join Filter: 4763
                                                   Extra Text: (seg2)   Hash chain length 1.0 avg, 1 max, using 198 of 2097152 buckets.
                                                   ->  Append  (cost=0.00..676.44 rows=2399189 width=13) (actual time=16.899..5572.473 rows=3090021 loops=1)
                                                         ->  Seq Scan on web_sales_1_prt_2  (cost=0.00..676.44 rows=2399189 width=13) (actual time=16.895..1138.267 rows=662
149 loops=1)
                                                         ->  Seq Scan on web_sales_1_prt_3  (cost=0.00..676.44 rows=2399189 width=13) (actual time=8.947..1102.409 rows=6621
36 loops=1)
                                                         ->  Seq Scan on web_sales_1_prt_4  (cost=0.00..676.44 rows=2399189 width=13) (actual time=8.822..1100.839 rows=6621
48 loops=1)
                                                         ->  Seq Scan on web_sales_1_prt_5  (cost=0.00..676.44 rows=2399189 width=13) (actual time=11.391..1083.785 rows=662
179 loops=1)
                                                         ->  Seq Scan on web_sales_1_prt_6  (cost=0.00..676.44 rows=2399189 width=13) (actual time=13.030..649.141 rows=4414
09 loops=1)
                                                         ->  Seq Scan on web_sales_1_prt_7  (cost=0.00..676.44 rows=2399189 width=13) (never executed)
                                                         ->  Seq Scan on web_sales_1_prt_others  (cost=0.00..676.44 rows=2399189 width=13) (actual time=1.213..3.203 rows=17
88 loops=1)
                                                   ->  Hash  (cost=2432.09..2432.09 rows=109 width=12) (actual time=3177.768..3177.770 rows=198 loops=1)
                                                         Buckets: 2097152  Batches: 1  Memory Usage: 16392kB
                                                         ->  Broadcast Motion 3:3  (slice3; segments: 3)  (cost=

need to consider partitioned table .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

support partitioned table in c701092

Comment on lines 4285 to 4286
attr_filter->min = LLONG_MAX;
attr_filter->max = LLONG_MIN;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LLONG_MAX, LLONG_MIN are platform-spec value, i.e. the bound value for unsigned long long, which may not be exactly the same width as Datum. For safety, static assert could be considered.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see StaticAssertDecl(SIZEOF_DATUM == 8, "sizeof datum is not 8"); in postgres.h, so it's better to use INT64_MAX/INT64_MIN here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use LONG_MAX, LONG_MIN instead ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix with 99eabb2

Comment on lines 2194 to 2196
/*
* Only applicatable for inner, right and semi join,
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you give a little more explain about why these join types are supported and others are not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add more message to explain why just inner, right and semi join are allowed with runtime filter.
fix it in 98dac6d

Comment on lines 2283 to 2284
if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr))
return false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These 2 lines duplicate with the following if-elseif-else code, could be deleted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix with 99eabb2

Comment on lines 2302 to 2315
break;

var = lfirst(lc);
if (var->varno == INNER_VAR)
*rattno = var->varattno;
else if (var->varno == OUTER_VAR)
*lattno = var->varattno;
else
break;

match = true;
}

return match;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The match flag gets the code hard(several modifications) to read. The break statement could be replaced by return false;. If the foreach loop ends, all conditions match, so returns true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use the more intuitive way to refactor the code, like below

/* check the first arg */
...

/* check the second arg */
...

return true;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix with 99eabb2

Comment on lines 106 to 107
if (TupIsNull(slot))
return slot;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that slot is never NULL here, so Assert(!TupIsNull(slot)); is better or remove them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix with 99eabb2

/*
* SK_EMPYT means the end of the array of the ScanKey
*/
sk[*num].sk_flags = SK_EMPYT;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to check the boundary of the ScanKey array in rescan? In normal rescan, the number of ScanKeys is the same as begin_scan. If the number of ScanKeys is larger in rescan than that in begin_scan, the boundary value might be invalid and dangerous to access.

+----------+  AttrFilter   +------+  ScanKey   +------------+
| HashJoin | ------------> | Hash | ---------> | SeqScan/AM |
+----------+               +------+            +------------+

If "gp_enable_runtime_filter_pushdown" is on, three steps will be run:

Step 1. In ExecInitHashJoin(), try to find the mapper between the var in
        hashclauses and the var in SeqScan. If found we will save the mapper in
        AttrFilter and push them to Hash node;

Step 2. We will create the range/bloom filters in AttrFilter during building
        hash table, and these filters will be converted to the list of ScanKey
        and pushed down to Seqscan when the building finishes;

Step 3. If AM support SCAN_SUPPORT_RUNTIME_FILTER, these ScanKeys will be pushed
        down to the AM module further, otherwise will be used to filter slot in
        Seqscan;
@avamingli
Copy link
Contributor

There are codes changed in MultiExecParallelHash, please add some parallel tests with runtime filter.

@zhangyue-hashdata
Copy link
Contributor Author

There are codes changed in MultiExecParallelHash, please add some parallel tests with runtime filter.

got it.

@zhangyue-hashdata
Copy link
Contributor Author

Looks interesting. And I have some questions to discuss.

  • Beside the seqscan, can the runtime filter apply to other types of scan? such as the index scan.
  • Looks only when the hashjoin node and seqscan node run in the same process can use the runtime filter. Which means the tables should have same distributed policy on the join columns or one of the table is replicated.
  • Beside the seqscan, can the runtime filter apply to other types of scan? such as the index scan.

Theoretically, it is feasible to apply runtime filters to operators such as Index Scan. However, because Index Scan already reduces data volume by leveraging an optimized storage structure, the performance gains from applying runtime filters to Index Scan would likely be minimal. Thus, I think that applying runtime filters to Index Scan would not yield significant performance benefits.

In subsequent work, when we discover that other scan operators can achieve notable performance improvements from pushdown runtime filters, we will support these operators. Our focus will be on operators where runtime filters can substantially decrease the amount of data processed early in the query execution, leading to more pronounced performance enhancements.

  • Looks only when the hashjoin node and seqscan node run in the same process can use the runtime filter. Which means the tables should have same distributed policy on the join columns or one of the table is replicated.

Yes, the current pushdown runtime filter only supports in-process pushdown, which means that the Hash Join and SeqScan need to be within the same process. The design and implementation of cross-process pushdown runtime filters are much more complex.

This limitation arises because coordinating and sharing data structures like Bloom filters or other runtime filters across different processes involves additional challenges such as inter-process communication (IPC), synchronization, and ensuring consistency and efficiency of the filters across process boundaries. Addressing these issues requires a more sophisticated design that can handle the complexities of distributed computing environments.

@avamingli
Copy link
Contributor

Hi, with gp_enable_runtime_filter_pushdown = on, execute SQL below will get a crash:

gpadmin=# show gp_enable_runtime_filter_pushdown;
 gp_enable_runtime_filter_pushdown
-----------------------------------
 on
(1 row)
CREATE TABLE test_tablesample (dist int, id int, name text) WITH (fillfactor=10) DISTRIBUTED BY (dist);
-- use fillfactor so we don't have to load too much data to get multiple pages

-- Changed the column length in order to match the expected results based on relation's blocksz
INSERT INTO test_tablesample SELECT 0, i, repeat(i::text, 875) FROM generate_series(0, 9) s(i) ORDER BY i;
INSERT INTO test_tablesample SELECT 3, i, repeat(i::text, 875) FROM generate_series(10, 19) s(i) ORDER BY i;
INSERT INTO test_tablesample SELECT 5, i, repeat(i::text, 875) FROM generate_series(20, 29) s(i) ORDER BY i;
EXPLAIN (COSTS OFF)
  SELECT id FROM test_tablesample TABLESAMPLE SYSTEM (50) REPEATABLE (2);
FATAL:  Unexpected internal error (assert.c:48)
DETAIL:  FailedAssertion("IsA(planstate, SeqScanState)", File: "explain.c", Line: 4154)
server closed the connection unexpectedly
        This probably means the server terminated abnormally
        before or while processing the request.
The connection to the server was lost. Attempting reset: Succeeded.
psql (14.4, server 14.4)
image

@zhangyue-hashdata
Copy link
Contributor Author

```sql
gpadmin=# show gp_enable_runtime_filter_pushdown;
 gp_enable_runtime_filter_pushdown
-----------------------------------
 on
(1 row)
CREATE TABLE test_tablesample (dist int, id int, name text) WITH (fillfactor=10) DISTRIBUTED BY (dist);
-- use fillfactor so we don't have to load too much data to get multiple pages

-- Changed the column length in order to match the expected results based on relation's blocksz
INSERT INTO test_tablesample SELECT 0, i, repeat(i::text, 875) FROM generate_series(0, 9) s(i) ORDER BY i;
INSERT INTO test_tablesample SELECT 3, i, repeat(i::text, 875) FROM generate_series(10, 19) s(i) ORDER BY i;
INSERT INTO test_tablesample SELECT 5, i, repeat(i::text, 875) FROM generate_series(20, 29) s(i) ORDER BY i;
EXPLAIN (COSTS OFF)
  SELECT id FROM test_tablesample TABLESAMPLE SYSTEM (50) REPEATABLE (2);
FATAL:  Unexpected internal error (assert.c:48)
DETAIL:  FailedAssertion("IsA(planstate, SeqScanState)", File: "explain.c", Line: 4154)
server closed the connection unexpectedly
        This probably means the server terminated abnormally
        before or while processing the request.
The connection to the server was lost. Attempting reset: Succeeded.
psql (14.4, server 14.4)

Thanks, I'll reproduce the issue and fix it.

@fanfuxiaoran
Copy link
Contributor

Thanks for your detailed explanation.

Looks interesting. And I have some questions to discuss.

  • Beside the seqscan, can the runtime filter apply to other types of scan? such as the index scan.
  • Looks only when the hashjoin node and seqscan node run in the same process can use the runtime filter. Which means the tables should have same distributed policy on the join columns or one of the table is replicated.
  • Beside the seqscan, can the runtime filter apply to other types of scan? such as the index scan.

Theoretically, it is feasible to apply runtime filters to operators such as Index Scan. However, because Index Scan already reduces data volume by leveraging an optimized storage structure, the performance gains from applying runtime filters to Index Scan would likely be minimal. Thus, I think that applying runtime filters to Index Scan would not yield significant performance benefits.

Make sense. When doing hashjoin, index scan or index only scan are often not used on probe node.

In subsequent work, when we discover that other scan operators can achieve notable performance improvements from pushdown runtime filters, we will support these operators. Our focus will be on operators where runtime filters can substantially decrease the amount of data processed early in the query execution, leading to more pronounced performance enhancements.

  • Looks only when the hashjoin node and seqscan node run in the same process can use the runtime filter. Which means the tables should have same distributed policy on the join columns or one of the table is replicated.

Yes, the current pushdown runtime filter only supports in-process pushdown, which means that the Hash Join and SeqScan need to be within the same process. The design and implementation of cross-process pushdown runtime filters are much more complex.

This limitation arises because coordinating and sharing data structures like Bloom filters or other runtime filters across different processes involves additional challenges such as inter-process communication (IPC), synchronization, and ensuring consistency and efficiency of the filters across process boundaries. Addressing these issues requires a more sophisticated design that can handle the complexities of distributed computing environments.

Exactly, and if there is any lock used to solve the problem may even lead bad performance.

@fanfuxiaoran
Copy link
Contributor

 explain analyze
SELECT count(t1.c3) FROM t1, t3 WHERE t1.c1 = t3.c1 ;
                                                              QUERY PLAN

-----------------------------------------------------------------------------------------
----------------------------------------------
 Finalize Aggregate  (cost=1700.07..1700.08 rows=1 width=8) (actual time=32119.566..32119
.571 rows=1 loops=1)
   ->  Gather Motion 3:1  (slice1; segments: 3)  (cost=1700.02..1700.07 rows=3 width=8) (
actual time=30.967..32119.550 rows=3 loops=1)
         ->  Partial Aggregate  (cost=1700.02..1700.03 rows=1 width=8) (actual time=32119
.131..32119.135 rows=1 loops=1)
               ->  Hash Join  (cost=771.01..1616.68 rows=33334 width=4) (actual time=14.0
59..32116.962 rows=33462 loops=1)
                     Hash Cond: (t3.c1 = t1.c1)
                     Extra Text: (seg0)   Hash chain length 1.0 avg, 3 max, using 32439 o
f 524288 buckets.
                     ->  Seq Scan on t3  (cost=0.00..387.34 rows=33334 width=4) (actual t
ime=0.028..32089.490 rows=33462 loops=1)
                     ->  Hash  (cost=354.34..354.34 rows=33334 width=8) (actual time=13.2
57..13.259 rows=33462 loops=1)
                           Buckets: 524288  Batches: 1  Memory Usage: 5404kB
                           ->  Seq Scan on t1  (cost=0.00..354.34 rows=33334 width=8) (ac
tual time=0.180..4.877 rows=33462 loops=1)
 Planning Time: 0.227 ms

runtime_filter has been pushed down to t3 table seqscan, but 'explain analyze' doesn't print them out.

\d t1
                 Table "public.t1"
 Column |  Type   | Collation | Nullable | Default
--------+---------+-----------+----------+---------
 c1     | integer |           |          |
 c2     | integer |           |          |
 c3     | integer |           |          |
 c4     | integer |           |          |
 c5     | integer |           |          |
Checksum: t
Indexes:
    "t1_c2" btree (c2)
Distributed by: (c1)
 \d t3
                 Table "public.t3"
 Column |  Type   | Collation | Nullable | Default
--------+---------+-----------+----------+---------
 c1     | integer |           |          |
 c2     | integer |           |          |
 c3     | integer |           |          |
 c4     | integer |           |          |
 c5     | integer |           |          |
Distributed by: (c1)

@zhangyue-hashdata
Copy link
Contributor Author

 explain analyze
SELECT count(t1.c3) FROM t1, t3 WHERE t1.c1 = t3.c1 ;
                                                              QUERY PLAN

-----------------------------------------------------------------------------------------
----------------------------------------------
 Finalize Aggregate  (cost=1700.07..1700.08 rows=1 width=8) (actual time=32119.566..32119
.571 rows=1 loops=1)
   ->  Gather Motion 3:1  (slice1; segments: 3)  (cost=1700.02..1700.07 rows=3 width=8) (
actual time=30.967..32119.550 rows=3 loops=1)
         ->  Partial Aggregate  (cost=1700.02..1700.03 rows=1 width=8) (actual time=32119
.131..32119.135 rows=1 loops=1)
               ->  Hash Join  (cost=771.01..1616.68 rows=33334 width=4) (actual time=14.0
59..32116.962 rows=33462 loops=1)
                     Hash Cond: (t3.c1 = t1.c1)
                     Extra Text: (seg0)   Hash chain length 1.0 avg, 3 max, using 32439 o
f 524288 buckets.
                     ->  Seq Scan on t3  (cost=0.00..387.34 rows=33334 width=4) (actual t
ime=0.028..32089.490 rows=33462 loops=1)
                     ->  Hash  (cost=354.34..354.34 rows=33334 width=8) (actual time=13.2
57..13.259 rows=33462 loops=1)
                           Buckets: 524288  Batches: 1  Memory Usage: 5404kB
                           ->  Seq Scan on t1  (cost=0.00..354.34 rows=33334 width=8) (ac
tual time=0.180..4.877 rows=33462 loops=1)
 Planning Time: 0.227 ms

runtime_filter has been pushed down to t3 table seqscan, but 'explain analyze' doesn't print them out.

\d t1
                 Table "public.t1"
 Column |  Type   | Collation | Nullable | Default
--------+---------+-----------+----------+---------
 c1     | integer |           |          |
 c2     | integer |           |          |
 c3     | integer |           |          |
 c4     | integer |           |          |
 c5     | integer |           |          |
Checksum: t
Indexes:
    "t1_c2" btree (c2)
Distributed by: (c1)
 \d t3
                 Table "public.t3"
 Column |  Type   | Collation | Nullable | Default
--------+---------+-----------+----------+---------
 c1     | integer |           |          |
 c2     | integer |           |          |
 c3     | integer |           |          |
 c4     | integer |           |          |
 c5     | integer |           |          |
Distributed by: (c1)

Thanks for your test case. Based on these, I rewrote code to ensure that debug info are always displayed even when the number of filtered rows is zero. And add the test case into gp_runtime_filter.sql too.
fix in 98dac6d

@zhangyue-hashdata
Copy link
Contributor Author

zhangyue-hashdata commented Dec 5, 2024

Hi, with gp_enable_runtime_filter_pushdown = on, execute SQL below will get a crash:

gpadmin=# show gp_enable_runtime_filter_pushdown;
 gp_enable_runtime_filter_pushdown
-----------------------------------
 on
(1 row)
CREATE TABLE test_tablesample (dist int, id int, name text) WITH (fillfactor=10) DISTRIBUTED BY (dist);
-- use fillfactor so we don't have to load too much data to get multiple pages

-- Changed the column length in order to match the expected results based on relation's blocksz
INSERT INTO test_tablesample SELECT 0, i, repeat(i::text, 875) FROM generate_series(0, 9) s(i) ORDER BY i;
INSERT INTO test_tablesample SELECT 3, i, repeat(i::text, 875) FROM generate_series(10, 19) s(i) ORDER BY i;
INSERT INTO test_tablesample SELECT 5, i, repeat(i::text, 875) FROM generate_series(20, 29) s(i) ORDER BY i;
EXPLAIN (COSTS OFF)
  SELECT id FROM test_tablesample TABLESAMPLE SYSTEM (50) REPEATABLE (2);
FATAL:  Unexpected internal error (assert.c:48)
DETAIL:  FailedAssertion("IsA(planstate, SeqScanState)", File: "explain.c", Line: 4154)
server closed the connection unexpectedly
        This probably means the server terminated abnormally
        before or while processing the request.
The connection to the server was lost. Attempting reset: Succeeded.
psql (14.4, server 14.4)
image

Thanks for your test case. I fix it in 98dac6d And add the test case into gp_runtime_filter.sql too.

@Smyatkin-Maxim
Copy link

Hi @zhangyue-hashdata
I see that previous runtime filter implementation relies on some cost model at try_runtime_filter(). Do I understand it correctly, that this PR does not do any cost evaluation?
Also for TPC-H/TPC-DS can you provide results for each query separately?

Asking mostly out of curiosity, I see here are quite a few reviewers here already :)

@zhangyue-hashdata
Copy link
Contributor Author

Hi @zhangyue-hashdata I see that previous runtime filter implementation relies on some cost model at try_runtime_filter(). Do I understand it correctly, that this PR does not do any cost evaluation? Also for TPC-H/TPC-DS can you provide results for each query separately?

Asking mostly out of curiosity, I see here are quite a few reviewers here already :)

Basically, you're correct. Because our goal is to filter out as much data as possible right at the point of data generation. However, this will lead to very complex evaluations, so we only made a simple estimation based on rows and work memory when creating the Bloom filter.
Furthermore, I have placed the detailed test results for TPC-DS 10s in PR description.

@zhangyue-hashdata
Copy link
Contributor Author

There are codes changed in MultiExecParallelHash, please add some parallel tests with runtime filter.

fix it in 7ab040a

if (table_scan_getnextslot(scandesc, direction, slot))
while (table_scan_getnextslot(scandesc, direction, slot))
{
if (node->filter_in_seqscan && node->filters &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (!node->filter_in_seqscan ||  !node->filters)
{
     if (table_scan_getnextslot(scandesc, direction, slot))
         return slot;
}
else 
{
     while (table_scan_getnextslot(scandesc, direction, slot))
     {
            .....
     }
}

this make origin path more efficient and readable ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea! I fix in bcf93e6

@yjhjstz
Copy link
Member

yjhjstz commented Dec 20, 2024

from tpcds 10s details table, there are some bad cases.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants