Skip to content

Commit

Permalink
enhance(kubernetes): Add podTracker and containerTracker to use k8s A…
Browse files Browse the repository at this point in the history
…PI more like a k8s controller (#302)
  • Loading branch information
cognifloyd authored May 3, 2022
1 parent cd90c6f commit e17c12a
Show file tree
Hide file tree
Showing 15 changed files with 1,068 additions and 132 deletions.
2 changes: 1 addition & 1 deletion cmd/vela-worker/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (w *Worker) exec(index int) error {
return nil
}

// log streaming uses buildCtx so that it is not subject to the timeout.
// log/event streaming uses buildCtx so that it is not subject to the timeout.
go func() {
logger.Info("streaming build logs")
// execute the build with the executor
Expand Down
7 changes: 7 additions & 0 deletions executor/linux/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,13 @@ func (c *client) StreamBuild(ctx context.Context) error {
c.Logger.Info("all stream functions have returned")
}()

// allow the runtime to do log/event streaming setup at build-level
streams.Go(func() error {
// If needed, the runtime should handle synchronizing with
// AssembleBuild which runs concurrently with StreamBuild.
return c.Runtime.StreamBuild(streamCtx, c.pipeline)
})

for {
select {
case req := <-c.streamRequests:
Expand Down
7 changes: 7 additions & 0 deletions executor/local/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,13 @@ func (c *client) StreamBuild(ctx context.Context) error {
fmt.Fprintln(os.Stdout, "all stream functions have returned")
}()

// allow the runtime to do log/event streaming setup at build-level
streams.Go(func() error {
// If needed, the runtime should handle synchronizing with
// AssembleBuild which runs concurrently with StreamBuild.
return c.Runtime.StreamBuild(streamCtx, c.pipeline)
})

for {
select {
case req := <-c.streamRequests:
Expand Down
8 changes: 8 additions & 0 deletions runtime/docker/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ func (c *client) SetupBuild(ctx context.Context, b *pipeline.Build) error {
return nil
}

// StreamBuild initializes log/event streaming for build.
// This is a no-op for docker.
func (c *client) StreamBuild(ctx context.Context, b *pipeline.Build) error {
c.Logger.Tracef("no-op: streaming build %s", b.ID)

return nil
}

// AssembleBuild finalizes pipeline build setup.
// This is a no-op for docker.
func (c *client) AssembleBuild(ctx context.Context, b *pipeline.Build) error {
Expand Down
35 changes: 35 additions & 0 deletions runtime/docker/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,41 @@ func TestDocker_SetupBuild(t *testing.T) {
}
}

func TestKubernetes_StreamBuild(t *testing.T) {
tests := []struct {
name string
failure bool
pipeline *pipeline.Build
}{
{
name: "steps",
failure: false,
pipeline: _pipeline,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_engine, err := NewMock()
if err != nil {
t.Errorf("unable to create runtime engine: %v", err)
}

err = _engine.StreamBuild(context.Background(), test.pipeline)

if test.failure {
if err == nil {
t.Errorf("StreamBuild should have returned err")
}

return // continue to next test
}

if err != nil {
t.Errorf("StreamBuild returned err: %v", err)
}
})
}
}
func TestDocker_AssembleBuild(t *testing.T) {
// setup tests
tests := []struct {
Expand Down
4 changes: 4 additions & 0 deletions runtime/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ type Engine interface {
// SetupBuild defines a function that
// prepares the pipeline build.
SetupBuild(context.Context, *pipeline.Build) error
// StreamBuild defines a function that initializes
// log/event streaming if the runtime needs it.
// StreamBuild and AssembleBuild run concurrently.
StreamBuild(context.Context, *pipeline.Build) error
// AssembleBuild defines a function that
// finalizes pipeline build setup.
AssembleBuild(context.Context, *pipeline.Build) error
Expand Down
50 changes: 50 additions & 0 deletions runtime/kubernetes/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ package kubernetes
import (
"context"
"fmt"
"time"

"github.com/go-vela/types/pipeline"
"github.com/go-vela/worker/runtime/kubernetes/apis/vela/v1alpha1"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"

// The k8s libraries have some quirks around yaml marshaling (see opts.go).
// So, just use the same library for all kubernetes-related YAML.
Expand Down Expand Up @@ -79,6 +81,7 @@ func (c *client) SetupBuild(ctx context.Context, b *pipeline.Build) error {
// https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1?tab=doc#ObjectMeta
c.Pod.ObjectMeta = metav1.ObjectMeta{
Name: b.ID,
Namespace: c.config.Namespace, // this is used by the podTracker
Labels: labels,
Annotations: c.PipelinePodTemplate.Metadata.Annotations,
}
Expand Down Expand Up @@ -124,6 +127,33 @@ func (c *client) SetupBuild(ctx context.Context, b *pipeline.Build) error {
}
}

// initialize the PodTracker now that we have a Pod for it to track
tracker, err := newPodTracker(c.Logger, c.Kubernetes, c.Pod, time.Second*30)
if err != nil {
return err
}

c.PodTracker = tracker

return nil
}

// StreamBuild initializes log/event streaming for build.
func (c *client) StreamBuild(ctx context.Context, b *pipeline.Build) error {
c.Logger.Tracef("streaming build %s", b.ID)

select {
case <-ctx.Done():
// bail out, as build timed out or was canceled.
return nil
case <-c.PodTracker.Ready:
// AssembleBuild signaled that the PodTracker is ready.
break
}

// Populate the PodTracker caches before creating the pipeline pod
c.PodTracker.Start(ctx)

return nil
}

Expand Down Expand Up @@ -182,6 +212,17 @@ func (c *client) AssembleBuild(ctx context.Context, b *pipeline.Build) error {
}
}

// setup containerTeachers now that all containers are defined.
c.PodTracker.TrackContainers(c.Pod.Spec.Containers)

// send signal to StreamBuild now that PodTracker is ready to be started.
close(c.PodTracker.Ready)

// wait for the PodTracker caches to populate before creating the pipeline pod.
if ok := cache.WaitForCacheSync(ctx.Done(), c.PodTracker.PodSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}

// If the api call to create the pod fails, the pod might
// partially exist. So, set this first to make sure all
// remnants get deleted.
Expand All @@ -206,6 +247,15 @@ func (c *client) AssembleBuild(ctx context.Context, b *pipeline.Build) error {
func (c *client) RemoveBuild(ctx context.Context, b *pipeline.Build) error {
c.Logger.Tracef("removing build %s", b.ID)

// PodTracker gets created in SetupBuild before pod is created
defer func() {
// check for nil as RemoveBuild may get called multiple times
if c.PodTracker != nil {
c.PodTracker.Stop()
c.PodTracker = nil
}
}()

if !c.createdPod {
// nothing to do
return nil
Expand Down
91 changes: 88 additions & 3 deletions runtime/kubernetes/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,82 @@ func TestKubernetes_SetupBuild(t *testing.T) {
}
}

func TestKubernetes_StreamBuild(t *testing.T) {
tests := []struct {
name string
failure bool
doCancel bool
doReady bool
pipeline *pipeline.Build
pod *v1.Pod
}{
{
name: "stages canceled",
failure: false,
doCancel: true,
pipeline: _stages,
pod: _stagesPod,
},
{
name: "steps canceled",
failure: false,
doCancel: true,
pipeline: _steps,
pod: _pod,
},
{
name: "stages ready",
failure: false,
doReady: true,
pipeline: _stages,
pod: _stagesPod,
},
{
name: "steps ready",
failure: false,
doReady: true,
pipeline: _steps,
pod: _pod,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_engine, err := NewMock(test.pod)
if err != nil {
t.Errorf("unable to create runtime engine: %v", err)
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// StreamBuild and AssembleBuild coordinate their work.
go func() {
if test.doCancel {
// simulate canceled build
cancel()
} else if test.doReady {
// simulate AssembleBuild
close(_engine.PodTracker.Ready)
}
}()

err = _engine.StreamBuild(ctx, test.pipeline)

if test.failure {
if err == nil {
t.Errorf("StreamBuild should have returned err")
}

return // continue to next test
}

if err != nil {
t.Errorf("StreamBuild returned err: %v", err)
}
})
}
}

func TestKubernetes_AssembleBuild(t *testing.T) {
// setup tests
tests := []struct {
Expand Down Expand Up @@ -421,16 +497,25 @@ func TestKubernetes_AssembleBuild(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_engine, err := NewMock(test.k8sPod)
if err != nil {
t.Errorf("unable to create runtime engine: %v", err)
}

_engine.Pod = test.enginePod

_engine.containersLookup = map[string]int{}
for i, ctn := range test.enginePod.Spec.Containers {
_engine.containersLookup[ctn.Name] = i
}

if err != nil {
t.Errorf("unable to create runtime engine: %v", err)
}
// StreamBuild and AssembleBuild coordinate their work, so, emulate
// executor.StreamBuild which calls runtime.StreamBuild concurrently.
go func() {
err := _engine.StreamBuild(context.Background(), test.pipeline)
if err != nil {
t.Errorf("unable to start PodTracker via StreamBuild")
}
}()

err = _engine.AssembleBuild(context.Background(), test.pipeline)

Expand Down
Loading

0 comments on commit e17c12a

Please sign in to comment.