Skip to content

Commit

Permalink
chore: address review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
jbrockopp committed Jul 6, 2023
1 parent 225b54f commit c7faed2
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 24 deletions.
52 changes: 29 additions & 23 deletions cmd/vela-server/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"strings"
"time"

"k8s.io/apimachinery/pkg/util/wait"

"github.com/adhocore/gronx"
"github.com/go-vela/server/api/build"
"github.com/go-vela/server/compiler"
Expand All @@ -20,13 +22,15 @@ import (
"github.com/go-vela/types/library"
"github.com/go-vela/types/pipeline"
"github.com/sirupsen/logrus"

"k8s.io/apimachinery/pkg/util/wait"
)

const baseErr = "unable to schedule build"
const (
scheduleErr = "unable to trigger build for schedule"

scheduleWait = "waiting to trigger build for schedule"
)

func processSchedules(interval time.Duration, compiler compiler.Engine, database database.Interface, metadata *types.Metadata, queue queue.Service, scm scm.Service) error {
func processSchedules(start time.Time, compiler compiler.Engine, database database.Interface, metadata *types.Metadata, queue queue.Service, scm scm.Service) error {
logrus.Infof("processing active schedules to create builds")

// send API call to capture the list of active schedules
Expand All @@ -37,41 +41,51 @@ func processSchedules(interval time.Duration, compiler compiler.Engine, database

// iterate through the list of active schedules
for _, s := range schedules {
// sleep for 1s - 2s before processing the active schedule
//
// This should prevent multiple servers from processing a schedule at the same time by
// leveraging a base duration along with a standard deviation of randomness a.k.a.
// "jitter". To create the jitter, we use a base duration of 1s with a scale factor of 1.0.
time.Sleep(wait.Jitter(time.Second, 1.0))

// send API call to capture the schedule
//
// This is needed to ensure we are not dealing with a stale schedule since we fetch
// all schedules once and iterate through that list which can take a significant
// amount of time to get to the end of the list.
schedule, err := database.GetSchedule(s.GetID())
if err != nil {
logrus.WithError(err).Warnf("%s for %s", baseErr, schedule.GetName())
logrus.WithError(err).Warnf("%s %s", scheduleErr, schedule.GetName())

continue
}

// ignore triggering a build if the schedule is no longer active
if !schedule.GetActive() {
logrus.Tracef("ignoring to schedule build for %s", s.GetName())
logrus.Tracef("skipping to trigger build for inactive schedule %s", schedule.GetName())

continue
}

// capture the previous occurrence of the entry for the schedule rounded to the nearest whole interval
// capture the last time a build was triggered for the schedule in UTC
scheduled := time.Unix(schedule.GetScheduledAt(), 0).UTC()

// capture the previous occurrence of the entry rounded to the nearest whole interval
//
// i.e. if it's 4:02 on five minute intervals, this will be 4:00
prevTime, err := gronx.PrevTick(schedule.GetEntry(), true)
if err != nil {
logrus.WithError(err).Warnf("%s for %s", baseErr, s.GetName())
logrus.WithError(err).Warnf("%s %s", scheduleErr, schedule.GetName())

continue
}

// capture the next occurrence of the entry after the last schedule rounded to the nearest whole interval
//
// i.e. if it's 4:02 on five minute intervals, this will be 4:05
nextTime, err := gronx.NextTickAfter(s.GetEntry(), time.Unix(s.GetScheduledAt(), 0).UTC(), true)
nextTime, err := gronx.NextTickAfter(schedule.GetEntry(), scheduled, true)
if err != nil {
logrus.WithError(err).Warnf("%s for %s", baseErr, s.GetName())
logrus.WithError(err).Warnf("%s %s", scheduleErr, schedule.GetName())

continue
}
Expand All @@ -80,25 +94,24 @@ func processSchedules(interval time.Duration, compiler compiler.Engine, database
//
// The current time must be after the next occurrence of the schedule.
if !time.Now().After(nextTime) {
logrus.Tracef("waiting to schedule build for %s", s.GetName())
logrus.Tracef("%s %s: current time not past next occurrence", scheduleWait, schedule.GetName())

continue
}

// check if we should wait to trigger a build for the schedule
//
// The interval for the schedule (multiplied by 2 as a buffer) subtracted from
// the current time must be after the previous occurrence of the schedule.
if !prevTime.After(time.Now().Add(-(2 * interval))) {
logrus.Tracef("waiting to schedule build for %s", s.GetName())
// The previous occurrence of the schedule must be after the starting time of processing schedules.
if !prevTime.After(start) {
logrus.Tracef("%s %s: previous occurence not after starting point", scheduleWait, schedule.GetName())

continue
}

// process the schedule and trigger a new build
err = processSchedule(schedule, compiler, database, metadata, queue, scm)
if err != nil {
logrus.WithError(err).Warnf("%s for %s", baseErr, schedule.GetName())
logrus.WithError(err).Warnf("%s %s", scheduleErr, schedule.GetName())

continue
}
Expand All @@ -109,13 +122,6 @@ func processSchedules(interval time.Duration, compiler compiler.Engine, database

//nolint:funlen // ignore function length and number of statements
func processSchedule(s *library.Schedule, compiler compiler.Engine, database database.Interface, metadata *types.Metadata, queue queue.Service, scm scm.Service) error {
// sleep for 1s - 3s before processing the schedule
//
// This should prevent multiple servers from processing a schedule at the same time by
// leveraging a base duration along with a standard deviation of randomness a.k.a.
// "jitter". To create the jitter, we use a base duration of 1s with a scale factor of 2.0.
time.Sleep(wait.Jitter(time.Second, 2.0))

// send API call to capture the repo for the schedule
r, err := database.GetRepo(s.GetRepoID())
if err != nil {
Expand Down
10 changes: 9 additions & 1 deletion cmd/vela-server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,14 @@ func server(c *cli.Context) error {
g.Go(func() error {
logrus.Info("starting scheduler")
for {
// track the starting time for when the server begins processing schedules
//
// This will be used to control which schedules will have a build triggered based
// off the configured entry and last time a build was triggered for the schedule.
start := time.Now().UTC()

// capture the interval of time to wait before processing schedules
//
// We need to sleep for some amount of time before we attempt to process schedules
// setup in the database. Since the schedule interval is configurable, we use that
// as the base duration to determine how long to sleep for.
Expand All @@ -191,7 +199,7 @@ func server(c *cli.Context) error {
// sleep for a duration of time before processing schedules
time.Sleep(jitter)

err = processSchedules(interval, compiler, database, metadata, queue, scm)
err = processSchedules(start, compiler, database, metadata, queue, scm)
if err != nil {
logrus.WithError(err).Warn("unable to process schedules")
} else {
Expand Down

0 comments on commit c7faed2

Please sign in to comment.