Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: add an upper bound for estimated row count of inner side of index join #41996

Merged
merged 16 commits into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions cmd/explaintest/r/tpch.result
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,9 @@ Projection 10.00 root tpch.lineitem.l_orderkey, Column#35, tpch.orders.o_orderd
│ └─Selection 36870000.00 cop[tikv] lt(tpch.orders.o_orderdate, 1995-03-13 00:00:00.000000)
│ └─TableFullScan 75000000.00 cop[tikv] table:orders keep order:false
└─IndexLookUp(Probe) 91515927.49 root
├─IndexRangeScan(Build) 168388203.74 cop[tikv] table:lineitem, index:PRIMARY(L_ORDERKEY, L_LINENUMBER) range: decided by [eq(tpch.lineitem.l_orderkey, tpch.orders.o_orderkey)], keep order:false
├─IndexRangeScan(Build) 91515927.49 cop[tikv] table:lineitem, index:PRIMARY(L_ORDERKEY, L_LINENUMBER) range: decided by [eq(tpch.lineitem.l_orderkey, tpch.orders.o_orderkey)], keep order:false
└─Selection(Probe) 91515927.49 cop[tikv] gt(tpch.lineitem.l_shipdate, 1995-03-13 00:00:00.000000)
└─TableRowIDScan 168388203.74 cop[tikv] table:lineitem keep order:false
└─TableRowIDScan 91515927.49 cop[tikv] table:lineitem keep order:false
/*
Q4 Order Priority Checking Query
This query determines how well the order priority system is working and gives an assessment of customer satisfaction.
Expand Down Expand Up @@ -298,9 +298,9 @@ Sort 1.00 root tpch.orders.o_orderpriority
│ └─Selection 2925937.50 cop[tikv] ge(tpch.orders.o_orderdate, 1995-01-01 00:00:00.000000), lt(tpch.orders.o_orderdate, 1995-04-01 00:00:00.000000)
│ └─TableFullScan 75000000.00 cop[tikv] table:orders keep order:false
└─IndexLookUp(Probe) 11851908.75 root
├─IndexRangeScan(Build) 14814885.94 cop[tikv] table:lineitem, index:PRIMARY(L_ORDERKEY, L_LINENUMBER) range: decided by [eq(tpch.lineitem.l_orderkey, tpch.orders.o_orderkey)], keep order:false
├─IndexRangeScan(Build) 11851908.75 cop[tikv] table:lineitem, index:PRIMARY(L_ORDERKEY, L_LINENUMBER) range: decided by [eq(tpch.lineitem.l_orderkey, tpch.orders.o_orderkey)], keep order:false
└─Selection(Probe) 11851908.75 cop[tikv] lt(tpch.lineitem.l_commitdate, tpch.lineitem.l_receiptdate)
└─TableRowIDScan 14814885.94 cop[tikv] table:lineitem keep order:false
└─TableRowIDScan 11851908.75 cop[tikv] table:lineitem keep order:false
/*
Q5 Local Supplier Volume Query
This query lists the revenue volume done through local suppliers.
Expand Down Expand Up @@ -672,9 +672,9 @@ Projection 20.00 root tpch.customer.c_custkey, tpch.customer.c_name, Column#39,
│ └─TableReader(Probe) 7500000.00 root data:TableFullScan
│ └─TableFullScan 7500000.00 cop[tikv] table:customer keep order:false
└─IndexLookUp(Probe) 12222016.17 root
├─IndexRangeScan(Build) 49605980.10 cop[tikv] table:lineitem, index:PRIMARY(L_ORDERKEY, L_LINENUMBER) range: decided by [eq(tpch.lineitem.l_orderkey, tpch.orders.o_orderkey)], keep order:false
├─IndexRangeScan(Build) 12222016.17 cop[tikv] table:lineitem, index:PRIMARY(L_ORDERKEY, L_LINENUMBER) range: decided by [eq(tpch.lineitem.l_orderkey, tpch.orders.o_orderkey)], keep order:false
└─Selection(Probe) 12222016.17 cop[tikv] eq(tpch.lineitem.l_returnflag, "R")
└─TableRowIDScan 49605980.10 cop[tikv] table:lineitem keep order:false
└─TableRowIDScan 12222016.17 cop[tikv] table:lineitem keep order:false
/*
Q11 Important Stock Identification Query
This query finds the most important subset of suppliers' stock in a given nation.
Expand Down Expand Up @@ -1239,9 +1239,9 @@ Projection 100.00 root tpch.supplier.s_name, Column#72
│ ├─IndexRangeScan(Build) 49550432.16 cop[tikv] table:l2, index:PRIMARY(L_ORDERKEY, L_LINENUMBER) range: decided by [eq(tpch.lineitem.l_orderkey, tpch.lineitem.l_orderkey)], keep order:false
│ └─TableRowIDScan(Probe) 49550432.16 cop[tikv] table:l2 keep order:false
└─IndexLookUp(Probe) 39640345.73 root
├─IndexRangeScan(Build) 49550432.16 cop[tikv] table:l3, index:PRIMARY(L_ORDERKEY, L_LINENUMBER) range: decided by [eq(tpch.lineitem.l_orderkey, tpch.lineitem.l_orderkey)], keep order:false
├─IndexRangeScan(Build) 39640345.73 cop[tikv] table:l3, index:PRIMARY(L_ORDERKEY, L_LINENUMBER) range: decided by [eq(tpch.lineitem.l_orderkey, tpch.lineitem.l_orderkey)], keep order:false
└─Selection(Probe) 39640345.73 cop[tikv] gt(tpch.lineitem.l_receiptdate, tpch.lineitem.l_commitdate)
└─TableRowIDScan 49550432.16 cop[tikv] table:l3 keep order:false
└─TableRowIDScan 39640345.73 cop[tikv] table:l3 keep order:false
/*
Q22 Global Sales Opportunity Query
The Global Sales Opportunity Query identifies geographies where there are customers who may be likely to make a
Expand Down
90 changes: 87 additions & 3 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -947,7 +947,7 @@ func (p *LogicalJoin) buildIndexJoinInner2IndexScan(
maxOneRow = ok && (sf.FuncName.L == ast.EQ)
}
}
innerTask := p.constructInnerIndexScanTask(wrapper, helper.chosenPath, helper.chosenRanges.Range(), helper.chosenRemained, outerJoinKeys, rangeInfo, false, false, avgInnerRowCnt, maxOneRow)
innerTask := p.constructInnerIndexScanTask(wrapper, helper.chosenPath, helper.chosenRanges.Range(), helper.chosenRemained, innerJoinKeys, rangeInfo, false, false, avgInnerRowCnt, maxOneRow)
failpoint.Inject("MockOnlyEnableIndexHashJoin", func(val failpoint.Value) {
if val.(bool) && !p.ctx.GetSessionVars().InRestrictedSQL {
failpoint.Return(p.constructIndexHashJoin(prop, outerIdx, innerTask, helper.chosenRanges, keyOff2IdxOff, helper.chosenPath, helper.lastColManager))
Expand All @@ -962,7 +962,7 @@ func (p *LogicalJoin) buildIndexJoinInner2IndexScan(
// Because we can't keep order for union scan, if there is a union scan in inner task,
// we can't construct index merge join.
if us == nil {
innerTask2 := p.constructInnerIndexScanTask(wrapper, helper.chosenPath, helper.chosenRanges.Range(), helper.chosenRemained, outerJoinKeys, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt, maxOneRow)
innerTask2 := p.constructInnerIndexScanTask(wrapper, helper.chosenPath, helper.chosenRanges.Range(), helper.chosenRemained, innerJoinKeys, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt, maxOneRow)
if innerTask2 != nil {
joins = append(joins, p.constructIndexMergeJoin(prop, outerIdx, innerTask2, helper.chosenRanges, keyOff2IdxOff, helper.chosenPath, helper.lastColManager)...)
}
Expand Down Expand Up @@ -1146,13 +1146,76 @@ func (p *LogicalJoin) constructInnerUnionScan(us *LogicalUnionScan, reader Physi
return physicalUnionScan
}

func getColsNDVLowerBoundFromHistColl(cols []*expression.Column, histColl *statistics.HistColl) int64 {
if len(cols) == 0 || histColl == nil {
return -1
}
colUIDs := make([]int64, len(cols))
for i, col := range cols {
colUIDs[i] = col.UniqueID
}

// Note that we don't need to specially handle prefix index in this function, because the NDV of a prefix index is
// equal or less than the corresponding normal index, and that's safe here since we want a lower bound.

// 1. Try to get NDV from column stats if it's a single column.
if len(colUIDs) == 1 && histColl.Columns != nil {
uid := colUIDs[0]
if colStats, ok := histColl.Columns[uid]; ok && colStats != nil {
return colStats.NDV
}
}

slices.Sort(colUIDs)
if histColl.Indices == nil || histColl.Idx2ColumnIDs == nil {
return -1
}

// 2. Try to get NDV from index stats.
for idxID, idxCols := range histColl.Idx2ColumnIDs {
if len(idxCols) != len(colUIDs) {
continue
}
orderedIdxCols := make([]int64, len(idxCols))
copy(orderedIdxCols, idxCols)
slices.Sort(orderedIdxCols)
if !slices.Equal(idxCols, colUIDs) {
continue
}
if idxStats, ok := histColl.Indices[idxID]; ok && idxStats != nil {
return idxStats.NDV
}
}

// TODO: if there's an index that contains the expected columns, we can also make use of its NDV.
// For example, NDV(a,b,c) / NDV(c) is a safe lower bound of NDV(a,b).

// 3. If we still haven't got an NDV, we use the minimal NDV in the column stats as a lower bound.
Copy link
Contributor

@AilinKid AilinKid Mar 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer a more case 2.5, once the index prefix columns satisfied the keys,we can use the index‘s DNV / the abscent column’s DNV (thought they are completely independent distributions, and result after division is quite small than the real value, leading a higher row count estimations(but not that highest from direct dividing lowest col-ndv) is which we wanna get from here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a clever one!
It's not that obvious but indeed a correct lower bound for the NDV. And I think it doesn't need to be prefix columns, it's enough if the index contains the columns we want.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if we add this, there needs to be a strategy for choosing from multiple possible indexes, and we need to make sure the result is stable, we also need to care about some edge cases like prefix index and generated columns, and we also need to construct corresponding test cases, that's too much for this sprint.
Besides, this PR would be cherry-picked for a customer, safety is more important, and less change is preferred.
So I tend to add a TODO comment here and leave it to the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a clever one! It's not that obvious but indeed a correct lower bound for the NDV. And I think it doesn't need to be prefix columns, it's enough if the index contains the columns we want.

make sense,subset of index column is enough

// This would happen when len(cols) > 1 and no proper index stats are available.
minNDV := int64(-1)
for _, colStats := range histColl.Columns {
if colStats == nil || colStats.Info == nil {
continue
}
col := colStats.Info
if col.IsGenerated() && !col.GeneratedStored {
continue
}
if (colStats.NDV > 0 && minNDV <= 0) ||
colStats.NDV < minNDV {
minNDV = colStats.NDV
}
}
return minNDV
}

// constructInnerIndexScanTask is specially used to construct the inner plan for PhysicalIndexJoin.
func (p *LogicalJoin) constructInnerIndexScanTask(
wrapper *indexJoinInnerChildWrapper,
path *util.AccessPath,
ranges ranger.Ranges,
filterConds []expression.Expression,
_ []*expression.Column,
innerJoinKeys []*expression.Column,
rangeInfo string,
keepOrder bool,
desc bool,
Expand Down Expand Up @@ -1239,6 +1302,21 @@ func (p *LogicalJoin) constructInnerIndexScanTask(
}
is.initSchema(append(path.FullIdxCols, ds.commonHandleCols...), cop.tablePlan != nil)
indexConds, tblConds := ds.splitIndexFilterConditions(filterConds, path.FullIdxCols, path.FullIdxColLens)

// Because we are estimating an average row count of the inner side corresponding to each row from the outer side,
// the estimated row count of the IndexScan should be no larger than (total row count / NDV of join key columns).
// We use it as an upper bound here.
rowCountUpperBound := -1.0
if ds.tableStats != nil {
joinKeyNDV := getColsNDVLowerBoundFromHistColl(innerJoinKeys, ds.tableStats.HistColl)
if joinKeyNDV > 0 {
rowCountUpperBound = ds.tableStats.RowCount / float64(joinKeyNDV)
}
}

if rowCountUpperBound > 0 {
rowCount = math.Min(rowCount, rowCountUpperBound)
}
if maxOneRow {
// Theoretically, this line is unnecessary because row count estimation of join should guarantee rowCount is not larger
// than 1.0; however, there may be rowCount larger than 1.0 in reality, e.g, pseudo statistics cases, which does not reflect
Expand All @@ -1261,6 +1339,9 @@ func (p *LogicalJoin) constructInnerIndexScanTask(
// rowCount is computed from result row count of join, which has already accounted the filters on DataSource,
// i.e, rowCount equals to `countAfterIndex * selectivity`.
cnt := rowCount / selectivity
if rowCountUpperBound > 0 {
cnt = math.Min(cnt, rowCountUpperBound)
}
if maxOneRow {
cnt = math.Min(cnt, 1.0)
}
Expand All @@ -1274,6 +1355,9 @@ func (p *LogicalJoin) constructInnerIndexScanTask(
selectivity = SelectionFactor
}
cnt := tmpPath.CountAfterIndex / selectivity
if rowCountUpperBound > 0 {
cnt = math.Min(cnt, rowCountUpperBound)
}
if maxOneRow {
cnt = math.Min(cnt, 1.0)
}
Expand Down
50 changes: 50 additions & 0 deletions statistics/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ import (

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/testkit/testdata"
"github.com/pingcap/tidb/types"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -743,3 +745,51 @@ func TestUpdateNotLoadIndexFMSketch(t *testing.T) {
require.Nil(t, h.GetPartitionStats(tblInfo, p0.ID).Indices[idxInfo.ID].FMSketch)
require.Nil(t, h.GetPartitionStats(tblInfo, p1.ID).Indices[idxInfo.ID].FMSketch)
}

func TestIndexJoinInnerRowCountUpperBound(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
testKit := testkit.NewTestKit(t, store)
h := dom.StatsHandle()

testKit.MustExec("use test")
testKit.MustExec("drop table if exists t")
testKit.MustExec("create table t(a int, b int, index idx(b))")
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
is := dom.InfoSchema()
tb, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
tblInfo := tb.Meta()

// Mock the stats:
// The two columns are the same.
// From 0 to 499, each value has 1000 rows. Therefore, NDV is 500 and total row count is 500000.
mockStatsTbl := mockStatsTable(tblInfo, 500000)
colValues, err := generateIntDatum(1, 500)
require.NoError(t, err)
for i := 1; i <= 2; i++ {
mockStatsTbl.Columns[int64(i)] = &statistics.Column{
Count: 500000,
Histogram: *mockStatsHistogram(int64(i), colValues, 1000, types.NewFieldType(mysql.TypeLonglong)),
Info: tblInfo.Columns[i-1],
StatsLoadedStatus: statistics.NewStatsFullLoadStatus(),
StatsVer: 2,
}
}
generateMapsForMockStatsTbl(mockStatsTbl)
stat := h.GetTableStats(tblInfo)
stat.HistColl = mockStatsTbl.HistColl

testKit.MustQuery("explain format = 'brief' " +
"select /*+ inl_join(t2) */ * from (select * from t where t.a < 1) as t1 join t t2 where t2.a = 0 and t1.a = t2.b").
Check(testkit.Rows(
"IndexJoin 1000000.00 root inner join, inner:IndexLookUp, outer key:test.t.a, inner key:test.t.b, equal cond:eq(test.t.a, test.t.b)",
"├─TableReader(Build) 1000.00 root data:Selection",
"│ └─Selection 1000.00 cop[tikv] lt(test.t.a, 1), not(isnull(test.t.a))",
"│ └─TableFullScan 500000.00 cop[tikv] table:t keep order:false, stats:pseudo",
"└─IndexLookUp(Probe) 1000000.00 root ",
" ├─Selection(Build) 1000000.00 cop[tikv] not(isnull(test.t.b))",
" │ └─IndexRangeScan 1000000.00 cop[tikv] table:t2, index:idx(b) range: decided by [eq(test.t.b, test.t.a)], keep order:false, stats:pseudo",
" └─Selection(Probe) 1000000.00 cop[tikv] eq(test.t.a, 0)",
" └─TableRowIDScan 1000000.00 cop[tikv] table:t2 keep order:false, stats:pseudo",
))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a comparison, on current master branch, the execution plan is:

IndexJoin 1000000.00 root  inner join, inner:IndexLookUp, outer key:test.t.a, inner key:test.t.b, equal cond:eq(test.t.a, test.t.b)
├─TableReader(Build) 1000.00 root  data:Selection
│ └─Selection 1000.00 cop[tikv]  lt(test.t.a, 1), not(isnull(test.t.a))
│   └─TableFullScan 500000.00 cop[tikv] table:t keep order:false, stats:pseudo
└─IndexLookUp(Probe) 1000000.00 root  
  ├─Selection(Build) 500000000.00 cop[tikv]  not(isnull(test.t.b))
  │ └─IndexRangeScan 500000000.00 cop[tikv] table:t2, index:idx(b) range: decided by [eq(test.t.b, test.t.a)], keep order:false, stats:pseudo
  └─Selection(Probe) 1000000.00 cop[tikv]  eq(test.t.a, 0)
    └─TableRowIDScan 500000000.00 cop[tikv] table:t2 keep order:false, stats:pseudo

}