From f0d892f21a1d6f787aa1d518f9fe465c025be4b5 Mon Sep 17 00:00:00 2001 From: Clint Greenwood Date: Thu, 2 Nov 2017 16:50:07 -0400 Subject: [PATCH] Add operation id to vSphere calls Increase use of trace.Operation as well as pass the operation.ID to vSphere for observability within the vSphere logs. Fixes #2739 --- .../engine/backends/eventmonitor.go | 22 +-- .../restapi/handlers/containers_handlers.go | 29 ++-- .../restapi/handlers/scopes_handlers.go | 5 +- lib/install/management/debug.go | 18 +-- lib/migration/migrator.go | 3 - .../event/collector/vsphere/vm_event.go | 6 +- .../event/collector/vsphere/vm_event_test.go | 3 +- lib/portlayer/event/events/base_event.go | 5 +- lib/portlayer/event/events/base_event_test.go | 4 +- lib/portlayer/event/events/events.go | 2 +- lib/portlayer/event/manager.go | 8 +- lib/portlayer/event/subscriber_test.go | 14 +- lib/portlayer/exec/base.go | 102 ++++++------- lib/portlayer/exec/commit.go | 82 +++++----- lib/portlayer/exec/container.go | 143 ++++++++++-------- lib/portlayer/exec/exec.go | 7 +- lib/portlayer/exec/exec_test.go | 4 +- lib/portlayer/exec/handle.go | 26 ++-- lib/portlayer/logging/logging.go | 15 +- lib/portlayer/network/context.go | 8 +- lib/portlayer/network/context_test.go | 7 +- lib/portlayer/network/network.go | 17 ++- pkg/trace/operation.go | 62 ++++---- pkg/trace/operation_test.go | 6 +- pkg/trace/trace.go | 53 +++++-- pkg/vsphere/tasks/waiter.go | 5 +- pkg/vsphere/vm/vm.go | 24 +-- 27 files changed, 372 insertions(+), 308 deletions(-) diff --git a/lib/apiservers/engine/backends/eventmonitor.go b/lib/apiservers/engine/backends/eventmonitor.go index 8b828813fd..efa6ad7d1c 100644 --- a/lib/apiservers/engine/backends/eventmonitor.go +++ b/lib/apiservers/engine/backends/eventmonitor.go @@ -39,6 +39,7 @@ import ( "github.com/vmware/vic/lib/apiservers/portlayer/client/events" plevents "github.com/vmware/vic/lib/portlayer/event/events" "github.com/vmware/vic/pkg/trace" + "github.com/vmware/vic/pkg/uid" ) const ( @@ -211,19 +212,22 @@ func (m *PortlayerEventMonitor) monitor() error { return nil } -// publishEvent() translate a portlayer event into a Docker event if the event is for a -// known container and publishes them to Docker event subscribers. +// PublishEvent translates select portlayer container events into Docker events +// and publishes to subscribers func (p DockerEventPublisher) PublishEvent(event plevents.BaseEvent) { - defer trace.End(trace.Begin("")) + // create a shortID for the container for logging purposes + containerShortID := uid.Parse(event.Ref).Truncate() + defer trace.End(trace.Begin(fmt.Sprintf("Event Monitor received eventID(%s) for container(%s) - %s", event.ID, containerShortID, event.Event))) vc := cache.ContainerCache().GetContainer(event.Ref) if vc == nil { - log.Warnf("Received portlayer event %s for container %s but not found in cache", event.Event, event.Ref) + log.Warnf("Event Monitor received eventID(%s) but container(%s) not in cache", event.ID, containerShortID) return } + // docker event attributes var attrs map[string]string - // TODO: move to a container.OnEvent() so that container drives the necessary changes based on event activity + switch event.Event { case plevents.ContainerStopped, plevents.ContainerPoweredOff, @@ -234,13 +238,13 @@ func (p DockerEventPublisher) PublishEvent(event plevents.BaseEvent) { // get the containerEngine code, _ := NewContainerBackend().containerProxy.exitCode(vc) - log.Infof("Sending die event for container %s - code: %s", vc.ContainerID, code) + log.Infof("Sending die event for container(%s) with exitCode[%s] - eventID(%s)", containerShortID, code, event.ID) // if the docker client is unable to convert the code to an int the client will return 125 attrs["exitCode"] = code actor := CreateContainerEventActorWithAttributes(vc, attrs) EventService().Log(containerDieEvent, eventtypes.ContainerEventType, actor) if err := UnmapPorts(vc.ContainerID, vc.HostConfig); err != nil { - log.Errorf("Failed to unmap ports for container %s: %s", vc.ContainerID, err) + log.Errorf("Event Monitor failed to unmap ports for container(%s): %s - eventID(%s)", containerShortID, err, event.ID) } // auto-remove if required @@ -252,7 +256,7 @@ func (p DockerEventPublisher) PublishEvent(event plevents.BaseEvent) { err := NewContainerBackend().ContainerRm(vc.Name, config) if err != nil { - log.Errorf("Failed to remove container %s: %s", vc.ContainerID, err) + log.Errorf("Event Monitor failed to remove container(%s) - eventID(%s): %s", containerShortID, event.ID, err) } } }() @@ -262,7 +266,7 @@ func (p DockerEventPublisher) PublishEvent(event plevents.BaseEvent) { actor := CreateContainerEventActorWithAttributes(vc, attrs) EventService().Log(containerDestroyEvent, eventtypes.ContainerEventType, actor) if err := UnmapPorts(vc.ContainerID, vc.HostConfig); err != nil { - log.Errorf("Failed to unmap ports for container %s: %s", vc.ContainerID, err) + log.Errorf("Event Monitor failed to unmap ports for container(%s): %s - eventID(%s)", containerShortID, err, event.ID) } // remove from the container cache... cache.ContainerCache().DeleteContainer(vc.ContainerID) diff --git a/lib/apiservers/portlayer/restapi/handlers/containers_handlers.go b/lib/apiservers/portlayer/restapi/handlers/containers_handlers.go index 13ebb39f8e..0b34a1a647 100644 --- a/lib/apiservers/portlayer/restapi/handlers/containers_handlers.go +++ b/lib/apiservers/portlayer/restapi/handlers/containers_handlers.go @@ -217,15 +217,16 @@ func (handler *ContainersHandlersImpl) GetHandler(params containers.GetParams) m } func (handler *ContainersHandlersImpl) CommitHandler(params containers.CommitParams) middleware.Responder { - defer trace.End(trace.Begin(fmt.Sprintf("handle(%s)", params.Handle))) + op := trace.NewOperation(context.Background(), fmt.Sprintf("commit handle(%s)", params.Handle)) + defer trace.End(trace.Begin(fmt.Sprintf("commit handle(%s)", params.Handle), op)) h := exec.GetHandle(params.Handle) if h == nil { return containers.NewCommitNotFound().WithPayload(&models.Error{Message: "container not found"}) } - if err := h.Commit(context.Background(), handler.handlerCtx.Session, params.WaitTime); err != nil { - log.Errorf("CommitHandler error on handle(%s) for %s: %s", h, h.ExecConfig.ID, err) + if err := h.Commit(op, handler.handlerCtx.Session, params.WaitTime); err != nil { + op.Errorf("CommitHandler error on handle(%s) for %s: %s", h, h.ExecConfig.ID, err) switch err := err.(type) { case exec.ConcurrentAccessError: return containers.NewCommitConflict().WithPayload(&models.Error{Message: err.Error()}) @@ -240,7 +241,8 @@ func (handler *ContainersHandlersImpl) CommitHandler(params containers.CommitPar } func (handler *ContainersHandlersImpl) RemoveContainerHandler(params containers.ContainerRemoveParams) middleware.Responder { - defer trace.End(trace.Begin(params.ID)) + op := trace.NewOperation(context.Background(), params.ID) + defer trace.End(trace.Begin(params.ID, op)) // get the indicated container for removal container := exec.Containers.Container(params.ID) @@ -249,7 +251,7 @@ func (handler *ContainersHandlersImpl) RemoveContainerHandler(params containers. } // NOTE: this should allowing batching of operations, as with Create, Start, Stop, et al - err := container.Remove(context.Background(), handler.handlerCtx.Session) + err := container.Remove(op, handler.handlerCtx.Session) if err != nil { switch err := err.(type) { case exec.NotFoundError: @@ -272,17 +274,18 @@ func (handler *ContainersHandlersImpl) RemoveContainerHandler(params containers. } func (handler *ContainersHandlersImpl) GetContainerInfoHandler(params containers.GetContainerInfoParams) middleware.Responder { - defer trace.End(trace.Begin(params.ID)) + op := trace.NewOperation(context.Background(), params.ID) + defer trace.End(trace.Begin(params.ID, op)) container := exec.Containers.Container(params.ID) if container == nil { info := fmt.Sprintf("GetContainerInfoHandler ContainerCache miss for container(%s)", params.ID) - log.Error(info) + op.Errorf(info) return containers.NewGetContainerInfoNotFound().WithPayload(&models.Error{Message: info}) } // Refresh to get up to date network info - container.Refresh(context.Background()) + container.Refresh(op) containerInfo := convertContainerToContainerInfo(container) return containers.NewGetContainerInfoOK().WithPayload(containerInfo) } @@ -319,7 +322,8 @@ func (handler *ContainersHandlersImpl) GetContainerListHandler(params containers } func (handler *ContainersHandlersImpl) ContainerSignalHandler(params containers.ContainerSignalParams) middleware.Responder { - defer trace.End(trace.Begin(params.ID)) + op := trace.NewOperation(context.Background(), params.ID) + defer trace.End(trace.Begin(params.ID, op)) // NOTE: I feel that this should be in a Commit path for consistency // it would allow phrasings such as: @@ -331,7 +335,7 @@ func (handler *ContainersHandlersImpl) ContainerSignalHandler(params containers. return containers.NewContainerSignalNotFound().WithPayload(&models.Error{Message: fmt.Sprintf("container %s not found", params.ID)}) } - err := container.Signal(context.Background(), params.Signal) + err := container.Signal(op, params.Signal) if err != nil { return containers.NewContainerSignalInternalServerError().WithPayload(&models.Error{Message: err.Error()}) } @@ -394,7 +398,8 @@ func (handler *ContainersHandlersImpl) GetContainerStatsHandler(params container } func (handler *ContainersHandlersImpl) GetContainerLogsHandler(params containers.GetContainerLogsParams) middleware.Responder { - defer trace.End(trace.Begin(params.ID)) + op := trace.NewOperation(context.Background(), params.ID) + defer trace.End(trace.Begin(params.ID, op)) container := exec.Containers.Container(params.ID) if container == nil { @@ -417,7 +422,7 @@ func (handler *ContainersHandlersImpl) GetContainerLogsHandler(params containers since = *params.Since } - reader, err := container.LogReader(context.Background(), tail, follow, since) + reader, err := container.LogReader(op, tail, follow, since) if err != nil { // Do not return an error here. It's a workaround for a panic similar to #2594 return containers.NewGetContainerLogsInternalServerError() diff --git a/lib/apiservers/portlayer/restapi/handlers/scopes_handlers.go b/lib/apiservers/portlayer/restapi/handlers/scopes_handlers.go index f3b4871d31..440f717997 100644 --- a/lib/apiservers/portlayer/restapi/handlers/scopes_handlers.go +++ b/lib/apiservers/portlayer/restapi/handlers/scopes_handlers.go @@ -301,7 +301,8 @@ func (handler *ScopesHandlersImpl) ScopesBindContainer(params scopes.BindContain } func (handler *ScopesHandlersImpl) ScopesUnbindContainer(params scopes.UnbindContainerParams) middleware.Responder { - defer trace.End(trace.Begin(fmt.Sprintf("handle(%s)", params.Handle))) + op := trace.NewOperation(context.Background(), params.Handle) + defer trace.End(trace.Begin(fmt.Sprintf("handle(%s)", params.Handle), op)) h := exec.GetHandle(params.Handle) if h == nil { @@ -310,7 +311,7 @@ func (handler *ScopesHandlersImpl) ScopesUnbindContainer(params scopes.UnbindCon var endpoints []*network.Endpoint var err error - if endpoints, err = handler.netCtx.UnbindContainer(h); err != nil { + if endpoints, err = handler.netCtx.UnbindContainer(op, h); err != nil { switch err := err.(type) { case network.ResourceNotFoundError: return scopes.NewUnbindContainerNotFound().WithPayload(errorPayload(err)) diff --git a/lib/install/management/debug.go b/lib/install/management/debug.go index 1ae146a935..b23009e62f 100644 --- a/lib/install/management/debug.go +++ b/lib/install/management/debug.go @@ -29,12 +29,9 @@ import ( func (d *Dispatcher) DebugVCH(vch *vm.VirtualMachine, conf *config.VirtualContainerHostConfigSpec, password, authorizedKey string) error { defer trace.End(trace.Begin(conf.Name)) - op, err := trace.FromContext(d.ctx) - if err != nil { - op = trace.NewOperation(d.ctx, "enable appliance debug") - } + op := trace.FromContext(d.ctx, "enable appliance debug") - err = d.enableSSH(op, vch, password, authorizedKey) + err := d.enableSSH(op, vch, password, authorizedKey) if err != nil { op.Errorf("Unable to enable ssh on the VCH appliance VM: %s", err) return err @@ -46,10 +43,7 @@ func (d *Dispatcher) DebugVCH(vch *vm.VirtualMachine, conf *config.VirtualContai } func (d *Dispatcher) enableSSH(ctx context.Context, vch *vm.VirtualMachine, password, authorizedKey string) error { - op, err := trace.FromContext(ctx) - if err != nil { - op = trace.NewOperation(ctx, "enable ssh in appliance") - } + op := trace.FromContext(ctx, "enable ssh in appliance") state, err := vch.PowerState(op) if err != nil { @@ -68,7 +62,7 @@ func (d *Dispatcher) enableSSH(ctx context.Context, vch *vm.VirtualMachine, pass return err } - pm, err := d.opManager(ctx, vch) + pm, err := d.opManager(op, vch) if err != nil { err = errors.Errorf("Unable to manage processes in appliance VM: %s", err) op.Errorf("%s", err) @@ -89,7 +83,7 @@ func (d *Dispatcher) enableSSH(ctx context.Context, vch *vm.VirtualMachine, pass return err } - _, err = d.opManagerWait(ctx, pm, &auth, pid) + _, err = d.opManagerWait(op, pm, &auth, pid) if err != nil { err = errors.Errorf("Unable to check enable SSH status: %s", err) op.Errorf("%s", err) @@ -113,7 +107,7 @@ func (d *Dispatcher) enableSSH(ctx context.Context, vch *vm.VirtualMachine, pass return err } - _, err = d.opManagerWait(ctx, pm, &auth, pid) + _, err = d.opManagerWait(op, pm, &auth, pid) if err != nil { err = errors.Errorf("Unable to check enable passwd status: %s", err) op.Errorf("%s", err) diff --git a/lib/migration/migrator.go b/lib/migration/migrator.go index 9f26bf8eb1..71d94ed2a7 100644 --- a/lib/migration/migrator.go +++ b/lib/migration/migrator.go @@ -16,7 +16,6 @@ package migration import ( "context" - "fmt" "strconv" "github.com/vmware/vic/lib/migration/errors" @@ -24,7 +23,6 @@ import ( "github.com/vmware/vic/lib/migration/manager" // imported for the side effect _ "github.com/vmware/vic/lib/migration/plugins" - "github.com/vmware/vic/pkg/trace" "github.com/vmware/vic/pkg/vsphere/session" ) @@ -74,7 +72,6 @@ func dataIsOlder(data map[string]string, target string, verKey string) (bool, er } func migrateConfig(ctx context.Context, s *session.Session, data map[string]string, target string, verKey string) (map[string]string, bool, error) { - defer trace.End(trace.Begin(fmt.Sprintf("target: %s, version key: %s", target, verKey))) dst := make(map[string]string) for k, v := range data { diff --git a/lib/portlayer/event/collector/vsphere/vm_event.go b/lib/portlayer/event/collector/vsphere/vm_event.go index 69d9cb7446..a50f17dc45 100644 --- a/lib/portlayer/event/collector/vsphere/vm_event.go +++ b/lib/portlayer/event/collector/vsphere/vm_event.go @@ -15,9 +15,11 @@ package vsphere import ( - "github.com/vmware/vic/lib/portlayer/event/events" + "strconv" "github.com/vmware/govmomi/vim25/types" + + "github.com/vmware/vic/lib/portlayer/event/events" ) type VMEvent struct { @@ -52,7 +54,7 @@ func NewVMEvent(be types.BaseEvent) *VMEvent { return &VMEvent{ &events.BaseEvent{ Event: ee, - ID: int(e.Key), + ID: strconv.Itoa(int(e.Key)), Detail: e.FullFormattedMessage, Ref: e.Vm.Vm.String(), CreatedTime: e.CreatedTime, diff --git a/lib/portlayer/event/collector/vsphere/vm_event_test.go b/lib/portlayer/event/collector/vsphere/vm_event_test.go index 9ced688e02..11320b8415 100644 --- a/lib/portlayer/event/collector/vsphere/vm_event_test.go +++ b/lib/portlayer/event/collector/vsphere/vm_event_test.go @@ -15,6 +15,7 @@ package vsphere import ( + "strconv" "testing" "time" @@ -34,7 +35,7 @@ func TestNewEvent(t *testing.T) { assert.NotNil(t, vme) assert.Equal(t, events.ContainerPoweredOn, vme.String()) assert.Equal(t, vm.String(), vme.Reference()) - assert.Equal(t, k, vme.EventID()) + assert.Equal(t, strconv.Itoa(k), vme.EventID()) assert.Equal(t, msg, vme.Message()) assert.Equal(t, "vsphere.VMEvent", vme.Topic()) assert.Equal(t, tt, vme.Created()) diff --git a/lib/portlayer/event/events/base_event.go b/lib/portlayer/event/events/base_event.go index acf224aae2..d6894810f5 100644 --- a/lib/portlayer/event/events/base_event.go +++ b/lib/portlayer/event/events/base_event.go @@ -26,13 +26,13 @@ type EventType string type BaseEvent struct { Type EventType Event string - ID int + ID string Detail string Ref string CreatedTime time.Time } -func (be *BaseEvent) EventID() int { +func (be *BaseEvent) EventID() string { return be.ID } @@ -48,6 +48,7 @@ func (be *BaseEvent) Message() string { func (be *BaseEvent) Reference() string { return be.Ref } + func (be *BaseEvent) Created() time.Time { return be.CreatedTime } diff --git a/lib/portlayer/event/events/base_event_test.go b/lib/portlayer/event/events/base_event_test.go index de70ee6c02..c6c1ce5672 100644 --- a/lib/portlayer/event/events/base_event_test.go +++ b/lib/portlayer/event/events/base_event_test.go @@ -30,13 +30,13 @@ func TestNewEventType(t *testing.T) { func TestBaseEvent(t *testing.T) { be := &BaseEvent{ Event: "PoweredOn", - ID: 12, + ID: "12", Detail: "VM 123 PoweredOn", Ref: "vm:12", } assert.Equal(t, "PoweredOn", be.String()) - assert.Equal(t, 12, be.EventID()) + assert.Equal(t, "12", be.EventID()) assert.Equal(t, "VM 123 PoweredOn", be.Message()) assert.Equal(t, "vm:12", be.Reference()) diff --git a/lib/portlayer/event/events/events.go b/lib/portlayer/event/events/events.go index 6b9d07ffe7..ba1d2fa0b1 100644 --- a/lib/portlayer/event/events/events.go +++ b/lib/portlayer/event/events/events.go @@ -21,7 +21,7 @@ import ( type Event interface { EventTopic // id of event - EventID() int + EventID() string // event (PowerOn, PowerOff, etc) String() string // reference evented object diff --git a/lib/portlayer/event/manager.go b/lib/portlayer/event/manager.go index a6c5623f68..b2e905060f 100644 --- a/lib/portlayer/event/manager.go +++ b/lib/portlayer/event/manager.go @@ -18,11 +18,11 @@ import ( "fmt" "sync" - log "github.com/Sirupsen/logrus" - "github.com/vmware/vic/lib/portlayer/event/collector" "github.com/vmware/vic/lib/portlayer/event/events" "github.com/vmware/vic/pkg/trace" + + log "github.com/Sirupsen/logrus" ) type Manager struct { @@ -69,10 +69,10 @@ func NewEventManager(collectors ...collector.Collector) *Manager { subs := mgr.subs.subscribers[e.Topic()] mgr.subs.mu.RUnlock() - log.Debugf("Found %d subscribers to id: %d - %s: %s", len(subs), e.EventID(), e.Topic(), e.Message()) + log.Debugf("Found %d subscribers to %s: %s - %s", len(subs), e.EventID(), e.Topic(), e.Message()) for sub, s := range subs { - log.Debugf("Event manager calling back to %s for %d - %s: %s", sub, e.EventID(), e.Topic(), e.Message()) + log.Debugf("Event manager calling back to %s for Event(%s): %s", sub, e.EventID(), e.Topic()) s.onEvent(e) } } diff --git a/lib/portlayer/event/subscriber_test.go b/lib/portlayer/event/subscriber_test.go index 5f6e17ba11..b8b7896b2c 100644 --- a/lib/portlayer/event/subscriber_test.go +++ b/lib/portlayer/event/subscriber_test.go @@ -15,7 +15,7 @@ package event import ( - "fmt" + "strconv" "testing" "time" @@ -61,17 +61,17 @@ func (m *mockCollector) Name() string { } type mockEvent struct { - id int + id string } // id of event -func (e *mockEvent) EventID() int { +func (e *mockEvent) EventID() string { return e.id } // event (PowerOn, PowerOff, etc) func (e *mockEvent) String() string { - return fmt.Sprintf("%d", e.id) + return e.id } // reference evented object @@ -111,7 +111,7 @@ func TestSuspendQueue(t *testing.T) { assert.True(t, s.IsSuspended()) suspended = true } - c.c(&mockEvent{id: i}) + c.c(&mockEvent{id: strconv.Itoa(i)}) } select { @@ -155,7 +155,7 @@ func TestSuspendDiscard(t *testing.T) { s.Suspend(false) assert.True(t, s.IsSuspended()) for i := 0; i < 50; i++ { - c.c(&mockEvent{id: i}) + c.c(&mockEvent{id: strconv.Itoa(i)}) } <-time.After(5 * time.Second) @@ -180,7 +180,7 @@ func TestSuspendOverflow(t *testing.T) { s.Suspend(true) assert.True(t, s.IsSuspended()) for i := 0; i < maxEventQueueSize+1; i++ { - c.c(&mockEvent{id: i}) + c.c(&mockEvent{id: strconv.Itoa(i)}) } select { diff --git a/lib/portlayer/exec/base.go b/lib/portlayer/exec/base.go index e2a6bc9421..b343e1dcb9 100644 --- a/lib/portlayer/exec/base.go +++ b/lib/portlayer/exec/base.go @@ -33,8 +33,6 @@ import ( "github.com/vmware/vic/pkg/vsphere/extraconfig/vmomi" "github.com/vmware/vic/pkg/vsphere/tasks" "github.com/vmware/vic/pkg/vsphere/vm" - - log "github.com/Sirupsen/logrus" ) // NotYetExistError is returned when a call that requires a VM exist is made @@ -104,10 +102,10 @@ func (c *containerBase) VMReference() types.ManagedObjectReference { } // unlocked refresh of container state -func (c *containerBase) refresh(ctx context.Context) error { - base, err := c.updates(ctx) +func (c *containerBase) refresh(op trace.Operation) error { + base, err := c.updates(op) if err != nil { - log.Errorf("Update: unable to update container %s: %s", c, err) + op.Errorf("Update: unable to update container %s: %s", c, err) return err } @@ -117,8 +115,8 @@ func (c *containerBase) refresh(ctx context.Context) error { } // updates acquires updates from the infrastructure without holding a lock -func (c *containerBase) updates(ctx context.Context) (*containerBase, error) { - defer trace.End(trace.Begin(c.ExecConfig.ID)) +func (c *containerBase) updates(op trace.Operation) (*containerBase, error) { + defer trace.End(trace.Begin(c.ExecConfig.ID, op)) var o mo.VirtualMachine @@ -128,10 +126,10 @@ func (c *containerBase) updates(ctx context.Context) (*containerBase, error) { } if c.Config != nil { - log.Debugf("Update: for %s, refreshing from change version %s", c, c.Config.ChangeVersion) + op.Debugf("Update: for %s, refreshing from change version %s", c, c.Config.ChangeVersion) } - if err := c.vm.Properties(ctx, c.vm.Reference(), []string{"config", "runtime"}, &o); err != nil { + if err := c.vm.Properties(op, c.vm.Reference(), []string{"config", "runtime"}, &o); err != nil { return nil, err } @@ -149,7 +147,7 @@ func (c *containerBase) updates(ctx context.Context) (*containerBase, error) { return nil, fmt.Errorf("Update: change version %s failed assertion extraconfig id != nil", o.Config.ChangeVersion) } - log.Debugf("Update: for %s, change version %s, extraconfig id: %+v", c, o.Config.ChangeVersion, containerExecKeyValues["guestinfo.vice./common/id"]) + op.Debugf("Update: for %s, change version %s, extraconfig id: %+v", c, o.Config.ChangeVersion, containerExecKeyValues["guestinfo.vice./common/id"]) // #nosec: Errors unhandled. base.DataVersion, _ = migration.ContainerDataVersion(containerExecKeyValues) migratedConf, base.Migrated, base.MigrationError = migration.MigrateContainerConfig(containerExecKeyValues) @@ -158,35 +156,35 @@ func (c *containerBase) updates(ctx context.Context) (*containerBase, error) { return base, nil } -func (c *containerBase) ReloadConfig(ctx context.Context) error { - defer trace.End(trace.Begin(c.ExecConfig.ID)) +func (c *containerBase) ReloadConfig(op trace.Operation) error { + defer trace.End(trace.Begin(c.ExecConfig.ID, op)) - return c.startGuestProgram(ctx, "reload", "") + return c.startGuestProgram(op, "reload", "") } // WaitForExec waits exec'ed task to set started field or timeout func (c *containerBase) WaitForExec(ctx context.Context, id string) error { - defer trace.End(trace.Begin(id)) + defer trace.End(trace.Begin(id, ctx)) return c.waitForExec(ctx, id) } // WaitForSession waits non-exec'ed task to set started field or timeout func (c *containerBase) WaitForSession(ctx context.Context, id string) error { - defer trace.End(trace.Begin(id)) + defer trace.End(trace.Begin(id, ctx)) return c.waitForSession(ctx, id) } -func (c *containerBase) startGuestProgram(ctx context.Context, name string, args string) error { +func (c *containerBase) startGuestProgram(op trace.Operation, name string, args string) error { // make sure we have vm if c.vm == nil { return NotYetExistError{c.ExecConfig.ID} } - defer trace.End(trace.Begin(c.ExecConfig.ID + ":" + name)) + defer trace.End(trace.Begin(c.ExecConfig.ID+":"+name, op)) o := guest.NewOperationsManager(c.vm.Client.Client, c.vm.Reference()) - m, err := o.ProcessManager(ctx) + m, err := o.ProcessManager(op) if err != nil { return err } @@ -200,13 +198,13 @@ func (c *containerBase) startGuestProgram(ctx context.Context, name string, args Username: c.ExecConfig.ID, } - _, err = m.StartProgram(ctx, &auth, &spec) + _, err = m.StartProgram(op, &auth, &spec) return err } -func (c *containerBase) start(ctx context.Context) error { - defer trace.End(trace.Begin(c.ExecConfig.ID)) +func (c *containerBase) start(op trace.Operation) error { + defer trace.End(trace.Begin(c.ExecConfig.ID, op)) // make sure we have vm if c.vm == nil { @@ -214,14 +212,14 @@ func (c *containerBase) start(ctx context.Context) error { } // Power on - _, err := c.vm.WaitForResult(ctx, func(ctx context.Context) (tasks.Task, error) { - return c.vm.PowerOn(ctx) + _, err := c.vm.WaitForResult(op, func(op context.Context) (tasks.Task, error) { + return c.vm.PowerOn(op) }) return err } -func (c *containerBase) stop(ctx context.Context, waitTime *int32) error { +func (c *containerBase) stop(op trace.Operation, waitTime *int32) error { // make sure we have vm if c.vm == nil { return NotYetExistError{c.ExecConfig.ID} @@ -230,35 +228,35 @@ func (c *containerBase) stop(ctx context.Context, waitTime *int32) error { // get existing state and set to stopping // if there's a failure we'll revert to existing - err := c.shutdown(ctx, waitTime) + err := c.shutdown(op, waitTime) if err == nil { return nil } - log.Warnf("stopping %s via hard power off due to: %s", c, err) + op.Warnf("stopping %s via hard power off due to: %s", c, err.Error()) - return c.poweroff(ctx) + return c.poweroff(op) } -func (c *containerBase) kill(ctx context.Context) error { +func (c *containerBase) kill(op trace.Operation) error { // make sure we have vm if c.vm == nil { return NotYetExistError{c.ExecConfig.ID} } wait := 10 * time.Second // default - timeout, cancel := context.WithTimeout(ctx, wait) + timeout, cancel := trace.WithTimeout(&op, wait, "kill") defer cancel() sig := string(ssh.SIGKILL) - log.Infof("sending kill -%s %s", sig, c) + timeout.Infof("sending kill -%s %s", sig, c) err := c.startGuestProgram(timeout, "kill", sig) if err == nil && timeout.Err() != nil { - log.Warnf("timeout (%s) waiting for %s to power off via SIG%s", wait, c, sig) + timeout.Warnf("timeout (%s) waiting for %s to power off via SIG%s", wait, c, sig) } if err != nil { - log.Warnf("killing %s attempt resulted in: %s", c, err) + timeout.Warnf("killing %s attempt resulted in: %s", c, err.Error()) if isInvalidPowerStateError(err) { return nil @@ -271,19 +269,19 @@ func (c *containerBase) kill(ctx context.Context) error { // into an invalid transition and will need to recover. If we try to grab properties at this time, the // power state may be incorrect. We work around this by waiting on the power state, regardless of error // from startGuestProgram. https://github.com/vmware/vic/issues/5803 - log.Infof("waiting %s for %s to power off", wait, c) + timeout.Infof("waiting %s for %s to power off", wait, c) err = c.vm.WaitForPowerState(timeout, types.VirtualMachinePowerStatePoweredOff) if err == nil { return nil // VM has powered off } - log.Warnf("killing %s via hard power off", c) + timeout.Warnf("killing %s via hard power off", c) // stop wait time is not applied for the hard kill - return c.poweroff(ctx) + return c.poweroff(op) } -func (c *containerBase) shutdown(ctx context.Context, waitTime *int32) error { +func (c *containerBase) shutdown(op trace.Operation, waitTime *int32) error { // make sure we have vm if c.vm == nil { return NotYetExistError{c.ExecConfig.ID} @@ -304,16 +302,16 @@ func (c *containerBase) shutdown(ctx context.Context, waitTime *int32) error { for _, sig := range stop { msg := fmt.Sprintf("sending kill -%s %s", sig, c) - log.Info(msg) + op.Infof(msg) - timeout, cancel := context.WithTimeout(ctx, wait) + timeout, cancel := trace.WithTimeout(&op, wait, "shutdown") defer cancel() err := c.startGuestProgram(timeout, "kill", sig) if err != nil { // Just warn and proceed to waiting for power state per issue https://github.com/vmware/vic/issues/5803 // Description above in function kill() - log.Warnf("%s: %s", msg, err) + timeout.Warnf("%s: %s", msg, err) // If the error tells us "The attempted operation cannot be performed in the current state (Powered off)" (InvalidPowerState), // we can avoid hard poweroff (issues #6236 and #6252). Here we wait for the power state changes instead of return @@ -323,7 +321,7 @@ func (c *containerBase) shutdown(ctx context.Context, waitTime *int32) error { } } - log.Infof("waiting %s for %s to power off", wait, c) + timeout.Infof("waiting %s for %s to power off", wait, c) err = c.vm.WaitForPowerState(timeout, types.VirtualMachinePowerStatePoweredOff) if err == nil { return nil // VM has powered off @@ -333,7 +331,7 @@ func (c *containerBase) shutdown(ctx context.Context, waitTime *int32) error { return err // error other than timeout } - log.Warnf("timeout (%s) waiting for %s to power off via SIG%s", wait, c, sig) + timeout.Warnf("timeout (%s) waiting for %s to power off via SIG%s", wait, c, sig) if killed { return nil @@ -343,14 +341,14 @@ func (c *containerBase) shutdown(ctx context.Context, waitTime *int32) error { return fmt.Errorf("failed to shutdown %s via kill signals %s", c, stop) } -func (c *containerBase) poweroff(ctx context.Context) error { +func (c *containerBase) poweroff(op trace.Operation) error { // make sure we have vm if c.vm == nil { return NotYetExistError{c.ExecConfig.ID} } - _, err := c.vm.WaitForResult(ctx, func(ctx context.Context) (tasks.Task, error) { - return c.vm.PowerOff(ctx) + _, err := c.vm.WaitForResult(op, func(op context.Context) (tasks.Task, error) { + return c.vm.PowerOff(op) }) if err != nil { @@ -360,10 +358,10 @@ func (c *containerBase) poweroff(ctx context.Context) error { switch terr := terr.Fault().(type) { case *types.InvalidPowerState: if terr.ExistingState == types.VirtualMachinePowerStatePoweredOff { - log.Warnf("power off %s task skipped (state was already %s)", c, terr.ExistingState) + op.Warnf("power off %s task skipped (state was already %s)", c, terr.ExistingState) return nil } - log.Warnf("invalid power state during power off: %s", terr.ExistingState) + op.Warnf("invalid power state during power off: %s", terr.ExistingState) case *types.GenericVmConfigFault: @@ -371,14 +369,14 @@ func (c *containerBase) poweroff(ctx context.Context) error { if len(terr.FaultMessage) > 0 { k := terr.FaultMessage[0].Key if k == vmNotSuspendedKey || k == vmPoweringOffKey { - log.Infof("power off %s task skipped due to guest shutdown", c) + op.Infof("power off %s task skipped due to guest shutdown", c) return nil } } - log.Warnf("generic vm config fault during power off: %#v", terr) + op.Warnf("generic vm config fault during power off: %#v", terr) default: - log.Warnf("hard power off failed due to: %#v", terr) + op.Warnf("hard power off failed due to: %#v", terr) } } @@ -389,7 +387,7 @@ func (c *containerBase) poweroff(ctx context.Context) error { } func (c *containerBase) waitForPowerState(ctx context.Context, max time.Duration, state types.VirtualMachinePowerState) (bool, error) { - defer trace.End(trace.Begin(c.ExecConfig.ID)) + defer trace.End(trace.Begin(c.ExecConfig.ID, ctx)) timeout, cancel := context.WithTimeout(ctx, max) defer cancel() @@ -403,7 +401,7 @@ func (c *containerBase) waitForPowerState(ctx context.Context, max time.Duration } func (c *containerBase) waitForSession(ctx context.Context, id string) error { - defer trace.End(trace.Begin(id)) + defer trace.End(trace.Begin(id, ctx)) // guestinfo key that we want to wait for key := extraconfig.CalculateKeys(c.ExecConfig, fmt.Sprintf("Sessions.%s.Started", id), "")[0] @@ -411,7 +409,7 @@ func (c *containerBase) waitForSession(ctx context.Context, id string) error { } func (c *containerBase) waitForExec(ctx context.Context, id string) error { - defer trace.End(trace.Begin(id)) + defer trace.End(trace.Begin(id, ctx)) // guestinfo key that we want to wait for key := extraconfig.CalculateKeys(c.ExecConfig, fmt.Sprintf("Execs.%s.Started", id), "")[0] diff --git a/lib/portlayer/exec/commit.go b/lib/portlayer/exec/commit.go index 0cf8f332cf..ad8e4e5595 100644 --- a/lib/portlayer/exec/commit.go +++ b/lib/portlayer/exec/commit.go @@ -27,13 +27,11 @@ import ( "github.com/vmware/vic/pkg/vsphere/session" "github.com/vmware/vic/pkg/vsphere/tasks" "github.com/vmware/vic/pkg/vsphere/vm" - - log "github.com/Sirupsen/logrus" ) // Commit executes the requires steps on the handle -func Commit(ctx context.Context, sess *session.Session, h *Handle, waitTime *int32) error { - defer trace.End(trace.Begin(h.ExecConfig.ID)) +func Commit(op trace.Operation, sess *session.Session, h *Handle, waitTime *int32) error { + defer trace.End(trace.Begin(h.ExecConfig.ID, op)) c := Containers.Container(h.ExecConfig.ID) creation := h.vm == nil @@ -60,27 +58,27 @@ func Commit(ctx context.Context, sess *session.Session, h *Handle, waitTime *int var err error if sess.IsVC() && Config.VirtualApp.ResourcePool != nil { // Create the vm - res, err = tasks.WaitForResult(ctx, func(ctx context.Context) (tasks.Task, error) { - return Config.VirtualApp.CreateChildVM(ctx, *h.Spec.Spec(), nil) + res, err = tasks.WaitForResult(op, func(op context.Context) (tasks.Task, error) { + return Config.VirtualApp.CreateChildVM(op, *h.Spec.Spec(), nil) }) } else { // Create the vm - res, err = tasks.WaitForResult(ctx, func(ctx context.Context) (tasks.Task, error) { - return sess.VMFolder.CreateVM(ctx, *h.Spec.Spec(), Config.ResourcePool, nil) + res, err = tasks.WaitForResult(op, func(op context.Context) (tasks.Task, error) { + return sess.VMFolder.CreateVM(op, *h.Spec.Spec(), Config.ResourcePool, nil) }) } if err != nil { - log.Errorf("An error occurred while waiting for a creation operation to complete. Spec was %+v", *h.Spec.Spec()) + op.Errorf("An error occurred while waiting for a creation operation to complete. Spec was %+v", *h.Spec.Spec()) return err } - h.vm = vm.NewVirtualMachine(ctx, sess, res.Result.(types.ManagedObjectReference)) - h.vm.DisableDestroy(ctx) + h.vm = vm.NewVirtualMachine(op, sess, res.Result.(types.ManagedObjectReference)) + h.vm.DisableDestroy(op) c = newContainer(&h.containerBase) Containers.Put(c) // inform of creation irrespective of remaining operations - publishContainerEvent(c.ExecConfig.ID, time.Now().UTC(), events.ContainerCreated) + publishContainerEvent(op, c.ExecConfig.ID, time.Now().UTC(), events.ContainerCreated) // clear the spec as we've acted on it - this prevents a reconfigure from occurring in follow-on // processing @@ -90,32 +88,32 @@ func Commit(ctx context.Context, sess *session.Session, h *Handle, waitTime *int // if we're stopping the VM, do so before the reconfigure to preserve the extraconfig if h.TargetState() == StateStopped { if h.Runtime == nil { - log.Warnf("Commit called with incomplete runtime state for %s", h.ExecConfig.ID) + op.Warnf("Commit called with incomplete runtime state for %s", h.ExecConfig.ID) } if h.Runtime != nil && h.Runtime.PowerState == types.VirtualMachinePowerStatePoweredOff { - log.Infof("Dropping duplicate power off operation for %s", h.ExecConfig.ID) + op.Infof("Dropping duplicate power off operation for %s", h.ExecConfig.ID) } else { // stop the container - if err := c.stop(ctx, waitTime); err != nil { + if err := c.stop(op, waitTime); err != nil { return err } // we must refresh now to get the new ChangeVersion - this is used to gate on powerstate in the reconfigure // because we cannot set the ExtraConfig if the VM is powered on. There is still a race here unfortunately because // tasks don't appear to contain the new ChangeVersion - h.refresh(ctx) + h.refresh(op) // inform of state change irrespective of remaining operations - but allow remaining operations to complete first // to avoid data race on container config - defer publishContainerEvent(h.ExecConfig.ID, time.Now().UTC(), events.ContainerStopped) + defer publishContainerEvent(op, h.ExecConfig.ID, time.Now().UTC(), events.ContainerStopped) } } // reconfigure operation if h.Spec != nil { if h.Runtime == nil { - log.Errorf("Refusing to perform reconfigure operation with incomplete runtime state for %s", h.ExecConfig.ID) + op.Errorf("Refusing to perform reconfigure operation with incomplete runtime state for %s", h.ExecConfig.ID) } else { // ensure that our logic based on Runtime state remains valid @@ -126,40 +124,40 @@ func Commit(ctx context.Context, sess *session.Session, h *Handle, waitTime *int // For the power off path this depends on handle.refresh() having been called to update the ChangeVersion s := h.Spec.Spec() - log.Infof("Reconfigure: attempting update to %s with change version %q (%s)", h.ExecConfig.ID, s.ChangeVersion, h.Runtime.PowerState) + op.Infof("Reconfigure: attempting update to %s with change version %q (%s)", h.ExecConfig.ID, s.ChangeVersion, h.Runtime.PowerState) // nilify ExtraConfig if container configuration is migrated // in this case, VCH and container are in different version. Migrated configuration cannot be written back to old container, to avoid data loss in old version's container if h.Migrated { - log.Debugf("Reconfigure: dropping extraconfig as configuration of container %s is migrated", h.ExecConfig.ID) + op.Debugf("Reconfigure: dropping extraconfig as configuration of container %s is migrated", h.ExecConfig.ID) s.ExtraConfig = nil } // address the race between power operation and refresh of config (and therefore ChangeVersion) in StateStopped block above if s.ExtraConfig != nil && h.TargetState() == StateStopped && h.Runtime.PowerState != types.VirtualMachinePowerStatePoweredOff { detail := fmt.Sprintf("Reconfigure: collision of concurrent operations - expected power state poweredOff, found %s", h.Runtime.PowerState) - log.Warn(detail) + op.Warnf(detail) // log out current vm power state and runtime power state got from refresh, to see if there is anything mismatch, // cause in issue #6127, we see the runtime power state is not updated even after 1 minute - ps, _ := h.vm.PowerState(ctx) - log.Debugf("Container %s power state: %s, runtime power state: %s", h.ExecConfig.ID, ps, h.Runtime.PowerState) + ps, _ := h.vm.PowerState(op) + op.Debugf("Container %s power state: %s, runtime power state: %s", h.ExecConfig.ID, ps, h.Runtime.PowerState) // this should cause a second attempt at the power op. This could result repeated contention that fails to resolve, but the randomness in the backoff and the tight timing // to hit this scenario should mean it will resolve in a reasonable timeframe. return ConcurrentAccessError{errors.New(detail)} } - _, err := h.vm.WaitForResult(ctx, func(ctx context.Context) (tasks.Task, error) { - return h.vm.Reconfigure(ctx, *s) + _, err := h.vm.WaitForResult(op, func(op context.Context) (tasks.Task, error) { + return h.vm.Reconfigure(op, *s) }) if err != nil { - log.Errorf("Reconfigure: failed update to %s with change version %s: %+v", h.ExecConfig.ID, s.ChangeVersion, err) + op.Errorf("Reconfigure: failed update to %s with change version %s: %+v", h.ExecConfig.ID, s.ChangeVersion, err) // Check whether we get ConcurrentAccess and wrap it if needed if f, ok := err.(types.HasFault); ok { switch f.Fault().(type) { case *types.ConcurrentAccess: - log.Errorf("Reconfigure: failed update to %s due to ConcurrentAccess, our change version %s", h.ExecConfig.ID, s.ChangeVersion) + op.Errorf("Reconfigure: failed update to %s due to ConcurrentAccess, our change version %s", h.ExecConfig.ID, s.ChangeVersion) return ConcurrentAccessError{err} } @@ -167,10 +165,10 @@ func Commit(ctx context.Context, sess *session.Session, h *Handle, waitTime *int return err } - log.Infof("Reconfigure: committed update to %s with change version: %s", h.ExecConfig.ID, s.ChangeVersion) + op.Infof("Reconfigure: committed update to %s with change version: %s", h.ExecConfig.ID, s.ChangeVersion) // trigger a configuration reload in the container if needed - err = reloadConfig(ctx, h, c) + err = reloadConfig(op, h, c) if err != nil { return err } @@ -180,31 +178,31 @@ func Commit(ctx context.Context, sess *session.Session, h *Handle, waitTime *int // best effort update of container cache using committed state - this will not reflect the power on below, however // this is primarily for updating ExtraConfig state. if !creation { - defer c.RefreshFromHandle(ctx, h) + defer c.RefreshFromHandle(op, h) } if h.TargetState() == StateRunning { if h.Runtime != nil && h.Runtime.PowerState == types.VirtualMachinePowerStatePoweredOn { - log.Infof("Dropping duplicate power on operation for %s", h.ExecConfig.ID) + op.Infof("Dropping duplicate power on operation for %s", h.ExecConfig.ID) return nil } if h.Runtime == nil && !creation { - log.Warnf("Commit called with incomplete runtime state for %s", h.ExecConfig.ID) + op.Warnf("Commit called with incomplete runtime state for %s", h.ExecConfig.ID) } // start the container - if err := c.start(ctx); err != nil { + if err := c.start(op); err != nil { // We observed that PowerOn_Task could get stuck on VC time to time even though the VM was starting fine on the host ESXi. // Eventually the task was getting timed out (After 20 min.) and that was setting the container state back to Stopped. // During that time VC was not generating any other event so the persona listener was getting nothing. // This new event is for signaling the eventmonitor so that it can autoremove the container after this failure. - publishContainerEvent(h.ExecConfig.ID, time.Now().UTC(), events.ContainerFailed) + publishContainerEvent(op, h.ExecConfig.ID, time.Now().UTC(), events.ContainerFailed) return err } - // inform of creation irrespective of remaining operations - publishContainerEvent(h.ExecConfig.ID, time.Now().UTC(), events.ContainerStarted) + // publish started event + publishContainerEvent(op, h.ExecConfig.ID, time.Now().UTC(), events.ContainerStarted) } return nil @@ -215,18 +213,18 @@ func Commit(ctx context.Context, sess *session.Session, h *Handle, waitTime *int // reloadConfig is responsible for triggering a guest_reconfigure in order to perform an operation on a running cVM // this function needs to be resilient to intermittent config errors and task errors, but will pass concurrent // modification issues back immediately. -func reloadConfig(ctx context.Context, h *Handle, c *Container) error { +func reloadConfig(op trace.Operation, h *Handle, c *Container) error { - log.Infof("Attempting to perform a guest reconfigure operation on (%s)", h.ExecConfig.ID) + op.Infof("Attempting to perform a guest reconfigure operation on (%s)", h.ExecConfig.ID) retryFunc := func() error { if h.reload && h.Runtime != nil && h.Runtime.PowerState == types.VirtualMachinePowerStatePoweredOn { - err := c.ReloadConfig(ctx) + err := c.ReloadConfig(op) if err != nil { - log.Debugf("Error occurred during an attempt to reload the container config for an exec operation: (%s)", err) + op.Debugf("Error occurred during an attempt to reload the container config for an exec operation: (%s)", err) // we will request the powerstate directly(this could be very costly without the vmomi gateway) - state, err := c.vm.PowerState(ctx) + state, err := c.vm.PowerState(op) if err != nil && state == types.VirtualMachinePowerStatePoweredOff { // TODO: probably should make this error a specific type such as PowerOffDuringExecError( or a better name ofcourse) return fmt.Errorf("container(%s) was powered down during the requested operation.", h.ExecConfig.ID) @@ -242,7 +240,7 @@ func reloadConfig(ctx context.Context, h *Handle, c *Container) error { err := retry.Do(retryFunc, isIntermittentFailure) if err != nil { - log.Debugf("Failed an exec operation with err: %s", err) + op.Debugf("Failed an exec operation with err: %s", err) return err } return nil diff --git a/lib/portlayer/exec/container.go b/lib/portlayer/exec/container.go index 2576a8c58d..5120737fee 100644 --- a/lib/portlayer/exec/container.go +++ b/lib/portlayer/exec/container.go @@ -28,6 +28,7 @@ import ( "github.com/vmware/govmomi/vim25/mo" "github.com/vmware/govmomi/vim25/soap" "github.com/vmware/govmomi/vim25/types" + "github.com/vmware/vic/lib/constants" "github.com/vmware/vic/lib/iolog" "github.com/vmware/vic/lib/portlayer/event/events" @@ -315,9 +316,10 @@ func (c *Container) WaitForState(s State) <-chan struct{} { func (c *Container) NewHandle(ctx context.Context) *Handle { // Call property collector to fill the data if c.vm != nil { + op := trace.FromContext(ctx, "NewHandle") // FIXME: this should be calling the cache to decide if a refresh is needed - if err := c.Refresh(ctx); err != nil { - log.Errorf("refreshing container %s failed: %s", c, err) + if err := c.Refresh(op); err != nil { + op.Errorf("refreshing container %s failed: %s", c, err) return nil // nil indicates error } } @@ -329,11 +331,11 @@ func (c *Container) NewHandle(ctx context.Context) *Handle { // Refresh updates config and runtime info, holding a lock only while swapping // the new data for the old -func (c *Container) Refresh(ctx context.Context) error { +func (c *Container) Refresh(op trace.Operation) error { c.m.Lock() defer c.m.Unlock() - if err := c.refresh(ctx); err != nil { + if err := c.refresh(op); err != nil { return err } @@ -361,37 +363,37 @@ func (c *Container) Refresh(ctx context.Context) error { return nil } -func (c *Container) refresh(ctx context.Context) error { - return c.containerBase.refresh(ctx) +func (c *Container) refresh(op trace.Operation) error { + return c.containerBase.refresh(op) } // RefreshFromHandle updates config and runtime info, holding a lock only while swapping // the new data for the old -func (c *Container) RefreshFromHandle(ctx context.Context, h *Handle) { +func (c *Container) RefreshFromHandle(op trace.Operation, h *Handle) { c.m.Lock() defer c.m.Unlock() if c.Config != nil && (h.Config == nil || h.Config.ChangeVersion != c.Config.ChangeVersion) { - log.Warnf("container and handle ChangeVersions do not match for %s: %s != %s", c, c.Config.ChangeVersion, h.Config.ChangeVersion) + op.Warnf("container and handle ChangeVersions do not match for %s: %s != %s", c, c.Config.ChangeVersion, h.Config.ChangeVersion) return } // power off doesn't necessarily cause a change version increment and bug1898149 occasionally impacts power on if c.Runtime != nil && (h.Runtime == nil || h.Runtime.PowerState != c.Runtime.PowerState) { - log.Warnf("container and handle PowerStates do not match: %s != %s", c.Runtime.PowerState, h.Runtime.PowerState) + op.Warnf("container and handle PowerStates do not match: %s != %s", c.Runtime.PowerState, h.Runtime.PowerState) return } // copy over the new state c.containerBase = h.containerBase if c.Config != nil { - log.Debugf("Update: updated change version from handle: %s", c.Config.ChangeVersion) + op.Debugf("Update: updated change version from handle: %s", c.Config.ChangeVersion) } } // Start starts a container vm with the given params -func (c *Container) start(ctx context.Context) error { - defer trace.End(trace.Begin(c.ExecConfig.ID)) +func (c *Container) start(op trace.Operation) error { + defer trace.End(trace.Begin(c.ExecConfig.ID, op)) if c.vm == nil { return fmt.Errorf("vm not set") @@ -399,7 +401,7 @@ func (c *Container) start(ctx context.Context) error { // Set state to Starting c.SetState(StateStarting) - err := c.containerBase.start(ctx) + err := c.containerBase.start(op) if err != nil { // change state to stopped because start task failed c.SetState(StateStopped) @@ -418,10 +420,10 @@ func (c *Container) start(ctx context.Context) error { } // wait task to set started field to something - ctx, cancel := context.WithTimeout(ctx, constants.PropertyCollectorTimeout) + op, cancel := trace.WithTimeout(&op, constants.PropertyCollectorTimeout, "WaitForSession") defer cancel() - err = c.waitForSession(ctx, c.ExecConfig.ID) + err = c.waitForSession(op, c.ExecConfig.ID) if err != nil { // leave this in state starting - if it powers off then the event // will cause transition to StateStopped which is likely our original state @@ -436,14 +438,14 @@ func (c *Container) start(ctx context.Context) error { // The current state is already Stopped if the container's process has exited or // a poweredoff event has been processed. if err = c.transitionState(StateStarting, StateRunning); err != nil { - log.Debug(err) + op.Debugf(err.Error()) } return nil } -func (c *Container) stop(ctx context.Context, waitTime *int32) error { - defer trace.End(trace.Begin(c.ExecConfig.ID)) +func (c *Container) stop(op trace.Operation, waitTime *int32) error { + defer trace.End(trace.Begin(c.ExecConfig.ID, op)) defer c.onStop() @@ -451,13 +453,13 @@ func (c *Container) stop(ctx context.Context, waitTime *int32) error { // if there's a failure we'll revert to existing finalState := c.SetState(StateStopping) - err := c.containerBase.stop(ctx, waitTime) + err := c.containerBase.stop(op, waitTime) if err != nil { // we've got no idea what state the container is in at this point // running is an _optimistic_ statement // If the current state is Stopping, revert it to the old state. if stateErr := c.transitionState(StateStopping, finalState); stateErr != nil { - log.Debug(stateErr) + op.Debugf(stateErr.Error()) } return err @@ -465,24 +467,24 @@ func (c *Container) stop(ctx context.Context, waitTime *int32) error { // Transition the state to Stopped only if it's Stopping. if err = c.transitionState(StateStopping, StateStopped); err != nil { - log.Debug(err) + op.Debugf(err.Error()) } return nil } -func (c *Container) Signal(ctx context.Context, num int64) error { - defer trace.End(trace.Begin(c.ExecConfig.ID)) +func (c *Container) Signal(op trace.Operation, num int64) error { + defer trace.End(trace.Begin(c.ExecConfig.ID, op)) if c.vm == nil { return fmt.Errorf("vm not set") } if num == int64(syscall.SIGKILL) { - return c.containerBase.kill(ctx) + return c.containerBase.kill(op) } - return c.startGuestProgram(ctx, "kill", fmt.Sprintf("%d", num)) + return c.startGuestProgram(op, "kill", fmt.Sprintf("%d", num)) } func (c *Container) onStop() { @@ -496,8 +498,8 @@ func (c *Container) onStop() { } } -func (c *Container) LogReader(ctx context.Context, tail int, follow bool, since int64) (io.ReadCloser, error) { - defer trace.End(trace.Begin(c.ExecConfig.ID)) +func (c *Container) LogReader(op trace.Operation, tail int, follow bool, since int64) (io.ReadCloser, error) { + defer trace.End(trace.Begin(c.ExecConfig.ID, op)) c.m.Lock() defer c.m.Unlock() @@ -505,7 +507,7 @@ func (c *Container) LogReader(ctx context.Context, tail int, follow bool, since return nil, fmt.Errorf("vm not set") } - url, err := c.vm.VMPathNameAsURL(ctx) + url, err := c.vm.VMPathNameAsURL(op) if err != nil { return nil, err } @@ -516,21 +518,28 @@ func (c *Container) LogReader(ctx context.Context, tail int, follow bool, since if c.state == StateRunning && c.vm.IsVC() { // #nosec: Errors unhandled. - hosts, _ := c.vm.Datastore.AttachedHosts(ctx) + hosts, _ := c.vm.Datastore.AttachedHosts(op) if len(hosts) > 1 { // In this case, we need download from the VM host as it owns the file lock // #nosec: Errors unhandled. - h, _ := c.vm.HostSystem(ctx) + h, _ := c.vm.HostSystem(op) if h != nil { - ctx = c.vm.Datastore.HostContext(ctx, h) + // get a context that embeds the host as a value + ctx := c.vm.Datastore.HostContext(op, h) + //grab the initial opID + id := op.ID() + // this will create a new operation with the host + // context value still intact + op = trace.FromContext(ctx, "LogReader") + op.Debugf("New Operation created replacing Operation: %s", id) via = fmt.Sprintf(" via %s", h.Reference()) } } } - log.Infof("pulling %s%s", name, via) + op.Infof("pulling %s%s", name, via) - file, err := c.vm.Datastore.Open(ctx, name) + file, err := c.vm.Datastore.Open(op, name) if err != nil { return nil, err } @@ -545,7 +554,7 @@ func (c *Container) LogReader(ctx context.Context, tail int, follow bool, since entry, err := iolog.ParseLogEntry(buf) if err != nil { - log.Errorf("Error parsing log entry: %s", err.Error()) + op.Errorf("Error parsing log entry: %s", err.Error()) return false } @@ -574,8 +583,9 @@ func (c *Container) LogReader(ctx context.Context, tail int, follow bool, since } // Remove removes a containerVM after detaching the disks -func (c *Container) Remove(ctx context.Context, sess *session.Session) error { - defer trace.End(trace.Begin(c.ExecConfig.ID)) +func (c *Container) Remove(op trace.Operation, sess *session.Session) error { + // op := trace.FromContext(ctx, "Remove") + defer trace.End(trace.Begin(c.ExecConfig.ID, op)) c.m.Lock() defer c.m.Unlock() @@ -593,7 +603,7 @@ func (c *Container) Remove(ctx context.Context, sess *session.Session) error { existingState := c.updateState(StateRemoving) // get the folder the VM is in - url, err := c.vm.VMPathNameAsURL(ctx) + url, err := c.vm.VMPathNameAsURL(op) if err != nil { // handle the out-of-band removal case @@ -602,31 +612,31 @@ func (c *Container) Remove(ctx context.Context, sess *session.Session) error { return NotFoundError{} } - log.Errorf("Failed to get datastore path for %s: %s", c, err) + op.Errorf("Failed to get datastore path for %s: %s", c, err) c.updateState(existingState) return err } - ds, err := sess.Finder.Datastore(ctx, url.Host) + ds, err := sess.Finder.Datastore(op, url.Host) if err != nil { return err } // enable Destroy - c.vm.EnableDestroy(ctx) + c.vm.EnableDestroy(op) concurrent := false // if DeleteExceptDisks succeeds on VC, it leaves the VM orphan so we need to call Unregister // if DeleteExceptDisks succeeds on ESXi, no further action needed // if DeleteExceptDisks fails, we should call Unregister and only return an error if that fails too // Unregister sometimes can fail with ManagedObjectNotFound so we ignore it - _, err = c.vm.WaitForResult(ctx, func(ctx context.Context) (tasks.Task, error) { - return c.vm.DeleteExceptDisks(ctx) + _, err = c.vm.WaitForResult(op, func(op context.Context) (tasks.Task, error) { + return c.vm.DeleteExceptDisks(op) }) if err != nil { f, ok := err.(types.HasFault) if !ok { - log.Warnf("DeleteExceptDisks failed with non-fault error %s for %s.", err, c) + op.Warnf("DeleteExceptDisks failed with non-fault error %s for %s.", err, c) c.updateState(existingState) return err @@ -634,18 +644,18 @@ func (c *Container) Remove(ctx context.Context, sess *session.Session) error { switch f.Fault().(type) { case *types.InvalidState: - log.Warnf("container VM %s is in invalid state, unregistering", c) - if err := c.vm.Unregister(ctx); err != nil { - log.Errorf("Error while attempting to unregister container VM %s: %s", c, err) + op.Warnf("container VM %s is in invalid state, unregistering", c) + if err := c.vm.Unregister(op); err != nil { + op.Errorf("Error while attempting to unregister container VM %s: %s", c, err) return err } case *types.ConcurrentAccess: // We are getting ConcurrentAccess errors from DeleteExceptDisks - even though we don't set ChangeVersion in that path // We are ignoring the error because in reality the operation finishes successfully. - log.Warnf("DeleteExceptDisks failed with ConcurrentAccess error for %s. Ignoring it.", c) + op.Warnf("DeleteExceptDisks failed with ConcurrentAccess error for %s. Ignoring it.", c) concurrent = true default: - log.Debugf("Unhandled fault while attempting to destroy vm %s: %#v", c, f.Fault()) + op.Debugf("Unhandled fault while attempting to destroy vm %s: %#v", c, f.Fault()) c.updateState(existingState) return err @@ -653,9 +663,9 @@ func (c *Container) Remove(ctx context.Context, sess *session.Session) error { } if concurrent && c.vm.IsVC() { - if err := c.vm.Unregister(ctx); err != nil { + if err := c.vm.Unregister(op); err != nil { if !IsNotFoundError(err) { - log.Errorf("Error while attempting to unregister container VM %s: %s", c, err) + op.Errorf("Error while attempting to unregister container VM %s: %s", c, err) return err } } @@ -663,14 +673,14 @@ func (c *Container) Remove(ctx context.Context, sess *session.Session) error { // remove from datastore fm := ds.NewFileManager(sess.Datacenter, true) - if err = fm.Delete(ctx, url.Path); err != nil { + if err = fm.Delete(op, url.Path); err != nil { // at this phase error doesn't matter. Just log it. - log.Debugf("Failed to delete %s, %s for %s", url, err, c) + op.Debugf("Failed to delete %s, %s for %s", url, err, c) } //remove container from cache Containers.Remove(c.ExecConfig.ID) - publishContainerEvent(c.ExecConfig.ID, time.Now(), events.ContainerRemoved) + publishContainerEvent(op, c.ExecConfig.ID, time.Now(), events.ContainerRemoved) return nil } @@ -704,12 +714,13 @@ func eventedState(e events.Event, current State) State { } func (c *Container) OnEvent(e events.Event) { - defer trace.End(trace.Begin(fmt.Sprintf("event %s received for id: %d", e.String(), e.EventID()))) + op := trace.NewOperation(context.Background(), "OnEvent") + defer trace.End(trace.Begin(fmt.Sprintf("eventID(%s) received for event: %s", e.EventID(), e.String()), op)) c.m.Lock() defer c.m.Unlock() if c.vm == nil { - log.Warnf("c.vm is nil for id: %d", e.String(), e.EventID()) + op.Warnf("Event(%s) received for %s but no VM found", e.EventID(), e.Reference()) return } newState := eventedState(e, c.state) @@ -722,22 +733,22 @@ func (c *Container) OnEvent(e events.Event) { StateSuspended: // container state has changed so we need to update the container attributes - ctx, cancel := context.WithTimeout(context.Background(), constants.PropertyCollectorTimeout) + op, cancel := trace.WithTimeout(&op, constants.PropertyCollectorTimeout, "Container State Event") defer cancel() - if err := c.refresh(ctx); err != nil { - log.Errorf("Event driven container update failed: %s", err) + if err := c.refresh(op); err != nil { + op.Errorf("Container(%s) Event driven update failed: %s", c, err) } c.updateState(newState) if newState == StateStopped { c.onStop() } - log.Debugf("Container(%s) state set to %s via event activity", c, newState) + op.Debugf("Container(%s) state set to %s via event activity", c, newState) case StateRemoved: if c.vm != nil && c.vm.IsFixing() { // is fixing vm, which will be registered back soon, so do not remove from containers cache - log.Debugf("Container(%s) %s is being fixed - %s event ignored", c, newState) + op.Debugf("Container(%s) %s is being fixed - %s event ignored", c, newState) // Received remove event triggered by unregister VM operation - leave // fixing state now. In a loaded environment, the remove event may be @@ -748,7 +759,7 @@ func (c *Container) OnEvent(e events.Event) { // a container event to be propogated to subscribers return } - log.Debugf("Container(%s) %s via event activity", c, newState) + op.Debugf("Container(%s) %s via event activity", c, newState) // if we are here the containerVM has been removed from vSphere, so lets remove it // from the portLayer cache Containers.Remove(c.ExecConfig.ID) @@ -758,21 +769,21 @@ func (c *Container) OnEvent(e events.Event) { } // regardless of state update success or failure publish the container event - publishContainerEvent(c.ExecConfig.ID, e.Created(), e.String()) + publishContainerEvent(op, c.ExecConfig.ID, e.Created(), e.String()) return } - log.Debugf("Container(%s) state didn't changed (%s)", c, newState) + op.Debugf("Container(%s) state(%s) didn't change", c, newState) switch e.String() { case events.ContainerRelocated: // container relocated so we need to update the container attributes - ctx, cancel := context.WithTimeout(context.Background(), constants.PropertyCollectorTimeout) + op, cancel := trace.WithTimeout(&op, constants.PropertyCollectorTimeout, "Container Relocated") defer cancel() - err := c.refresh(ctx) + err := c.refresh(op) if err != nil { - log.Errorf("Event driven container update failed for %s with %s", c, err) + op.Errorf("Container(%s) Relocation Event driven refresh failed: %s", c, err) } } } diff --git a/lib/portlayer/exec/exec.go b/lib/portlayer/exec/exec.go index 048e540355..fedee78607 100644 --- a/lib/portlayer/exec/exec.go +++ b/lib/portlayer/exec/exec.go @@ -25,9 +25,11 @@ import ( "github.com/vmware/govmomi/find" "github.com/vmware/govmomi/object" + "github.com/vmware/vic/lib/portlayer/event" "github.com/vmware/vic/lib/portlayer/event/collector/vsphere" "github.com/vmware/vic/lib/portlayer/event/events" + "github.com/vmware/vic/pkg/trace" "github.com/vmware/vic/pkg/vsphere/extraconfig" "github.com/vmware/vic/pkg/vsphere/session" ) @@ -133,13 +135,16 @@ func Init(ctx context.Context, sess *session.Session, source extraconfig.DataSou } // publishContainerEvent will publish a ContainerEvent to the vic event stream -func publishContainerEvent(id string, created time.Time, eventType string) { +func publishContainerEvent(op trace.Operation, id string, created time.Time, eventType string) { if Config.EventManager == nil || eventType == "" { return } ce := &events.ContainerEvent{ BaseEvent: &events.BaseEvent{ + // containerEvents are a construct of vic, so lets set the + // ID equal to the operation that created the event + ID: op.ID(), Ref: id, CreatedTime: created, Event: eventType, diff --git a/lib/portlayer/exec/exec_test.go b/lib/portlayer/exec/exec_test.go index ee636afc87..3915d2cc1c 100644 --- a/lib/portlayer/exec/exec_test.go +++ b/lib/portlayer/exec/exec_test.go @@ -15,6 +15,7 @@ package exec import ( + "context" "testing" "time" @@ -24,6 +25,7 @@ import ( "github.com/vmware/vic/lib/portlayer/event" "github.com/vmware/vic/lib/portlayer/event/collector/vsphere" "github.com/vmware/vic/lib/portlayer/event/events" + "github.com/vmware/vic/pkg/trace" ) var containerEvents []events.Event @@ -75,7 +77,7 @@ func TestPublishContainerEvent(t *testing.T) { container.SetState(StateRunning) Containers.Put(container) - publishContainerEvent(id, time.Now().UTC(), events.ContainerPoweredOff) + publishContainerEvent(trace.NewOperation(context.Background(), "container"), id, time.Now().UTC(), events.ContainerPoweredOff) time.Sleep(time.Millisecond * 30) assert.Equal(t, 1, len(containerEvents)) diff --git a/lib/portlayer/exec/handle.go b/lib/portlayer/exec/handle.go index 549028a5b6..f57587cfad 100644 --- a/lib/portlayer/exec/handle.go +++ b/lib/portlayer/exec/handle.go @@ -202,6 +202,9 @@ func (h *Handle) String() string { } func (h *Handle) Commit(ctx context.Context, sess *session.Session, waitTime *int32) error { + // ensure we have an operation moving forward.. + op := trace.FromContext(ctx, "handle commit") + cfg := make(map[string]string) // Set timestamps based on target state @@ -243,7 +246,7 @@ func (h *Handle) Commit(ctx context.Context, sess *session.Session, waitTime *in } s.ExtraConfig = h.changes - if err := Commit(ctx, sess, h, waitTime); err != nil { + if err := Commit(op, sess, h, waitTime); err != nil { return err } @@ -253,9 +256,9 @@ func (h *Handle) Commit(ctx context.Context, sess *session.Session, waitTime *in // refresh is for internal use only - it's sole purpose at this time is to allow the stop path to update ChangeVersion // and corresponding state before performing any associated reconfigure -func (h *Handle) refresh(ctx context.Context) { +func (h *Handle) refresh(op trace.Operation) { // update Config and Runtime to reflect current state - h.containerBase.refresh(ctx) + h.containerBase.refresh(op) // reapply extraconfig changes s := h.Spec.Spec() @@ -264,17 +267,18 @@ func (h *Handle) refresh(ctx context.Context) { s.ChangeVersion = h.Config.ChangeVersion } +// TODO: CDG Remove?? // CommitWithoutSpec sets the handle's spec to nil so that Commit operation only does a state change and won't touch the extraconfig -func (h *Handle) CommitWithoutSpec(ctx context.Context, sess *session.Session, waitTime *int32) error { - h.Spec = nil +// func (h *Handle) CommitWithoutSpec(ctx context.Context, sess *session.Session, waitTime *int32) error { +// h.Spec = nil - if err := Commit(ctx, sess, h, waitTime); err != nil { - return err - } +// if err := Commit(ctx, sess, h, waitTime); err != nil { +// return err +// } - h.Close() - return nil -} +// h.Close() +// return nil +// } func (h *Handle) Close() { handlesLock.Lock() diff --git a/lib/portlayer/logging/logging.go b/lib/portlayer/logging/logging.go index 9ccfc6612a..fe14cb6c4d 100644 --- a/lib/portlayer/logging/logging.go +++ b/lib/portlayer/logging/logging.go @@ -54,19 +54,20 @@ func eventCallback(ctx context.Context, ie events.Event) { switch ie.String() { case events.ContainerMigrated, events.ContainerMigratedByDrs: - log.Debugf("Received %s event", ie) + op := trace.FromContext(ctx, "LoggingEvent") + op.Debugf("Logging processing eventID(%s): %s", ie.EventID(), ie) // grab the container from the cache container := exec.Containers.Container(ie.Reference()) if container == nil { - log.Errorf("Container %s not found. Dropping the event %s from Logging subsystem.", ie.Reference(), ie) + op.Errorf("Container %s not found. Dropping the event %s from Logging subsystem.", ie.Reference(), ie) return } operation := func() error { var err error - handle := container.NewHandle(ctx) + handle := container.NewHandle(op) if handle == nil { err = fmt.Errorf("Handle for %s cannot be created", ie.Reference()) log.Error(err) @@ -76,19 +77,19 @@ func eventCallback(ctx context.Context, ie events.Event) { // set them to true if handle, err = toggle(handle, true); err != nil { - log.Errorf("Failed to toggle logging after %s event for container %s: %s", ie, ie.Reference(), err) + op.Errorf("Failed to toggle logging after %s event for container %s: %s", ie, ie.Reference(), err) return err } - if err = handle.Commit(ctx, nil, nil); err != nil { - log.Errorf("Failed to commit handle after getting %s event for container %s: %s", ie, ie.Reference(), err) + if err = handle.Commit(op, nil, nil); err != nil { + op.Errorf("Failed to commit handle after getting %s event for container %s: %s", ie, ie.Reference(), err) return err } return nil } if err := retry.Do(operation, exec.IsConcurrentAccessError); err != nil { - log.Errorf("Multiple attempts failed to commit handle after getting %s event for container %s: %s", ie, ie.Reference(), err) + op.Errorf("Multiple attempts failed to commit handle after getting %s event for container %s: %s", ie, ie.Reference(), err) } } diff --git a/lib/portlayer/network/context.go b/lib/portlayer/network/context.go index 77784ea8ea..e58e886f8d 100644 --- a/lib/portlayer/network/context.go +++ b/lib/portlayer/network/context.go @@ -911,8 +911,8 @@ func (c *Context) removeAliases(aliases []string, con *Container) { // RemoveIDFromScopes removes the container from the scopes but doesn't touch the runtime state // Because of that it requires an id -func (c *Context) RemoveIDFromScopes(id string) ([]*Endpoint, error) { - defer trace.End(trace.Begin("")) +func (c *Context) RemoveIDFromScopes(op trace.Operation, id string) ([]*Endpoint, error) { + defer trace.End(trace.Begin("", op)) c.Lock() defer c.Unlock() @@ -951,8 +951,8 @@ func (c *Context) RemoveIDFromScopes(id string) ([]*Endpoint, error) { // UnbindContainer removes the container from the scopes and clears out the assigned IP // Because of that, it requires a handle -func (c *Context) UnbindContainer(h *exec.Handle) ([]*Endpoint, error) { - defer trace.End(trace.Begin("")) +func (c *Context) UnbindContainer(op trace.Operation, h *exec.Handle) ([]*Endpoint, error) { + defer trace.End(trace.Begin("", op)) c.Lock() defer c.Unlock() diff --git a/lib/portlayer/network/context_test.go b/lib/portlayer/network/context_test.go index 225b3cb03d..61774fd78c 100644 --- a/lib/portlayer/network/context_test.go +++ b/lib/portlayer/network/context_test.go @@ -40,6 +40,7 @@ import ( "github.com/vmware/vic/lib/spec" "github.com/vmware/vic/pkg/ip" "github.com/vmware/vic/pkg/kvstore" + "github.com/vmware/vic/pkg/trace" "github.com/vmware/vic/pkg/uid" "github.com/vmware/vic/pkg/vsphere/extraconfig" ) @@ -875,7 +876,8 @@ func TestContextBindUnbindContainer(t *testing.T) { // test UnbindContainer for i, te := range tests { - eps, err := ctx.UnbindContainer(te.h) + op := trace.NewOperation(context.Background(), "Testing..") + eps, err := ctx.UnbindContainer(op, te.h) if te.err != nil { if err == nil { t.Fatalf("%d: ctx.UnbindContainer(%s) => nil, want err", i, te.h) @@ -1224,7 +1226,8 @@ func TestAliases(t *testing.T) { t.Logf("containers: %#v", ctx.containers) c := containers["c2"] - _, err = ctx.UnbindContainer(c) + op := trace.NewOperation(context.Background(), "unbind") + _, err = ctx.UnbindContainer(op, c) assert.NoError(t, err) // verify aliases are gone assert.Nil(t, ctx.Container(c.ExecConfig.ID)) diff --git a/lib/portlayer/network/network.go b/lib/portlayer/network/network.go index 31c3f7f972..299243ba72 100644 --- a/lib/portlayer/network/network.go +++ b/lib/portlayer/network/network.go @@ -120,23 +120,26 @@ func Init(ctx context.Context, sess *session.Session, source extraconfig.DataSou func handleEvent(netctx *Context, ie events.Event) { switch ie.String() { case events.ContainerPoweredOff: - handle := exec.GetContainer(context.Background(), uid.Parse(ie.Reference())) + op := trace.NewOperation(context.Background(), fmt.Sprintf("handleEvent(%s)", ie.EventID())) + op.Infof("Handling Event: %s", ie.EventID()) + // grab the operation from the event + handle := exec.GetContainer(op, uid.Parse(ie.Reference())) if handle == nil { - _, err := netctx.RemoveIDFromScopes(ie.Reference()) + _, err := netctx.RemoveIDFromScopes(op, ie.Reference()) if err != nil { - log.Errorf("Failed to remove container %s scope: %s", ie.Reference(), err) + op.Errorf("Failed to remove container %s scope: %s", ie.Reference(), err) } return } defer handle.Close() - if _, err := netctx.UnbindContainer(handle); err != nil { - log.Warnf("Failed to unbind container %s: %s", ie.Reference(), err) + if _, err := netctx.UnbindContainer(op, handle); err != nil { + op.Warnf("Failed to unbind container %s: %s", ie.Reference(), err) return } - if err := handle.Commit(context.Background(), nil, nil); err != nil { - log.Warnf("Failed to commit handle after network unbind for container %s: %s", ie.Reference(), err) + if err := handle.Commit(op, nil, nil); err != nil { + op.Warnf("Failed to commit handle after network unbind for container %s: %s", ie.Reference(), err) } } diff --git a/pkg/trace/operation.go b/pkg/trace/operation.go index 920e15149c..1217f2e8a7 100644 --- a/pkg/trace/operation.go +++ b/pkg/trace/operation.go @@ -22,7 +22,7 @@ import ( "sync/atomic" "time" - "github.com/Sirupsen/logrus" + "github.com/vmware/govmomi/vim25/types" ) type OperationKey string @@ -52,7 +52,7 @@ func newOperation(ctx context.Context, id string, skip int, msg string) Operatio id: id, // Start the trace. - t: []Message{*newTrace(msg, skip)}, + t: []Message{*newTrace(msg, skip, id)}, } // We need to be able to identify this operation across API (and process) @@ -61,6 +61,12 @@ func newOperation(ctx context.Context, id string, skip int, msg string) Operatio // the operation itself as a value to the embedded context (it's circular) ctx = context.WithValue(ctx, OpTraceKey, op) + // By adding the op.id any operations passed to govmomi will result + // in the op.id being logged in vSphere (vpxa / hostd) as the prefix to opID + // For example if the op.id was 299.16 hostd would show + // verbose hostd[12281B70] [Originator@6876 sub=PropertyProvider opID=299.16-5b05 user=root] + ctx = context.WithValue(ctx, types.ID{}, op.id) + o := Operation{ Context: ctx, operation: op, @@ -71,9 +77,9 @@ func newOperation(ctx context.Context, id string, skip int, msg string) Operatio // Creates a header string to be printed. func (o *Operation) header() string { - if Logger.Level >= logrus.DebugLevel { - return fmt.Sprintf("op=%s (delta:%s)", o.id, o.t[0].delta()) - } + // if Logger.Level >= logrus.DebugLevel { + // return fmt.Sprintf("op=%s (delta:%s)", o.id, o.t[0].delta()) + // } return fmt.Sprintf("op=%s", o.id) } @@ -88,7 +94,7 @@ func (o Operation) Err() error { buf := &bytes.Buffer{} // Add a frame for this Err call, then walk the stack - currFrame := newTrace("Err", 2) + currFrame := newTrace("Err", 2, o.id) fmt.Fprintf(buf, "%s: %s error: %s\n", currFrame.funcName, o.t[0].msg, err) // handle the carriage return @@ -154,13 +160,9 @@ func opID(opNum uint64) string { return fmt.Sprintf("%d.%d", opIDPrefix, opNum) } -// Add tracing info to the context. +// NewOperation will return a new operation with operationID added as a value to the context func NewOperation(ctx context.Context, format string, args ...interface{}) Operation { - o := newOperation(ctx, opID(atomic.AddUint64(&opCount, 1)), 3, fmt.Sprintf(format, args...)) - - frame := o.t[0] - o.Debugf("[NewOperation] %s [%s:%d]", o.header(), frame.funcName, frame.lineNo) - return o + return newOperation(ctx, opID(atomic.AddUint64(&opCount, 1)), 3, fmt.Sprintf(format, args...)) } // WithTimeout creates a new operation from parent with context.WithTimeout @@ -201,20 +203,28 @@ func FromOperation(parent Operation, format string, args ...interface{}) Operati return parent.newChild(parent.Context, fmt.Sprintf(format, args...)) } -// FromContext unpacks the values in the ctx to create an Operation -func FromContext(ctx context.Context) (Operation, error) { - - o := Operation{ - Context: ctx, +// FromContext will return an Operation +// +// The Operation returned will be one of the following: +// The operation in the context value +// The operation passed as the context param +// A new operation +func FromContext(ctx context.Context, message string) Operation { + + // do we have a context w/the op added as a value + if op, ok := ctx.Value(OpTraceKey).(Operation); ok { + return Operation{ + Context: ctx, + operation: op.operation, + } } - - op := ctx.Value(OpTraceKey) - switch val := op.(type) { - case operation: - o.operation = val - default: - return Operation{}, fmt.Errorf("not an Operation") + // do we have an operation + op, ok := ctx.(Operation) + if !ok { + op = newOperation(ctx, opID(atomic.AddUint64(&opCount, 1)), 3, message) + frame := op.t[0] + Logger.Debugf("%s: [OperationFromContext] [%s:%d]", op.id, frame.funcName, frame.lineNo) } - - return o, nil + // return the new or existing operation + return op } diff --git a/pkg/trace/operation_test.go b/pkg/trace/operation_test.go index 646e2b1b7b..bfe53b32a2 100644 --- a/pkg/trace/operation_test.go +++ b/pkg/trace/operation_test.go @@ -39,11 +39,7 @@ func TestContextUnpack(t *testing.T) { ctx := NewOperation(context.TODO(), "testmsg") // unpack an Operation via the context using it's Values fields - c, err := FromContext(ctx) - - if !assert.NoError(t, err) || !assert.NotNil(t, c) { - return - } + c := FromContext(ctx, "test") c.Infof("test info message %d", i) }(i) // fix race in test } diff --git a/pkg/trace/trace.go b/pkg/trace/trace.go index e29b33c257..0bcaff89da 100644 --- a/pkg/trace/trace.go +++ b/pkg/trace/trace.go @@ -15,12 +15,16 @@ package trace import ( + "context" + "fmt" "os" "runtime" + "strings" "time" "github.com/Sirupsen/logrus" + "github.com/vmware/govmomi/vim25/types" "github.com/vmware/vic/pkg/log" ) @@ -47,9 +51,10 @@ var Logger = &logrus.Logger{ // trace object used to grab run-time state type Message struct { - msg string - funcName string - lineNo int + msg string + funcName string + lineNo int + operationID string startTime time.Time } @@ -62,30 +67,50 @@ func (t *Message) delta() time.Duration { } // begin a trace from this stack frame less the skip. -func newTrace(msg string, skip int) *Message { +func newTrace(msg string, skip int, opID string) *Message { pc, _, line, ok := runtime.Caller(skip) if !ok { return nil } - name := runtime.FuncForPC(pc).Name() + // lets only return the func name from the repo (vic) + // down - i.e. vic/lib/etc vs. github.com/vmware/vic/lib/etc + // if github.com/vmware doesn't match then the original is returned + name := strings.Replace(runtime.FuncForPC(pc).Name(), "github.com/vmware/", "", -1) + + message := Message{ + msg: msg, + funcName: name, + lineNo: line, - return &Message{ - msg: msg, - funcName: name, - lineNo: line, startTime: time.Now(), } + + // if we have an operationID then format the output + if opID != "" { + message.operationID = fmt.Sprintf("op=%s", opID) + } + + return &message } // Begin starts the trace. Msg is the msg to log. -func Begin(msg string) *Message { +// context provided to allow tracing of operationID +// context added as optional to avoid breaking current usage +func Begin(msg string, ctx ...context.Context) *Message { if tracingEnabled && Logger.Level >= logrus.DebugLevel { - if t := newTrace(msg, 2); t != nil { + var opID string + // populate operationID if provided + if len(ctx) == 1 { + if id, ok := ctx[0].Value(types.ID{}).(string); ok { + opID = id + } + } + if t := newTrace(msg, 2, opID); t != nil { if msg == "" { - Logger.Debugf("[BEGIN] [%s:%d]", t.funcName, t.lineNo) + Logger.Debugf("[BEGIN] %s [%s:%d]", t.operationID, t.funcName, t.lineNo) } else { - Logger.Debugf("[BEGIN] [%s:%d] %s", t.funcName, t.lineNo, t.msg) + Logger.Debugf("[BEGIN] %s [%s:%d] %s", t.operationID, t.funcName, t.lineNo, t.msg) } return t @@ -99,5 +124,5 @@ func End(t *Message) { if t == nil { return } - Logger.Debugf("[ END ] [%s:%d] [%s] %s", t.funcName, t.lineNo, t.delta(), t.msg) + Logger.Debugf("[ END ] %s [%s:%d] [%s] %s", t.operationID, t.funcName, t.lineNo, t.delta(), t.msg) } diff --git a/pkg/vsphere/tasks/waiter.go b/pkg/vsphere/tasks/waiter.go index c76cf7962a..60e65533ff 100644 --- a/pkg/vsphere/tasks/waiter.go +++ b/pkg/vsphere/tasks/waiter.go @@ -63,10 +63,7 @@ func WaitForResult(ctx context.Context, f func(context.Context) (Task, error)) ( var err error var backoffFactor int64 = 1 - op, err := trace.FromContext(ctx) - if err != nil { - op = trace.NewOperation(ctx, "WaitForResult") - } + op := trace.FromContext(ctx, "WaitForResult") for { var t Task diff --git a/pkg/vsphere/vm/vm.go b/pkg/vsphere/vm/vm.go index 4dfadd3caf..bcdaad12ac 100644 --- a/pkg/vsphere/vm/vm.go +++ b/pkg/vsphere/vm/vm.go @@ -33,6 +33,7 @@ import ( "github.com/vmware/govmomi/vim25/mo" "github.com/vmware/govmomi/vim25/types" + "github.com/vmware/vic/pkg/trace" "github.com/vmware/vic/pkg/vsphere/extraconfig/vmomi" "github.com/vmware/vic/pkg/vsphere/session" "github.com/vmware/vic/pkg/vsphere/tasks" @@ -179,6 +180,8 @@ func (vm *VirtualMachine) WaitForKeyInExtraConfig(ctx context.Context, key strin var detail string var poweredOff error + op := trace.FromContext(ctx, "WaitForKey") + waitFunc := func(pc []types.PropertyChange) bool { for _, c := range pc { if c.Op != types.PropertyChangeOpAssign { @@ -213,7 +216,7 @@ func (vm *VirtualMachine) WaitForKeyInExtraConfig(ctx context.Context, key strin return false } - err := vm.WaitForExtraConfig(ctx, waitFunc) + err := vm.WaitForExtraConfig(op, waitFunc) if err == nil && poweredOff != nil { err = poweredOff } @@ -486,7 +489,10 @@ func (vm *VirtualMachine) WaitForResult(ctx context.Context, f func(context.Cont } func (vm *VirtualMachine) Properties(ctx context.Context, r types.ManagedObjectReference, ps []string, o *mo.VirtualMachine) error { - log.Debugf("get vm properties %s of vm %s", ps, r) + // lets ensure we have an operation + op := trace.FromContext(ctx, "VM Properties") + defer trace.End(trace.Begin(fmt.Sprintf("VM(%s) Properties(%s)", r, ps), op)) + contains := false for i := range ps { if ps[i] == "summary" || ps[i] == "summary.runtime" { @@ -500,20 +506,20 @@ func (vm *VirtualMachine) Properties(ctx context.Context, r types.ManagedObjectR } else { newps = append(newps, ps...) } - log.Debugf("properties: %s", newps) - if err := vm.VirtualMachine.Properties(ctx, r, newps, o); err != nil { + op.Debugf("properties: %s", newps) + if err := vm.VirtualMachine.Properties(op, r, newps, o); err != nil { return err } if o.Summary.Runtime.ConnectionState != types.VirtualMachineConnectionStateInvalid { return nil } - log.Infof("vm %s is in invalid state", r) - if err := vm.fixVM(ctx); err != nil { - log.Errorf("Failed to fix vm %s: %s", vm.Reference(), err) + op.Infof("vm %s is in invalid state", r) + if err := vm.fixVM(op); err != nil { + op.Errorf("Failed to fix vm %s: %s", vm.Reference(), err) return &InvalidState{r: vm.Reference()} } - log.Debugf("Retry properties query %s of vm %s", ps, vm.Reference()) - return vm.VirtualMachine.Properties(ctx, vm.Reference(), ps, o) + + return vm.VirtualMachine.Properties(op, vm.Reference(), ps, o) } func (vm *VirtualMachine) Parent(ctx context.Context) (*types.ManagedObjectReference, error) {