Skip to content

Commit

Permalink
sql: release table name as soon as it is no longer referenced
Browse files Browse the repository at this point in the history
When dropping a table, the table name would be released only after
the table was truncated. Table truncation can take a long time and
it doesn't make for a good user experience when a user expects a
name for a dropped table to be available almost immediately.

fixes cockroachdb#7348
  • Loading branch information
vivekmenezes committed Mar 7, 2017
1 parent e0f3b01 commit 5ae895e
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 24 deletions.
32 changes: 29 additions & 3 deletions pkg/sql/drop.go
Original file line number Diff line number Diff line change
Expand Up @@ -965,19 +965,45 @@ func (p *planner) dropViewImpl(
// can even eliminate the need to use a transaction for each chunk at a later
// stage if it proves inefficient).
func truncateAndDropTable(
ctx context.Context, tableDesc *sqlbase.TableDescriptor, db *client.DB,
ctx context.Context,
tableDesc *sqlbase.TableDescriptor,
db *client.DB,
testingKnobs SchemaChangerTestingKnobs,
) error {
zoneKey, nameKey, descKey := GetKeysForTableDescriptor(tableDesc)
// The table name is no longer in use across the entire cluster.
// Delete the namekey so that it can be used by another table.
// We do this before truncating the table because the table truncation
// takes too much time.
if err := db.Txn(ctx, func(txn *client.Txn) error {
b := &client.Batch{}
// Use CPut because we want to remove a specific name -> id map.
b.CPut(nameKey, nil, tableDesc.ID)
txn.SetSystemConfigTrigger()
err := txn.Run(b)
if _, ok := err.(*roachpb.ConditionFailedError); ok {
return nil
}
return err
}); err != nil {
return err
}

if cb := testingKnobs.RunAfterTableNameDropped; cb != nil {
if err := cb(); err != nil {
return err
}
}

if err := truncateTableInChunks(ctx, tableDesc, db); err != nil {
return err
}

// Finished deleting all the table data, now delete the table meta data.
return db.Txn(ctx, func(txn *client.Txn) error {
zoneKey, nameKey, descKey := GetKeysForTableDescriptor(tableDesc)
// Delete table descriptor
b := &client.Batch{}
b.Del(descKey)
b.Del(nameKey)
// Delete the zone config entry for this table.
b.Del(zoneKey)
txn.SetSystemConfigTrigger()
Expand Down
80 changes: 60 additions & 20 deletions pkg/sql/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/pkg/errors"
)

func TestDropDatabase(t *testing.T) {
Expand Down Expand Up @@ -173,30 +174,29 @@ func checkKeyCount(t *testing.T, kvDB *client.DB, prefix roachpb.Key, numKeys in
}
}

func createKVTable(t *testing.T, sqlDB *gosql.DB, numRows int) {
func createKVTable(sqlDB *gosql.DB, numRows int) error {
// Fix the column families so the key counts don't change if the family
// heuristics are updated.
if _, err := sqlDB.Exec(`
CREATE DATABASE t;
CREATE DATABASE IF NOT EXISTS t;
CREATE TABLE t.kv (k INT PRIMARY KEY, v INT, FAMILY (k), FAMILY (v));
CREATE INDEX foo on t.kv (v);
`); err != nil {
t.Fatal(err)
return err
}

// Bulk insert.
var insert bytes.Buffer
if _, err := insert.WriteString(fmt.Sprintf(`INSERT INTO t.kv VALUES (%d, %d)`, 0, numRows-1)); err != nil {
t.Fatal(err)
return err
}
for i := 1; i < numRows; i++ {
if _, err := insert.WriteString(fmt.Sprintf(` ,(%d, %d)`, i, numRows-i)); err != nil {
t.Fatal(err)
return err
}
}
if _, err := sqlDB.Exec(insert.String()); err != nil {
t.Fatal(err)
}
_, err := sqlDB.Exec(insert.String())
return err
}

func TestDropIndex(t *testing.T) {
Expand All @@ -212,8 +212,9 @@ func TestDropIndex(t *testing.T) {
defer s.Stopper().Stop()

numRows := 2*chunkSize + 1
createKVTable(t, sqlDB, numRows)

if err := createKVTable(sqlDB, numRows); err != nil {
t.Fatal(err)
}
tableDesc := sqlbase.GetTableDescriptor(kvDB, "t", "kv")

status, i, err := tableDesc.FindIndexByName("foo")
Expand Down Expand Up @@ -295,6 +296,7 @@ func TestDropIndexInterleaved(t *testing.T) {
tablePrefix := roachpb.Key(keys.MakeTablePrefix(uint32(tableDesc.ID)))

checkKeyCount(t, kvDB, tablePrefix, 3*numRows)

if _, err := sqlDB.Exec(`DROP INDEX t.intlv@intlv_idx`); err != nil {
t.Fatal(err)
}
Expand All @@ -310,12 +312,35 @@ func TestDropIndexInterleaved(t *testing.T) {
func TestDropTable(t *testing.T) {
defer leaktest.AfterTest(t)()
params, _ := createTestServerParams()
var runAfterTableNameDropped func() error
errChan := make(chan error)
var numDropTable int
params.Knobs = base.TestingKnobs{
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
RunAfterTableNameDropped: func() error {
numDropTable++
runFunc := runAfterTableNameDropped
runAfterTableNameDropped = nil
if runFunc != nil {
go func() {
errChan <- runFunc()
}()
// Return an error so that the DROP TABLE is retried.
// This tests the idempotency of DROP TABLE.
return errors.Errorf("after name is dropped")
}
return nil
},
},
}
s, sqlDB, kvDB := serverutils.StartServer(t, params)
defer s.Stopper().Stop()
ctx := context.TODO()

numRows := 2*sql.TableTruncateChunkSize + 1
createKVTable(t, sqlDB, numRows)
if err := createKVTable(sqlDB, numRows); err != nil {
t.Fatal(err)
}

tableDesc := sqlbase.GetTableDescriptor(kvDB, "t", "kv")
nameKey := sqlbase.MakeNameMetadataKey(keys.MaxReservedDescID+1, "kv")
Expand Down Expand Up @@ -350,16 +375,33 @@ func TestDropTable(t *testing.T) {

tablePrefix := roachpb.Key(keys.MakeTablePrefix(uint32(tableDesc.ID)))

runAfterTableNameDropped = func() error {
// Test that deleted table cannot be used. This prevents
// regressions where name -> descriptor ID caches might make
// this statement erronously work.
if _, err := sqlDB.Exec(
`SELECT * FROM t.kv`,
); !testutils.IsError(err, `table "t.kv" does not exist`) {
return errors.Wrap(err, "different error than expected")
}

if gr, err := kvDB.Get(ctx, nameKey); err != nil {
return err
} else if gr.Exists() {
return errors.Errorf("table namekey still exists")
}

return createKVTable(sqlDB, numRows)
}

checkKeyCount(t, kvDB, tablePrefix, 3*numRows)
if _, err := sqlDB.Exec(`DROP TABLE t.kv`); err != nil {
t.Fatal(err)
}
checkKeyCount(t, kvDB, tablePrefix, 0)

// Test that deleted table cannot be used. This prevents regressions where
// name -> descriptor ID caches might make this statement erronously work.
if _, err := sqlDB.Exec(`SELECT * FROM t.kv`); !testutils.IsError(err, `table "t.kv" does not exist`) {
t.Fatalf("different error than expected: %v", err)
if numDropTable != 2 {
t.Fatalf("numDropTable=%d, expected=2", numDropTable)
}

if gr, err := kvDB.Get(ctx, descKey); err != nil {
Expand All @@ -368,16 +410,14 @@ func TestDropTable(t *testing.T) {
t.Fatalf("table descriptor still exists after the table is dropped")
}

if gr, err := kvDB.Get(ctx, nameKey); err != nil {
if gr, err := kvDB.Get(ctx, zoneKey); err != nil {
t.Fatal(err)
} else if gr.Exists() {
t.Fatalf("table namekey still exists after the table is dropped")
t.Fatalf("zone config entry still exists after the table is dropped")
}

if gr, err := kvDB.Get(ctx, zoneKey); err != nil {
if err := <-errChan; err != nil {
t.Fatal(err)
} else if gr.Exists() {
t.Fatalf("zone config entry still exists after the table is dropped")
}
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (sc *SchemaChanger) truncateAndDropTable(
if err := sc.ExtendLease(lease); err != nil {
return err
}
return truncateAndDropTable(ctx, tableDesc, &sc.db)
return truncateAndDropTable(ctx, tableDesc, &sc.db, *sc.testingKnobs)
}

// NewSchemaChangerForTesting only for tests.
Expand Down Expand Up @@ -664,6 +664,11 @@ type SchemaChangerTestingKnobs struct {

// BackfillChunkSize is to be used for all backfill chunked operations.
BackfillChunkSize int64

// RunAfterTableNameDropped is called when a table is being dropped.
// It is called as soon as the table name is released and before the
// table is truncated.
RunAfterTableNameDropped func() error
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down

0 comments on commit 5ae895e

Please sign in to comment.