Unverified Commit e9b7f777 authored by crazycs's avatar crazycs Committed by GitHub
Browse files

topsql: add more test to check resource tag for each RPC request (#33623)

close pingcap/tidb#33208
parent 8b5150e5
Showing with 314 additions and 41 deletions
+314 -41
......@@ -709,7 +709,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba
// recordIterFunc is used for low-level record iteration.
type recordIterFunc func(h kv.Handle, rowKey kv.Key, rawRecord []byte) (more bool, err error)
func iterateSnapshotRows(ctx *jobContext, store kv.Storage, priority int, t table.Table, version uint64,
func iterateSnapshotRows(ctx *JobContext, store kv.Storage, priority int, t table.Table, version uint64,
startKey kv.Key, endKey kv.Key, fn recordIterFunc) error {
var firstKey kv.Key
if startKey == nil {
......
......@@ -1043,7 +1043,7 @@ func (w *worker) doModifyColumnTypeWithData(
func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table,
oldCol, changingCol *model.ColumnInfo, changingIdxs []*model.IndexInfo) (done bool, ver int64, err error) {
reorgInfo, err := getReorgInfo(w.jobContext, d, t, job, tbl, BuildElements(changingCol, changingIdxs))
reorgInfo, err := getReorgInfo(w.JobContext, d, t, job, tbl, BuildElements(changingCol, changingIdxs))
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
......@@ -1266,7 +1266,7 @@ func (w *worker) updateColumnAndIndexes(t table.Table, oldCol, col *model.Column
if err != nil {
return errors.Trace(err)
}
originalStartHandle, originalEndHandle, err := getTableRange(w.jobContext, reorgInfo.d, t.(table.PhysicalTable), currentVer.Ver, reorgInfo.Job.Priority)
originalStartHandle, originalEndHandle, err := getTableRange(w.JobContext, reorgInfo.d, t.(table.PhysicalTable), currentVer.Ver, reorgInfo.Job.Priority)
if err != nil {
return errors.Trace(err)
}
......@@ -1373,7 +1373,7 @@ func (w *updateColumnWorker) fetchRowColVals(txn kv.Transaction, taskRange reorg
taskDone := false
var lastAccessedHandle kv.Key
oprStartTime := startTime
err := iterateSnapshotRows(w.ddlWorker.jobContext, w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startKey, taskRange.endKey,
err := iterateSnapshotRows(w.ddlWorker.JobContext, w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startKey, taskRange.endKey,
func(handle kv.Handle, recordKey kv.Key, rawRow []byte) (bool, error) {
oprEndTime := time.Now()
logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotRows in updateColumnWorker fetchRowColVals", 0)
......
......@@ -173,7 +173,7 @@ type DDL interface {
// GetID gets the ddl ID.
GetID() string
// GetTableMaxHandle gets the max row ID of a normal table or a partition.
GetTableMaxHandle(startTS uint64, tbl table.PhysicalTable) (kv.Handle, bool, error)
GetTableMaxHandle(ctx *JobContext, startTS uint64, tbl table.PhysicalTable) (kv.Handle, bool, error)
// SetBinlogClient sets the binlog client for DDL worker. It's exported for testing.
SetBinlogClient(*pumpcli.PumpsClient)
// GetHook gets the hook. It's exported for testing.
......
......@@ -97,11 +97,11 @@ type worker struct {
lockSeqNum bool
*ddlCtx
*jobContext
*JobContext
}
// jobContext is the ddl job execution context.
type jobContext struct {
// JobContext is the ddl job execution context.
type JobContext struct {
// below fields are cache for top sql
ddlJobCtx context.Context
cacheSQL string
......@@ -109,18 +109,23 @@ type jobContext struct {
cacheDigest *parser.Digest
}
// NewJobContext returns a new ddl job context.
func NewJobContext() *JobContext {
return &JobContext{
ddlJobCtx: context.Background(),
cacheSQL: "",
cacheNormalizedSQL: "",
cacheDigest: nil,
}
}
func newWorker(ctx context.Context, tp workerType, sessPool *sessionPool, delRangeMgr delRangeManager, dCtx *ddlCtx) *worker {
worker := &worker{
id: atomic.AddInt32(&ddlWorkerID, 1),
tp: tp,
ddlJobCh: make(chan struct{}, 1),
ctx: ctx,
jobContext: &jobContext{
ddlJobCtx: context.Background(),
cacheSQL: "",
cacheNormalizedSQL: "",
cacheDigest: nil,
},
id: atomic.AddInt32(&ddlWorkerID, 1),
tp: tp,
ddlJobCh: make(chan struct{}, 1),
ctx: ctx,
JobContext: NewJobContext(),
ddlCtx: dCtx,
reorgCtx: &reorgCtx{notifyCancelReorgJob: 0},
sessPool: sessPool,
......@@ -475,7 +480,7 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
updateRawArgs = false
}
w.writeDDLSeqNum(job)
w.jobContext.resetWhenJobFinish()
w.JobContext.resetWhenJobFinish()
err = t.AddHistoryDDLJob(job, updateRawArgs)
return errors.Trace(err)
}
......@@ -528,7 +533,7 @@ func newMetaWithQueueTp(txn kv.Transaction, tp workerType) *meta.Meta {
return meta.NewMeta(txn)
}
func (w *jobContext) setDDLLabelForTopSQL(job *model.Job) {
func (w *JobContext) setDDLLabelForTopSQL(job *model.Job) {
if !topsqlstate.TopSQLEnabled() || job == nil {
return
}
......@@ -542,7 +547,7 @@ func (w *jobContext) setDDLLabelForTopSQL(job *model.Job) {
}
}
func (w *jobContext) getResourceGroupTaggerForTopSQL() tikvrpc.ResourceGroupTagger {
func (w *JobContext) getResourceGroupTaggerForTopSQL() tikvrpc.ResourceGroupTagger {
if !topsqlstate.TopSQLEnabled() || w.cacheDigest == nil {
return nil
}
......@@ -555,7 +560,7 @@ func (w *jobContext) getResourceGroupTaggerForTopSQL() tikvrpc.ResourceGroupTagg
return tagger
}
func (w *jobContext) resetWhenJobFinish() {
func (w *JobContext) resetWhenJobFinish() {
w.ddlJobCtx = context.Background()
w.cacheSQL = ""
w.cacheDigest = nil
......
......@@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
"go.uber.org/zap"
)
......@@ -179,6 +180,10 @@ func (dr *delRange) doTask(ctx sessionctx.Context, r util.DelRangeTask) error {
finish := true
dr.keys = dr.keys[:0]
err := kv.RunInNewTxn(context.Background(), dr.store, false, func(ctx context.Context, txn kv.Transaction) error {
if topsqlstate.TopSQLEnabled() {
// Only when TiDB run without PD(use unistore as storage for test) will run into here, so just set a mock internal resource tagger.
txn.SetOption(kv.ResourceGroupTagger, util.GetInternalResourceGroupTaggerForTopSQL())
}
iter, err := txn.Iter(oldStartKey, r.EndKey)
if err != nil {
return errors.Trace(err)
......
......@@ -587,7 +587,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
tbl table.Table, indexInfo *model.IndexInfo) (done bool, ver int64, err error) {
elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}}
reorgInfo, err := getReorgInfo(w.jobContext, d, t, job, tbl, elements)
reorgInfo, err := getReorgInfo(w.JobContext, d, t, job, tbl, elements)
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
......@@ -1151,7 +1151,7 @@ func (w *baseIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgBac
// taskDone means that the reorged handle is out of taskRange.endHandle.
taskDone := false
oprStartTime := startTime
err := iterateSnapshotRows(w.ddlWorker.jobContext, w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startKey, taskRange.endKey,
err := iterateSnapshotRows(w.ddlWorker.JobContext, w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startKey, taskRange.endKey,
func(handle kv.Handle, recordKey kv.Key, rawRow []byte) (bool, error) {
oprEndTime := time.Now()
logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotRows in baseIndexWorker fetchRowColVals", 0)
......@@ -1420,7 +1420,7 @@ func (w *worker) updateReorgInfo(t table.PartitionedTable, reorg *reorgInfo) (bo
if err != nil {
return false, errors.Trace(err)
}
start, end, err := getTableRange(w.jobContext, reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority)
start, end, err := getTableRange(w.JobContext, reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority)
if err != nil {
return false, errors.Trace(err)
}
......@@ -1604,7 +1604,7 @@ func (w *worker) updateReorgInfoForPartitions(t table.PartitionedTable, reorg *r
if err != nil {
return false, errors.Trace(err)
}
start, end, err := getTableRange(w.jobContext, reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority)
start, end, err := getTableRange(w.JobContext, reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority)
if err != nil {
return false, errors.Trace(err)
}
......
......@@ -1124,7 +1124,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
elements = append(elements, &meta.Element{ID: idxInfo.ID, TypeKey: meta.IndexElementKey})
}
}
reorgInfo, err := getReorgInfoFromPartitions(w.jobContext, d, t, job, tbl, physicalTableIDs, elements)
reorgInfo, err := getReorgInfoFromPartitions(w.JobContext, d, t, job, tbl, physicalTableIDs, elements)
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
......
......@@ -40,7 +40,7 @@ import (
func getTableMaxHandle(t *testing.T, d ddl.DDL, tbl table.Table, store kv.Storage) (kv.Handle, bool) {
ver, err := store.CurrentVersion(kv.GlobalTxnScope)
require.NoError(t, err)
maxHandle, emptyTable, err := d.GetTableMaxHandle(ver.Ver, tbl.(table.PhysicalTable))
maxHandle, emptyTable, err := d.GetTableMaxHandle(ddl.NewJobContext(), ver.Ver, tbl.(table.PhysicalTable))
require.NoError(t, err)
return maxHandle, emptyTable
}
......
......@@ -438,7 +438,7 @@ func getColumnsTypes(columns []*model.ColumnInfo) []*types.FieldType {
}
// buildDescTableScan builds a desc table scan upon tblInfo.
func (dc *ddlCtx) buildDescTableScan(ctx context.Context, startTS uint64, tbl table.PhysicalTable,
func (dc *ddlCtx) buildDescTableScan(ctx *JobContext, startTS uint64, tbl table.PhysicalTable,
handleCols []*model.ColumnInfo, limit uint64) (distsql.SelectResult, error) {
sctx := newContext(dc.store)
dagPB, err := buildDescTableScanDAG(sctx, tbl, handleCols, limit)
......@@ -459,6 +459,7 @@ func (dc *ddlCtx) buildDescTableScan(ctx context.Context, startTS uint64, tbl ta
SetKeepOrder(true).
SetConcurrency(1).SetDesc(true)
builder.Request.ResourceGroupTagger = ctx.getResourceGroupTaggerForTopSQL()
builder.Request.NotFillCache = true
builder.Request.Priority = kv.PriorityLow
......@@ -467,7 +468,7 @@ func (dc *ddlCtx) buildDescTableScan(ctx context.Context, startTS uint64, tbl ta
return nil, errors.Trace(err)
}
result, err := distsql.Select(ctx, sctx, kvReq, getColumnsTypes(handleCols), statistics.NewQueryFeedback(0, nil, 0, false))
result, err := distsql.Select(ctx.ddlJobCtx, sctx, kvReq, getColumnsTypes(handleCols), statistics.NewQueryFeedback(0, nil, 0, false))
if err != nil {
return nil, errors.Trace(err)
}
......@@ -475,7 +476,7 @@ func (dc *ddlCtx) buildDescTableScan(ctx context.Context, startTS uint64, tbl ta
}
// GetTableMaxHandle gets the max handle of a PhysicalTable.
func (dc *ddlCtx) GetTableMaxHandle(startTS uint64, tbl table.PhysicalTable) (maxHandle kv.Handle, emptyTable bool, err error) {
func (dc *ddlCtx) GetTableMaxHandle(ctx *JobContext, startTS uint64, tbl table.PhysicalTable) (maxHandle kv.Handle, emptyTable bool, err error) {
var handleCols []*model.ColumnInfo
var pkIdx *model.IndexInfo
tblInfo := tbl.Meta()
......@@ -497,7 +498,6 @@ func (dc *ddlCtx) GetTableMaxHandle(startTS uint64, tbl table.PhysicalTable) (ma
handleCols = []*model.ColumnInfo{model.NewExtraHandleColInfo()}
}
ctx := context.Background()
// build a desc scan of tblInfo, which limit is 1, we can use it to retrieve the last handle of the table.
result, err := dc.buildDescTableScan(ctx, startTS, tbl, handleCols, 1)
if err != nil {
......@@ -506,7 +506,7 @@ func (dc *ddlCtx) GetTableMaxHandle(startTS uint64, tbl table.PhysicalTable) (ma
defer terror.Call(result.Close)
chk := chunk.New(getColumnsTypes(handleCols), 1, 1)
err = result.Next(ctx, chk)
err = result.Next(ctx.ddlJobCtx, chk)
if err != nil {
return nil, false, errors.Trace(err)
}
......@@ -542,7 +542,7 @@ func buildCommonHandleFromChunkRow(sctx *stmtctx.StatementContext, tblInfo *mode
}
// getTableRange gets the start and end handle of a table (or partition).
func getTableRange(ctx *jobContext, d *ddlCtx, tbl table.PhysicalTable, snapshotVer uint64, priority int) (startHandleKey, endHandleKey kv.Key, err error) {
func getTableRange(ctx *JobContext, d *ddlCtx, tbl table.PhysicalTable, snapshotVer uint64, priority int) (startHandleKey, endHandleKey kv.Key, err error) {
// Get the start handle of this partition.
err = iterateSnapshotRows(ctx, d.store, priority, tbl, snapshotVer, nil, nil,
func(h kv.Handle, rowKey kv.Key, rawRecord []byte) (bool, error) {
......@@ -552,7 +552,7 @@ func getTableRange(ctx *jobContext, d *ddlCtx, tbl table.PhysicalTable, snapshot
if err != nil {
return startHandleKey, endHandleKey, errors.Trace(err)
}
maxHandle, isEmptyTable, err := d.GetTableMaxHandle(snapshotVer, tbl)
maxHandle, isEmptyTable, err := d.GetTableMaxHandle(ctx, snapshotVer, tbl)
if err != nil {
return startHandleKey, nil, errors.Trace(err)
}
......@@ -579,7 +579,7 @@ func getValidCurrentVersion(store kv.Storage) (ver kv.Version, err error) {
return ver, nil
}
func getReorgInfo(ctx *jobContext, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, elements []*meta.Element) (*reorgInfo, error) {
func getReorgInfo(ctx *JobContext, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, elements []*meta.Element) (*reorgInfo, error) {
var (
element *meta.Element
start kv.Key
......@@ -671,7 +671,7 @@ func getReorgInfo(ctx *jobContext, d *ddlCtx, t *meta.Meta, job *model.Job, tbl
return &info, nil
}
func getReorgInfoFromPartitions(ctx *jobContext, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, partitionIDs []int64, elements []*meta.Element) (*reorgInfo, error) {
func getReorgInfoFromPartitions(ctx *JobContext, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, partitionIDs []int64, elements []*meta.Element) (*reorgInfo, error) {
var (
element *meta.Element
start kv.Key
......
......@@ -143,7 +143,7 @@ func TestReorg(t *testing.T) {
require.NoError(t, err)
m = meta.NewMeta(txn)
info, err1 := getReorgInfo(&jobContext{}, d.ddlCtx, m, job, mockTbl, nil)
info, err1 := getReorgInfo(NewJobContext(), d.ddlCtx, m, job, mockTbl, nil)
require.NoError(t, err1)
require.Equal(t, info.StartKey, kv.Key(handle.Encoded()))
require.Equal(t, info.currElement, e)
......@@ -174,7 +174,7 @@ func TestReorg(t *testing.T) {
err = kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
var err1 error
_, err1 = getReorgInfo(&jobContext{}, d.ddlCtx, m, job, mockTbl, []*meta.Element{element})
_, err1 = getReorgInfo(NewJobContext(), d.ddlCtx, m, job, mockTbl, []*meta.Element{element})
require.True(t, meta.ErrDDLReorgElementNotExist.Equal(err1))
require.Equal(t, job.SnapshotVer, uint64(0))
return nil
......@@ -185,7 +185,7 @@ func TestReorg(t *testing.T) {
require.NoError(t, err)
err = kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
info1, err1 := getReorgInfo(&jobContext{}, d.ddlCtx, m, job, mockTbl, []*meta.Element{element})
info1, err1 := getReorgInfo(NewJobContext(), d.ddlCtx, m, job, mockTbl, []*meta.Element{element})
require.NoError(t, err1)
require.Equal(t, info1.currElement, info.currElement)
require.Equal(t, info1.StartKey, info.StartKey)
......
......@@ -15,6 +15,7 @@
package util
import (
"bytes"
"context"
"encoding/hex"
"strings"
......@@ -27,6 +28,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/tikv/client-go/v2/tikvrpc"
atomicutil "go.uber.org/atomic"
)
......@@ -234,3 +236,18 @@ func EmulatorGCDisable() {
func IsEmulatorGCEnable() bool {
return emulatorGCEnable.Load() == 1
}
var internalResourceGroupTag = []byte{0}
// GetInternalResourceGroupTaggerForTopSQL only use for testing.
func GetInternalResourceGroupTaggerForTopSQL() tikvrpc.ResourceGroupTagger {
tagger := func(req *tikvrpc.Request) {
req.ResourceGroupTag = internalResourceGroupTag
}
return tagger
}
// IsInternalResourceGroupTaggerForTopSQL use for testing.
func IsInternalResourceGroupTaggerForTopSQL(tag []byte) bool {
return bytes.Equal(tag, internalResourceGroupTag)
}
......@@ -1064,6 +1064,9 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
if err != nil {
return err
}
sessVars := e.ctx.GetSessionVars()
setResourceGroupTaggerForTxn(sessVars.StmtCtx, txn)
setRPCInterceptorOfExecCounterForTxn(sessVars, txn)
if e.collectRuntimeStatsEnabled() {
if snapshot := txn.GetSnapshot(); snapshot != nil {
snapshot.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
......
......@@ -23,7 +23,9 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore/unistore"
"github.com/pingcap/tidb/util/testbridge"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/goleak"
)
......@@ -32,6 +34,10 @@ func TestMain(m *testing.M) {
testbridge.SetupForCommonTest()
RunInGoTest = true // flag for NewServer to known it is running in test environment
// Enable TopSQL for all test, and check the resource tag for each RPC request.
// This is used to detect which codes are not tracked by TopSQL.
topsqlstate.EnableTopSQL()
unistore.CheckResourceTagForTopSQLInGoTest = true
// AsyncCommit will make DDL wait 2.5s before changing to the next state.
// Set schema lease to avoid it from making CI slow.
......
......@@ -39,6 +39,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
ddlutil "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser"
......@@ -1911,6 +1912,9 @@ func (c *resourceTagChecker) checkExist(t *testing.T, digest stmtstats.BinaryDig
}
func (c *resourceTagChecker) checkReqExist(t *testing.T, digest stmtstats.BinaryDigest, sqlStr string, reqs ...tikvrpc.CmdType) {
if len(reqs) == 0 {
return
}
c.Lock()
defer c.Unlock()
reqMap, ok := c.sqlDigest2Reqs[digest]
......@@ -1971,7 +1975,8 @@ func setupForTestTopSQLStatementStats(t *testing.T) (*tidbTestSuite, stmtstats.S
}
unistore.UnistoreRPCClientSendHook = func(req *tikvrpc.Request) {
tag := req.GetResourceGroupTag()
if len(tag) == 0 {
if len(tag) == 0 || ddlutil.IsInternalResourceGroupTaggerForTopSQL(tag) {
// Ignore for internal background request.
return
}
sqlDigest, err := resourcegrouptag.DecodeResourceGroupTag(tag)
......@@ -2265,6 +2270,112 @@ func TestTopSQLStatementStats4(t *testing.T) {
}
}
func TestTopSQLResourceTag(t *testing.T) {
ts, _, tagChecker, _, cleanFn := setupForTestTopSQLStatementStats(t)
defer func() {
topsqlstate.DisableTopSQL()
cleanFn()
}()
loadDataFile, err := os.CreateTemp("", "load_data_test0.csv")
require.NoError(t, err)
defer func() {
path := loadDataFile.Name()
err = loadDataFile.Close()
require.NoError(t, err)
err = os.Remove(path)
require.NoError(t, err)
}()
_, err = loadDataFile.WriteString(
"31 31\n" +
"32 32\n" +
"33 33\n")
require.NoError(t, err)
// Test case for other statements
cases := []struct {
sql string
isQuery bool
reqs []tikvrpc.CmdType
}{
// Test for curd.
{"insert into t values (1,1), (3,3)", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit}},
{"insert into t values (1,2) on duplicate key update a = 2", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit, tikvrpc.CmdBatchGet}},
{"update t set b=b+1 where a=3", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit, tikvrpc.CmdGet}},
{"update t set b=b+1 where a>1", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit, tikvrpc.CmdCop}},
{"delete from t where a=3", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit, tikvrpc.CmdGet}},
{"delete from t where a>1", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit, tikvrpc.CmdCop}},
{"insert ignore into t values (2,2), (3,3)", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit, tikvrpc.CmdBatchGet}},
{"select * from t where a in (1,2,3,4)", true, []tikvrpc.CmdType{tikvrpc.CmdBatchGet}},
{"select * from t where a = 1", true, []tikvrpc.CmdType{tikvrpc.CmdGet}},
{"select * from t where b > 0", true, []tikvrpc.CmdType{tikvrpc.CmdCop}},
{"replace into t values (2,2), (4,4)", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit, tikvrpc.CmdBatchGet}},
// Test for DDL
{"create database test_db0", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit}},
{"create table test_db0.test_t0 (a int, b int, index idx(a))", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit}},
{"create table test_db0.test_t1 (a int, b int, index idx(a))", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit}},
{"alter table test_db0.test_t0 add column c int", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit}},
{"drop table test_db0.test_t0", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit}},
{"drop database test_db0", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit}},
{"alter table t modify column b double", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit, tikvrpc.CmdScan, tikvrpc.CmdCop}},
{"alter table t add index idx2 (b,a)", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit, tikvrpc.CmdScan, tikvrpc.CmdCop}},
{"alter table t drop index idx2", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit}},
// Test for transaction
{"begin", false, nil},
{"insert into t2 values (10,10), (11,11)", false, nil},
{"insert ignore into t2 values (20,20), (21,21)", false, []tikvrpc.CmdType{tikvrpc.CmdBatchGet}},
{"commit", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit}},
// Test for other statements.
{"set @@global.tidb_enable_1pc = 1", false, nil},
{fmt.Sprintf("load data local infile %q into table t2", loadDataFile.Name()), false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit, tikvrpc.CmdBatchGet}},
}
internalCases := []struct {
sql string
reqs []tikvrpc.CmdType
}{
{"replace into mysql.global_variables (variable_name,variable_value) values ('tidb_enable_1pc', '1')", []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit, tikvrpc.CmdBatchGet}},
}
executeCaseFn := func(execFn func(db *sql.DB)) {
dsn := ts.getDSN(func(config *mysql.Config) {
config.AllowAllFiles = true
config.Params["sql_mode"] = "''"
})
db, err := sql.Open("mysql", dsn)
require.NoError(t, err)
dbt := testkit.NewDBTestKit(t, db)
dbt.MustExec("use stmtstats;")
require.NoError(t, err)
execFn(db)
err = db.Close()
require.NoError(t, err)
}
execFn := func(db *sql.DB) {
dbt := testkit.NewDBTestKit(t, db)
for _, ca := range cases {
if ca.isQuery {
mustQuery(t, dbt, ca.sql)
} else {
dbt.MustExec(ca.sql)
}
}
}
executeCaseFn(execFn)
for _, ca := range cases {
_, digest := parser.NormalizeDigest(ca.sql)
tagChecker.checkReqExist(t, stmtstats.BinaryDigest(digest.Bytes()), ca.sql, ca.reqs...)
}
for _, ca := range internalCases {
_, digest := parser.NormalizeDigest(ca.sql)
tagChecker.checkReqExist(t, stmtstats.BinaryDigest(digest.Bytes()), ca.sql, ca.reqs...)
}
}
func (ts *tidbTestTopSQLSuite) loopExec(ctx context.Context, t *testing.T, fn func(db *sql.DB)) {
db, err := sql.Open("mysql", ts.getDSN())
require.NoError(t, err, "Error connecting")
......
......@@ -53,6 +53,9 @@ type RPCClient struct {
closed int32
}
// CheckResourceTagForTopSQLInGoTest is used to identify whether check resource tag for TopSQL.
var CheckResourceTagForTopSQLInGoTest bool
// UnistoreRPCClientSendHook exports for test.
var UnistoreRPCClientSendHook func(*tikvrpc.Request)
......@@ -96,6 +99,13 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
return nil, err
}
if CheckResourceTagForTopSQLInGoTest {
err = checkResourceTagForTopSQL(req)
if err != nil {
return nil, err
}
}
resp := &tikvrpc.Response{}
switch req.Type {
case tikvrpc.CmdGet:
......
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package unistore
import (
"errors"
"fmt"
"runtime"
"github.com/pingcap/tidb/tablecodec"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
"github.com/tikv/client-go/v2/tikvrpc"
)
func checkResourceTagForTopSQL(req *tikvrpc.Request) error {
if !topsqlstate.TopSQLEnabled() {
return nil
}
tag := req.GetResourceGroupTag()
if len(tag) > 0 {
return nil
}
startKey, err := getReqStartKey(req)
if err != nil {
return err
}
var tid int64
if tablecodec.IsRecordKey(startKey) {
tid, _, _ = tablecodec.DecodeRecordKey(startKey)
}
if tablecodec.IsIndexKey(startKey) {
tid, _, _, _ = tablecodec.DecodeIndexKey(startKey)
}
// since the error maybe "invalid record key", should just ignore check resource tag for this request.
if tid > 0 {
stack := getStack()
return fmt.Errorf("%v req does not set the resource tag, tid: %v, stack: %v",
req.Type.String(), tid, string(stack))
}
return nil
}
func getReqStartKey(req *tikvrpc.Request) ([]byte, error) {
switch req.Type {
case tikvrpc.CmdGet:
request := req.Get()
return request.Key, nil
case tikvrpc.CmdScan:
request := req.Scan()
return request.StartKey, nil
case tikvrpc.CmdPrewrite:
request := req.Prewrite()
return request.Mutations[0].Key, nil
case tikvrpc.CmdCommit:
request := req.Commit()
return request.Keys[0], nil
case tikvrpc.CmdCleanup:
request := req.Cleanup()
return request.Key, nil
case tikvrpc.CmdBatchGet:
request := req.BatchGet()
return request.Keys[0], nil
case tikvrpc.CmdBatchRollback:
request := req.BatchRollback()
return request.Keys[0], nil
case tikvrpc.CmdScanLock:
request := req.ScanLock()
return request.StartKey, nil
case tikvrpc.CmdPessimisticLock:
request := req.PessimisticLock()
return request.PrimaryLock, nil
case tikvrpc.CmdCheckSecondaryLocks:
request := req.CheckSecondaryLocks()
return request.Keys[0], nil
case tikvrpc.CmdCop, tikvrpc.CmdCopStream:
request := req.Cop()
return request.Ranges[0].Start, nil
case tikvrpc.CmdGC, tikvrpc.CmdDeleteRange, tikvrpc.CmdTxnHeartBeat, tikvrpc.CmdRawGet,
tikvrpc.CmdRawBatchGet, tikvrpc.CmdRawPut, tikvrpc.CmdRawBatchPut, tikvrpc.CmdRawDelete, tikvrpc.CmdRawBatchDelete, tikvrpc.CmdRawDeleteRange,
tikvrpc.CmdRawScan, tikvrpc.CmdGetKeyTTL, tikvrpc.CmdRawCompareAndSwap, tikvrpc.CmdUnsafeDestroyRange, tikvrpc.CmdRegisterLockObserver,
tikvrpc.CmdCheckLockObserver, tikvrpc.CmdRemoveLockObserver, tikvrpc.CmdPhysicalScanLock, tikvrpc.CmdStoreSafeTS,
tikvrpc.CmdLockWaitInfo, tikvrpc.CmdMvccGetByKey, tikvrpc.CmdMvccGetByStartTs, tikvrpc.CmdSplitRegion,
tikvrpc.CmdDebugGetRegionProperties, tikvrpc.CmdEmpty:
// Ignore those requests since now, since it is no business with TopSQL.
return nil, nil
case tikvrpc.CmdBatchCop, tikvrpc.CmdMPPTask, tikvrpc.CmdMPPConn, tikvrpc.CmdMPPCancel, tikvrpc.CmdMPPAlive:
// Ignore mpp requests.
return nil, nil
case tikvrpc.CmdResolveLock, tikvrpc.CmdCheckTxnStatus, tikvrpc.CmdPessimisticRollback:
// TODO: add resource tag for those request. https://github.com/pingcap/tidb/issues/33621
return nil, nil
default:
return nil, errors.New("unknown request, check the new type RPC request here")
}
}
func getStack() []byte {
const size = 1024 * 64
buf := make([]byte, size)
stackSize := runtime.Stack(buf, false)
buf = buf[:stackSize]
return buf
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment