Skip to content

Commit

Permalink
reduce pointers
Browse files Browse the repository at this point in the history
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
  • Loading branch information
zhongzc committed Dec 16, 2021
1 parent 1f95cff commit a73010d
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 26 deletions.
2 changes: 1 addition & 1 deletion util/topsql/reporter/datasink.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type DataSink interface {
// TrySend pushes a report data into the sink, which will later be sent to a target by the sink. A deadline can be
// specified to control how late it should be sent. If the sink is kept full and cannot schedule a send within
// the specified deadline, or the sink is closed, an error will be returned.
TrySend(data ReportData, deadline time.Time) error
TrySend(data *ReportData, deadline time.Time) error

// IsPaused indicates that the DataSink is not expecting to receive records for now
// and may resume in the future.
Expand Down
26 changes: 13 additions & 13 deletions util/topsql/reporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,9 +457,9 @@ type collectedData struct {
// ReportData contains data that reporter sends to the agent
type ReportData struct {
// CPUTimeRecords contains the topN collected records and the `others` record which aggregation all records that is out of Top N.
CPUTimeRecords []*tipb.CPUTimeRecord
SQLMetas []*tipb.SQLMeta
PlanMetas []*tipb.PlanMeta
CPUTimeRecords []tipb.CPUTimeRecord
SQLMetas []tipb.SQLMeta
PlanMetas []tipb.PlanMeta
}

func (d *ReportData) hasData() bool {
Expand Down Expand Up @@ -487,7 +487,7 @@ func (tsr *RemoteTopSQLReporter) reportWorker() {

// getReportData gets ReportData from the collectedData.
// This function will calculate the topN collected records and the `others` record which aggregation all records that is out of Top N.
func (tsr *RemoteTopSQLReporter) getReportData(collected collectedData) ReportData {
func (tsr *RemoteTopSQLReporter) getReportData(collected collectedData) *ReportData {
records := getTopNFromCollected(collected)
return tsr.buildReportData(records, collected.normalizedSQLMap, collected.normalizedPlanMap)
}
Expand Down Expand Up @@ -526,15 +526,15 @@ func getTopNFromCollected(collected collectedData) (records []*dataPoints) {
//
// Attention, caller should guarantee no more reader or writer access `sqlMap` and `planMap`, because buildReportData
// will do heavy jobs in sync.Map.Range and it may block other readers and writers.
func (tsr *RemoteTopSQLReporter) buildReportData(records []*dataPoints, sqlMap *sync.Map, planMap *sync.Map) ReportData {
res := ReportData{
CPUTimeRecords: make([]*tipb.CPUTimeRecord, 0, len(records)),
SQLMetas: make([]*tipb.SQLMeta, 0, len(records)),
PlanMetas: make([]*tipb.PlanMeta, 0, len(records)),
func (tsr *RemoteTopSQLReporter) buildReportData(records []*dataPoints, sqlMap *sync.Map, planMap *sync.Map) *ReportData {
res := &ReportData{
CPUTimeRecords: make([]tipb.CPUTimeRecord, 0, len(records)),
SQLMetas: make([]tipb.SQLMeta, 0, len(records)),
PlanMetas: make([]tipb.PlanMeta, 0, len(records)),
}

for _, record := range records {
res.CPUTimeRecords = append(res.CPUTimeRecords, &tipb.CPUTimeRecord{
res.CPUTimeRecords = append(res.CPUTimeRecords, tipb.CPUTimeRecord{
RecordListTimestampSec: record.TimestampList,
RecordListCpuTimeMs: record.CPUTimeMsList,
SqlDigest: record.SQLDigest,
Expand All @@ -544,7 +544,7 @@ func (tsr *RemoteTopSQLReporter) buildReportData(records []*dataPoints, sqlMap *

sqlMap.Range(func(key, value interface{}) bool {
meta := value.(SQLMeta)
res.SQLMetas = append(res.SQLMetas, &tipb.SQLMeta{
res.SQLMetas = append(res.SQLMetas, tipb.SQLMeta{
SqlDigest: []byte(key.(string)),
NormalizedSql: meta.normalizedSQL,
IsInternalSql: meta.isInternal,
Expand All @@ -558,7 +558,7 @@ func (tsr *RemoteTopSQLReporter) buildReportData(records []*dataPoints, sqlMap *
logutil.BgLogger().Warn("[top-sql] decode plan failed", zap.Error(errDecode))
return true
}
res.PlanMetas = append(res.PlanMetas, &tipb.PlanMeta{
res.PlanMetas = append(res.PlanMetas, tipb.PlanMeta{
PlanDigest: []byte(key.(string)),
NormalizedPlan: planDecoded,
})
Expand All @@ -568,7 +568,7 @@ func (tsr *RemoteTopSQLReporter) buildReportData(records []*dataPoints, sqlMap *
return res
}

func (tsr *RemoteTopSQLReporter) doReport(data ReportData) {
func (tsr *RemoteTopSQLReporter) doReport(data *ReportData) {
defer util.Recover("top-sql", "doReport", nil, false)

if !data.hasData() {
Expand Down
24 changes: 12 additions & 12 deletions util/topsql/reporter/single_target.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ var _ DataSink = &SingleTargetDataSink{}
// TrySend implements the DataSink interface.
// Currently the implementation will establish a new connection every time,
// which is suitable for a per-minute sending period
func (ds *SingleTargetDataSink) TrySend(data ReportData, deadline time.Time) error {
func (ds *SingleTargetDataSink) TrySend(data *ReportData, deadline time.Time) error {
select {
case ds.sendTaskCh <- sendTask{data: data, deadline: deadline}:
return nil
Expand Down Expand Up @@ -142,7 +142,7 @@ func (ds *SingleTargetDataSink) Close() {
ds.conn = nil
}

func (ds *SingleTargetDataSink) doSend(ctx context.Context, addr string, data ReportData) error {
func (ds *SingleTargetDataSink) doSend(ctx context.Context, addr string, data *ReportData) error {
err := ds.tryEstablishConnection(ctx, addr)
if err != nil {
return err
Expand Down Expand Up @@ -175,7 +175,7 @@ func (ds *SingleTargetDataSink) doSend(ctx context.Context, addr string, data Re
}

// sendBatchCPUTimeRecord sends a batch of TopSQL records by stream.
func (ds *SingleTargetDataSink) sendBatchCPUTimeRecord(ctx context.Context, records []*tipb.CPUTimeRecord) (err error) {
func (ds *SingleTargetDataSink) sendBatchCPUTimeRecord(ctx context.Context, records []tipb.CPUTimeRecord) (err error) {
if len(records) == 0 {
return nil
}
Expand All @@ -195,8 +195,8 @@ func (ds *SingleTargetDataSink) sendBatchCPUTimeRecord(ctx context.Context, reco
if err != nil {
return err
}
for _, record := range records {
if err = stream.Send(record); err != nil {
for i := range records {
if err = stream.Send(&records[i]); err != nil {
return
}
sentCount += 1
Expand All @@ -208,7 +208,7 @@ func (ds *SingleTargetDataSink) sendBatchCPUTimeRecord(ctx context.Context, reco
}

// sendBatchSQLMeta sends a batch of SQL metas by stream.
func (ds *SingleTargetDataSink) sendBatchSQLMeta(ctx context.Context, sqlMetas []*tipb.SQLMeta) (err error) {
func (ds *SingleTargetDataSink) sendBatchSQLMeta(ctx context.Context, sqlMetas []tipb.SQLMeta) (err error) {
if len(sqlMetas) == 0 {
return
}
Expand All @@ -230,8 +230,8 @@ func (ds *SingleTargetDataSink) sendBatchSQLMeta(ctx context.Context, sqlMetas [
return err
}

for _, meta := range sqlMetas {
if err = stream.Send(meta); err != nil {
for i := range sqlMetas {
if err = stream.Send(&sqlMetas[i]); err != nil {
return
}
sentCount += 1
Expand All @@ -243,7 +243,7 @@ func (ds *SingleTargetDataSink) sendBatchSQLMeta(ctx context.Context, sqlMetas [
}

// sendBatchPlanMeta sends a batch of SQL metas by stream.
func (ds *SingleTargetDataSink) sendBatchPlanMeta(ctx context.Context, planMetas []*tipb.PlanMeta) (err error) {
func (ds *SingleTargetDataSink) sendBatchPlanMeta(ctx context.Context, planMetas []tipb.PlanMeta) (err error) {
if len(planMetas) == 0 {
return nil
}
Expand All @@ -264,8 +264,8 @@ func (ds *SingleTargetDataSink) sendBatchPlanMeta(ctx context.Context, planMetas
return err
}

for _, meta := range planMetas {
if err = stream.Send(meta); err != nil {
for i := range planMetas {
if err = stream.Send(&planMetas[i]); err != nil {
return err
}
sentCount += 1
Expand Down Expand Up @@ -321,6 +321,6 @@ func (ds *SingleTargetDataSink) dial(ctx context.Context, targetRPCAddr string)
}

type sendTask struct {
data ReportData
data *ReportData
deadline time.Time
}

0 comments on commit a73010d

Please sign in to comment.