Skip to content

Commit

Permalink
enhance(k8s): use the buildCtx/logCtx to create informerCtx
Browse files Browse the repository at this point in the history
  • Loading branch information
cognifloyd committed Apr 28, 2022
1 parent 844be2d commit fbd0612
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 3 deletions.
2 changes: 1 addition & 1 deletion cmd/vela-worker/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (w *Worker) exec(index int) error {
return nil
}

// log streaming uses buildCtx so that it is not subject to the timeout.
// log/event streaming uses buildCtx so that it is not subject to the timeout.
go func() {
logger.Info("streaming build logs")
// execute the build with the executor
Expand Down
12 changes: 12 additions & 0 deletions executor/linux/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,18 @@ func (c *client) StreamBuild(ctx context.Context) error {
c.Logger.Info("all stream functions have returned")
}()

// allow the runtime to do log/event streaming setup at build-level
streams.Go(func() error {
// If needed, the runtime should handle synchronizing with
// AssembleBuild which runs concurrently with StreamBuild.
err := c.Runtime.StreamBuild(streamCtx, c.pipeline)
if err != nil {
return err
}

return nil
})

for {
select {
case req := <-c.streamRequests:
Expand Down
12 changes: 12 additions & 0 deletions executor/local/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,18 @@ func (c *client) StreamBuild(ctx context.Context) error {
fmt.Fprintln(os.Stdout, "all stream functions have returned")
}()

// allow the runtime to do log/event streaming setup at build-level
streams.Go(func() error {
// If needed, the runtime should handle synchronizing with
// AssembleBuild which runs concurrently with StreamBuild.
err := c.Runtime.StreamBuild(streamCtx, c.pipeline)
if err != nil {
return err
}

return nil
})

for {
select {
case req := <-c.streamRequests:
Expand Down
8 changes: 8 additions & 0 deletions runtime/docker/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ func (c *client) SetupBuild(ctx context.Context, b *pipeline.Build) error {
return nil
}

// StreamBuild initializes log/event streaming for build.
// This is a no-op for docker.
func (c *client) StreamBuild(ctx context.Context, b *pipeline.Build) error {
c.Logger.Tracef("no-op: streaming build %s", b.ID)

return nil
}

// AssembleBuild finalizes pipeline build setup.
// This is a no-op for docker.
func (c *client) AssembleBuild(ctx context.Context, b *pipeline.Build) error {
Expand Down
4 changes: 4 additions & 0 deletions runtime/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ type Engine interface {
// SetupBuild defines a function that
// prepares the pipeline build.
SetupBuild(context.Context, *pipeline.Build) error
// StreamBuild defines a function that initializes
// log/event streaming if the runtime needs it.
// StreamBuild and AssembleBuild run concurrently.
StreamBuild(context.Context, *pipeline.Build) error
// AssembleBuild defines a function that
// finalizes pipeline build setup.
AssembleBuild(context.Context, *pipeline.Build) error
Expand Down
24 changes: 22 additions & 2 deletions runtime/kubernetes/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,25 @@ func (c *client) SetupBuild(ctx context.Context, b *pipeline.Build) error {
return nil
}

// StreamBuild initializes log/event streaming for build.
func (c *client) StreamBuild(ctx context.Context, b *pipeline.Build) error {
c.Logger.Tracef("streaming build %s", b.ID)

select {
case <-ctx.Done():
// bail out, as build timed out or was canceled.
return nil
case <-c.PodTracker.Ready:
// AssembleBuild signaled that the PodTracker is ready.
break
}

// Populate the PodTracker caches before creating the pipeline pod
c.PodTracker.Start(ctx)

return nil
}

// AssembleBuild finalizes the pipeline build setup.
// This creates the pod in kubernetes for the pipeline build.
// After creation, image is the only container field we can edit in kubernetes.
Expand Down Expand Up @@ -196,9 +215,10 @@ func (c *client) AssembleBuild(ctx context.Context, b *pipeline.Build) error {
// setup containerTeachers now that all containers are defined.
c.PodTracker.TrackContainers(c.Pod.Spec.Containers)

// Populate the PodTracker caches before creating the pipeline pod
c.PodTracker.Start(ctx)
// send signal to StreamBuild now that PodTracker is ready to be started.
close(c.PodTracker.Ready)

// wait for the PodTracker caches to populate before creating the pipeline pod.
if ok := cache.WaitForCacheSync(ctx.Done(), c.PodTracker.PodSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
Expand Down
9 changes: 9 additions & 0 deletions runtime/kubernetes/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,15 @@ func TestKubernetes_AssembleBuild(t *testing.T) {
t.Errorf("unable to create runtime engine: %v", err)
}

// StreamBuild and AssembleBuild coordinate their work, so, emulate
// executor.StreamBuild which calls runtime.StreamBuild concurrently.
go func() {
err := _engine.StreamBuild(context.Background(), test.pipeline)
if err != nil {
t.Errorf("unable to start PodTracker via StreamBuild")
}
}()

err = _engine.AssembleBuild(context.Background(), test.pipeline)

if test.failure {
Expand Down
4 changes: 4 additions & 0 deletions runtime/kubernetes/pod_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type podTracker struct {

// Containers maps the container name to a containerTracker
Containers map[string]*containerTracker

// Ready signals when the PodTracker is done with setup and ready to Start.
Ready chan struct{}
}

// HandlePodAdd is an AddFunc for cache.ResourceEventHandlerFuncs for Pods.
Expand Down Expand Up @@ -219,6 +222,7 @@ func newPodTracker(log *logrus.Entry, clientset kubernetes.Interface, pod *v1.Po
podInformer: podInformer,
PodLister: podInformer.Lister(),
PodSynced: podInformer.Informer().HasSynced,
Ready: make(chan struct{}),
}

// register event handler funcs in podInformer
Expand Down

0 comments on commit fbd0612

Please sign in to comment.