Skip to content

Commit

Permalink
Implemented Orchestrator options
Browse files Browse the repository at this point in the history
Signed-off-by: Plamen Petrov <plamb0brt@gmail.com>
  • Loading branch information
plamenmpetrov authored and ustiugov committed Jul 22, 2020
1 parent 6d3d001 commit 7007b48
Show file tree
Hide file tree
Showing 15 changed files with 260 additions and 118 deletions.
4 changes: 2 additions & 2 deletions ctriface/Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
EXTRAGOARGS:=-v -race -cover
EXTRATESTFILES:=iface_test.go iface.go dialer.go
MANUALTESTFILES:=manual_cleanup_test.go iface.go dialer.go
EXTRATESTFILES:=iface_test.go iface.go dialer.go orch_options.go
MANUALTESTFILES:=manual_cleanup_test.go iface.go dialer.go orch_options.go

test:
sudo env "PATH=$(PATH)" go test $(EXTRATESTFILES) $(EXTRAGOARGS)
Expand Down
4 changes: 2 additions & 2 deletions ctriface/failing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestStartSnapStop(t *testing.T) {
ctx, cancel := context.WithTimeout(namespaces.WithNamespace(context.Background(), namespaceName), testTimeout)
defer cancel()

orch := NewOrchestrator("devmapper", 1, true)
orch := NewOrchestrator("devmapper", 1, WithTestModeOn(true))

vmID := "2"

Expand All @@ -46,7 +46,7 @@ func TestStartSnapStop(t *testing.T) {

time.Sleep(300 * time.Millisecond)

message, err = orch.LoadSnapshot(ctx, vmID, "/tmp/snapshot_file", "/tmp/mem_file", false)
message, err = orch.LoadSnapshot(ctx, vmID, "/tmp/snapshot_file", "/tmp/mem_file")
require.NoError(t, err, "Failed to load snapshot of VM, "+message)

message, err = orch.ResumeVM(ctx, vmID)
Expand Down
1 change: 1 addition & 0 deletions ctriface/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/ustiugov/fccd-orchestrator/helloworld v0.0.0-20200714130355-0a4bf3145bf7
github.com/ustiugov/fccd-orchestrator/misc v0.0.0-20200714130355-0a4bf3145bf7
github.com/ustiugov/fccd-orchestrator/taps v0.0.0-20200714130355-0a4bf3145bf7 // indirect
golang.org/x/tools v0.0.0-20200721032237-77f530d86f9a // indirect
google.golang.org/grpc v1.30.0

)
Expand Down
4 changes: 4 additions & 0 deletions ctriface/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTk
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b h1:Wh+f8QHJXR411sJR8/vRBTZ7YapZaRvUcLFFJhusH0k=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
Expand Down Expand Up @@ -576,7 +577,10 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200615222825-6aa8f57aacd9/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200710042808-f1c4188a97a1/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200713235242-6acd2ab80ede h1:ItmFoZpZfJTYGsnON6247QuXEOKiKLBpieGcAgweGsk=
golang.org/x/tools v0.0.0-20200713235242-6acd2ab80ede/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200721032237-77f530d86f9a h1:kVMPw4f6EVqYdfGQTedjrpw1dbE2PEMfw4jwXsNdn9s=
golang.org/x/tools v0.0.0-20200721032237-77f530d86f9a/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
Expand Down
28 changes: 21 additions & 7 deletions ctriface/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package ctriface

import (
"context"
"fmt"
"os"
"os/signal"
"strconv"
Expand Down Expand Up @@ -70,21 +71,24 @@ type Orchestrator struct {
client *containerd.Client
fcClient *fcclient.Client
// store *skv.KVStore
snapshotsEnabled bool
isUPFEnabled bool
}

// NewOrchestrator Initializes a new orchestrator
func NewOrchestrator(snapshotter string, niNum int, testModeOn bool) *Orchestrator {
func NewOrchestrator(snapshotter string, niNum int, opts ...OrchestratorOption) *Orchestrator {
var err error

o := new(Orchestrator)
o.niNum = niNum
o.vmPool = misc.NewVMPool(o.niNum)
o.cachedImages = make(map[string]containerd.Image)
o.snapshotter = snapshotter
o.snapshotsEnabled = false
o.isUPFEnabled = false

if !testModeOn {
o.setupCloseHandler()
o.setupHeartbeat()
for _, opt := range opts {
opt(o)
}

log.Info("Creating containerd client")
Expand All @@ -107,7 +111,7 @@ func (o *Orchestrator) getImage(ctx context.Context, imageName string) (*contain
image, found := o.cachedImages[imageName]
if !found {
var err error
log.Debugf("Pulling image %s", imageName)
log.Debug(fmt.Sprintf("Pulling image %s", imageName))
image, err = o.client.Pull(ctx, "docker.io/"+imageName,
containerd.WithPullUnpack,
containerd.WithPullSnapshotter(o.snapshotter),
Expand Down Expand Up @@ -439,6 +443,16 @@ func (o *Orchestrator) Cleanup() {
o.vmPool.RemoveBridges()
}

// GetSnapshotsEnabled Returns the snapshots mode of the orchestrator
func (o *Orchestrator) GetSnapshotsEnabled() bool {
return o.snapshotsEnabled
}

// GetUPFEnabled Returns the UPF mode of the orchestrator
func (o *Orchestrator) GetUPFEnabled() bool {
return o.isUPFEnabled
}

// PauseVM Pauses a VM
func (o *Orchestrator) PauseVM(ctx context.Context, vmID string) (string, error) {
logger := log.WithFields(log.Fields{"vmID": vmID})
Expand Down Expand Up @@ -498,7 +512,7 @@ func (o *Orchestrator) CreateSnapshot(ctx context.Context, vmID, snapPath, memPa
}

// LoadSnapshot Loads a snapshot of a VM
func (o *Orchestrator) LoadSnapshot(ctx context.Context, vmID, snapPath, memPath string, isUpf bool) (string, error) {
func (o *Orchestrator) LoadSnapshot(ctx context.Context, vmID, snapPath, memPath string) (string, error) {
logger := log.WithFields(log.Fields{"vmID": vmID})
logger.Debug("Orchestrator received LoadSnapshot")

Expand All @@ -508,7 +522,7 @@ func (o *Orchestrator) LoadSnapshot(ctx context.Context, vmID, snapPath, memPath
VMID: vmID,
SnapshotFilePath: snapPath,
MemFilePath: memPath,
EnableUserPF: isUpf,
EnableUserPF: o.GetUPFEnabled(),
}

if _, err := o.fcClient.LoadSnapshot(ctx, req); err != nil {
Expand Down
22 changes: 11 additions & 11 deletions ctriface/iface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestPauseSnapResume(t *testing.T) {
ctx, cancel := context.WithTimeout(namespaces.WithNamespace(context.Background(), namespaceName), testTimeout)
defer cancel()

orch := NewOrchestrator("devmapper", 1, true)
orch := NewOrchestrator("devmapper", 1, WithTestModeOn(true))

vmID := "4"

Expand Down Expand Up @@ -66,7 +66,7 @@ func TestStartStopSerial(t *testing.T) {
ctx, cancel := context.WithTimeout(namespaces.WithNamespace(context.Background(), namespaceName), testTimeout)
defer cancel()

orch := NewOrchestrator("devmapper", 1, true)
orch := NewOrchestrator("devmapper", 1, WithTestModeOn(true))

vmID := "5"

Expand Down Expand Up @@ -94,7 +94,7 @@ func TestPauseResumeSerial(t *testing.T) {
ctx, cancel := context.WithTimeout(namespaces.WithNamespace(context.Background(), namespaceName), testTimeout)
defer cancel()

orch := NewOrchestrator("devmapper", 1, true)
orch := NewOrchestrator("devmapper", 1, WithTestModeOn(true))

vmID := "6"

Expand Down Expand Up @@ -129,15 +129,15 @@ func TestStartStopParallel(t *testing.T) {
defer cancel()

vmNum := 10
orch := NewOrchestrator("devmapper", vmNum, true)
orch := NewOrchestrator("devmapper", vmNum, WithTestModeOn(true))

{
var vmGroup sync.WaitGroup
for i := 0; i < vmNum; i++ {
vmGroup.Add(1)
go func(i int) {
defer vmGroup.Done()
vmID := fmt.Sprintf("test_%d", i)
vmID := fmt.Sprintf("%d", i)
message, _, err := orch.StartVM(ctx, vmID, "ustiugov/helloworld:runner_workload")
require.NoError(t, err, "Failed to start VM, "+message)
}(i)
Expand All @@ -151,7 +151,7 @@ func TestStartStopParallel(t *testing.T) {
vmGroup.Add(1)
go func(i int) {
defer vmGroup.Done()
vmID := fmt.Sprintf("test_%d", i)
vmID := fmt.Sprintf("%d", i)
message, err := orch.StopSingleVM(ctx, vmID)
require.NoError(t, err, "Failed to stop VM, "+message)
}(i)
Expand All @@ -178,15 +178,15 @@ func TestPauseResumeParallel(t *testing.T) {
defer cancel()

vmNum := 10
orch := NewOrchestrator("devmapper", vmNum, true)
orch := NewOrchestrator("devmapper", vmNum, WithTestModeOn(true))

{
var vmGroup sync.WaitGroup
for i := 0; i < vmNum; i++ {
vmGroup.Add(1)
go func(i int) {
defer vmGroup.Done()
vmID := fmt.Sprintf("test_%d", i)
vmID := fmt.Sprintf("%d", i)
message, _, err := orch.StartVM(ctx, vmID, "ustiugov/helloworld:runner_workload")
require.NoError(t, err, "Failed to start VM, "+message)
}(i)
Expand All @@ -200,7 +200,7 @@ func TestPauseResumeParallel(t *testing.T) {
vmGroup.Add(1)
go func(i int) {
defer vmGroup.Done()
vmID := fmt.Sprintf("test_%d", i)
vmID := fmt.Sprintf("%d", i)
message, err := orch.PauseVM(ctx, vmID)
require.NoError(t, err, "Failed to pause VM, "+message)
}(i)
Expand All @@ -214,7 +214,7 @@ func TestPauseResumeParallel(t *testing.T) {
vmGroup.Add(1)
go func(i int) {
defer vmGroup.Done()
vmID := fmt.Sprintf("test_%d", i)
vmID := fmt.Sprintf("%d", i)
message, err := orch.ResumeVM(ctx, vmID)
require.NoError(t, err, "Failed to resume VM, "+message)
}(i)
Expand All @@ -228,7 +228,7 @@ func TestPauseResumeParallel(t *testing.T) {
vmGroup.Add(1)
go func(i int) {
defer vmGroup.Done()
vmID := fmt.Sprintf("test_%d", i)
vmID := fmt.Sprintf("%d", i)
message, err := orch.StopSingleVM(ctx, vmID)
require.NoError(t, err, "Failed to stop VM, "+message)
}(i)
Expand Down
18 changes: 9 additions & 9 deletions ctriface/manual_cleanup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestSnapLoad(t *testing.T) {
ctx, cancel := context.WithTimeout(namespaces.WithNamespace(context.Background(), namespaceName), testTimeout)
defer cancel()

orch := NewOrchestrator("devmapper", 1, true)
orch := NewOrchestrator("devmapper", 1, WithTestModeOn(true))

vmID := "1"

Expand All @@ -52,7 +52,7 @@ func TestSnapLoad(t *testing.T) {

time.Sleep(300 * time.Millisecond)

message, err = orch.LoadSnapshot(ctx, vmID, "/tmp/snapshot_file", "/tmp/mem_file", false)
message, err = orch.LoadSnapshot(ctx, vmID, "/tmp/snapshot_file", "/tmp/mem_file")
require.NoError(t, err, "Failed to load snapshot of VM, "+message)

message, err = orch.ResumeVM(ctx, vmID)
Expand All @@ -77,7 +77,7 @@ func TestSnapLoadMultiple(t *testing.T) {
ctx, cancel := context.WithTimeout(namespaces.WithNamespace(context.Background(), namespaceName), testTimeout)
defer cancel()

orch := NewOrchestrator("devmapper", 1, true)
orch := NewOrchestrator("devmapper", 1, WithTestModeOn(true))

vmID := "3"

Expand All @@ -95,7 +95,7 @@ func TestSnapLoadMultiple(t *testing.T) {

time.Sleep(300 * time.Millisecond)

message, err = orch.LoadSnapshot(ctx, vmID, "/tmp/snapshot_file1", "/tmp/mem_file1", false)
message, err = orch.LoadSnapshot(ctx, vmID, "/tmp/snapshot_file1", "/tmp/mem_file1")
require.NoError(t, err, "Failed to load snapshot of VM, "+message)

message, err = orch.ResumeVM(ctx, vmID)
Expand All @@ -106,7 +106,7 @@ func TestSnapLoadMultiple(t *testing.T) {

time.Sleep(300 * time.Millisecond)

message, err = orch.LoadSnapshot(ctx, vmID, "/tmp/snapshot_file1", "/tmp/mem_file1", false)
message, err = orch.LoadSnapshot(ctx, vmID, "/tmp/snapshot_file1", "/tmp/mem_file1")
require.NoError(t, err, "Failed to load snapshot of VM, "+message)

message, err = orch.ResumeVM(ctx, vmID)
Expand Down Expand Up @@ -136,7 +136,7 @@ func TestParallelSnapLoad(t *testing.T) {

vmNum := 5
vmIDBase := 6
orch := NewOrchestrator("devmapper", vmNum, true)
orch := NewOrchestrator("devmapper", vmNum, WithTestModeOn(true))

// Pull image to work around parallel pulling
message, _, err := orch.StartVM(ctx, "img_plr", "ustiugov/helloworld:runner_workload")
Expand Down Expand Up @@ -168,7 +168,7 @@ func TestParallelSnapLoad(t *testing.T) {

time.Sleep(300 * time.Millisecond)

message, err = orch.LoadSnapshot(ctx, vmID, snapshotFilePath, memFilePath, false)
message, err = orch.LoadSnapshot(ctx, vmID, snapshotFilePath, memFilePath)
require.NoError(t, err, "Failed to load snapshot of VM, "+vmID+", "+message)

message, err = orch.ResumeVM(ctx, vmID)
Expand Down Expand Up @@ -198,7 +198,7 @@ func TestParallelPhasedSnapLoad(t *testing.T) {

vmNum := 10
vmIDBase := 11
orch := NewOrchestrator("devmapper", vmNum, true)
orch := NewOrchestrator("devmapper", vmNum, WithTestModeOn(true))

// Pull image to work around parallel pulling
message, _, err := orch.StartVM(ctx, "img_plr", "ustiugov/helloworld:runner_workload")
Expand Down Expand Up @@ -276,7 +276,7 @@ func TestParallelPhasedSnapLoad(t *testing.T) {
vmID := fmt.Sprintf("%d", i+vmIDBase)
snapshotFilePath := fmt.Sprintf("/dev/snapshot_file_%s", vmID)
memFilePath := fmt.Sprintf("/dev/mem_file_%s", vmID)
message, err := orch.LoadSnapshot(ctx, vmID, snapshotFilePath, memFilePath, false)
message, err := orch.LoadSnapshot(ctx, vmID, snapshotFilePath, memFilePath)
require.NoError(t, err, "Failed to load snapshot of VM, "+vmID+", "+message)
}(i)
}
Expand Down
50 changes: 50 additions & 0 deletions ctriface/orch_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// MIT License
//
// Copyright (c) 2020 Plamen Petrov
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package ctriface

// OrchestratorOption Options to pass to Orchestrator
type OrchestratorOption func(*Orchestrator)

// WithTestModeOn Sets the test mode
func WithTestModeOn(testModeOn bool) OrchestratorOption {
return func(o *Orchestrator) {
if !testModeOn {
o.setupCloseHandler()
o.setupHeartbeat()
}
}
}

// WithSnapshots Sets the snapshot mode on or off
func WithSnapshots(snapshotsEnabled bool) OrchestratorOption {
return func(o *Orchestrator) {
o.snapshotsEnabled = snapshotsEnabled
}
}

// WithUPF Sets the UPF mode on or off
func WithUPF(isUPFEnabled bool) OrchestratorOption {
return func(o *Orchestrator) {
o.isUPFEnabled = isUPFEnabled
}
}
4 changes: 2 additions & 2 deletions failing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestServePauseSnapLoadServe(t *testing.T) {
imageName := "ustiugov/helloworld:runner_workload"
funcPool = NewFuncPool(false, 0, 0, true)

resp, err := funcPool.Serve(context.Background(), fID, imageName, "world")
resp, err := funcPool.Serve(context.Background(), fID, imageName, "world", false)
require.NoError(t, err, "Function returned error on 1st run")
require.Equal(t, resp.IsColdStart, true)
require.Equal(t, resp.Payload, "Hello, world!")
Expand All @@ -59,7 +59,7 @@ func TestServePauseSnapLoadServe(t *testing.T) {
_, err = orch.ResumeVM(context.Background(), fmt.Sprintf(fID+"_0"))
require.NoError(t, err, "Error when resuming VM")

resp, err = funcPool.Serve(context.Background(), fID, imageName, "world")
resp, err = funcPool.Serve(context.Background(), fID, imageName, "world", false)
require.NoError(t, err, "Function returned error on 2nd run")
require.Equal(t, resp.Payload, "Hello, world!")

Expand Down
Loading

0 comments on commit 7007b48

Please sign in to comment.