Skip to content

Commit

Permalink
Add retry functionality for ExecCreate path
Browse files Browse the repository at this point in the history
This adds several new additions to the ExecCreate path as follows:
* Adds a retry around most of the ExecCreate operation including state checks
* Retry on Concurrent mods for exec should now detect shutdown and state
collisoins
* Changes the behavior of the GetState function in the portlayer
	** This now takes the vm powerstate from the provided handle
* Shifts all ExecCreate operations to using the handle version of the config.

Based on all of the above points the entire exec path should now be
more resilient to any kind of concurrent modification collision.
We are primarly attempting to mitigate powerstate collisions
with this commit though.
  • Loading branch information
matthewavery committed Nov 27, 2017
1 parent 39c045a commit 8ab0bb5
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 103 deletions.
118 changes: 55 additions & 63 deletions lib/apiservers/engine/backends/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,19 +206,6 @@ func (c *Container) Handle(id, name string) (string, error) {

// docker's container.execBackend

// FIXME: this returns a docker model, we should avoid this. This is what should be in the container proxy... But we need to confirm that it is not a public endpoint. its return hints that it is not... or that the actual return might be different
func (c *Container) TaskInspect(cid, cname, eid string) (*models.TaskInspectResponse, error) {
defer trace.End(trace.Begin(fmt.Sprintf("cid(%s), cname(%s), eid(%s)", cid, cname, eid)))
op := trace.NewOperation(context.Background(), "")

handle, err := c.Handle(cid, cname)
if err != nil {
return nil, err
}

return c.containerProxy.InspectTask(op, handle, eid, cid)
}

func (c *Container) TaskWaitToStart(cid, cname, eid string) error {
// obtain a portlayer client
client := c.containerProxy.Client()
Expand Down Expand Up @@ -261,57 +248,64 @@ func (c *Container) ContainerExecCreate(name string, config *types.ExecConfig) (
}
id := vc.ContainerID

// Is it running?
state, err := c.containerProxy.State(vc)
if err != nil {
return "", InternalServerError(err.Error())
}
// set up the environment
config.Env = setEnvFromImageConfig(config.Tty, config.Env, vc.Config.Env)

if state.Restarting {
return "", ConflictError(fmt.Sprintf("Container %s is restarting, wait until the container is running", id))
}
if !state.Running {
return "", ConflictError(fmt.Sprintf("Container %s is not running", id))
}
var eid string
operation := func() error {

op.Debugf("State checks succeeded for exec operation on cotnainer(%s)", id)
handle, err := c.Handle(id, name)
if err != nil {
op.Error(err)
return "", InternalServerError(err.Error())
}
handle, err := c.Handle(id, name)
if err != nil {
op.Error(err)
return InternalServerError(err.Error())
}

// set up the environment
config.Env = setEnvFromImageConfig(config.Tty, config.Env, vc.Config.Env)
// Is it running?
handle, state, err := c.containerProxy.GetStateFromHandle(op, handle)
if err != nil {
return InternalServerError(err.Error())
}

handleprime, eid, err := c.containerProxy.CreateExecTask(handle, config)
if err != nil {
op.Errorf("Failed to create exec task for container(%s) due to error(%s)", id, err)
return "", InternalServerError(err.Error())
// NOTE: we should investigate what we can add or manipulate in the handle in the portlayer to make this check even more granular. Maybe Add the actually State into the handle config or part of the container info could be "snapshotted"
// This check is now done off of the handle.
if state != "RUNNING" {
return ConflictError(fmt.Sprintf("Container %s is restarting, wait until the container is running", id))
}

op.Debugf("State checks succeeded for exec operation on container(%s)", id)
handle, eid, err = c.containerProxy.CreateExecTask(handle, config)
if err != nil {
op.Errorf("Failed to create exec task for container(%s) due to error(%s)", id, err)
return InternalServerError(err.Error())
}

err = c.containerProxy.CommitContainerHandle(handle, id, 0)
if err != nil {
op.Errorf("Failed to commit exec handle for container(%s) due to error(%s)", id, err)
return err
}

return nil
}

err = c.containerProxy.CommitContainerHandle(handleprime, id, 0)
if err != nil {
op.Errorf("Failed to commit exec handle for container(%s) due to error(%s)", id, err)
// FIXME: We cannot necessarily check for IsConflictError here yet since the state check also returns conflict error
if err := retry.Do(operation, IsConflictError); err != nil {
op.Errorf("Failed to start Exec task for container(%s) due to error (%s)", id, err)
return "", err
}

// associate newly created exec task with container
cache.ContainerCache().AddExecToContainer(vc, eid)

// FIXME: NEEDS CONTAINER PROXY
ec, err := c.TaskInspect(id, name, eid)
handle, err := c.Handle(id, name)
if err != nil {
switch err := err.(type) {
case *tasks.InspectInternalServerError:
op.Debugf("received an internal server error during task inspect: %s", err.Payload.Message)
return "", InternalServerError(err.Payload.Message)
case *tasks.InspectConflict:
op.Debugf("received a conflict error during task inspect: %s", err.Payload.Message)
return "", ConflictError(fmt.Sprintf("Cannot complete the operation, container %s has been powered off during execution", id))
default:
return "", InternalServerError(err.Error())
}
op.Error(err)
return "", InternalServerError(err.Error())
}

ec, err := c.containerProxy.InspectTask(op, handle, eid, id)
if err != nil {
return "", err
}

// exec_create event
Expand All @@ -336,19 +330,15 @@ func (c *Container) ContainerExecInspect(eid string) (*backend.ExecInspect, erro
id := vc.ContainerID
name := vc.Name

//FIXME: NEEDS CONTAINER PROXY
ec, err := c.TaskInspect(id, name, eid)
handle, err := c.Handle(id, name)
if err != nil {
switch err := err.(type) {
case *tasks.InspectInternalServerError:
op.Debugf("received an internal server error during task inspect: %s", err.Payload.Message)
return nil, InternalServerError(err.Payload.Message)
case *tasks.InspectConflict:
op.Debugf("received a conflict error during task inspect: %s", err.Payload.Message)
return nil, ConflictError(fmt.Sprintf("Cannot complete the operation, container %s has been powered off during execution", id))
default:
return nil, InternalServerError(err.Error())
}
op.Error(err)
return nil, InternalServerError(err.Error())
}

ec, err := c.containerProxy.InspectTask(op, handle, eid, id)
if err != nil {
return nil, err
}

exit := int(ec.ExitCode)
Expand Down Expand Up @@ -451,6 +441,7 @@ func (c *Container) ContainerExecStart(ctx context.Context, eid string, stdin io
defer cancel()

// we do not return an error here if this fails. TODO: Investigate what exactly happens on error here...
// FIXME: This being in a retry could lead to multiple writes to stdout(?)
go func() {
defer trace.End(trace.Begin(eid))

Expand Down Expand Up @@ -518,6 +509,7 @@ func (c *Container) ContainerExecStart(ctx context.Context, eid string, stdin io
}
return nil
}

if err := retry.Do(operation, IsConflictError); err != nil {
op.Errorf("Failed to start Exec task for container(%s) due to error (%s)", id, err)
return err
Expand Down
27 changes: 23 additions & 4 deletions lib/apiservers/engine/backends/container_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type VicContainerProxy interface {

Stop(vc *viccontainer.VicContainer, name string, seconds *int, unbound bool) error
State(vc *viccontainer.VicContainer) (*types.ContainerState, error)
GetStateFromHandle(op trace.Operation, handle string) (string, string, error)
Wait(vc *viccontainer.VicContainer, timeout time.Duration) (*types.ContainerState, error)
Signal(vc *viccontainer.VicContainer, sig uint64) error
Resize(id string, height, width int32) error
Expand Down Expand Up @@ -337,14 +338,12 @@ func (c *ContainerProxy) InspectTask(op trace.Operation, handle string, eid stri
case *tasks.InspectNotFound:
// These error types may need to be expanded. NotFoundError does not fit here.
op.Errorf("received a TaskNotFound error during task inspect: %s", err.Payload.Message)
return nil, ConflictError("container (%s) has been poweredoff")
return nil, ConflictError(fmt.Sprintf("container (%s) has been poweredoff", cid))
case *tasks.InspectInternalServerError:
op.Errorf("received an internal server error during task inspect: %s", err.Payload.Message)
return nil, InternalServerError(err.Payload.Message)
case *tasks.InspectConflict:
op.Errorf("received a conflict error during task inspect: %s", err.Payload.Message)
return nil, ConflictError(fmt.Sprintf("Cannot complete the operation, container %s has been powered off during execution", cid))
default:
// right now Task inspection in the portlayer does not return a conflict error
return nil, InternalServerError(err.Error())
}
}
Expand Down Expand Up @@ -911,6 +910,26 @@ func (c *ContainerProxy) UnbindContainerFromNetwork(vc *viccontainer.VicContaine
return ub.Payload.Handle, nil
}

// GetStateFromHandle takes a handle and returns the state of the container based on that handle. Also returns handle that comes back with the response.
func (c *ContainerProxy) GetStateFromHandle(op trace.Operation, handle string) (string, string, error) {
defer trace.End(trace.Begin(fmt.Sprintf("handle(%s)", handle), op))

params := &containers.GetStateParams{
Handle: handle,
}

resp, err := c.client.Containers.GetState(params)
if err != nil {
switch err := err.(type) {
case *containers.GetStateNotFound:
return handle, "", NotFoundError(err.Payload.Message)
default:
return handle, "", InternalServerError(err.Error())
}
}
return resp.Payload.Handle, resp.Payload.State, nil
}

// State returns container state
func (c *ContainerProxy) State(vc *viccontainer.VicContainer) (*types.ContainerState, error) {
defer trace.End(trace.Begin(""))
Expand Down
11 changes: 11 additions & 0 deletions lib/apiservers/engine/backends/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,17 @@ func (m *MockContainerProxy) State(vc *viccontainer.VicContainer) (*types.Contai
return state, nil
}

func (m *MockContainerProxy) GetStateFromHandle(op trace.Operation, handle string) (string, string, error) {
return "", "", nil
}

func (m *MockContainerProxy) InspectTask(op trace.Operation, handle string, eid string, cid string) (*models.TaskInspectResponse, error) {
return nil, nil
}
func (m *MockContainerProxy) BindTask(op trace.Operation, handle string, eid string) (*models.TaskBindResponse, error) {
return nil, nil
}

func (m *MockContainerProxy) Wait(vc *viccontainer.VicContainer, timeout time.Duration) (*types.ContainerState, error) {
dockerState := &types.ContainerState{ExitCode: 0}
return dockerState, nil
Expand Down
18 changes: 4 additions & 14 deletions lib/apiservers/portlayer/restapi/handlers/containers_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,30 +171,20 @@ func (handler *ContainersHandlersImpl) StateChangeHandler(params containers.Stat
func (handler *ContainersHandlersImpl) GetStateHandler(params containers.GetStateParams) middleware.Responder {
defer trace.End(trace.Begin(fmt.Sprintf("handle(%s)", params.Handle)))

// NOTE: I've no idea why GetStateHandler takes a handle instead of an ID - hopefully there was a reason for an inspection
// operation to take this path
h := exec.GetHandle(params.Handle)
if h == nil || h.ExecConfig == nil {
return containers.NewGetStateNotFound()
}

container := exec.Containers.Container(h.ExecConfig.ID)
if container == nil {
return containers.NewGetStateNotFound()
}

var state string
switch container.CurrentState() {
case exec.StateRunning:
switch h.Runtime.PowerState {
case types.VirtualMachinePowerStatePoweredOn:
state = "RUNNING"

case exec.StateStopped:
case types.VirtualMachinePowerStatePoweredOff:
state = "STOPPED"

case exec.StateCreated:
state = "CREATED"

default:
// This will occur if the container is suspended... Those are the only types covered in the runtime types right now.
return containers.NewGetStateDefault(http.StatusServiceUnavailable)
}

Expand Down
1 change: 1 addition & 0 deletions lib/portlayer/exec/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func newHandle(con *Container) *Handle {
h := &Handle{
key: newHandleKey(),
targetState: StateUnknown,
State: con.State(),
containerBase: *newBase(con.vm, con.Config, con.Runtime),
// currently every operation has a spec, because even the power operations
// make changes to extraconfig for timestamps and session status
Expand Down
2 changes: 1 addition & 1 deletion lib/portlayer/task/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/vmware/vic/pkg/trace"
)

// Insepct the given task id and returns it's config
// Inspect the given task id and returns it's config
func Inspect(op *trace.Operation, h interface{}, id string) (*executor.SessionConfig, error) {
defer trace.End(trace.Begin(id))

Expand Down
40 changes: 19 additions & 21 deletions tests/test-cases/Group1-Docker-Commands/1-38-Docker-Exec.robot
Original file line number Diff line number Diff line change
Expand Up @@ -125,24 +125,22 @@ Exec NonExisting
#\ Should Contain ${output} no such file or directory

Exec During PowerOff
${status}= Get State Of Github Issue 6744
Run Keyword If '${status}' == 'closed' Fail Test 1-38-Docker-Exec.robot needs to be updated now that Issue #6744 has been resolved
#${rc} ${output}= Run And Return Rc And Output docker %{VCH-PARAMS} pull ${busybox}
#Should Be Equal As Integers ${rc} 0
#Should Not Contain ${output} Error
#${rc} ${id}= Run And Return Rc And Output docker %{VCH-PARAMS} run -d ${busybox} /bin/top
#Should Be Equal As Integers ${rc} 0
#:FOR ${idx} IN RANGE 1 10
#\ Start Process docker %{VCH-PARAMS} exec ${id} /bin/top alias=exec-%{VCH-NAME}-${idx} shell=true
#
#Start Process docker %{VCH-PARAMS} stop ${id} alias=stop-%{VCH-NAME}-${id} shell=true
#${stopResult}= Wait For Process stop-%{VCH-NAME}-${id}
#Should Be Equal As Integers ${stopResult.rc} 0
#
#${combinedoutput}= Set Variable
#
#:FOR ${idx} IN RANGE 1 10
#\ ${result}= Wait For Process exec-%{VCH-NAME}-${idx} timeout=2 mins
#\ ${combinedOutput}= Catenate ${combinedOutput} ${result.stderr}${\n}
#
#Should Contain ${combinedOutput} Cannot complete the operation, container ${id} has been powered off during execution
${rc} ${output}= Run And Return Rc And Output docker %{VCH-PARAMS} pull ${busybox}
Should Be Equal As Integers ${rc} 0
Should Not Contain ${output} Error
${rc} ${id}= Run And Return Rc And Output docker %{VCH-PARAMS} run -d ${busybox} /bin/top
Should Be Equal As Integers ${rc} 0
:FOR ${idx} IN RANGE 1 10
\ Start Process docker %{VCH-PARAMS} exec ${id} /bin/top alias=exec-%{VCH-NAME}-${idx} shell=true

Start Process docker %{VCH-PARAMS} stop ${id} alias=stop-%{VCH-NAME}-${id} shell=true
${stopResult}= Wait For Process stop-%{VCH-NAME}-${id}
Should Be Equal As Integers ${stopResult.rc} 0

${combinedoutput}= Set Variable

:FOR ${idx} IN RANGE 1 10
\ ${result}= Wait For Process exec-%{VCH-NAME}-${idx} timeout=2 mins
\ ${combinedOutput}= Catenate ${combinedOutput} ${result.stderr}${\n}

Should Contain ${combinedOutput} container (${id}) has been powered off

0 comments on commit 8ab0bb5

Please sign in to comment.