Skip to content

Commit

Permalink
WIP - Add operation id to vSphere calls
Browse files Browse the repository at this point in the history
  • Loading branch information
Clint Greenwood committed Nov 2, 2017
1 parent a3d7a5f commit 6c26587
Show file tree
Hide file tree
Showing 20 changed files with 308 additions and 251 deletions.
19 changes: 10 additions & 9 deletions lib/apiservers/engine/backends/eventmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,19 +211,20 @@ func (m *PortlayerEventMonitor) monitor() error {
return nil
}

// publishEvent() translate a portlayer event into a Docker event if the event is for a
// known container and publishes them to Docker event subscribers.
// PublishEvent translates select portlayer container events into Docker events
// and publishes to subscribers
func (p DockerEventPublisher) PublishEvent(event plevents.BaseEvent) {
defer trace.End(trace.Begin(""))
defer trace.End(trace.Begin(fmt.Sprintf("[op=%s] Event Monitor received event(%s) for container(%s)", event.OperationID(), event.Event, event.Ref)))

vc := cache.ContainerCache().GetContainer(event.Ref)
if vc == nil {
log.Warnf("Received portlayer event %s for container %s but not found in cache", event.Event, event.Ref)
log.Warnf("[op=%s] Event Monitor received event(%s) but container(%s) not in cache", event.OperationID(), event.Event, event.Ref)
return
}

// docker event attributes
var attrs map[string]string
// TODO: move to a container.OnEvent() so that container drives the necessary changes based on event activity

switch event.Event {
case plevents.ContainerStopped,
plevents.ContainerPoweredOff,
Expand All @@ -234,13 +235,13 @@ func (p DockerEventPublisher) PublishEvent(event plevents.BaseEvent) {
// get the containerEngine
code, _ := NewContainerBackend().containerProxy.exitCode(vc)

log.Infof("Sending die event for container %s - code: %s", vc.ContainerID, code)
log.Infof("[op=%s] Sending die event for container(%s) with exitCode[%s]", event.OperationID(), vc.ContainerID, code)
// if the docker client is unable to convert the code to an int the client will return 125
attrs["exitCode"] = code
actor := CreateContainerEventActorWithAttributes(vc, attrs)
EventService().Log(containerDieEvent, eventtypes.ContainerEventType, actor)
if err := UnmapPorts(vc.ContainerID, vc.HostConfig); err != nil {
log.Errorf("Failed to unmap ports for container %s: %s", vc.ContainerID, err)
log.Errorf("[op=%s] Event Monitor failed to unmap ports for container(%s): %s", event.OperationID(), vc.ContainerID, err)
}

// auto-remove if required
Expand All @@ -252,7 +253,7 @@ func (p DockerEventPublisher) PublishEvent(event plevents.BaseEvent) {

err := NewContainerBackend().ContainerRm(vc.Name, config)
if err != nil {
log.Errorf("Failed to remove container %s: %s", vc.ContainerID, err)
log.Errorf("[op=%s] Event Monitor failed to remove container(%s): %s", event.OperationID(), vc.ContainerID, err)
}
}
}()
Expand All @@ -262,7 +263,7 @@ func (p DockerEventPublisher) PublishEvent(event plevents.BaseEvent) {
actor := CreateContainerEventActorWithAttributes(vc, attrs)
EventService().Log(containerDestroyEvent, eventtypes.ContainerEventType, actor)
if err := UnmapPorts(vc.ContainerID, vc.HostConfig); err != nil {
log.Errorf("Failed to unmap ports for container %s: %s", vc.ContainerID, err)
log.Errorf("[op=%s] Event Monitor failed to unmap ports for container(%s): %s", event.OperationID(), vc.ContainerID, err)
}
// remove from the container cache...
cache.ContainerCache().DeleteContainer(vc.ContainerID)
Expand Down
29 changes: 17 additions & 12 deletions lib/apiservers/portlayer/restapi/handlers/containers_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,15 +217,16 @@ func (handler *ContainersHandlersImpl) GetHandler(params containers.GetParams) m
}

func (handler *ContainersHandlersImpl) CommitHandler(params containers.CommitParams) middleware.Responder {
defer trace.End(trace.Begin(fmt.Sprintf("handle(%s)", params.Handle)))
op := trace.NewOperation(context.Background(), fmt.Sprintf("commit handle(%s)", params.Handle))
defer trace.End(trace.Begin(fmt.Sprintf("commit handle(%s)", params.Handle), op))

h := exec.GetHandle(params.Handle)
if h == nil {
return containers.NewCommitNotFound().WithPayload(&models.Error{Message: "container not found"})
}

if err := h.Commit(context.Background(), handler.handlerCtx.Session, params.WaitTime); err != nil {
log.Errorf("CommitHandler error on handle(%s) for %s: %s", h, h.ExecConfig.ID, err)
if err := h.Commit(op, handler.handlerCtx.Session, params.WaitTime); err != nil {
op.Errorf("CommitHandler error on handle(%s) for %s: %s", h, h.ExecConfig.ID, err)
switch err := err.(type) {
case exec.ConcurrentAccessError:
return containers.NewCommitConflict().WithPayload(&models.Error{Message: err.Error()})
Expand All @@ -240,7 +241,8 @@ func (handler *ContainersHandlersImpl) CommitHandler(params containers.CommitPar
}

func (handler *ContainersHandlersImpl) RemoveContainerHandler(params containers.ContainerRemoveParams) middleware.Responder {
defer trace.End(trace.Begin(params.ID))
op := trace.NewOperation(context.Background(), params.ID)
defer trace.End(trace.Begin(params.ID, op))

// get the indicated container for removal
container := exec.Containers.Container(params.ID)
Expand All @@ -249,7 +251,7 @@ func (handler *ContainersHandlersImpl) RemoveContainerHandler(params containers.
}

// NOTE: this should allowing batching of operations, as with Create, Start, Stop, et al
err := container.Remove(context.Background(), handler.handlerCtx.Session)
err := container.Remove(op, handler.handlerCtx.Session)
if err != nil {
switch err := err.(type) {
case exec.NotFoundError:
Expand All @@ -272,17 +274,18 @@ func (handler *ContainersHandlersImpl) RemoveContainerHandler(params containers.
}

func (handler *ContainersHandlersImpl) GetContainerInfoHandler(params containers.GetContainerInfoParams) middleware.Responder {
defer trace.End(trace.Begin(params.ID))
op := trace.NewOperation(context.Background(), params.ID)
defer trace.End(trace.Begin(params.ID, op))

container := exec.Containers.Container(params.ID)
if container == nil {
info := fmt.Sprintf("GetContainerInfoHandler ContainerCache miss for container(%s)", params.ID)
log.Error(info)
op.Errorf(info)
return containers.NewGetContainerInfoNotFound().WithPayload(&models.Error{Message: info})
}

// Refresh to get up to date network info
container.Refresh(context.Background())
container.Refresh(op)
containerInfo := convertContainerToContainerInfo(container)
return containers.NewGetContainerInfoOK().WithPayload(containerInfo)
}
Expand Down Expand Up @@ -319,7 +322,8 @@ func (handler *ContainersHandlersImpl) GetContainerListHandler(params containers
}

func (handler *ContainersHandlersImpl) ContainerSignalHandler(params containers.ContainerSignalParams) middleware.Responder {
defer trace.End(trace.Begin(params.ID))
op := trace.NewOperation(context.Background(), params.ID)
defer trace.End(trace.Begin(params.ID, op))

// NOTE: I feel that this should be in a Commit path for consistency
// it would allow phrasings such as:
Expand All @@ -331,7 +335,7 @@ func (handler *ContainersHandlersImpl) ContainerSignalHandler(params containers.
return containers.NewContainerSignalNotFound().WithPayload(&models.Error{Message: fmt.Sprintf("container %s not found", params.ID)})
}

err := container.Signal(context.Background(), params.Signal)
err := container.Signal(op, params.Signal)
if err != nil {
return containers.NewContainerSignalInternalServerError().WithPayload(&models.Error{Message: err.Error()})
}
Expand Down Expand Up @@ -394,7 +398,8 @@ func (handler *ContainersHandlersImpl) GetContainerStatsHandler(params container
}

func (handler *ContainersHandlersImpl) GetContainerLogsHandler(params containers.GetContainerLogsParams) middleware.Responder {
defer trace.End(trace.Begin(params.ID))
op := trace.NewOperation(context.Background(), params.ID)
defer trace.End(trace.Begin(params.ID, op))

container := exec.Containers.Container(params.ID)
if container == nil {
Expand All @@ -417,7 +422,7 @@ func (handler *ContainersHandlersImpl) GetContainerLogsHandler(params containers
since = *params.Since
}

reader, err := container.LogReader(context.Background(), tail, follow, since)
reader, err := container.LogReader(op, tail, follow, since)
if err != nil {
// Do not return an error here. It's a workaround for a panic similar to #2594
return containers.NewGetContainerLogsInternalServerError()
Expand Down
5 changes: 3 additions & 2 deletions lib/apiservers/portlayer/restapi/handlers/scopes_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,8 @@ func (handler *ScopesHandlersImpl) ScopesBindContainer(params scopes.BindContain
}

func (handler *ScopesHandlersImpl) ScopesUnbindContainer(params scopes.UnbindContainerParams) middleware.Responder {
defer trace.End(trace.Begin(fmt.Sprintf("handle(%s)", params.Handle)))
op := trace.NewOperation(context.Background(), params.Handle)
defer trace.End(trace.Begin(fmt.Sprintf("handle(%s)", params.Handle), op))

h := exec.GetHandle(params.Handle)
if h == nil {
Expand All @@ -310,7 +311,7 @@ func (handler *ScopesHandlersImpl) ScopesUnbindContainer(params scopes.UnbindCon

var endpoints []*network.Endpoint
var err error
if endpoints, err = handler.netCtx.UnbindContainer(h); err != nil {
if endpoints, err = handler.netCtx.UnbindContainer(op.ID(), h); err != nil {
switch err := err.(type) {
case network.ResourceNotFoundError:
return scopes.NewUnbindContainerNotFound().WithPayload(errorPayload(err))
Expand Down
18 changes: 6 additions & 12 deletions lib/install/management/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,9 @@ import (
func (d *Dispatcher) DebugVCH(vch *vm.VirtualMachine, conf *config.VirtualContainerHostConfigSpec, password, authorizedKey string) error {
defer trace.End(trace.Begin(conf.Name))

op, err := trace.FromContext(d.ctx)
if err != nil {
op = trace.NewOperation(d.ctx, "enable appliance debug")
}
op := trace.FromContext(d.ctx, "enable appliance debug")

err = d.enableSSH(op, vch, password, authorizedKey)
err := d.enableSSH(op, vch, password, authorizedKey)
if err != nil {
op.Errorf("Unable to enable ssh on the VCH appliance VM: %s", err)
return err
Expand All @@ -46,10 +43,7 @@ func (d *Dispatcher) DebugVCH(vch *vm.VirtualMachine, conf *config.VirtualContai
}

func (d *Dispatcher) enableSSH(ctx context.Context, vch *vm.VirtualMachine, password, authorizedKey string) error {
op, err := trace.FromContext(ctx)
if err != nil {
op = trace.NewOperation(ctx, "enable ssh in appliance")
}
op := trace.FromContext(ctx, "enable ssh in appliance")

state, err := vch.PowerState(op)
if err != nil {
Expand All @@ -68,7 +62,7 @@ func (d *Dispatcher) enableSSH(ctx context.Context, vch *vm.VirtualMachine, pass
return err
}

pm, err := d.opManager(ctx, vch)
pm, err := d.opManager(op, vch)
if err != nil {
err = errors.Errorf("Unable to manage processes in appliance VM: %s", err)
op.Errorf("%s", err)
Expand All @@ -89,7 +83,7 @@ func (d *Dispatcher) enableSSH(ctx context.Context, vch *vm.VirtualMachine, pass
return err
}

_, err = d.opManagerWait(ctx, pm, &auth, pid)
_, err = d.opManagerWait(op, pm, &auth, pid)
if err != nil {
err = errors.Errorf("Unable to check enable SSH status: %s", err)
op.Errorf("%s", err)
Expand All @@ -113,7 +107,7 @@ func (d *Dispatcher) enableSSH(ctx context.Context, vch *vm.VirtualMachine, pass
return err
}

_, err = d.opManagerWait(ctx, pm, &auth, pid)
_, err = d.opManagerWait(op, pm, &auth, pid)
if err != nil {
err = errors.Errorf("Unable to check enable passwd status: %s", err)
op.Errorf("%s", err)
Expand Down
3 changes: 0 additions & 3 deletions lib/migration/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@ package migration

import (
"context"
"fmt"
"strconv"

"github.com/vmware/vic/lib/migration/errors"
"github.com/vmware/vic/lib/migration/feature"
"github.com/vmware/vic/lib/migration/manager"
// imported for the side effect
_ "github.com/vmware/vic/lib/migration/plugins"
"github.com/vmware/vic/pkg/trace"
"github.com/vmware/vic/pkg/vsphere/session"
)

Expand Down Expand Up @@ -74,7 +72,6 @@ func dataIsOlder(data map[string]string, target string, verKey string) (bool, er
}

func migrateConfig(ctx context.Context, s *session.Session, data map[string]string, target string, verKey string) (map[string]string, bool, error) {
defer trace.End(trace.Begin(fmt.Sprintf("target: %s, version key: %s", target, verKey)))

dst := make(map[string]string)
for k, v := range data {
Expand Down
7 changes: 7 additions & 0 deletions lib/portlayer/event/events/base_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type BaseEvent struct {
Detail string
Ref string
CreatedTime time.Time

OpID string
}

func (be *BaseEvent) EventID() int {
Expand All @@ -48,10 +50,15 @@ func (be *BaseEvent) Message() string {
func (be *BaseEvent) Reference() string {
return be.Ref
}

func (be *BaseEvent) Created() time.Time {
return be.CreatedTime
}

func (be *BaseEvent) OperationID() string {
return be.OpID
}

// NewEventType utility function that uses reflection to return
// the event type
func NewEventType(kind interface{}) EventType {
Expand Down
2 changes: 2 additions & 0 deletions lib/portlayer/event/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type Event interface {
Message() string

Created() time.Time

OperationID() string
}

type EventTopic interface {
Expand Down
14 changes: 10 additions & 4 deletions lib/portlayer/event/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ import (
"fmt"
"sync"

log "github.com/Sirupsen/logrus"

"github.com/vmware/vic/lib/portlayer/event/collector"
"github.com/vmware/vic/lib/portlayer/event/events"
"github.com/vmware/vic/pkg/trace"

log "github.com/Sirupsen/logrus"
)

type Manager struct {
Expand Down Expand Up @@ -69,10 +69,16 @@ func NewEventManager(collectors ...collector.Collector) *Manager {
subs := mgr.subs.subscribers[e.Topic()]
mgr.subs.mu.RUnlock()

log.Debugf("Found %d subscribers to id: %d - %s: %s", len(subs), e.EventID(), e.Topic(), e.Message())
var eventID string
if e.OperationID() != "" {
eventID = fmt.Sprintf("ContainerEvent(opid=%s)", e.OperationID())
} else {
eventID = fmt.Sprintf("Event(id=%d)", e.EventID())
}
log.Debugf("Found %d subscribers to %s: %s: %s", len(subs), eventID, e.Topic(), e.Message())

for sub, s := range subs {
log.Debugf("Event manager calling back to %s for %d - %s: %s", sub, e.EventID(), e.Topic(), e.Message())
log.Debugf("Event manager calling back to %s for %s: %s", sub, eventID, e.Topic(), e.Message())
s.onEvent(e)
}
}
Expand Down
Loading

0 comments on commit 6c26587

Please sign in to comment.