diff --git a/ctriface/Makefile b/ctriface/Makefile index ca139bbb0..ed3867a6f 100644 --- a/ctriface/Makefile +++ b/ctriface/Makefile @@ -1,6 +1,6 @@ EXTRAGOARGS:=-v -race -cover -EXTRATESTFILES:=iface_test.go iface.go dialer.go -MANUALTESTFILES:=manual_cleanup_test.go iface.go dialer.go +EXTRATESTFILES:=iface_test.go iface.go dialer.go orch_options.go +MANUALTESTFILES:=manual_cleanup_test.go iface.go dialer.go orch_options.go test: sudo env "PATH=$(PATH)" go test $(EXTRATESTFILES) $(EXTRAGOARGS) diff --git a/ctriface/failing_test.go b/ctriface/failing_test.go index 232af33bf..c5d1872b8 100644 --- a/ctriface/failing_test.go +++ b/ctriface/failing_test.go @@ -28,7 +28,7 @@ func TestStartSnapStop(t *testing.T) { ctx, cancel := context.WithTimeout(namespaces.WithNamespace(context.Background(), namespaceName), testTimeout) defer cancel() - orch := NewOrchestrator("devmapper", 1, true) + orch := NewOrchestrator("devmapper", 1, WithTestModeOn(true)) vmID := "2" @@ -46,7 +46,7 @@ func TestStartSnapStop(t *testing.T) { time.Sleep(300 * time.Millisecond) - message, err = orch.LoadSnapshot(ctx, vmID, "/tmp/snapshot_file", "/tmp/mem_file", false) + message, err = orch.LoadSnapshot(ctx, vmID, "/tmp/snapshot_file", "/tmp/mem_file") require.NoError(t, err, "Failed to load snapshot of VM, "+message) message, err = orch.ResumeVM(ctx, vmID) diff --git a/ctriface/go.mod b/ctriface/go.mod index bd339ca32..dc1cd9e4f 100644 --- a/ctriface/go.mod +++ b/ctriface/go.mod @@ -16,6 +16,7 @@ require ( github.com/ustiugov/fccd-orchestrator/helloworld v0.0.0-20200714130355-0a4bf3145bf7 github.com/ustiugov/fccd-orchestrator/misc v0.0.0-20200714130355-0a4bf3145bf7 github.com/ustiugov/fccd-orchestrator/taps v0.0.0-20200714130355-0a4bf3145bf7 // indirect + golang.org/x/tools v0.0.0-20200721032237-77f530d86f9a // indirect google.golang.org/grpc v1.30.0 ) diff --git a/ctriface/go.sum b/ctriface/go.sum index a5c711d34..8126f34f6 100644 --- a/ctriface/go.sum +++ b/ctriface/go.sum @@ -493,6 +493,7 @@ golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTk golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/lint v0.0.0-20200302205851-738671d3881b h1:Wh+f8QHJXR411sJR8/vRBTZ7YapZaRvUcLFFJhusH0k= golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -576,7 +577,10 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200615222825-6aa8f57aacd9/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20200710042808-f1c4188a97a1/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= +golang.org/x/tools v0.0.0-20200713235242-6acd2ab80ede h1:ItmFoZpZfJTYGsnON6247QuXEOKiKLBpieGcAgweGsk= golang.org/x/tools v0.0.0-20200713235242-6acd2ab80ede/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= +golang.org/x/tools v0.0.0-20200721032237-77f530d86f9a h1:kVMPw4f6EVqYdfGQTedjrpw1dbE2PEMfw4jwXsNdn9s= +golang.org/x/tools v0.0.0-20200721032237-77f530d86f9a/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= diff --git a/ctriface/iface.go b/ctriface/iface.go index 7e6dc9368..1045f9c4d 100644 --- a/ctriface/iface.go +++ b/ctriface/iface.go @@ -24,6 +24,7 @@ package ctriface import ( "context" + "fmt" "os" "os/signal" "strconv" @@ -70,10 +71,12 @@ type Orchestrator struct { client *containerd.Client fcClient *fcclient.Client // store *skv.KVStore + snapshotsEnabled bool + isUPFEnabled bool } // NewOrchestrator Initializes a new orchestrator -func NewOrchestrator(snapshotter string, niNum int, testModeOn bool) *Orchestrator { +func NewOrchestrator(snapshotter string, niNum int, opts ...OrchestratorOption) *Orchestrator { var err error o := new(Orchestrator) @@ -81,10 +84,11 @@ func NewOrchestrator(snapshotter string, niNum int, testModeOn bool) *Orchestrat o.vmPool = misc.NewVMPool(o.niNum) o.cachedImages = make(map[string]containerd.Image) o.snapshotter = snapshotter + o.snapshotsEnabled = false + o.isUPFEnabled = false - if !testModeOn { - o.setupCloseHandler() - o.setupHeartbeat() + for _, opt := range opts { + opt(o) } log.Info("Creating containerd client") @@ -107,7 +111,7 @@ func (o *Orchestrator) getImage(ctx context.Context, imageName string) (*contain image, found := o.cachedImages[imageName] if !found { var err error - log.Debugf("Pulling image %s", imageName) + log.Debug(fmt.Sprintf("Pulling image %s", imageName)) image, err = o.client.Pull(ctx, "docker.io/"+imageName, containerd.WithPullUnpack, containerd.WithPullSnapshotter(o.snapshotter), @@ -439,6 +443,16 @@ func (o *Orchestrator) Cleanup() { o.vmPool.RemoveBridges() } +// GetSnapshotsEnabled Returns the snapshots mode of the orchestrator +func (o *Orchestrator) GetSnapshotsEnabled() bool { + return o.snapshotsEnabled +} + +// GetUPFEnabled Returns the UPF mode of the orchestrator +func (o *Orchestrator) GetUPFEnabled() bool { + return o.isUPFEnabled +} + // PauseVM Pauses a VM func (o *Orchestrator) PauseVM(ctx context.Context, vmID string) (string, error) { logger := log.WithFields(log.Fields{"vmID": vmID}) @@ -498,7 +512,7 @@ func (o *Orchestrator) CreateSnapshot(ctx context.Context, vmID, snapPath, memPa } // LoadSnapshot Loads a snapshot of a VM -func (o *Orchestrator) LoadSnapshot(ctx context.Context, vmID, snapPath, memPath string, isUpf bool) (string, error) { +func (o *Orchestrator) LoadSnapshot(ctx context.Context, vmID, snapPath, memPath string) (string, error) { logger := log.WithFields(log.Fields{"vmID": vmID}) logger.Debug("Orchestrator received LoadSnapshot") @@ -508,7 +522,7 @@ func (o *Orchestrator) LoadSnapshot(ctx context.Context, vmID, snapPath, memPath VMID: vmID, SnapshotFilePath: snapPath, MemFilePath: memPath, - EnableUserPF: isUpf, + EnableUserPF: o.GetUPFEnabled(), } if _, err := o.fcClient.LoadSnapshot(ctx, req); err != nil { diff --git a/ctriface/iface_test.go b/ctriface/iface_test.go index c85aa7b9b..b695e173b 100644 --- a/ctriface/iface_test.go +++ b/ctriface/iface_test.go @@ -29,7 +29,7 @@ func TestPauseSnapResume(t *testing.T) { ctx, cancel := context.WithTimeout(namespaces.WithNamespace(context.Background(), namespaceName), testTimeout) defer cancel() - orch := NewOrchestrator("devmapper", 1, true) + orch := NewOrchestrator("devmapper", 1, WithTestModeOn(true)) vmID := "4" @@ -66,7 +66,7 @@ func TestStartStopSerial(t *testing.T) { ctx, cancel := context.WithTimeout(namespaces.WithNamespace(context.Background(), namespaceName), testTimeout) defer cancel() - orch := NewOrchestrator("devmapper", 1, true) + orch := NewOrchestrator("devmapper", 1, WithTestModeOn(true)) vmID := "5" @@ -94,7 +94,7 @@ func TestPauseResumeSerial(t *testing.T) { ctx, cancel := context.WithTimeout(namespaces.WithNamespace(context.Background(), namespaceName), testTimeout) defer cancel() - orch := NewOrchestrator("devmapper", 1, true) + orch := NewOrchestrator("devmapper", 1, WithTestModeOn(true)) vmID := "6" @@ -129,7 +129,7 @@ func TestStartStopParallel(t *testing.T) { defer cancel() vmNum := 10 - orch := NewOrchestrator("devmapper", vmNum, true) + orch := NewOrchestrator("devmapper", vmNum, WithTestModeOn(true)) { var vmGroup sync.WaitGroup @@ -137,7 +137,7 @@ func TestStartStopParallel(t *testing.T) { vmGroup.Add(1) go func(i int) { defer vmGroup.Done() - vmID := fmt.Sprintf("test_%d", i) + vmID := fmt.Sprintf("%d", i) message, _, err := orch.StartVM(ctx, vmID, "ustiugov/helloworld:runner_workload") require.NoError(t, err, "Failed to start VM, "+message) }(i) @@ -151,7 +151,7 @@ func TestStartStopParallel(t *testing.T) { vmGroup.Add(1) go func(i int) { defer vmGroup.Done() - vmID := fmt.Sprintf("test_%d", i) + vmID := fmt.Sprintf("%d", i) message, err := orch.StopSingleVM(ctx, vmID) require.NoError(t, err, "Failed to stop VM, "+message) }(i) @@ -178,7 +178,7 @@ func TestPauseResumeParallel(t *testing.T) { defer cancel() vmNum := 10 - orch := NewOrchestrator("devmapper", vmNum, true) + orch := NewOrchestrator("devmapper", vmNum, WithTestModeOn(true)) { var vmGroup sync.WaitGroup @@ -186,7 +186,7 @@ func TestPauseResumeParallel(t *testing.T) { vmGroup.Add(1) go func(i int) { defer vmGroup.Done() - vmID := fmt.Sprintf("test_%d", i) + vmID := fmt.Sprintf("%d", i) message, _, err := orch.StartVM(ctx, vmID, "ustiugov/helloworld:runner_workload") require.NoError(t, err, "Failed to start VM, "+message) }(i) @@ -200,7 +200,7 @@ func TestPauseResumeParallel(t *testing.T) { vmGroup.Add(1) go func(i int) { defer vmGroup.Done() - vmID := fmt.Sprintf("test_%d", i) + vmID := fmt.Sprintf("%d", i) message, err := orch.PauseVM(ctx, vmID) require.NoError(t, err, "Failed to pause VM, "+message) }(i) @@ -214,7 +214,7 @@ func TestPauseResumeParallel(t *testing.T) { vmGroup.Add(1) go func(i int) { defer vmGroup.Done() - vmID := fmt.Sprintf("test_%d", i) + vmID := fmt.Sprintf("%d", i) message, err := orch.ResumeVM(ctx, vmID) require.NoError(t, err, "Failed to resume VM, "+message) }(i) @@ -228,7 +228,7 @@ func TestPauseResumeParallel(t *testing.T) { vmGroup.Add(1) go func(i int) { defer vmGroup.Done() - vmID := fmt.Sprintf("test_%d", i) + vmID := fmt.Sprintf("%d", i) message, err := orch.StopSingleVM(ctx, vmID) require.NoError(t, err, "Failed to stop VM, "+message) }(i) diff --git a/ctriface/manual_cleanup_test.go b/ctriface/manual_cleanup_test.go index 24e0547c9..d56ebd141 100644 --- a/ctriface/manual_cleanup_test.go +++ b/ctriface/manual_cleanup_test.go @@ -31,7 +31,7 @@ func TestSnapLoad(t *testing.T) { ctx, cancel := context.WithTimeout(namespaces.WithNamespace(context.Background(), namespaceName), testTimeout) defer cancel() - orch := NewOrchestrator("devmapper", 1, true) + orch := NewOrchestrator("devmapper", 1, WithTestModeOn(true)) vmID := "1" @@ -52,7 +52,7 @@ func TestSnapLoad(t *testing.T) { time.Sleep(300 * time.Millisecond) - message, err = orch.LoadSnapshot(ctx, vmID, "/tmp/snapshot_file", "/tmp/mem_file", false) + message, err = orch.LoadSnapshot(ctx, vmID, "/tmp/snapshot_file", "/tmp/mem_file") require.NoError(t, err, "Failed to load snapshot of VM, "+message) message, err = orch.ResumeVM(ctx, vmID) @@ -77,7 +77,7 @@ func TestSnapLoadMultiple(t *testing.T) { ctx, cancel := context.WithTimeout(namespaces.WithNamespace(context.Background(), namespaceName), testTimeout) defer cancel() - orch := NewOrchestrator("devmapper", 1, true) + orch := NewOrchestrator("devmapper", 1, WithTestModeOn(true)) vmID := "3" @@ -95,7 +95,7 @@ func TestSnapLoadMultiple(t *testing.T) { time.Sleep(300 * time.Millisecond) - message, err = orch.LoadSnapshot(ctx, vmID, "/tmp/snapshot_file1", "/tmp/mem_file1", false) + message, err = orch.LoadSnapshot(ctx, vmID, "/tmp/snapshot_file1", "/tmp/mem_file1") require.NoError(t, err, "Failed to load snapshot of VM, "+message) message, err = orch.ResumeVM(ctx, vmID) @@ -106,7 +106,7 @@ func TestSnapLoadMultiple(t *testing.T) { time.Sleep(300 * time.Millisecond) - message, err = orch.LoadSnapshot(ctx, vmID, "/tmp/snapshot_file1", "/tmp/mem_file1", false) + message, err = orch.LoadSnapshot(ctx, vmID, "/tmp/snapshot_file1", "/tmp/mem_file1") require.NoError(t, err, "Failed to load snapshot of VM, "+message) message, err = orch.ResumeVM(ctx, vmID) @@ -136,7 +136,7 @@ func TestParallelSnapLoad(t *testing.T) { vmNum := 5 vmIDBase := 6 - orch := NewOrchestrator("devmapper", vmNum, true) + orch := NewOrchestrator("devmapper", vmNum, WithTestModeOn(true)) // Pull image to work around parallel pulling message, _, err := orch.StartVM(ctx, "img_plr", "ustiugov/helloworld:runner_workload") @@ -168,7 +168,7 @@ func TestParallelSnapLoad(t *testing.T) { time.Sleep(300 * time.Millisecond) - message, err = orch.LoadSnapshot(ctx, vmID, snapshotFilePath, memFilePath, false) + message, err = orch.LoadSnapshot(ctx, vmID, snapshotFilePath, memFilePath) require.NoError(t, err, "Failed to load snapshot of VM, "+vmID+", "+message) message, err = orch.ResumeVM(ctx, vmID) @@ -198,7 +198,7 @@ func TestParallelPhasedSnapLoad(t *testing.T) { vmNum := 10 vmIDBase := 11 - orch := NewOrchestrator("devmapper", vmNum, true) + orch := NewOrchestrator("devmapper", vmNum, WithTestModeOn(true)) // Pull image to work around parallel pulling message, _, err := orch.StartVM(ctx, "img_plr", "ustiugov/helloworld:runner_workload") @@ -276,7 +276,7 @@ func TestParallelPhasedSnapLoad(t *testing.T) { vmID := fmt.Sprintf("%d", i+vmIDBase) snapshotFilePath := fmt.Sprintf("/dev/snapshot_file_%s", vmID) memFilePath := fmt.Sprintf("/dev/mem_file_%s", vmID) - message, err := orch.LoadSnapshot(ctx, vmID, snapshotFilePath, memFilePath, false) + message, err := orch.LoadSnapshot(ctx, vmID, snapshotFilePath, memFilePath) require.NoError(t, err, "Failed to load snapshot of VM, "+vmID+", "+message) }(i) } diff --git a/ctriface/orch_options.go b/ctriface/orch_options.go new file mode 100644 index 000000000..6dfc06f74 --- /dev/null +++ b/ctriface/orch_options.go @@ -0,0 +1,50 @@ +// MIT License +// +// Copyright (c) 2020 Plamen Petrov +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package ctriface + +// OrchestratorOption Options to pass to Orchestrator +type OrchestratorOption func(*Orchestrator) + +// WithTestModeOn Sets the test mode +func WithTestModeOn(testModeOn bool) OrchestratorOption { + return func(o *Orchestrator) { + if !testModeOn { + o.setupCloseHandler() + o.setupHeartbeat() + } + } +} + +// WithSnapshots Sets the snapshot mode on or off +func WithSnapshots(snapshotsEnabled bool) OrchestratorOption { + return func(o *Orchestrator) { + o.snapshotsEnabled = snapshotsEnabled + } +} + +// WithUPF Sets the UPF mode on or off +func WithUPF(isUPFEnabled bool) OrchestratorOption { + return func(o *Orchestrator) { + o.isUPFEnabled = isUPFEnabled + } +} diff --git a/failing_test.go b/failing_test.go index 4dd6439f5..8bc261b2c 100644 --- a/failing_test.go +++ b/failing_test.go @@ -37,7 +37,7 @@ func TestServePauseSnapLoadServe(t *testing.T) { imageName := "ustiugov/helloworld:runner_workload" funcPool = NewFuncPool(false, 0, 0, true) - resp, err := funcPool.Serve(context.Background(), fID, imageName, "world") + resp, err := funcPool.Serve(context.Background(), fID, imageName, "world", false) require.NoError(t, err, "Function returned error on 1st run") require.Equal(t, resp.IsColdStart, true) require.Equal(t, resp.Payload, "Hello, world!") @@ -59,7 +59,7 @@ func TestServePauseSnapLoadServe(t *testing.T) { _, err = orch.ResumeVM(context.Background(), fmt.Sprintf(fID+"_0")) require.NoError(t, err, "Error when resuming VM") - resp, err = funcPool.Serve(context.Background(), fID, imageName, "world") + resp, err = funcPool.Serve(context.Background(), fID, imageName, "world", false) require.NoError(t, err, "Function returned error on 2nd run") require.Equal(t, resp.Payload, "Hello, world!") diff --git a/fccd-orchestrator.go b/fccd-orchestrator.go index 823bd9705..e8b6aa79e 100644 --- a/fccd-orchestrator.go +++ b/fccd-orchestrator.go @@ -97,7 +97,7 @@ func main() { } } - orch = ctriface.NewOrchestrator(*snapshotter, *niNum, false) + orch = ctriface.NewOrchestrator(*snapshotter, *niNum, ctriface.WithTestModeOn(false), ctriface.WithSnapshotsEnabled(true)) funcPool = NewFuncPool(*isSaveMemory, *servedThreshold, *pinnedFuncNum, false) @@ -148,7 +148,7 @@ func (s *server) StartVM(ctx context.Context, in *pb.StartVMReq) (*pb.StartVMRes imageName := in.GetImage() log.WithFields(log.Fields{"fID": fID, "image": imageName}).Info("Received direct StartVM") - message, err := funcPool.AddInstance(fID, imageName) + message, err := funcPool.AddInstance(fID, imageName, false) tProfile := "not supported anymore" //message, tProfile, err := orch.StartVM(ctx, fID, imageName) if err != nil { // does not return error @@ -189,5 +189,5 @@ func (s *fwdServer) FwdHello(ctx context.Context, in *hpb.FwdHelloReq) (*hpb.Fwd logger := log.WithFields(log.Fields{"fID": fID, "image": imageName, "payload": payload}) logger.Debug("Received FwdHelloVM") - return funcPool.Serve(ctx, fID, imageName, payload) + return funcPool.Serve(ctx, fID, imageName, payload, false) } diff --git a/fccd-orchestrator_test.go b/fccd-orchestrator_test.go index c9e764c85..d557506a4 100644 --- a/fccd-orchestrator_test.go +++ b/fccd-orchestrator_test.go @@ -29,6 +29,7 @@ import ( "strconv" "sync" "testing" + "time" ctrdlog "github.com/containerd/containerd/log" log "github.com/sirupsen/logrus" @@ -47,9 +48,9 @@ func TestMain(m *testing.M) { log.SetOutput(os.Stdout) - log.SetLevel(log.InfoLevel) + log.SetLevel(log.DebugLevel) - orch = ctriface.NewOrchestrator("devmapper", 10, true) + orch = ctriface.NewOrchestrator("devmapper", 10, ctriface.WithTestModeOn(true), ctriface.WithSnapshotsEnabled(true)) ret := m.Run() @@ -68,7 +69,7 @@ func TestPauseResumeSerial(t *testing.T) { imageName := "ustiugov/helloworld:runner_workload" funcPool = NewFuncPool(false, 0, 0, true) - resp, err := funcPool.Serve(context.Background(), fID, imageName, "world") + resp, err := funcPool.Serve(context.Background(), fID, imageName, "world", false) require.NoError(t, err, "Function returned error on 1st run") require.Equal(t, resp.IsColdStart, true) require.Equal(t, resp.Payload, "Hello, world!") @@ -78,14 +79,14 @@ func TestPauseResumeSerial(t *testing.T) { // NOTE: Current implementation just return error but does not time out //timeout_ctx, _ := context.WithTimeout(context.Background(), 20*time.Second) - resp, err = funcPool.Serve(context.Background(), fID, imageName, "world") + resp, err = funcPool.Serve(context.Background(), fID, imageName, "world", false) require.Error(t, err, "Function did not time out on 2nd run") require.Equal(t, resp.Payload, "") _, err = orch.ResumeVM(context.Background(), fmt.Sprintf(fID+"_0")) require.NoError(t, err, "Error when resuming VM") - resp, err = funcPool.Serve(context.Background(), fID, imageName, "world") + resp, err = funcPool.Serve(context.Background(), fID, imageName, "world", false) require.NoError(t, err, "Function returned error on 3rd run") require.Equal(t, resp.Payload, "Hello, world!") @@ -98,7 +99,7 @@ func TestServePauseSnapResumeServe(t *testing.T) { imageName := "ustiugov/helloworld:runner_workload" funcPool = NewFuncPool(false, 0, 0, true) - resp, err := funcPool.Serve(context.Background(), fID, imageName, "world") + resp, err := funcPool.Serve(context.Background(), fID, imageName, "world", false) require.NoError(t, err, "Function returned error on 1st run") require.Equal(t, resp.IsColdStart, true) require.Equal(t, resp.Payload, "Hello, world!") @@ -112,7 +113,7 @@ func TestServePauseSnapResumeServe(t *testing.T) { _, err = orch.ResumeVM(context.Background(), fmt.Sprintf(fID+"_0")) require.NoError(t, err, "Error when resuming VM") - resp, err = funcPool.Serve(context.Background(), fID, imageName, "world") + resp, err = funcPool.Serve(context.Background(), fID, imageName, "world", false) require.NoError(t, err, "Function returned error on 2nd run") require.Equal(t, resp.Payload, "Hello, world!") @@ -126,7 +127,7 @@ func TestSendToFunctionSerial(t *testing.T) { funcPool = NewFuncPool(false, 0, 0, true) for i := 0; i < 2; i++ { - resp, err := funcPool.Serve(context.Background(), fID, imageName, "world") + resp, err := funcPool.Serve(context.Background(), fID, imageName, "world", false) require.NoError(t, err, "Function returned error") if i == 0 { require.Equal(t, resp.IsColdStart, true) @@ -150,7 +151,7 @@ func TestSendToFunctionParallel(t *testing.T) { go func(i int) { defer vmGroup.Done() - resp, err := funcPool.Serve(context.Background(), fID, imageName, "world") + resp, err := funcPool.Serve(context.Background(), fID, imageName, "world", false) require.NoError(t, err, "Function returned error") require.Equal(t, resp.Payload, "Hello, world!") }(i) @@ -169,13 +170,14 @@ func TestStartSendStopTwice(t *testing.T) { for i := 0; i < 2; i++ { for k := 0; k < 2; k++ { - resp, err := funcPool.Serve(context.Background(), fID, imageName, "world") + resp, err := funcPool.Serve(context.Background(), fID, imageName, "world", false) require.NoError(t, err, "Function returned error") require.Equal(t, resp.Payload, "Hello, world!") } message, err := funcPool.RemoveInstance(fID, imageName) require.NoError(t, err, "Function returned error, "+message) + time.Sleep(300 * time.Millisecond) } servedGot := funcPool.stats.statMap[fID].served @@ -189,7 +191,7 @@ func TestStatsNotNumericFunction(t *testing.T) { imageName := "ustiugov/helloworld:runner_workload" funcPool = NewFuncPool(true, 1, 2, true) - resp, err := funcPool.Serve(context.Background(), fID, imageName, "world") + resp, err := funcPool.Serve(context.Background(), fID, imageName, "world", false) require.NoError(t, err, "Function returned error") require.Equal(t, resp.Payload, "Hello, world!") @@ -207,7 +209,7 @@ func TestStatsNotColdFunction(t *testing.T) { imageName := "ustiugov/helloworld:runner_workload" funcPool = NewFuncPool(true, 1, 11, true) - resp, err := funcPool.Serve(context.Background(), fID, imageName, "world") + resp, err := funcPool.Serve(context.Background(), fID, imageName, "world", false) require.NoError(t, err, "Function returned error") require.Equal(t, resp.Payload, "Hello, world!") @@ -226,7 +228,7 @@ func TestSaveMemorySerial(t *testing.T) { funcPool = NewFuncPool(true, 40, 2, true) for i := 0; i < 100; i++ { - resp, err := funcPool.Serve(context.Background(), fID, imageName, "world") + resp, err := funcPool.Serve(context.Background(), fID, imageName, "world", false) require.NoError(t, err, "Function returned error") require.Equal(t, resp.Payload, "Hello, world!") } @@ -250,7 +252,7 @@ func TestSaveMemoryParallel(t *testing.T) { go func(i int) { defer vmGroup.Done() - resp, err := funcPool.Serve(context.Background(), fID, imageName, "world") + resp, err := funcPool.Serve(context.Background(), fID, imageName, "world", false) require.NoError(t, err, "Function returned error") require.Equal(t, resp.Payload, "Hello, world!") }(i) @@ -270,10 +272,10 @@ func TestDirectStartStopVM(t *testing.T) { imageName := "ustiugov/helloworld:runner_workload" funcPool = NewFuncPool(false, 0, 0, true) - message, err := funcPool.AddInstance(fID, imageName) + message, err := funcPool.AddInstance(fID, imageName, false) require.NoError(t, err, "This error should never happen (addInstance())"+message) - resp, err := funcPool.Serve(context.Background(), fID, imageName, "world") + resp, err := funcPool.Serve(context.Background(), fID, imageName, "world", false) require.NoError(t, err, "Function returned error") require.Equal(t, resp.Payload, "Hello, world!") @@ -305,7 +307,7 @@ func TestAllFunctions(t *testing.T) { go func(fID int, imageName, request, response string) { defer vmGroup.Done() - resp, err := funcPool.Serve(context.Background(), strconv.Itoa(fID), imageName, request) + resp, err := funcPool.Serve(context.Background(), strconv.Itoa(fID), imageName, request, false) require.NoError(t, err, "Function returned error") require.Equal(t, resp.Payload, "Hello, "+response+"!") diff --git a/functions.go b/functions.go index c1f3c193d..73da5800f 100644 --- a/functions.go +++ b/functions.go @@ -108,14 +108,14 @@ func (p *FuncPool) getFunction(fID, imageName string) *Function { } // Serve Service RPC request by triggering the corresponding function. -func (p *FuncPool) Serve(ctx context.Context, fID, imageName, payload string) (*hpb.FwdHelloResp, error) { +func (p *FuncPool) Serve(ctx context.Context, fID, imageName, payload string, isUPF bool) (*hpb.FwdHelloResp, error) { f := p.getFunction(fID, imageName) - return f.Serve(ctx, fID, imageName, payload) + return f.Serve(ctx, fID, imageName, payload, isUPF) } // AddInstance Adds instance of the function -func (p *FuncPool) AddInstance(fID, imageName string) (string, error) { +func (p *FuncPool) AddInstance(fID, imageName string, isUPF bool) (string, error) { f := p.getFunction(fID, imageName) logger := log.WithFields(log.Fields{"fID": f.fID}) @@ -123,7 +123,7 @@ func (p *FuncPool) AddInstance(fID, imageName string) (string, error) { f.OnceAddInstance.Do( func() { logger.Debug("Function is inactive, starting the instance...") - f.AddInstance() + f.AddInstance(isUPF) }) return "Instance started", nil @@ -141,16 +141,18 @@ func (p *FuncPool) RemoveInstance(fID, imageName string) (string, error) { // Function type type Function struct { sync.RWMutex - OnceAddInstance *sync.Once - fID string - imageName string - vmIDList []string // FIXME: only a single VM per function is supported - lastInstanceID int - isPinnedInMem bool // if pinned, the orchestrator does not stop/offload it) - stats *Stats - servedTh uint64 - sem *semaphore.Weighted - servedSyncCounter int64 + OnceAddInstance *sync.Once + fID string + imageName string + vmID string + lastInstanceID int + isPinnedInMem bool // if pinned, the orchestrator does not stop/offload it) + stats *Stats + servedTh uint64 + sem *semaphore.Weighted + servedSyncCounter int64 + isSnapshotReady bool // if ready, the orchestrator should load the instance rather than creating it + OnceCreateSnapInstance *sync.Once } // NewFunction Initializes a function @@ -163,6 +165,8 @@ func NewFunction(fID, imageName string, Stats *Stats, servedTh uint64, isToPin b f.OnceAddInstance = new(sync.Once) f.isPinnedInMem = isToPin f.stats = Stats + f.isSnapshotReady = false + f.OnceCreateSnapInstance = new(sync.Once) // Normal distribution with stddev=servedTh/2, mean=servedTh thresh := int64(rand.NormFloat64()*float64(servedTh/2) + float64(servedTh)) @@ -200,7 +204,7 @@ func NewFunction(fID, imageName string, Stats *Stats, servedTh uint64, isToPin b // b. The last goroutine is determined by the atomic counter: the goroutine wih syncID==0 shuts down // the instance. // c. Instance shutdown is performed asynchronously because all instances have unique IDs. -func (f *Function) Serve(ctx context.Context, fID, imageName, reqPayload string) (*hpb.FwdHelloResp, error) { +func (f *Function) Serve(ctx context.Context, fID, imageName, reqPayload string, isUPF bool) (*hpb.FwdHelloResp, error) { syncID := int64(-1) // default is no synchronization logger := log.WithFields(log.Fields{"fID": f.fID}) @@ -221,7 +225,7 @@ func (f *Function) Serve(ctx context.Context, fID, imageName, reqPayload string) func() { isColdStart = true logger.Debug("Function is inactive, starting the instance...") - f.AddInstance() + f.AddInstance(isUPF) }) f.RLock() @@ -255,7 +259,11 @@ func (f *Function) Serve(ctx context.Context, fID, imageName, reqPayload string) if !f.isPinnedInMem && syncID == 0 { logger.Debugf("Function has to shut down its instance, served %d requests", f.GetStatServed()) - f.RemoveInstanceAsync() + if orch.GetSnapshotsEnabled() { + f.RemoveInstance() + } else { + f.RemoveInstanceAsync() + } f.ZeroServedStat() f.servedSyncCounter = int64(f.servedTh) // reset counter f.sem.Release(int64(f.servedTh)) @@ -264,10 +272,6 @@ func (f *Function) Serve(ctx context.Context, fID, imageName, reqPayload string) return &hpb.FwdHelloResp{IsColdStart: isColdStart, Payload: resp.Message}, err } -func (f *Function) getInstanceVMID() string { - return f.vmIDList[0] // TODO: load balancing when many instances are supported -} - // FwdRPC Forward the RPC to an instance, then forwards the response back. func (f *Function) fwdRPC(ctx context.Context, reqPayload string) (*hpb.HelloReply, error) { f.RLock() @@ -275,7 +279,7 @@ func (f *Function) fwdRPC(ctx context.Context, reqPayload string) (*hpb.HelloRep logger := log.WithFields(log.Fields{"fID": f.fID}) - funcClientPtr, err := orch.GetFuncClient(f.getInstanceVMID()) + funcClientPtr, err := orch.GetFuncClient(f.vmID) if err != nil { return &hpb.HelloReply{Message: "Failed to get function client"}, err } @@ -291,8 +295,7 @@ func (f *Function) fwdRPC(ctx context.Context, reqPayload string) (*hpb.HelloRep // AddInstance Starts a VM, waits till it is ready. // Note: this function is called from sync.Once construct -func (f *Function) AddInstance() { - // DMITRII: use LoadInstance here +func (f *Function) AddInstance(isUPF bool) { f.Lock() defer f.Unlock() @@ -300,20 +303,21 @@ func (f *Function) AddInstance() { logger.Debug("Adding instance") - vmID := fmt.Sprintf("%s_%d", f.fID, f.lastInstanceID) - ctx, cancel := context.WithTimeout(context.Background(), time.Second*120) defer cancel() - message, _, err := orch.StartVM(ctx, vmID, f.imageName) - if err != nil { - log.Panic(message, err) + if f.isSnapshotReady { + f.LoadInstance(isUPF) + } else { + message, _, err := orch.StartVM(ctx, f.getVMID(), f.imageName) + if err != nil { + log.Panic(message, err) + } + f.vmID = f.getVMID() + f.lastInstanceID++ } f.stats.IncStarted(f.fID) - - f.vmIDList = append(f.vmIDList, vmID) - f.lastInstanceID++ } // RemoveInstanceAsync Stops an instance (VM) of the function. @@ -345,42 +349,86 @@ func (f *Function) RemoveInstance() (string, error) { logger.Debug("Removing instance") - vmID := f.clearInstanceState() + if orch.GetSnapshotsEnabled() { + f.OffloadInstance() + + _ = f.clearInstanceState() + + f.Unlock() + return "Successfully offloaded instance " + f.vmID, nil + } f.Unlock() - return orch.StopSingleVM(context.Background(), vmID) + _ = f.clearInstanceState() + + return orch.StopSingleVM(context.Background(), f.vmID) } func (f *Function) clearInstanceState() (vmID string) { - vmID, f.vmIDList = f.vmIDList[0], f.vmIDList[1:] - - if len(f.vmIDList) == 0 { - f.OnceAddInstance = new(sync.Once) - } else { - log.Panic("List of function's instance is not empty after stopping an instance!") - } + f.OnceAddInstance = new(sync.Once) return vmID } // CreateInstanceSnapshot Shuts down the function's instance keeping its shim process alive -func (f *Function) CreateInstanceSnapshot() error { - // ctriface: pause & createSnap - return nil +func (f *Function) CreateInstanceSnapshot() { + logger := log.WithFields(log.Fields{"fID": f.fID}) + + logger.Debug("Creating instance snapshot") + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*120) + defer cancel() + + message, err := orch.PauseVM(ctx, f.vmID) + if err != nil { + log.Panic(message, err) + } + message, err = orch.CreateSnapshot(ctx, f.vmID, f.getSnapshotFilePath(), f.getMemFilePath()) + if err != nil { + log.Panic(message, err) + } } -// OffloadInstance Creates a snapshot of the function's instance -func (f *Function) OffloadInstance() error { - // ctriface: offload +// OffloadInstance Offloads the instance +func (f *Function) OffloadInstance() { + logger := log.WithFields(log.Fields{"fID": f.fID}) + + f.OnceCreateSnapInstance.Do( + func() { + logger.Debug("First time offloading, need to create a snapshot first") + f.CreateInstanceSnapshot() + f.isSnapshotReady = true + }) + + logger.Debug("Offloading instance") - return nil + ctx, cancel := context.WithTimeout(context.Background(), time.Second*120) + defer cancel() + + message, err := orch.Offload(ctx, f.vmID) + if err != nil { + log.Panic(message, err) + } } -// LoadInstance Loads a new instance of the function from its snapshot -func (f *Function) LoadInstance() error { - // ctriface: load & resume +// LoadInstance Loads a new instance of the function from its snapshot and resumes it +func (f *Function) LoadInstance(isUPF bool) { + logger := log.WithFields(log.Fields{"fID": f.fID}) + + logger.Debug("Loading instance") + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*120) + defer cancel() + + message, err := orch.LoadSnapshot(ctx, f.vmID, f.getSnapshotFilePath(), f.getMemFilePath(), isUPF) + if err != nil { + log.Panic(message, err) + } - return nil + message, err = orch.ResumeVM(ctx, f.vmID) + if err != nil { + log.Panic(message, err) + } } // GetStatServed Returns the served counter value @@ -392,3 +440,18 @@ func (f *Function) GetStatServed() uint64 { func (f *Function) ZeroServedStat() { atomic.StoreUint64(&f.stats.statMap[f.fID].served, 0) } + +// getVMID Creates the vmID for the function +func (f *Function) getVMID() string { + return fmt.Sprintf("%s_%d", f.fID, f.lastInstanceID) +} + +// getSnapshotFilePath Creates the snapshot file path for the function +func (f *Function) getSnapshotFilePath() string { + return fmt.Sprintf("/dev/snap_file_%s", f.vmID) +} + +// getMemFilePath Creates the memory file path for the function +func (f *Function) getMemFilePath() string { + return fmt.Sprintf("/dev/mem_file_%s", f.vmID) +} diff --git a/go.mod b/go.mod index 9e281bfbf..e024978da 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/golang/protobuf v1.3.3 github.com/sirupsen/logrus v1.6.0 github.com/stretchr/testify v1.5.1 - github.com/ustiugov/fccd-orchestrator/ctriface v0.0.0-20200717112852-e54464c54909 + github.com/ustiugov/fccd-orchestrator/ctriface v0.0.0-20200720155131-b39156b53543 github.com/ustiugov/fccd-orchestrator/helloworld v0.0.0-20200714162243-d6dc0c083e9e github.com/ustiugov/fccd-orchestrator/misc v0.0.0-20200714162243-d6dc0c083e9e // indirect github.com/ustiugov/fccd-orchestrator/proto v0.0.0-20200714162243-d6dc0c083e9e diff --git a/go.sum b/go.sum index e9c8d2a5d..d1de907dd 100644 --- a/go.sum +++ b/go.sum @@ -359,6 +359,14 @@ github.com/ustiugov/fccd-orchestrator/ctriface v0.0.0-20200714173637-bc269d5f428 github.com/ustiugov/fccd-orchestrator/ctriface v0.0.0-20200714173637-bc269d5f4288/go.mod h1:y72oQ9E3hRbVmL4b9TWPuB4PXwJbP+A5ZH8D5oG8piI= github.com/ustiugov/fccd-orchestrator/ctriface v0.0.0-20200717112852-e54464c54909 h1:mGYHO9kHWGW4eli5xuqAXq1jrMKLxvNTtIyCAkyAvwk= github.com/ustiugov/fccd-orchestrator/ctriface v0.0.0-20200717112852-e54464c54909/go.mod h1:8PPjt6r9ZHN2bNeRf5ptiafNMI939RxwqmHBBx/IZro= +github.com/ustiugov/fccd-orchestrator/ctriface v0.0.0-20200720131908-741569ae1a16 h1:M24lmM3VgYPkGLlaj6pVTQKvTyGomXbEqin5LLwuiuo= +github.com/ustiugov/fccd-orchestrator/ctriface v0.0.0-20200720131908-741569ae1a16/go.mod h1:8PPjt6r9ZHN2bNeRf5ptiafNMI939RxwqmHBBx/IZro= +github.com/ustiugov/fccd-orchestrator/ctriface v0.0.0-20200720143659-c661962f8c49 h1:REM73la51EBDFFhylCiRL1bQS3pDqMdQc/LoX5k+aJ0= +github.com/ustiugov/fccd-orchestrator/ctriface v0.0.0-20200720143659-c661962f8c49/go.mod h1:8PPjt6r9ZHN2bNeRf5ptiafNMI939RxwqmHBBx/IZro= +github.com/ustiugov/fccd-orchestrator/ctriface v0.0.0-20200720145710-fee692dcfd5f h1:FaAgj/bJTmQ6siSamLoqtcfVpu/i2fRccXu2SJdU+Uw= +github.com/ustiugov/fccd-orchestrator/ctriface v0.0.0-20200720145710-fee692dcfd5f/go.mod h1:8PPjt6r9ZHN2bNeRf5ptiafNMI939RxwqmHBBx/IZro= +github.com/ustiugov/fccd-orchestrator/ctriface v0.0.0-20200720155131-b39156b53543 h1:mKInacxPreeyeeskWfvjzWN251+WjDbhVLiXsm6FF7E= +github.com/ustiugov/fccd-orchestrator/ctriface v0.0.0-20200720155131-b39156b53543/go.mod h1:8PPjt6r9ZHN2bNeRf5ptiafNMI939RxwqmHBBx/IZro= github.com/ustiugov/fccd-orchestrator/helloworld v0.0.0-20200710144657-9fbec6857e48/go.mod h1:5dhCs/XynpQoQcrhd/YgUBjGahhNpTknQUcC1kHRCaA= github.com/ustiugov/fccd-orchestrator/helloworld v0.0.0-20200710145415-bb09d1a68889/go.mod h1:5dhCs/XynpQoQcrhd/YgUBjGahhNpTknQUcC1kHRCaA= github.com/ustiugov/fccd-orchestrator/helloworld v0.0.0-20200710150633-096cac68bd72 h1:r67pqykSYWZHFYfIKuT44PTXFJWu5lJcatkV16d3vKU= diff --git a/manual_cleanup_test.go b/manual_cleanup_test.go index fae5d7bd1..5cfde02c3 100644 --- a/manual_cleanup_test.go +++ b/manual_cleanup_test.go @@ -39,7 +39,7 @@ func TestParallelLoadServe(t *testing.T) { funcPool = NewFuncPool(false, 0, 0, true) // Pull image to work around parallel pulling limitation - resp, err := funcPool.Serve(context.Background(), "plr_fnc", imageName, "world") + resp, err := funcPool.Serve(context.Background(), "plr_fnc", imageName, "world", false) require.NoError(t, err, "Function returned error") require.Equal(t, resp.Payload, "Hello, world!") // ----------------------------------------------------- @@ -56,7 +56,7 @@ func TestParallelLoadServe(t *testing.T) { snapshotFilePath := fmt.Sprintf("/dev/snapshot_file_%s", fID) memFilePath := fmt.Sprintf("/dev/mem_file_%s", fID) - resp, err := funcPool.Serve(context.Background(), fID, imageName, "world") + resp, err := funcPool.Serve(context.Background(), fID, imageName, "world", false) require.NoError(t, err, "Function returned error on 1st run") require.Equal(t, resp.Payload, "Hello, world!") @@ -77,7 +77,7 @@ func TestParallelLoadServe(t *testing.T) { _, err = orch.ResumeVM(context.Background(), vmID) require.NoError(t, err, "Error when resuming VM") - resp, err = funcPool.Serve(context.Background(), fID, imageName, "world") + resp, err = funcPool.Serve(context.Background(), fID, imageName, "world", false) require.NoError(t, err, "Function returned error on 2nd run") require.Equal(t, resp.Payload, "Hello, world!") }(i) @@ -97,7 +97,7 @@ func TestLoadServeMultiple1(t *testing.T) { snapshotFile := "/tmp/snap_test_" + fID memFile := "/tmp/mem_test_" + fID - resp, err := funcPool.Serve(context.Background(), fID, imageName, "world") + resp, err := funcPool.Serve(context.Background(), fID, imageName, "world", false) require.NoError(t, err, "Function returned error on 1st run") require.Equal(t, resp.IsColdStart, true) require.Equal(t, resp.Payload, "Hello, world!") @@ -119,7 +119,7 @@ func TestLoadServeMultiple1(t *testing.T) { _, err = orch.ResumeVM(context.Background(), fmt.Sprintf(fID+"_0")) require.NoError(t, err, "Error when resuming VM") - resp, err = funcPool.Serve(context.Background(), fID, imageName, "world") + resp, err = funcPool.Serve(context.Background(), fID, imageName, "world", false) require.NoError(t, err, "Function returned error on 2nd run") require.Equal(t, resp.Payload, "Hello, world!") @@ -134,7 +134,7 @@ func TestLoadServeMultiple1(t *testing.T) { _, err = orch.ResumeVM(context.Background(), fmt.Sprintf(fID+"_0")) require.NoError(t, err, "Error when resuming VM") - resp, err = funcPool.Serve(context.Background(), fID, imageName, "world") + resp, err = funcPool.Serve(context.Background(), fID, imageName, "world", false) require.NoError(t, err, "Function returned error on 3rd run") require.Equal(t, resp.Payload, "Hello, world!") } @@ -151,7 +151,7 @@ func TestLoadServeMultiple2(t *testing.T) { snapshotFile := "/tmp/snap_test_" + fID memFile := "/tmp/mem_test_" + fID - resp, err := funcPool.Serve(context.Background(), fID, imageName, "world") + resp, err := funcPool.Serve(context.Background(), fID, imageName, "world", false) require.NoError(t, err, "Function returned error on 1st run") require.Equal(t, resp.IsColdStart, true) require.Equal(t, resp.Payload, "Hello, world!") @@ -173,11 +173,11 @@ func TestLoadServeMultiple2(t *testing.T) { _, err = orch.ResumeVM(context.Background(), fmt.Sprintf(fID+"_0")) require.NoError(t, err, "Error when resuming VM") - resp, err = funcPool.Serve(context.Background(), fID, imageName, "world") + resp, err = funcPool.Serve(context.Background(), fID, imageName, "world", false) require.NoError(t, err, "Function returned error on 2nd run") require.Equal(t, resp.Payload, "Hello, world!") - resp, err = funcPool.Serve(context.Background(), fID, imageName, "world") + resp, err = funcPool.Serve(context.Background(), fID, imageName, "world", false) require.NoError(t, err, "Function returned error on 3rd run") require.Equal(t, resp.Payload, "Hello, world!") @@ -208,7 +208,7 @@ func TestLoadServeMultiple3(t *testing.T) { snapshotFile := "/tmp/snap_test_" + fID memFile := "/tmp/mem_test_" + fID - resp, err := funcPool.Serve(context.Background(), fID, imageName, "world") + resp, err := funcPool.Serve(context.Background(), fID, imageName, "world", false) require.NoError(t, err, "Function returned error on 1st run") require.Equal(t, resp.IsColdStart, true) require.Equal(t, resp.Payload, "Hello, world!") @@ -241,11 +241,11 @@ func TestLoadServeMultiple3(t *testing.T) { _, err = orch.ResumeVM(context.Background(), fmt.Sprintf(fID+"_0")) require.NoError(t, err, "Error when resuming VM") - resp, err = funcPool.Serve(context.Background(), fID, imageName, "world") + resp, err = funcPool.Serve(context.Background(), fID, imageName, "world", false) require.NoError(t, err, "Function returned error on 2nd run") require.Equal(t, resp.Payload, "Hello, world!") - resp, err = funcPool.Serve(context.Background(), fID, imageName, "world") + resp, err = funcPool.Serve(context.Background(), fID, imageName, "world", false) require.NoError(t, err, "Function returned error on 3rd run") require.Equal(t, resp.Payload, "Hello, world!") }