diff --git a/runtime/kubernetes/build.go b/runtime/kubernetes/build.go index ad94681f..78e436cf 100644 --- a/runtime/kubernetes/build.go +++ b/runtime/kubernetes/build.go @@ -219,7 +219,7 @@ func (c *client) AssembleBuild(ctx context.Context, b *pipeline.Build) error { close(c.PodTracker.Ready) // wait for the PodTracker caches to populate before creating the pipeline pod. - if ok := cache.WaitForCacheSync(ctx.Done(), c.PodTracker.PodSynced); !ok { + if ok := cache.WaitForCacheSync(ctx.Done(), c.PodTracker.PodSynced, c.PodTracker.EventSynced); !ok { return fmt.Errorf("failed to wait for caches to sync") } diff --git a/runtime/kubernetes/build_test.go b/runtime/kubernetes/build_test.go index 50f304ea..ef9b6664 100644 --- a/runtime/kubernetes/build_test.go +++ b/runtime/kubernetes/build_test.go @@ -455,9 +455,10 @@ func TestKubernetes_StreamBuild(t *testing.T) { func TestKubernetes_AssembleBuild(t *testing.T) { // setup tests tests := []struct { - name string - failure bool - pipeline *pipeline.Build + name string + failure bool + cancelBuild bool + pipeline *pipeline.Build // k8sPod is the pod that the mock Kubernetes client will return k8sPod *v1.Pod // enginePod is the pod under construction in the Runtime Engine @@ -491,6 +492,22 @@ func TestKubernetes_AssembleBuild(t *testing.T) { k8sPod: _pod, enginePod: _pod, }, + { + name: "stages-build canceled", + failure: true, + cancelBuild: true, + pipeline: _stages, + k8sPod: &v1.Pod{}, + enginePod: _stagesPod, + }, + { + name: "steps-build canceled", + failure: true, + cancelBuild: true, + pipeline: _steps, + k8sPod: &v1.Pod{}, + enginePod: _pod, + }, } // run tests @@ -508,16 +525,25 @@ func TestKubernetes_AssembleBuild(t *testing.T) { _engine.containersLookup[ctn.Name] = i } + // setup test context + ctx, done := context.WithCancel(context.Background()) + defer done() + // StreamBuild and AssembleBuild coordinate their work, so, emulate // executor.StreamBuild which calls runtime.StreamBuild concurrently. go func() { - err := _engine.StreamBuild(context.Background(), test.pipeline) - if err != nil { - t.Errorf("unable to start PodTracker via StreamBuild") + if test.cancelBuild { + // simulate a build timeout + done() + } else { + err := _engine.StreamBuild(context.Background(), test.pipeline) + if err != nil { + t.Errorf("unable to start PodTracker via StreamBuild") + } } }() - err = _engine.AssembleBuild(context.Background(), test.pipeline) + err = _engine.AssembleBuild(ctx, test.pipeline) if test.failure { if err == nil { diff --git a/runtime/kubernetes/container.go b/runtime/kubernetes/container.go index 28e0932d..20948389 100644 --- a/runtime/kubernetes/container.go +++ b/runtime/kubernetes/container.go @@ -23,6 +23,38 @@ import ( "k8s.io/apimachinery/pkg/util/wait" ) +// These are known kubernetes event Reasons. +const ( + // nolint: godot // commented code is not a sentence + // known scheduler event reasons can be found here: + // https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/schedule_one.go + //reasonFailedScheduling = "FailedScheduling" + //reasonScheduled = "Scheduled" + + // known kubelet event reasons are listed here: + // https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/kubelet/events/event.go + + // kubelet image event reasons. + reasonPulling = "Pulling" + reasonPulled = "Pulled" + reasonFailed = "Failed" // Warning: image, container, pod + reasonInspectFailed = "InspectFailed" // Warning + reasonErrImageNeverPull = "ErrImageNeverPull" // Warning + reasonBackOff = "BackOff" // Normal: image, container + + // nolint: godot // commented code is not a sentence + // kubelet container event reasons. + //reasonCreated = "Created" + //reasonStarted = "Started" + //reasonKilling = "Killing" + //reasonPreempting = "Preempting" + //reasonExceededGracePeriod = "ExceededGracePeriod" + // kubelet pod event reasons. + //reasonFailedKillPod = "FailedKillPod" + //reasonFailedCreatePodContainer = "FailedCreatePodContainer" + //reasonNetworkNotReady = "NetworkNotReady" +) + // InspectContainer inspects the pipeline container. func (c *client) InspectContainer(ctx context.Context, ctn *pipeline.Container) error { c.Logger.Tracef("inspecting container %s", ctn.ID) @@ -81,6 +113,12 @@ func (c *client) RunContainer(ctx context.Context, ctn *pipeline.Container, b *p return err } + // get the containerTracker for this container + ctnTracker, ok := c.PodTracker.Containers[ctn.ID] + if !ok { + return fmt.Errorf("containerTracker missing for %s", ctn.ID) + } + // set the pod container image to the parsed step image c.Pod.Spec.Containers[c.containersLookup[ctn.ID]].Image = _image @@ -98,7 +136,25 @@ func (c *client) RunContainer(ctx context.Context, ctn *pipeline.Container, b *p return err } - return nil + // make sure the container starts (watch for image pull errors or similar) + for { + select { + case <-ctx.Done(): + // build was canceled. give up + return nil + case <-ctnTracker.Running: + // hooray it is running + return nil + case event := <-ctnTracker.ImagePullErrors: + return fmt.Errorf( + "failed to run container %s in %s: [%s] %s", + ctn.ID, + c.Pod.ObjectMeta.Name, + event.Reason, + event.Message, + ) + } + } } // SetupContainer prepares the image for the pipeline container. @@ -322,7 +378,14 @@ func (c *client) WaitContainer(ctx context.Context, ctn *pipeline.Container) err } // wait for the container terminated signal - <-tracker.Terminated + select { + case <-tracker.Terminated: + // container is terminated + break + case <-ctx.Done(): + // build was canceled + break + } return nil } @@ -354,6 +417,12 @@ func (p *podTracker) inspectContainerStatuses(pod *v1.Pod) { // cst.LastTerminationState has details about the kubernetes/pause image's exit. // cst.RestartCount is 1 at exit due to switch from kubernetes/pause to final image. + if cst.Image != tracker.Image { + // we don't care if the pause image has terminated or is running + p.Logger.Tracef("container %s expected image %s, got %s", cst.Name, tracker.Image, cst.Image) + continue + } + // check if the container is in a terminated state // // https://pkg.go.dev/k8s.io/api/core/v1?tab=doc#ContainerState @@ -364,6 +433,106 @@ func (p *podTracker) inspectContainerStatuses(pod *v1.Pod) { // let WaitContainer know the container is terminated close(tracker.Terminated) }) + } else if cst.State.Running != nil { + tracker.runningOnce.Do(func() { + p.Logger.Debugf("container running: %s in pod %s, %v", cst.Name, p.TrackedPod, cst) + + // let RunContainer know the container is running + close(tracker.Running) + }) + } else if cst.State.Waiting != nil && + (cst.State.Waiting.Reason == reasonBackOff || cst.State.Waiting.Reason == reasonFailed) && + strings.Contains(cst.State.Waiting.Message, tracker.Image) { + // inspectContainerStatuses should return as quickly as possible + // writing to the channel can block when nothing is receiving, + // so fire it off in a goroutine. + go func() { + // imitate an event to make sure we catch it. + tracker.ImagePullErrors <- &v1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: p.Namespace, + }, + InvolvedObject: v1.ObjectReference{ + Kind: "Pod", + Name: p.Name, + Namespace: p.Namespace, + FieldPath: fmt.Sprintf("spec.containers{%s}", tracker.Name), + }, + Reason: cst.State.Waiting.Reason, + Message: cst.State.Waiting.Message, + } + }() + } + } +} + +// inspectContainerEvent gathers container info from an event. +func (p *podTracker) inspectContainerEvent(event *v1.Event) { + if !strings.HasPrefix(event.InvolvedObject.FieldPath, "spec.containers") { + // event is not for a container + return + } + + // the FieldPath format is "spec.containers{container-name}" for named containers + containerName := strings.TrimPrefix(event.InvolvedObject.FieldPath, "spec.containers{") + containerName = strings.TrimSuffix(containerName, "}") + + if containerName == event.InvolvedObject.FieldPath { + // the FieldPath is probably the indexed "spec.containers[2]" format, + // which is only used for unnamed containers, + // but all of our containers are named. + p.Logger.Debugf("ignoring unnamed container, got pod fieldPath %s", event.InvolvedObject.FieldPath) + + return + } + + // get the containerTracker for this container + tracker, ok := p.Containers[containerName] + if !ok { + // unknown container (probably a sidecar injected by an admissions controller) + p.Logger.Debugf("ignoring untracked container %s from pod %s", containerName, p.TrackedPod) + + return + } + + p.Logger.Tracef("container event for %s: [%s] %s", tracker.Name, event.Reason, event.Message) + + // check if the event mentions the target image. + // If the relevant messages does not include our image, then + // either it is for "kubernetes/pause:latest", which we don't care about, + // or it is a generic message that is basically useless like: + // event.Reason => event.Message + // Failed => Error: ErrImagePull + // BackOff => Error: ImagePullBackOff + // Many of these generic messages come from this part of kubelet: + // https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/kubelet/kuberuntime/kuberuntime_container.go + if strings.Contains(event.Message, tracker.Image) { + switch event.Reason { + // examples: event.Reason => event.Message + // The image related messages come from the image manager in kubelet: + // https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/kubelet/images/image_manager.go + case reasonFailed, reasonBackOff, reasonInspectFailed, reasonErrImageNeverPull: + // Failed => Failed to pull image "image:tag": + // BackOff => Back-off pulling image "image:tag" + // InspectFailed => Failed to apply default image tag "": couldn't parse image reference "": + // InspectFailed => Failed to inspect image "": + // ErrImageNeverPull => Container image "image:tag" is not present with pull policy of Never + tracker.ImagePullErrors <- event + return + case reasonPulled: + // Pulled => Successfully pulled image "image:tag" in