Skip to content

Commit

Permalink
feat(k8s): Use PodTracker in WaitContainer
Browse files Browse the repository at this point in the history
Replaces manual Pods Watch API call with a "container is terminated" signal
in a containerTracker. That signal is controlled by the PodTracker based
on Add/Update/Delete pod events from the PodInformer (which does its own
watch/list API calls internally).
  • Loading branch information
cognifloyd committed Apr 8, 2022
1 parent c9ece53 commit acbe508
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 111 deletions.
8 changes: 0 additions & 8 deletions runtime/kubernetes/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,6 @@ func (c *client) AssembleBuild(ctx context.Context, b *pipeline.Build) error {
}
}

// TODO: use these to start streaming logs before TailContainer is called
// Add the ResourceEventHandler
//c.PodTracker.AddPodInformerEventHandler(cache.ResourceEventHandlerFuncs{
// AddFunc: func(new interface{}) {},
// UpdateFunc: func(old, new interface{}) {},
// DeleteFunc: func(old interface{}) {},
//})

// Populate the PodTracker caches
c.PodTracker.Start(ctx.Done())
if ok := cache.WaitForCacheSync(ctx.Done(), c.PodTracker.PodSynced); !ok {
Expand Down
93 changes: 34 additions & 59 deletions runtime/kubernetes/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
)
Expand Down Expand Up @@ -317,73 +316,49 @@ func (c *client) TailContainer(ctx context.Context, ctn *pipeline.Container) (io
func (c *client) WaitContainer(ctx context.Context, ctn *pipeline.Container) error {
c.Logger.Tracef("waiting for container %s", ctn.ID)

// create label selector for watching the pod
selector := fmt.Sprintf("pipeline=%s", fields.EscapeValue(c.Pod.ObjectMeta.Name))

// create options for watching the container
opts := metav1.ListOptions{
LabelSelector: selector,
Watch: true,
tracker, ok := c.PodTracker.Containers[ctn.ID]
if !ok {
return fmt.Errorf("containerTracker is missing for %s", ctn.ID)
}

// send API call to capture channel for watching the container
//
// https://pkg.go.dev/k8s.io/client-go/kubernetes/typed/core/v1?tab=doc#PodInterface
// ->
// https://pkg.go.dev/k8s.io/apimachinery/pkg/watch?tab=doc#Interface
// nolint: contextcheck // ignore non-inherited new context
podWatch, err := c.Kubernetes.CoreV1().Pods(c.config.Namespace).Watch(context.Background(), opts)
if err != nil {
return err
}
// wait for the container terminated signal
<-tracker.Terminated

defer podWatch.Stop()
return nil
}

for {
// capture new result from the channel
//
// https://pkg.go.dev/k8s.io/apimachinery/pkg/watch?tab=doc#Interface
result := <-podWatch.ResultChan()
// inspectContainerStatuses signals when a container reaches a terminal state.
func (p podTracker) inspectContainerStatuses(pod *v1.Pod) {
// check if the pod is in a pending state
//
// https://pkg.go.dev/k8s.io/api/core/v1?tab=doc#PodStatus
if pod.Status.Phase == v1.PodPending {
// nothing to inspect if pod is in a pending state
return
}

// convert the object from the result to a pod
pod, ok := result.Object.(*v1.Pod)
// iterate through each container in the pod
for _, cst := range pod.Status.ContainerStatuses {
tracker, ok := p.Containers[cst.Name]
if !ok {
return fmt.Errorf("unable to watch pod %s", c.Pod.ObjectMeta.Name)
}

// check if the pod is in a pending state
//
// https://pkg.go.dev/k8s.io/api/core/v1?tab=doc#PodStatus
if pod.Status.Phase == v1.PodPending {
// skip pod if it's in a pending state
// unknown container
continue
}

// iterate through each container in the pod
for _, cst := range pod.Status.ContainerStatuses {
// check if the container has a matching ID
//
// https://pkg.go.dev/k8s.io/api/core/v1?tab=doc#ContainerStatus
if !strings.EqualFold(cst.Name, ctn.ID) {
// skip container if it's not a matching ID
continue
}

// check if the container is in a terminated state
//
// https://pkg.go.dev/k8s.io/api/core/v1?tab=doc#ContainerState
if cst.State.Terminated == nil {
// skip container if it's not in a terminated state
break
}

// check if the container has a terminated state reason
//
// https://pkg.go.dev/k8s.io/api/core/v1?tab=doc#ContainerStateTerminated
if len(cst.State.Terminated.Reason) > 0 {
// break watching the container as it's complete
return nil
}
// check if the container is in a terminated state
//
// https://pkg.go.dev/k8s.io/api/core/v1?tab=doc#ContainerState
if cst.State.Terminated != nil {
// && len(cst.State.Terminated.Reason) > 0 {
// WaitContainer used to check Terminated.Reason as well.
// if that is still needed, then we can add that check here
// or retrieve the pod with something like this in WaitContainer:
// c.PodTracker.PodLister.Pods(c.config.Namespace).Get(c.Pod.GetName())

tracker.terminatedOnce.Do(func() {
// let WaitContainer know the container is terminated
close(tracker.Terminated)
})
}
}
}
109 changes: 73 additions & 36 deletions runtime/kubernetes/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
velav1alpha1 "github.com/go-vela/worker/runtime/kubernetes/apis/vela/v1alpha1"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)

func TestKubernetes_InspectContainer(t *testing.T) {
Expand Down Expand Up @@ -320,31 +318,27 @@ func TestKubernetes_WaitContainer(t *testing.T) {
tests := []struct {
failure bool
container *pipeline.Container
object runtime.Object
cached *v1.Pod
updated *v1.Pod
}{
{
failure: false,
container: _container,
object: _pod,
cached: _pod,
updated: _pod,
},
{
failure: false,
container: _container,
object: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "github-octocat-1",
Namespace: "test",
Labels: map[string]string{
"pipeline": "github-octocat-1",
},
},
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Pod",
},
cached: _pod,
updated: &v1.Pod{
ObjectMeta: _pod.ObjectMeta,
TypeMeta: _pod.TypeMeta,
Spec: _pod.Spec,
Status: v1.PodStatus{
Phase: v1.PodRunning,
ContainerStatuses: []v1.ContainerStatus{
// alternate order
{
Name: "step-github-octocat-1-echo",
State: v1.ContainerState{
Expand All @@ -367,38 +361,81 @@ func TestKubernetes_WaitContainer(t *testing.T) {
},
},
},
{
failure: false,
container: _container,
cached: &v1.Pod{
ObjectMeta: _pod.ObjectMeta,
TypeMeta: _pod.TypeMeta,
Spec: _pod.Spec,
Status: v1.PodStatus{
Phase: v1.PodRunning,
ContainerStatuses: []v1.ContainerStatus{
{
Name: "step-github-octocat-1-clone",
State: v1.ContainerState{
Running: &v1.ContainerStateRunning{},
},
},
},
},
},
updated: _pod,
},
{
failure: true,
container: _container,
object: new(v1.PodTemplate),
cached: _pod,
updated: &v1.Pod{
ObjectMeta: _pod.ObjectMeta,
TypeMeta: _pod.TypeMeta,
Status: _pod.Status,
// if client.Pod.Spec is empty, podTracker will fail
//Spec: _pod.Spec,
},
},
}

// run tests
for _, test := range tests {
// setup types
_engine, _watch, err := newMockWithWatch(_pod, "pods")
if err != nil {
t.Errorf("unable to create runtime engine: %v", err)
}
// anonymous function to allow "defer close(stopCh)" on each iteration
func() {
// set up the fake k8s clientset so that it returns the final/updated state
_engine, err := NewMock(test.updated)
if err != nil {
t.Errorf("unable to create runtime engine: %v", err)
}

go func() {
// simulate adding a pod to the watcher
_watch.Add(test.object)
}()
stopCh := make(chan struct{})
defer close(stopCh)

err = _engine.WaitContainer(context.Background(), test.container)
// enable the add/update/delete funcs for pod changes
_engine.PodTracker.Start(stopCh)

if test.failure {
if err == nil {
t.Errorf("WaitContainer should have returned err")
}
go func() {
// revert the cached pod to an "older" version
// this will trigger a sync which will use the fake clientset to get "updated"
pod := test.cached.DeepCopy()
pod.SetResourceVersion("older")
err = _engine.PodTracker.podInformer.Informer().GetIndexer().Add(pod)
if err != nil {
t.Errorf("loading the podInformer cache failed: %v", err)
}
}()

continue
}
err = _engine.WaitContainer(context.Background(), test.container)

if err != nil {
t.Errorf("WaitContainer returned err: %v", err)
}
if test.failure {
if err == nil {
t.Errorf("WaitContainer should have returned err")
}

return // effectively "continue" to next test
}

if err != nil {
t.Errorf("WaitContainer returned err: %v", err)
}
}()
}
}
13 changes: 11 additions & 2 deletions runtime/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ func NewMock(_pod *v1.Pod, opts ...ClientOpt) (*client, error) {
c.config.Namespace = "test"

// set the Kubernetes pod in the runtime client
c.Pod = _pod
c.Pod = _pod.DeepCopy()
c.Pod.SetResourceVersion("0")

// apply all provided configuration options
for _, opt := range opts {
Expand All @@ -185,7 +186,13 @@ func NewMock(_pod *v1.Pod, opts ...ClientOpt) (*client, error) {
)

// set the PodTracker (normally populated in SetupBuild)
tracker, err := NewPodTracker(c.Logger, c.Kubernetes, _pod, time.Second*0)
tracker, err := NewPodTracker(c.Logger, c.Kubernetes, c.Pod, time.Second*0)
if err != nil {
return c, err
}

// pre-populate the podInformer cache
err = tracker.podInformer.Informer().GetIndexer().Add(c.Pod)
if err != nil {
return c, err
}
Expand All @@ -195,5 +202,7 @@ func NewMock(_pod *v1.Pod, opts ...ClientOpt) (*client, error) {

c.PodTracker = tracker

// The test is responsible for calling c.PodTracker.Start() if needed

return c, nil
}
Loading

0 comments on commit acbe508

Please sign in to comment.