Skip to content

Commit

Permalink
Address stun from volume manipulation after outofspace question
Browse files Browse the repository at this point in the history
Observed an outofspace question causing a stun of the VCH until
answering the question via the UI. In this case the question was
msg.hbacommon.outofspace

That has NOT been fully resolved. The following steps have been
taken:
1. switch to retry on transient errors in vm.reconfigure in
   disk manager
2. added Cancel answer to the question so the operation fails in
   predictable manner instead of stunning the VCH indefinitely.

Additionally async error handlers have been added on some disk
management paths to allow for increased parallelization of
attach/detact.
  • Loading branch information
hickeng committed Dec 2, 2022
1 parent 0070ecd commit 15ba6ec
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 71 deletions.
60 changes: 41 additions & 19 deletions lib/apiservers/engine/proxy/storage_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"

log "github.com/Sirupsen/logrus"
"github.com/google/uuid"
Expand Down Expand Up @@ -95,7 +96,7 @@ var SupportedVolDrivers = map[string]struct{}{
"local": {},
}

//Validation pattern for Volume Names
// Validation pattern for Volume Names
var volumeNameRegex = regexp.MustCompile("^[a-zA-Z0-9][a-zA-Z0-9_.-]*$")

func NewStorageProxy(client *client.PortLayer) VicStorageProxy {
Expand Down Expand Up @@ -248,6 +249,7 @@ func (s *StorageProxy) Remove(ctx context.Context, name string) error {
// - imageID is the current label by which we address this image and is recorded as metadata
// - repoName is the repository for the image and is recorded as metadata
// returns:
//
// modified handle
func (s *StorageProxy) AddImageToContainer(ctx context.Context, handle, deltaID, layerID, imageID string, config types.ContainerCreateConfig) (string, error) {
op := trace.FromContext(ctx, "AddImageToContainer: %s", deltaID)
Expand Down Expand Up @@ -299,6 +301,7 @@ func (s *StorageProxy) AddImageToContainer(ctx context.Context, handle, deltaID,
// If an error is returned, the returned handle should not be used.
//
// returns:
//
// modified handle
func (s *StorageProxy) AddVolumesToContainer(ctx context.Context, handle string, config types.ContainerCreateConfig) (string, error) {
op := trace.FromContext(ctx, "AddVolumesToContainer: %s", handle)
Expand Down Expand Up @@ -340,33 +343,52 @@ func (s *StorageProxy) AddVolumesToContainer(ctx context.Context, handle string,
}
}

// Create and join volumes.
// Create volumes in parallel
results := make(chan error, len(volList))
wg := sync.WaitGroup{}
wg.Add(len(volList))
for _, fields := range volList {
// We only set these here for volumes made on a docker create
volumeData := make(map[string]string)
volumeData[DriverArgFlagKey] = fields.Flags
volumeData[DriverArgContainerKey] = config.Name
volumeData[DriverArgImageKey] = config.Config.Image

// NOTE: calling volumeCreate regardless of whether the volume is already
// present can be avoided by adding an extra optional param to VolumeJoin,
// which would then call volumeCreate if the volume does not exist.
_, err := s.volumeCreate(op, fields.ID, "vsphere", volumeData, nil)
if err != nil {
go func(id, flags string) {
defer wg.Done()

// We only set these here for volumes made on a docker create
volumeData := make(map[string]string)
volumeData[DriverArgFlagKey] = flags
volumeData[DriverArgContainerKey] = config.Name
volumeData[DriverArgImageKey] = config.Config.Image

// NOTE: calling volumeCreate regardless of whether the volume is already
// present can be avoided by adding an extra optional param to VolumeJoin,
// which would then call volumeCreate if the volume does not exist.
_, err := s.volumeCreate(op, id, "vsphere", volumeData, nil)
if err == nil {
log.Infof("volumeCreate succeeded. Volume mount section ID: %s", id)
return
}

switch err := err.(type) {
case *storage.CreateVolumeConflict:
// Implicitly ignore the error where a volume with the same name
// already exists. We can just join the said volume to the container.
log.Infof("a volume with the name %s already exists", fields.ID)
log.Infof("a volume with the name %s already exists", id)
case *storage.CreateVolumeNotFound:
return handle, errors.VolumeCreateNotFoundError(volumeStore(volumeData))
results <- errors.VolumeCreateNotFoundError(volumeStore(volumeData))
default:
return handle, errors.InternalServerError(err.Error())
results <- errors.InternalServerError(err.Error())
}
} else {
log.Infof("volumeCreate succeeded. Volume mount section ID: %s", fields.ID)
}
}(fields.ID, fields.Flags)
}

wg.Wait()
if len(results) > 0 {
// TODO: should we attempt to do anything with errors other than the first?
// given we're not logging errors at this point prior to the parallelization I don't
// think it's needed.
return handle, <-results
}

// Attach volumes.
for _, fields := range volList {
flags := make(map[string]string)
//NOTE: for now we are passing the flags directly through. This is NOT SAFE and only a stop gap.
flags[constants.Mode] = fields.Flags
Expand Down
23 changes: 21 additions & 2 deletions lib/portlayer/storage/volume/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ import (
"github.com/vmware/vic/pkg/trace"
)

// pendingID is used as the ID inside a Volume before the ID is known. This is used to allow concurrent
// volume creation
const pendingID string = "pending"

// VolumeLookupCache caches Volume references to volumes in the system.
type VolumeLookupCache struct {

Expand Down Expand Up @@ -131,10 +135,25 @@ func (v *VolumeLookupCache) VolumeCreate(op trace.Operation, ID string, store *u
return nil, os.ErrExist
}

// if we exit this function without having replaced the placeholder Volume in the cache
// there was an error and we should delete the placeholder so a subsequent attempt at the
// same ID can proceed.
defer func() {
v.vlcLock.Lock()

vol, ok := v.vlc[ID]
if ok {
if vol.ID == pendingID {
delete(v.vlc, ID)
}
}
v.vlcLock.Unlock()
}()

// TODO: construct a proper async cache
// this is done because this path was blocking any concurrent volume create
v.vlc[ID] = volume.Volume{
ID: "pending",
ID: pendingID,
}

v.vlcLock.Unlock()
Expand All @@ -149,7 +168,7 @@ func (v *VolumeLookupCache) VolumeCreate(op trace.Operation, ID string, store *u
return nil, err
}

// Add it to the cache
// Replace the pending entry with the actual entry
v.vlcLock.Lock()
v.vlc[ID] = *vol
v.vlcLock.Unlock()
Expand Down
10 changes: 9 additions & 1 deletion lib/portlayer/storage/volume/vsphere/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,15 @@ func (v *VolumeStore) VolumeCreate(op trace.Operation, ID string, store *url.URL
if err != nil {
return nil, err
}

// TODO: handle the error that can come back from detach - if we hit this path nothing is going to attempt to detach this disk.
// That will mean:
// * the disk cannot be attached to a container (assuming it was prepped correctly)
// * we've leaked one of the limited number of disks that can be attached to the VCH at a time
// v.Detach has been modified to use async batching, so likely requires more structural revision, eg. a disk manager thread
// responsible for detach that this specific functional path can confidently delegate to instead of trying to handle inline.
defer v.Detach(op, vmdisk.VirtualDiskConfig)

vol, err := volume.NewVolume(store, ID, info, vmdisk, executor.CopyNew)
if err != nil {
return nil, err
Expand Down Expand Up @@ -149,7 +157,7 @@ func (v *VolumeStore) VolumeCreate(op trace.Operation, ID string, store *url.URL
return nil, err
}

op.Infof("volumestore: %s (%s)", ID, vol.SelfLink)
op.Infof("VolumeStore: %s (%s)", ID, vol.SelfLink)
return vol, nil
}

Expand Down
2 changes: 2 additions & 0 deletions lib/spec/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ func NewVirtualMachineConfigSpec(ctx context.Context, session *session.Session,
&types.OptionValue{Key: "disk.EnableUUID", Value: "true"},
// needed to avoid the questions that occur when attaching multiple disks with the same uuid (bugzilla 1362918)
&types.OptionValue{Key: "answer.msg.disk.duplicateUUID", Value: "Yes"},
// if we hit out of space errors then ensure we don't block other operations - observed during testing parallel anonymous volume create
&types.OptionValue{Key: "answer.msg.hbacommon.outofspace", Value: "Cancel"},
// needed to avoid the question that occur when opening a file backed serial port
&types.OptionValue{Key: "answer.msg.serial.file.open", Value: "Append"},

Expand Down
132 changes: 85 additions & 47 deletions pkg/vsphere/disk/disk_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (m *Manager) CreateAndAttach(op trace.Operation, config *VirtualDiskConfig)
// if it is then it's indicative of an error because it wasn't found in the cache, but this lets us recover
_, ferr := findDiskByFilename(op, m.vm, d.DatastoreURI.String(), d.IsPersistent())
if os.IsNotExist(ferr) {
if err := m.attach(op, config); err != nil {
if err := m.attach(op, config, nil); err != nil {
return nil, errors.Trace(err)
}
} else {
Expand All @@ -218,7 +218,7 @@ func (m *Manager) CreateAndAttach(op trace.Operation, config *VirtualDiskConfig)
op.Debugf("findDiskByFilename(%s) failed with %s", d.DatastoreURI, errors.ErrorStack(findErr))
}

if detachErr := m.detach(op, disk); detachErr != nil {
if detachErr := m.detach(op, disk, nil); detachErr != nil {
op.Debugf("detach(%s) failed with %s", d.DatastoreURI, errors.ErrorStack(detachErr))
}

Expand Down Expand Up @@ -395,8 +395,10 @@ func (m *Manager) dequeueBatch(op trace.Operation, data []interface{}) error {
return err
}

// Attach attempts to attach a virtual disk
func (m *Manager) attach(op trace.Operation, config *VirtualDiskConfig) error {
// attach attempts to attach the specified disk to the VM.
// if there is an error handling function, then this removal is queued and dispatched async.
// if the handling function is nil this blocks until operation is complete
func (m *Manager) attach(op trace.Operation, config *VirtualDiskConfig, errHandler func(error)) error {
defer trace.End(trace.Begin(""))

disk := m.toSpec(config)
Expand All @@ -412,34 +414,51 @@ func (m *Manager) attach(op trace.Operation, config *VirtualDiskConfig) error {
machineSpec := types.VirtualMachineConfigSpec{}
machineSpec.DeviceChange = append(machineSpec.DeviceChange, changeSpec...)

// ensure we abide by max attached disks limits
// ensure we abide by max attached disks limits at all times
// we undo this in error handling if we fail the attach
m.maxAttached <- true

// hickeng: I don't think this locking is needed - at least I cannot tell what for after having rewritten
// this for batching. It appears to be explicitly for serializing the reconfigure, I suspect to avoid TaskInProgress
// issues.
// m.mu.Lock()
// defer m.mu.Unlock()

// make sure the op is still valid as the above line could block for a long time
select {
case <-op.Done():
// if the op has been cancelled then we need to undo our allocation of an available disk slot
select {
case <-m.maxAttached:
default:
}

return op.Err()
default:
}

// batch the operation and run the error handling in the background when the batch completes
err = m.queueBatch(op, changeSpec[0], nil)
if err != nil {
select {
case <-m.maxAttached:
default:
handler := func(err error) {
if err != nil {
select {
case <-m.maxAttached:
default:
}

op.Errorf("vmdk storage driver failed to attach disk: %s", errors.ErrorStack(err))
}

op.Errorf("vmdk storage driver failed to attach disk: %s", errors.ErrorStack(err))
if errHandler != nil {
errHandler(err)
}
}

return nil
var wrapper func(error)
if errHandler != nil {
wrapper = handler
}

// batch the operation
// run the error handling in the background when the batch completes if a handler is provided
err = m.queueBatch(op, changeSpec[0], wrapper)
if errHandler == nil {
handler(err)
}

return err
}

// Detach attempts to detach a virtual disk
Expand Down Expand Up @@ -480,8 +499,7 @@ func (m *Manager) Detach(op trace.Operation, config *VirtualDiskConfig) error {
return errors.Trace(err)
}

// unlocking the cache here allows for parallel detach operations to occur,
// enabling batching
// unlocking the cache here allows for parallel detach operations to occur, enabling batching
m.disksLock.Unlock()

op.Infof("Detaching disk %s", d.DevicePath)
Expand All @@ -491,16 +509,21 @@ func (m *Manager) Detach(op trace.Operation, config *VirtualDiskConfig) error {
return errors.Trace(err)
}

if err = m.detach(op, disk); err != nil {
op.Errorf("detach for %s failed with %s", d.DevicePath, errors.ErrorStack(err))
return errors.Trace(err)
}
// run the result handler in the batch error handler
resultHandler := func(err error) {
if err != nil {
op.Errorf("detach for %s failed with %s", d.DevicePath, errors.ErrorStack(err))
return
}

// this deletes the disk from the disk cache
m.disksLock.Lock()
d.setDetached(op, m.Disks)
m.disksLock.Unlock()
// this deletes the disk from the disk cache
m.disksLock.Lock()
d.setDetached(op, m.Disks)
m.disksLock.Unlock()
}

// execution is async with a result handler, so no error processing here
m.detach(op, disk, resultHandler)
return nil
}

Expand All @@ -513,39 +536,54 @@ func (m *Manager) DetachAll(op trace.Operation) error {
}

for _, disk := range disks {
// TODO: pretty sure we do not need the additional error handling on this path as it's
// packed into m.detach to not drain the channel on error
m.detach(op, disk)
// it's packed into m.detach to not drain the reference counting channel on error
// but providing an error handler here allows this to happen in parallel
m.detach(op, disk, func(err error) {
op.Errorf("vmdk storage driver failed to detach disk: %s", errors.ErrorStack(err))
})
}

return err
}

func (m *Manager) detach(op trace.Operation, disk *types.VirtualDisk) error {
// detach removes the specified disk from the VM.
// if there is an error handling function, then this removal is queued and dispatched async.
// if the handling function is nil this blocks until operation is complete
func (m *Manager) detach(op trace.Operation, disk *types.VirtualDisk, errHandler func(error)) error {
config := &types.VirtualDeviceConfigSpec{
Device: disk,
Operation: types.VirtualDeviceConfigSpecOperationRemove,
}

// hickeng: I don't think this locking is needed - at least I cannot tell what for after having rewritten
// this for batching. It appears to be explicitly for serializing the reconfigure, I suspect to avoid TaskInProgress
// issues.
// m.mu.Lock()
// defer m.mu.Unlock()
handler := func(err error) {
if err != nil {
// can only enter here if the errHandler is nil, when this blocks until the op is completed
op.Errorf("vmdk storage driver failed to detach disk: %s", errors.ErrorStack(err))
} else {
select {
case <-m.maxAttached:
default:
}
}

if errHandler != nil {
errHandler(err)
}
}

// batch the operation and run the error handling in the background when the batch completes
err := m.queueBatch(op, config, nil)
if err != nil {
op.Errorf("vmdk storage driver failed to detach disk: %s", errors.ErrorStack(err))
return err
var wrapper func(error)
if errHandler != nil {
wrapper = handler
}

select {
case <-m.maxAttached:
default:
// batch the operation
// run the error handling in the background when the batch completes if a handler is provided
err := m.queueBatch(op, config, wrapper)
if errHandler == nil {
handler(err)
}

return nil
return err
}

func (m *Manager) devicePathByURI(op trace.Operation, datastoreURI *object.DatastorePath, persistent bool) (string, error) {
Expand Down
Loading

0 comments on commit 15ba6ec

Please sign in to comment.