package main import ( "context" "database/sql" "errors" "fmt" "math/rand" "sync" _ "github.com/go-sql-driver/mysql" perrors "github.com/pingcap/errors" "github.com/pingcap/log" "go.uber.org/zap" ) type AppendWorkload struct { DB *sql.DB Concurrency int Tables int PadLength int } // MustExec must execute sql or fatal func MustExec(DB *sql.DB, query string, args ...interface{}) sql.Result { r, err := DB.Exec(query, args...) if err != nil { log.Fatal("Exec query err.", zap.String("query", query), zap.Error(err)) } return r } func (c *AppendWorkload) Prepare() error { // Use 32 threads to create Tables. var wg sync.WaitGroup create_table_sql := "create table write_stress%d(col bigint" for i := 0; i < 99; i ++ { create_table_sql += fmt.Sprintf(", col%d bigint", i) } create_table_sql += ")" log.Info("preparing data") for i := 0; i < 32; i++ { wg.Add(1) go func(tid int) { defer wg.Done() for j := 0; j < c.Tables; j++ { if j%32 == tid { sql := fmt.Sprintf("drop table if exists write_stress%d", j+1) MustExec(c.DB, sql) sql = fmt.Sprintf(create_table_sql, j+1) MustExec(c.DB, sql) } } }(i) } wg.Wait() log.Info("Prepare ok.") return nil } func (c *AppendWorkload) Run(ctx context.Context) error { log.Info("AppendWorkload start") var wg sync.WaitGroup for i := 0; i < c.Concurrency; i++ { wg.Add(1) go func() { defer wg.Done() for { select { case <-ctx.Done(): log.Info("Context is done.") return default: } err := c.runClient(ctx) if !errors.Is(err, context.DeadlineExceeded) { log.Warn("Append row failed", zap.Error(err)) } } }() } wg.Wait() log.Info("Run ok!") return nil } func (c *AppendWorkload) runClient(ctx context.Context) error { rng := rand.New(rand.NewSource(rand.Int63())) columnCnt := 100 values := "(?" for i := 0; i < columnCnt- 1; i++ { values += ",?" } values += ")," for { fullValues := "" tupleCnt := rng.Int63n(500) for i := int64(0); i < tupleCnt; i++ { fullValues += values } sql := "insert into write_stress%d values" sql += fullValues[0:len(fullValues)-1] var args []any for i := 0; i < columnCnt*tupleCnt; i++ { args = append(args, rng.Int63()) } tid := rng.Int()%c.Tables + 1 sql := fmt.Sprintf(sql, tid) _, err := c.DB.ExecContext(ctx, sql, args...) if err != nil { return perrors.Trace(err) } } return nil } func main() { db, err := sql.Open("mysql", "root:@tcp(127.0.0.1:4000)/test") if err != nil { log.Fatal("error open db", zap.Error(err)) } tableNum := 32 w := AppendWorkload{ DB: db, Concurrency: tableNum, Tables: tableNum, PadLength: 4000000, } if err := w.Prepare(); err != nil { log.Fatal("w.Prepare fail") } w.Run(context.Background()) }