Skip to content

Commit

Permalink
Serve benchmarking
Browse files Browse the repository at this point in the history
Signed-off-by: Plamen Petrov <plamb0brt@gmail.com>
  • Loading branch information
plamenmpetrov authored and ustiugov committed Jul 27, 2020
1 parent 87529e2 commit 25728c1
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 41 deletions.
132 changes: 132 additions & 0 deletions bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// MIT License
//
// Copyright (c) 2020 Plamen Petrov
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package main

import (
"context"
"os"
"os/exec"
"testing"

log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"github.com/ustiugov/fccd-orchestrator/metrics"
)

func TestBenchmarkServeWithCache(t *testing.T) {
fID := "1"
imageName := "ustiugov/helloworld:runner_workload"
var (
servedTh uint64
pinnedFuncNum int
isSyncOffload bool = true
)
funcPool = NewFuncPool(!isSaveMemoryConst, servedTh, pinnedFuncNum, isTestModeConst)

// Pull image to work around parallel pulling limitation
resp, _, err := funcPool.Serve(context.Background(), "plr_fnc", imageName, "world")
require.NoError(t, err, "Function returned error")
require.Equal(t, resp.Payload, "Hello, world!")

message, err := funcPool.RemoveInstance("plr_fnc", imageName, isSyncOffload)
require.NoError(t, err, "Function returned error, "+message)
// -----------------------------------------------------

resp, _, err = funcPool.Serve(context.Background(), fID, imageName, "world")
require.NoError(t, err, "Function returned error")
require.Equal(t, resp.Payload, "Hello, world!")

message, err = funcPool.RemoveInstance(fID, imageName, isSyncOffload)
require.NoError(t, err, "Function returned error, "+message)

benchCount := 10
serveStats := make([]*metrics.ServeStat, benchCount)

for i := 0; i < 10; i++ {
resp, stat, err := funcPool.Serve(context.Background(), fID, imageName, "world")
require.NoError(t, err, "Function returned error")
require.Equal(t, resp.Payload, "Hello, world!")

serveStats[i] = stat

message, err := funcPool.RemoveInstance(fID, imageName, isSyncOffload)
require.NoError(t, err, "Function returned error, "+message)
}

metrics.PrintServeStats(serveStats...)
}

func TestBenchmarkServeNoCache(t *testing.T) {
fID := "1"
imageName := "ustiugov/helloworld:runner_workload"
var (
servedTh uint64
pinnedFuncNum int
isSyncOffload bool = true
)
funcPool = NewFuncPool(!isSaveMemoryConst, servedTh, pinnedFuncNum, isTestModeConst)

// Pull image to work around parallel pulling limitation
resp, _, err := funcPool.Serve(context.Background(), "plr_fnc", imageName, "world")
require.NoError(t, err, "Function returned error")
require.Equal(t, resp.Payload, "Hello, world!")

message, err := funcPool.RemoveInstance("plr_fnc", imageName, isSyncOffload)
require.NoError(t, err, "Function returned error, "+message)
// -----------------------------------------------------

resp, _, err = funcPool.Serve(context.Background(), fID, imageName, "world")
require.NoError(t, err, "Function returned error")
require.Equal(t, resp.Payload, "Hello, world!")

message, err = funcPool.RemoveInstance(fID, imageName, isSyncOffload)
require.NoError(t, err, "Function returned error, "+message)

benchCount := 10
serveStats := make([]*metrics.ServeStat, benchCount)

for i := 0; i < 10; i++ {
dropPageCache()

resp, stat, err := funcPool.Serve(context.Background(), fID, imageName, "world")
require.NoError(t, err, "Function returned error")
require.Equal(t, resp.Payload, "Hello, world!")

serveStats[i] = stat

message, err := funcPool.RemoveInstance(fID, imageName, isSyncOffload)
require.NoError(t, err, "Function returned error, "+message)
}

metrics.PrintServeStats(serveStats...)
}

func dropPageCache() {
cmd := exec.Command("sudo", "/bin/sh", "-c", "sync; echo 1 > /proc/sys/vm/drop_caches")
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stdout

if err := cmd.Run(); err != nil {
log.Fatalf("Failed to drop caches: %v", err)
}
}
16 changes: 12 additions & 4 deletions ctriface/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ func TestBenchmarkLoadSnapshotWithCache(t *testing.T) {
benchCount := 10
loadStats := make([]*metrics.LoadSnapshotStat, benchCount)

snapshotFile := "/dev/snapshot_file"
memFile := "/dev/mem_file"
snapshotFile := "/snapshot_file"
memFile := "/mem_file"

// Pull image and prepare snapshot
message, _, err := orch.StartVM(ctx, vmID, "ustiugov/helloworld:runner_workload")
Expand All @@ -119,6 +119,14 @@ func TestBenchmarkLoadSnapshotWithCache(t *testing.T) {

time.Sleep(300 * time.Millisecond)

message, _, err = orch.LoadSnapshot(ctx, vmID, snapshotFile, memFile)
require.NoError(t, err, "Failed to load snapshot of VM, "+message)

message, err = orch.Offload(ctx, vmID)
require.NoError(t, err, "Failed to offload VM, "+message)

time.Sleep(300 * time.Millisecond)

for i := 0; i < benchCount; i++ {
message, stat, err := orch.LoadSnapshot(ctx, vmID, snapshotFile, memFile)
require.NoError(t, err, "Failed to load snapshot of VM, "+message)
Expand Down Expand Up @@ -158,8 +166,8 @@ func TestBenchmarkLoadSnapshotNoCache(t *testing.T) {
benchCount := 10
loadStats := make([]*metrics.LoadSnapshotStat, benchCount)

snapshotFile := "/dev/snapshot_file"
memFile := "/dev/mem_file"
snapshotFile := "/snapshot_file"
memFile := "/mem_file"

// Pull image and prepare snapshot
message, _, err := orch.StartVM(ctx, vmID, "ustiugov/helloworld:runner_workload")
Expand Down
3 changes: 2 additions & 1 deletion fccd-orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,5 +192,6 @@ func (s *fwdServer) FwdHello(ctx context.Context, in *hpb.FwdHelloReq) (*hpb.Fwd
logger := log.WithFields(log.Fields{"fID": fID, "image": imageName, "payload": payload})
logger.Debug("Received FwdHelloVM")

return funcPool.Serve(ctx, fID, imageName, payload)
resp, _, err := funcPool.Serve(ctx, fID, imageName, payload)
return resp, err
}
18 changes: 9 additions & 9 deletions fccd-orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestSendToFunctionSerial(t *testing.T) {
funcPool = NewFuncPool(!isSaveMemoryConst, servedTh, pinnedFuncNum, isTestModeConst)

for i := 0; i < 2; i++ {
resp, err := funcPool.Serve(context.Background(), fID, imageName, "world")
resp, _, err := funcPool.Serve(context.Background(), fID, imageName, "world")
require.NoError(t, err, "Function returned error")
if i == 0 {
require.Equal(t, resp.IsColdStart, true)
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestSendToFunctionParallel(t *testing.T) {

go func(i int) {
defer vmGroup.Done()
resp, err := funcPool.Serve(context.Background(), fID, imageName, "world")
resp, _, err := funcPool.Serve(context.Background(), fID, imageName, "world")
require.NoError(t, err, "Function returned error")
require.Equal(t, resp.Payload, "Hello, world!")
}(i)
Expand All @@ -140,7 +140,7 @@ func TestStartSendStopTwice(t *testing.T) {

for i := 0; i < 2; i++ {
for k := 0; k < 2; k++ {
resp, err := funcPool.Serve(context.Background(), fID, imageName, "world")
resp, _, err := funcPool.Serve(context.Background(), fID, imageName, "world")
require.NoError(t, err, "Function returned error")
require.Equal(t, resp.Payload, "Hello, world!")
}
Expand All @@ -164,7 +164,7 @@ func TestStatsNotNumericFunction(t *testing.T) {
)
funcPool = NewFuncPool(isSaveMemoryConst, servedTh, pinnedFuncNum, isTestModeConst)

resp, err := funcPool.Serve(context.Background(), fID, imageName, "world")
resp, _, err := funcPool.Serve(context.Background(), fID, imageName, "world")
require.NoError(t, err, "Function returned error")
require.Equal(t, resp.Payload, "Hello, world!")

Expand All @@ -186,7 +186,7 @@ func TestStatsNotColdFunction(t *testing.T) {
)
funcPool = NewFuncPool(isSaveMemoryConst, servedTh, pinnedFuncNum, isTestModeConst)

resp, err := funcPool.Serve(context.Background(), fID, imageName, "world")
resp, _, err := funcPool.Serve(context.Background(), fID, imageName, "world")
require.NoError(t, err, "Function returned error")
require.Equal(t, resp.Payload, "Hello, world!")

Expand All @@ -209,7 +209,7 @@ func TestSaveMemorySerial(t *testing.T) {
funcPool = NewFuncPool(isSaveMemoryConst, servedTh, pinnedFuncNum, isTestModeConst)

for i := 0; i < 100; i++ {
resp, err := funcPool.Serve(context.Background(), fID, imageName, "world")
resp, _, err := funcPool.Serve(context.Background(), fID, imageName, "world")
require.NoError(t, err, "Function returned error")
require.Equal(t, resp.Payload, "Hello, world!")
}
Expand Down Expand Up @@ -237,7 +237,7 @@ func TestSaveMemoryParallel(t *testing.T) {
go func(i int) {
defer vmGroup.Done()

resp, err := funcPool.Serve(context.Background(), fID, imageName, "world")
resp, _, err := funcPool.Serve(context.Background(), fID, imageName, "world")
require.NoError(t, err, "Function returned error")
require.Equal(t, resp.Payload, "Hello, world!")
}(i)
Expand All @@ -264,7 +264,7 @@ func TestDirectStartStopVM(t *testing.T) {
message, err := funcPool.AddInstance(fID, imageName)
require.NoError(t, err, "This error should never happen (addInstance())"+message)

resp, err := funcPool.Serve(context.Background(), fID, imageName, "world")
resp, _, err := funcPool.Serve(context.Background(), fID, imageName, "world")
require.NoError(t, err, "Function returned error")
require.Equal(t, resp.Payload, "Hello, world!")

Expand Down Expand Up @@ -300,7 +300,7 @@ func TestAllFunctions(t *testing.T) {
go func(fID int, imageName, request, response string) {
defer vmGroup.Done()

resp, err := funcPool.Serve(context.Background(), strconv.Itoa(8+fID), imageName, request)
resp, _, err := funcPool.Serve(context.Background(), strconv.Itoa(8+fID), imageName, request)
require.NoError(t, err, "Function returned error")

require.Equal(t, resp.Payload, "Hello, "+response+"!")
Expand Down
35 changes: 26 additions & 9 deletions functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (

log "github.com/sirupsen/logrus"
hpb "github.com/ustiugov/fccd-orchestrator/helloworld"
"github.com/ustiugov/fccd-orchestrator/metrics"
)

var isTestMode bool // set with a call to NewFuncPool
Expand Down Expand Up @@ -109,7 +110,7 @@ func (p *FuncPool) getFunction(fID, imageName string) *Function {
}

// Serve Service RPC request by triggering the corresponding function.
func (p *FuncPool) Serve(ctx context.Context, fID, imageName, payload string) (*hpb.FwdHelloResp, error) {
func (p *FuncPool) Serve(ctx context.Context, fID, imageName, payload string) (*hpb.FwdHelloResp, *metrics.ServeStat, error) {
f := p.getFunction(fID, imageName)

return f.Serve(ctx, fID, imageName, payload)
Expand Down Expand Up @@ -204,7 +205,12 @@ func NewFunction(fID, imageName string, Stats *Stats, servedTh uint64, isToPin b
// b. The last goroutine is determined by the atomic counter: the goroutine wih syncID==0 shuts down
// the instance.
// c. Instance shutdown is performed asynchronously because all instances have unique IDs.
func (f *Function) Serve(ctx context.Context, fID, imageName, reqPayload string) (*hpb.FwdHelloResp, error) {
func (f *Function) Serve(ctx context.Context, fID, imageName, reqPayload string) (*hpb.FwdHelloResp, *metrics.ServeStat, error) {
var (
serveStat *metrics.ServeStat = metrics.NewServeStat()
tStart time.Time
)

syncID := int64(-1) // default is no synchronization

logger := log.WithFields(log.Fields{"fID": f.fID})
Expand All @@ -221,42 +227,51 @@ func (f *Function) Serve(ctx context.Context, fID, imageName, reqPayload string)

f.stats.IncServed(f.fID)

tStart = time.Now()

f.OnceAddInstance.Do(
func() {
isColdStart = true
logger.Debug("Function is inactive, starting the instance...")
f.AddInstance()
})
serveStat.AddInstance = time.Since(tStart).Microseconds()

f.RLock()

// FIXME: keep a strict deadline for forwarding RPCs to a warm function
// Eventually, it needs to be RPC-dependent and probably client-defined
ctxFwd, cancel := context.WithDeadline(context.Background(), time.Now().Add(20*time.Second))
defer cancel()

tStart = time.Now()
resp, err := f.fwdRPC(ctxFwd, reqPayload)
serveStat.GetResponse = time.Since(tStart).Microseconds()

if err != nil && ctxFwd.Err() == context.Canceled {
// context deadline exceeded
f.RUnlock()
return &hpb.FwdHelloResp{IsColdStart: isColdStart, Payload: ""}, err
return &hpb.FwdHelloResp{IsColdStart: isColdStart, Payload: ""}, serveStat, err
} else if err != nil {
if e, ok := status.FromError(err); ok {
switch e.Code() {
case codes.DeadlineExceeded:
// deadline exceeded
f.RUnlock()
return &hpb.FwdHelloResp{IsColdStart: isColdStart, Payload: ""}, err
return &hpb.FwdHelloResp{IsColdStart: isColdStart, Payload: ""}, serveStat, err
default:
logger.Warn("Function returned error: ", err)
f.RUnlock()
return &hpb.FwdHelloResp{IsColdStart: isColdStart, Payload: ""}, err
return &hpb.FwdHelloResp{IsColdStart: isColdStart, Payload: ""}, serveStat, err
}
} else {
logger.Panic("Not able to parse error returned ", err)
}
}
f.RUnlock()

tStart = time.Now()

if !f.isPinnedInMem && syncID == 0 {
logger.Debugf("Function has to shut down its instance, served %d requests", f.GetStatServed())
if _, err := f.RemoveInstance(false); err != nil {
Expand All @@ -267,7 +282,9 @@ func (f *Function) Serve(ctx context.Context, fID, imageName, reqPayload string)
f.sem.Release(int64(f.servedTh))
}

return &hpb.FwdHelloResp{IsColdStart: isColdStart, Payload: resp.Message}, err
serveStat.RetireOld = time.Since(tStart).Microseconds()

return &hpb.FwdHelloResp{IsColdStart: isColdStart, Payload: resp.Message}, serveStat, err
}

// FwdRPC Forward the RPC to an instance, then forwards the response back.
Expand Down Expand Up @@ -415,7 +432,7 @@ func (f *Function) LoadInstance() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

message, err := orch.LoadSnapshot(ctx, f.vmID, f.getSnapshotFilePath(), f.getMemFilePath())
message, _, err := orch.LoadSnapshot(ctx, f.vmID, f.getSnapshotFilePath(), f.getMemFilePath())
if err != nil {
log.Panic(message, err)
}
Expand Down Expand Up @@ -443,10 +460,10 @@ func (f *Function) getVMID() string {

// getSnapshotFilePath Creates the snapshot file path for the function
func (f *Function) getSnapshotFilePath() string {
return fmt.Sprintf(filepath.Join("/dev", "snap_file_%s"), f.vmID)
return fmt.Sprintf(filepath.Join("/root", "snap_file_%s"), f.vmID)
}

// getMemFilePath Creates the memory file path for the function
func (f *Function) getMemFilePath() string {
return fmt.Sprintf(filepath.Join("/dev", "mem_file_%s"), f.vmID)
return fmt.Sprintf(filepath.Join("/root", "mem_file_%s"), f.vmID)
}
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ require (
github.com/containerd/containerd v1.3.6
github.com/golang/protobuf v1.3.3
github.com/sirupsen/logrus v1.6.0
github.com/stretchr/testify v1.5.1
github.com/ustiugov/fccd-orchestrator/ctriface v0.0.0-20200722073430-90c1342030ac
github.com/stretchr/testify v1.6.1
github.com/ustiugov/fccd-orchestrator/ctriface v0.0.0-20200723105930-96d5e1cb279b
github.com/ustiugov/fccd-orchestrator/helloworld v0.0.0-20200717125634-528c6e9f9cc9
github.com/ustiugov/fccd-orchestrator/metrics v0.0.0-20200723105930-96d5e1cb279b
github.com/ustiugov/fccd-orchestrator/misc v0.0.0-20200717125634-528c6e9f9cc9 // indirect
github.com/ustiugov/fccd-orchestrator/proto v0.0.0-20200717125634-528c6e9f9cc9
github.com/ustiugov/fccd-orchestrator/taps v0.0.0-20200717125634-528c6e9f9cc9 // indirect
Expand Down
Loading

0 comments on commit 25728c1

Please sign in to comment.