Skip to content

Commit

Permalink
bulkio: Fix transaction semantics in job scheduler.
Browse files Browse the repository at this point in the history
Use transaction when querying for the schedules to run.
In addition, ensure that a single bad schedule does not cause
all of the previous work to be wasted by using transaction safepoints.

Release Notes: None
  • Loading branch information
Yevgeniy Miretskiy committed Aug 19, 2020
1 parent 935ae2e commit 4f77599
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 6 deletions.
20 changes: 17 additions & 3 deletions pkg/jobs/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,10 @@ func (s *jobScheduler) executeSchedules(
defer stats.updateMetrics(&s.metrics)

findSchedulesStmt := getFindSchedulesStatement(s.env, maxSchedules)
rows, cols, err := s.InternalExecutor.QueryWithCols(ctx, "find-scheduled-jobs", nil,
sqlbase.InternalExecutorSessionDataOverride{User: security.RootUser},
rows, cols, err := s.InternalExecutor.QueryWithCols(
ctx, "find-scheduled-jobs",
txn,
sqlbase.InternalExecutorSessionDataOverride{User: security.NodeUser},
findSchedulesStmt)

if err != nil {
Expand All @@ -252,8 +254,20 @@ func (s *jobScheduler) executeSchedules(
continue
}

sp, err := txn.CreateSavepoint(ctx)
if err != nil {
return err
}

if err := s.processSchedule(ctx, schedule, numRunning, stats, txn); err != nil {
// We don't know if txn is good at this point, so bail out.
log.Errorf(ctx, "error processing schedule %d: %+v", schedule.ScheduleID(), err)

if err := txn.RollbackToSavepoint(ctx, sp); err != nil {
return errors.Wrapf(err, "failed to rollback savepoint for schedule %d", schedule.ScheduleID())
}
}

if err := txn.ReleaseSavepoint(ctx, sp); err != nil {
return err
}
}
Expand Down
61 changes: 58 additions & 3 deletions pkg/jobs/job_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -43,8 +42,6 @@ func TestJobSchedulerReschedulesRunning(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.WithIssue(t, 52959)

h, cleanup := newTestHelper(t)
defer cleanup()

Expand Down Expand Up @@ -425,6 +422,64 @@ func TestJobSchedulerDaemonHonorsMaxJobsLimit(t *testing.T) {
stopper.Stop(ctx)
}

// returnErrorExecutor counts the number of times it is
// called, and always returns an error.
type returnErrorExecutor struct {
numCalls int
}

func (e *returnErrorExecutor) ExecuteJob(
_ context.Context,
_ *scheduledjobs.JobExecutionConfig,
_ scheduledjobs.JobSchedulerEnv,
schedule *ScheduledJob,
_ *kv.Txn,
) error {
e.numCalls++
return errors.Newf("error for schedule %d", schedule.ScheduleID())
}

func (e *returnErrorExecutor) NotifyJobTermination(
_ context.Context, _ int64, _ Status, _ *ScheduledJob, _ *kv.Txn,
) error {
return nil
}

func (e *returnErrorExecutor) Metrics() metric.Struct {
return nil
}

var _ ScheduledJobExecutor = &returnErrorExecutor{}

func TestJobSchedulerToleratesBadSchedules(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
h, cleanup := newTestHelper(t)
defer cleanup()

ctx := context.Background()

const executorName = "return_error"
ex := &returnErrorExecutor{}
defer registerScopedScheduledJobExecutor(executorName, ex)()

// Create few one-off schedules.
const numJobs = 5
scheduleRunTime := h.env.Now().Add(time.Hour)
for i := 0; i < numJobs; i++ {
s := h.newScheduledJobForExecutor("schedule", executorName, nil)
s.SetNextRun(scheduleRunTime)
require.NoError(t, s.Create(ctx, h.cfg.InternalExecutor, nil))
}
h.env.SetTime(scheduleRunTime.Add(time.Second))
daemon := newJobScheduler(h.cfg, h.env, metric.NewRegistry())
require.NoError(t,
h.cfg.DB.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
return daemon.executeSchedules(ctx, numJobs, txn)
}))
require.Equal(t, numJobs, ex.numCalls)
}

func TestJobSchedulerDaemonUsesSystemTables(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down

0 comments on commit 4f77599

Please sign in to comment.