diff --git a/go.mod b/go.mod index 75022634b..6ac197c8d 100644 --- a/go.mod +++ b/go.mod @@ -67,6 +67,7 @@ require ( github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-isatty v0.0.19 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect + github.com/moby/spdystream v0.2.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect diff --git a/go.sum b/go.sum index 88f63ca3c..8478cb168 100644 --- a/go.sum +++ b/go.sum @@ -12,6 +12,7 @@ github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbt github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/andygrunwald/go-jira v1.16.0 h1:PU7C7Fkk5L96JvPc6vDVIrd99vdPnYudHu4ju2c2ikQ= github.com/andygrunwald/go-jira v1.16.0/go.mod h1:UQH4IBVxIYWbgagc0LF/k9FRs9xjIiQ8hIcC6HfLwFU= +github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= @@ -29,6 +30,7 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= +github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153 h1:yUdfgN0XgIJw7foRItutHYUIhlcKzcSf5vDpdhQAKTc= github.com/emicklei/go-restful/v3 v3.9.0 h1:XwGDlfxEnQZzuopoqxwSEllNcCOM9DhhFyhFIIGKwxE= github.com/emicklei/go-restful/v3 v3.9.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -121,6 +123,7 @@ github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/ github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU= github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= @@ -162,6 +165,8 @@ github.com/mattn/go-sqlite3 v1.14.17 h1:mCRHCLDUBXgpKAqIKsaAaAsrAlbkeomtRFKXh2L6 github.com/mattn/go-sqlite3 v1.14.17/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= +github.com/moby/spdystream v0.2.0 h1:cjW1zVyyoiM0T7b6UoySUFqzXMoqRckQtXwGPiBhOM8= +github.com/moby/spdystream v0.2.0/go.mod h1:f7i0iNDQJ059oMTcWxx8MA/zKFIuD/lY+0GqbN2Wy8c= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= diff --git a/reaper/task.go b/reaper/task.go index 3f73432f1..6f64ec41c 100644 --- a/reaper/task.go +++ b/reaper/task.go @@ -44,7 +44,7 @@ type TaskReaper struct { // - Pod is deleted after the defined period. func (r *TaskReaper) Run() { Log.V(1).Info("Reaping tasks.") - list := []model.Task{} + list := []task.Task{} result := r.DB.Find( &list, "state IN ?", @@ -52,6 +52,7 @@ func (r *TaskReaper) Run() { task.Created, task.Succeeded, task.Failed, + task.Canceled, }) Log.Error(result.Error, "") if result.Error != nil { @@ -108,6 +109,10 @@ func (r *TaskReaper) Run() { r.release(m) } } + d := time.Duration(Settings.Hub.Task.Pod.Retention.Succeeded) * Unit + if time.Since(mark) > d { + r.podDelete(m) + } case task.Failed: mark := m.CreateTime if m.Terminated != nil { @@ -124,23 +129,17 @@ func (r *TaskReaper) Run() { r.release(m) } } + d := time.Duration(Settings.Hub.Task.Pod.Retention.Failed) * Unit + if time.Since(mark) > d { + r.podDelete(m) + } } } } -// release resources. -func (r *TaskReaper) release(m *model.Task) { +// release bucket and file resources. +func (r *TaskReaper) release(m *task.Task) { nChanged := 0 - if m.Pod != "" { - rt := Task{Task: m} - err := rt.Delete(r.Client) - if err == nil { - m.Pod = "" - nChanged++ - } else { - Log.Error(err, "") - } - } if m.HasBucket() { Log.Info("Task bucket released.", "id", m.ID) m.SetBucket(nil) @@ -151,8 +150,7 @@ func (r *TaskReaper) release(m *model.Task) { nChanged++ } if nChanged > 0 { - rt := task.Task{Task: m} - rt.Event(task.Released) + m.Event(task.Released) err := r.DB.Save(m).Error if err != nil { Log.Error(err, "") @@ -161,10 +159,25 @@ func (r *TaskReaper) release(m *model.Task) { return } +// podDelete deletes the task pod. +func (r *TaskReaper) podDelete(m *task.Task) { + if m.Pod == "" { + return + } + err := m.Delete(r.Client) + if err != nil { + Log.Error(err, "") + return + } + err = r.DB.Save(m).Error + if err != nil { + Log.Error(err, "") + } +} + // delete task. -func (r *TaskReaper) delete(m *model.Task) { - rt := Task{Task: m} - err := rt.Delete(r.Client) +func (r *TaskReaper) delete(m *task.Task) { + err := m.Delete(r.Client) if err != nil { Log.Error(err, "") } diff --git a/settings/hub.go b/settings/hub.go index d21e14813..a215177e0 100644 --- a/settings/hub.go +++ b/settings/hub.go @@ -19,6 +19,8 @@ const ( EnvTaskReapCreated = "TASK_REAP_CREATED" EnvTaskReapSucceeded = "TASK_REAP_SUCCEEDED" EnvTaskReapFailed = "TASK_REAP_FAILED" + EnvTaskPodRetainSucceeded = "TASK_POD_RETAIN_SUCCEEDED" + EnvTaskPodRetainFailed = "TASK_POD_RETAIN_FAILED" EnvTaskSA = "TASK_SA" EnvTaskRetries = "TASK_RETRIES" EnvTaskPreemptEnabled = "TASK_PREEMPT_ENABLED" @@ -84,6 +86,12 @@ type Hub struct { Postponed time.Duration Rate int } + Pod struct { + Retention struct { + Succeeded int + Failed int + } + } } // Frequency Frequency struct { @@ -169,6 +177,20 @@ func (r *Hub) Load() (err error) { } else { r.Task.Reaper.Failed = 43200 // 720 hours (30 days). } + s, found = os.LookupEnv(EnvTaskPodRetainSucceeded) + if found { + n, _ := strconv.Atoi(s) + r.Task.Pod.Retention.Succeeded = n + } else { + r.Task.Pod.Retention.Succeeded = 1 + } + s, found = os.LookupEnv(EnvTaskPodRetainFailed) + if found { + n, _ := strconv.Atoi(s) + r.Task.Pod.Retention.Failed = n + } else { + r.Task.Pod.Retention.Failed = 4320 // 72 hours. + } r.Task.SA, found = os.LookupEnv(EnvTaskSA) if !found { r.Task.SA = "tackle-hub" diff --git a/task/manager.go b/task/manager.go index 914c23999..5bd34fcf4 100644 --- a/task/manager.go +++ b/task/manager.go @@ -1,6 +1,7 @@ package task import ( + "bytes" "context" "errors" "fmt" @@ -30,7 +31,10 @@ import ( meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/remotecommand" k8s "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/config" ) // States @@ -51,19 +55,20 @@ const ( // Events const ( - AddonSelected = "AddonSelected" - ExtSelected = "ExtensionSelected" - ImageError = "ImageError" - PodNotFound = "PodNotFound" - PodCreated = "PodCreated" - PodPending = "PodPending" - PodRunning = "PodRunning" - Preempted = "Preempted" - PodSucceeded = "PodSucceeded" - PodFailed = "PodFailed" - PodDeleted = "PodDeleted" - Escalated = "Escalated" - Released = "Released" + AddonSelected = "AddonSelected" + ExtSelected = "ExtensionSelected" + ImageError = "ImageError" + PodNotFound = "PodNotFound" + PodCreated = "PodCreated" + PodPending = "PodPending" + PodRunning = "PodRunning" + Preempted = "Preempted" + PodSucceeded = "PodSucceeded" + PodFailed = "PodFailed" + PodDeleted = "PodDeleted" + Escalated = "Escalated" + Released = "Released" + ContainerKilled = "ContainerKilled" ) // k8s labels. @@ -123,6 +128,7 @@ func (m *Manager) Run(ctx context.Context) { m.deleteOrphanPods() m.runActions() m.updateRunning() + m.deleteZombies() m.startReady() m.pause() } else { @@ -860,9 +866,6 @@ func (m *Manager) updateRunning() { list = append(list, &Task{task}) } for _, task := range list { - if !task.StateIn(Running, Pending) { - continue - } running := task pod, found := running.Reflect(&m.cluster) if found { @@ -872,10 +875,24 @@ func (m *Manager) updateRunning() { Log.Error(err, "") continue } - err = running.Delete(m.Client) - if err != nil { - Log.Error(err, "") - continue + podRetention := 0 + if running.State == Succeeded { + podRetention = Settings.Hub.Task.Pod.Retention.Succeeded + } else { + podRetention = Settings.Hub.Task.Pod.Retention.Failed + } + if podRetention > 0 { + err = m.ensureTerminated(running, pod) + if err != nil { + podRetention = 0 + } + } + if podRetention == 0 { + err = running.Delete(m.Client) + if err != nil { + Log.Error(err, "") + continue + } } } } @@ -888,6 +905,56 @@ func (m *Manager) updateRunning() { } } +// deleteZombies - detect and delete zombie pods. +// A zombie is a (succeed|failed) task with a running pod that +// the manager has previously tried to kill. +func (m *Manager) deleteZombies() { + var err error + defer func() { + Log.Error(err, "") + }() + var pods []string + for _, pod := range m.cluster.Pods() { + if pod.Status.Phase == core.PodRunning { + ref := path.Join(pod.Namespace, pod.Name) + pods = append( + pods, + ref) + } + } + fetched := []*Task{} + db := m.DB.Select("Events") + db = db.Where("Pod", pods) + db = db.Where("state IN ?", + []string{ + Succeeded, + Failed, + }) + err = db.Find(&fetched).Error + if err != nil { + err = liberr.Wrap(err) + return + } + for _, task := range fetched { + event, found := task.LastEvent(ContainerKilled) + if !found { + continue + } + if time.Since(event.Last) > time.Minute { + Log.Info( + "Zombie detected.", + "task", + task.ID, + "pod", + task.Pod) + err = task.Delete(m.Client) + if err != nil { + Log.Error(err, "") + } + } + } +} + // deleteOrphanPods finds and deletes task pods not referenced by a task. func (m *Manager) deleteOrphanPods() { var err error @@ -1068,6 +1135,87 @@ func (m *Manager) containerLog(pod *core.Pod, container string) (file *model.Fil return } +// ensureTerminated - Terminate running containers. +func (m *Manager) ensureTerminated(task *Task, pod *core.Pod) (err error) { + for _, status := range pod.Status.ContainerStatuses { + if status.State.Terminated != nil { + continue + } + if status.Started == nil || !*status.Started { + continue + } + err = m.terminateContainer(task, pod, status.Name) + if err != nil { + return + } + } + return +} + +// terminateContainer - Terminate container as needed. +// The container is killed. +// Should the container continue to run after (1) minute, +// it is reported as an error. +func (m *Manager) terminateContainer(task *Task, pod *core.Pod, container string) (err error) { + Log.V(1).Info("KILL container", "container", container) + clientSet, err := k8s2.NewClientSet() + if err != nil { + return + } + cmd := []string{ + "sh", + "-c", + "kill 1", + } + req := clientSet.CoreV1().RESTClient().Post() + req = req.Resource("pods") + req = req.Name(pod.Name) + req = req.Namespace(pod.Namespace) + req = req.SubResource("exec") + option := &core.PodExecOptions{ + Command: cmd, + Container: container, + Stdout: true, + Stderr: true, + TTY: true, + } + req.VersionedParams( + option, + scheme.ParameterCodec, + ) + cfg, _ := config.GetConfig() + exec, err := remotecommand.NewSPDYExecutor(cfg, "POST", req.URL()) + if err != nil { + return + } + stdout := bytes.NewBuffer([]byte{}) + stderr := bytes.NewBuffer([]byte{}) + err = exec.Stream(remotecommand.StreamOptions{ + Stdout: stdout, + Stderr: stderr, + }) + if err != nil { + Log.Info( + "Container KILL failed.", + "name", + container, + "err", + err.Error(), + "stderr", + stderr.String()) + } else { + task.Event( + ContainerKilled, + "container: '%s' had not terminated.", + container) + Log.Info( + "Container KILLED.", + "name", + container) + } + return +} + // Task is an runtime task. type Task struct { // model. @@ -1140,6 +1288,17 @@ func (r *Task) LastEvent(kind string) (event *model.TaskEvent, found bool) { return } +// FindEvent returns the matched events by kind. +func (r *Task) FindEvent(kind string) (matched []*model.TaskEvent) { + for i := 0; i < len(r.Events); i++ { + event := &r.Events[i] + if kind == event.Kind { + matched = append(matched, event) + } + } + return +} + // Run the specified task. func (r *Task) Run(cluster *Cluster) (started bool, err error) { mark := time.Now()