Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix log collection after container stop #991

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions bcs-services/bcs-logbeat-sidecar/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,18 @@ replace (
)

require (
github.com/Tencent/bk-bcs/bcs-common v0.0.0-20210818040851-76fdc539dc33
github.com/Tencent/bk-bcs/bcs-k8s/kubebkbcs v0.0.0-20210518090424-99527484a283
github.com/Tencent/bk-bcs/bcs-common v0.0.0-20210908080357-99540f892332
github.com/Tencent/bk-bcs/bcs-k8s/kubebkbcs v0.0.0-20210810131039-5220f346d815
github.com/containerd/containerd v1.4.3 // indirect
github.com/docker/docker v20.10.0-rc1+incompatible // indirect
github.com/fsouza/go-dockerclient v1.6.5
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 // indirect
github.com/imdario/mergo v0.3.11 // indirect
github.com/moby/sys/mount v0.2.0 // indirect
github.com/moby/term v0.0.0-20201216013528-df9cb8a40635 // indirect
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
github.com/prometheus/client_golang v1.9.0
github.com/prometheus/client_model v0.2.0
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce
gopkg.in/yaml.v2 v2.3.0
gotest.tools/v3 v3.0.3 // indirect
k8s.io/api v0.18.5
k8s.io/apiextensions-apiserver v0.18.2
k8s.io/apimachinery v0.18.5
Expand Down
2 changes: 1 addition & 1 deletion bcs-services/bcs-logbeat-sidecar/sidecar/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (s *SidecarController) inspectContainer(ID string) *docker.Container {
return c
case <-timer.C:
cancelFunc()
blog.Errorf("Inspect container %d timeout unexpected, check whether pod is working properly with sharePID mode.", ID)
blog.Errorf("Inspect container %s timeout unexpected, check whether pod is working properly with sharePID mode.", ID)
return nil
}
}
83 changes: 47 additions & 36 deletions bcs-services/bcs-logbeat-sidecar/sidecar/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/Tencent/bk-bcs/bcs-common/common/blog"
bcsv1 "github.com/Tencent/bk-bcs/bcs-k8s/kubebkbcs/apis/bkbcs/v1"
Expand Down Expand Up @@ -150,6 +151,20 @@ func (s *SidecarController) listenerDockerEvent() {
}
}()

freeFunc := func(containerID string) {
s.containerCacheMutex.Lock()
delete(s.containerCache, containerID)
s.containerCacheMutex.Unlock()
s.Lock()
s.deleteContainerLogConf(containerID)
s.Unlock()
err = s.reloadLogbeat()
if err != nil {
blog.Errorf("reload logbeat failed: %s", err.Error())
}
blog.V(3).Infof("reload logbeat succ")
}

for {
var msg *docker.APIEvents
select {
Expand All @@ -160,9 +175,11 @@ func (s *SidecarController) listenerDockerEvent() {
switch msg.Action {
//start container
case "start":
blog.Infof("docker action : %+v", *msg)
c := s.inspectContainer(msg.ID)
if c == nil {
blog.Errorf("inspect container %s failed", msg.ID)
freeFunc(msg.ID)
break
}
s.containerCacheMutex.Lock()
Expand All @@ -175,29 +192,29 @@ func (s *SidecarController) listenerDockerEvent() {
}
blog.V(3).Infof("reload logbeat succ")

// destroy container
case "destroy":
s.containerCacheMutex.Lock()
delete(s.containerCache, msg.ID)
s.containerCacheMutex.Unlock()
s.Lock()
s.deleteContainerLogConf(msg.ID)
s.Unlock()
err = s.reloadLogbeat()
if err != nil {
blog.Errorf("reload logbeat failed: %s", err.Error())
// exit container
case "die", "stop":
blog.Infof("docker action : %+v", *msg)
s.containerCacheMutex.RLock()
c, ok := s.containerCache[msg.ID]
s.containerCacheMutex.RUnlock()
if !ok {
blog.Errorf("Container info with containerID (%s) did not in containerCache", msg.ID)
freeFunc(msg.ID)
break
}
blog.V(3).Infof("reload logbeat succ")
// stop container
case "stop":
s.Lock()
s.deleteContainerLogConf(msg.ID)
s.Unlock()
c.State.Running = false
c.State.Dead = true
s.produceContainerLogConf(c)
err = s.reloadLogbeat()
if err != nil {
blog.Errorf("reload logbeat failed: %s", err.Error())
}
blog.V(3).Infof("reload logbeat succ")

// destroy container
case "destroy":
freeFunc(msg.ID)
}
}
}
Expand Down Expand Up @@ -510,6 +527,7 @@ func (s *SidecarController) produceLogConfParameterV2(container *docker.Containe
matchedLogConfig.LogPaths = logConf.Spec.LogPaths
matchedLogConfig.HostPaths = logConf.Spec.HostPaths
matchedLogConfig.LogTags = logConf.Spec.LogTags
matchedLogConfig.Multiline = logConf.Spec.Multiline
matchedLogConfigs = append(matchedLogConfigs, &matchedLogConfig)
}

Expand All @@ -535,11 +553,16 @@ func (s *SidecarController) produceLogConfParameterV2(container *docker.Containe
Paths: make([]string, 0),
ToJSON: true,
OutputFormat: s.conf.LogbeatOutputFormat,
Package: logConf.Spec.PackageCollection,
}
blog.Infof("container info: %+v", *container)
if !container.State.Running {
var closeEOF bool = true
para.CloseEOF = &closeEOF
para.CloseTimeout = time.Duration(time.Duration(logConf.Spec.ExitedContainerLogCloseTimeout) * time.Second).String()
}
if logConf.Spec.PackageCollection {
pack := new(bool)
*pack = true
para.Package = pack
if conf.Multiline != nil && conf.Multiline.Type != "" {
para.Multiline = conf.Multiline
}
para.ExtMeta["io_tencent_bcs_cluster"] = logConf.Spec.ClusterId
para.ExtMeta["io_tencent_bcs_pod"] = pod.Name
Expand All @@ -551,15 +574,16 @@ func (s *SidecarController) produceLogConfParameterV2(container *docker.Containe
para.ExtMeta["io_tencent_bcs_projectid"] = pod.Labels["io.tencent.paas.projectid"]
para.ExtMeta["container_id"] = container.ID
para.ExtMeta["container_hostname"] = container.Config.Hostname
para.ExtMeta["io_tencent_bcs_container_name"] = container.Config.Labels[ContainerLabelK8sContainerName]
//whether report pod labels to log tags
if logConf.Spec.PodLabels {
for k, v := range pod.Labels {
para.ExtMeta[k] = v
para.ExtMeta[fmt.Sprintf("labels_%s", strings.ReplaceAll(k, ".", "_"))] = v
}
}
//custom log tags
for k, v := range conf.LogTags {
para.ExtMeta[k] = v
para.ExtMeta[fmt.Sprintf("%s", strings.ReplaceAll(k, ".", "_"))] = v
}
// generate std output log collection config
if stdoutDataid == "" && conf.Stdout && conf.StdDataId != "" {
Expand Down Expand Up @@ -648,16 +672,3 @@ func (s *SidecarController) getFirstWildcardPos(str string) int {
}
return pos
}

// getContainerRootPath return the root path of the container
// Usually it begins with /data/bcs/lib/docker/overlay2/{hashid}/merged
// If the container does not use OverlayFS, it will return /proc/{procid}/root
func (s *SidecarController) getContainerRootPath(container *docker.Container) string {
switch container.Driver {
case "overlay2":
return container.GraphDriver.Data["MergedDir"]
default:
// blog.Warnf("Container %s has driver %s not overlay2", container.ID, container.Driver)
return fmt.Sprintf("/proc/%d/root", container.State.Pid)
}
}
13 changes: 13 additions & 0 deletions bcs-services/bcs-logbeat-sidecar/sidecar/logconfop_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,16 @@ func (s *SidecarController) getActualPath(logPath string, container *docker.Cont
blog.V(3).Infof("origin path: %s, clean path: %s", logPath, retpath)
return retpath, nil
}

// getContainerRootPath return the root path of the container
// Usually it begins with /data/bcs/lib/docker/overlay2/{hashid}/merged
// If the container does not use OverlayFS, it will return /proc/{procid}/root
func (s *SidecarController) getContainerRootPath(container *docker.Container) string {
switch container.Driver {
case "overlay2":
return container.GraphDriver.Data["UpperDir"]
default:
// blog.Warnf("Container %s has driver %s not overlay2", container.ID, container.Driver)
return fmt.Sprintf("/proc/%d/root", container.State.Pid)
}
}
16 changes: 10 additions & 6 deletions bcs-services/bcs-logbeat-sidecar/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package types

import (
bcsv1 "github.com/Tencent/bk-bcs/bcs-k8s/kubebkbcs/apis/bkbcs/v1"
"github.com/Tencent/bk-bcs/bcs-services/bcs-logbeat-sidecar/metric"
)

Expand All @@ -26,12 +27,15 @@ type Yaml struct {

// Local is a single log collection task with single dataid
type Local struct {
DataID int `yaml:"dataid"`
OutputFormat string `yaml:"output_format"`
Paths []string `yaml:"paths"`
ToJSON bool `yaml:"to_json"`
Package *bool `yaml:"package,omitempty"`
ExtMeta map[string]string `yaml:"ext_meta"`
DataID int `yaml:"dataid"`
OutputFormat string `yaml:"output_format"`
Paths []string `yaml:"paths"`
ToJSON bool `yaml:"to_json"`
Package bool `yaml:"package"`
ExtMeta map[string]string `yaml:"ext_meta"`
CloseEOF *bool `yaml:"close_eof,omitempty"`
CloseTimeout string `yaml:"close_timeout,omitempty"`
Multiline *bcsv1.MultilineConf `yaml:"multiline,omitempty"`

//stdout dataid
StdoutDataid string `yaml:"-"`
Expand Down