Skip to content

Commit

Permalink
Merge pull request kubernetes-sigs#39 from codefromthecrypt/reset-bench
Browse files Browse the repository at this point in the history
Switches from UID to pointer for identifying cycle IDs
  • Loading branch information
k8s-ci-robot authored Jun 15, 2023
2 parents 9e3b936 + ac40a20 commit 84d22b7
Show file tree
Hide file tree
Showing 6 changed files with 296 additions and 223 deletions.
25 changes: 17 additions & 8 deletions internal/e2e/plugin_perf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,12 @@ func BenchmarkPluginFilter(b *testing.B) {

b.ResetTimer()
for i := 0; i < b.N; i++ {
s := pl.plugin.(framework.FilterPlugin).Filter(ctx, nil, tc.pod, ni)
// Intentionally change the pointer to simulate a new scheduling cycle.
pod := *tc.pod

s := pl.plugin.(framework.FilterPlugin).Filter(ctx, nil, &pod, ni)
if want, have := framework.Success, s.Code(); want != have {
b.Fatalf("unexpected code: got %v, expected %v, got reason: %v", want, have, s.Message())
b.Fatalf("unexpected code: have %v, expected %v, have reason: %v", want, have, s.Message())
}
}
})
Expand Down Expand Up @@ -98,9 +101,12 @@ func BenchmarkPluginScore(b *testing.B) {
b.Run(tc.name, func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, s := pl.plugin.(framework.ScorePlugin).Score(ctx, nil, tc.pod, tc.pod.Spec.NodeName)
// Intentionally change the pointer to simulate a new scheduling cycle.
pod := *tc.pod

_, s := pl.plugin.(framework.ScorePlugin).Score(ctx, nil, &pod, pod.Spec.NodeName)
if want, have := framework.Success, s.Code(); want != have {
b.Fatalf("unexpected status code: got %v, expected %v, got reason: %v", want, have, s.Message())
b.Fatalf("unexpected status code: have %v, expected %v, have reason: %v", want, have, s.Message())
}
}
})
Expand Down Expand Up @@ -142,13 +148,16 @@ func BenchmarkPluginFilterAndScore(b *testing.B) {

b.ResetTimer()
for i := 0; i < b.N; i++ {
s := pl.plugin.(framework.FilterPlugin).Filter(ctx, nil, tc.pod, ni)
// Intentionally change the pointer to simulate a new scheduling cycle.
pod := *tc.pod

s := pl.plugin.(framework.FilterPlugin).Filter(ctx, nil, &pod, ni)
if want, have := framework.Success, s.Code(); want != have {
b.Fatalf("unexpected code: got %v, expected %v, got reason: %v", want, have, s.Message())
b.Fatalf("unexpected code: have %v, expected %v, have reason: %v", want, have, s.Message())
}
_, s = pl.plugin.(framework.ScorePlugin).Score(ctx, nil, tc.pod, tc.pod.Spec.NodeName)
_, s = pl.plugin.(framework.ScorePlugin).Score(ctx, nil, &pod, tc.pod.Spec.NodeName)
if want, have := framework.Success, s.Code(); want != have {
b.Fatalf("unexpected status code: got %v, expected %v, got reason: %v", want, have, s.Message())
b.Fatalf("unexpected status code: have %v, expected %v, have reason: %v", want, have, s.Message())
}
}
})
Expand Down
28 changes: 12 additions & 16 deletions scheduler/plugin/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package wasm

import (
wazeroapi "github.com/tetratelabs/wazero/api"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/scheduler/framework"
)

Expand All @@ -29,31 +28,28 @@ func NewTestWasmPlugin(p framework.Plugin) *WasmPlugin {
}

func (w *WasmPlugin) SetGlobals(globals map[string]int32) {
defer w.pool.unassignForScheduling()

g, err := w.pool.getOrCreateGuest(ctx, "a")
if err != nil {
if err := w.pool.doWithSchedulingGuest(ctx, 0, func(g *guest) {
// Use test conventions to set a global used to test value range.
for n, v := range globals {
g.guest.ExportedGlobal(n + "_global").(wazeroapi.MutableGlobal).Set(uint64(v))
}
}); err != nil {
panic(err)
}

// Use test conventions to set a global used to test value range.
for n, v := range globals {
g.guest.ExportedGlobal(n + "_global").(wazeroapi.MutableGlobal).Set(uint64(v))
}
}

func (w *WasmPlugin) ClearGuestModule() {
w.guestModule = nil
}

func (w *WasmPlugin) GetSchedulingPodUID() types.UID {
return w.pool.schedulingPodUID
func (w *WasmPlugin) GetSchedulingCycleID() uint32 {
return w.pool.schedulingCycleID
}

func (w *WasmPlugin) GetAssignedToBindingPod() map[types.UID]*guest {
return w.pool.assignedToBindingPod
func (w *WasmPlugin) GetBindingCycles() map[uint32]*guest {
return w.pool.binding
}

func (w *WasmPlugin) GetInstanceFromPool() any {
return w.pool.pool.Get()
func (w *WasmPlugin) GetFreePool() []*guest {
return w.pool.free
}
126 changes: 66 additions & 60 deletions scheduler/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"
"sync/atomic"
"time"
"unsafe"

"github.com/tetratelabs/wazero"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -146,147 +147,139 @@ var _ framework.PreFilterExtensions = (*wasmPlugin)(nil)

// AddPod implements the same method as documented on framework.PreFilterExtensions.
func (pl *wasmPlugin) AddPod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod, podInfoToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
pl.pool.assignedToSchedulingPodLock.Lock()
defer pl.pool.assignedToSchedulingPodLock.Unlock()

// TODO: support AddPod in wasm guest.
return nil
panic("TODO: scheduling: AddPod")
}

// RemovePod implements the same method as documented on framework.PreFilterExtensions.
func (pl *wasmPlugin) RemovePod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod, podInfoToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
pl.pool.assignedToSchedulingPodLock.Lock()
defer pl.pool.assignedToSchedulingPodLock.Unlock()

// TODO: support RemovePod in wasm guest.
return nil
panic("TODO: scheduling: RemovePod")
}

var _ framework.PreFilterPlugin = (*wasmPlugin)(nil)

// PreFilterExtensions implements the same method as documented on
// framework.PreFilterPlugin.
func (pl *wasmPlugin) PreFilterExtensions() framework.PreFilterExtensions {
return nil
panic("TODO: scheduling: PreFilterExtensions")
}

// PreFilter implements the same method as documented on
// framework.PreFilterPlugin.
func (pl *wasmPlugin) PreFilter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
// PreFilter is the first stage in scheduling. If there's an existing guest
// association, unassign it, so that we can make a pod-specific one.
pl.pool.unassignForScheduling()

_, err := pl.pool.getOrCreateGuest(ctx, pod.GetUID())
if err != nil {
return nil, framework.AsStatus(err)
func (pl *wasmPlugin) PreFilter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod) (result *framework.PreFilterResult, status *framework.Status) {
if err := pl.pool.doWithSchedulingGuest(ctx, cycleID(pod), func(g *guest) {
// TODO: partially implemented for testing
}); err != nil {
status = framework.AsStatus(err)
}

// TODO: support PreFilter in wasm guest.

return nil, nil
return
}

var _ framework.FilterPlugin = (*wasmPlugin)(nil)

// Filter implements the same method as documented on framework.FilterPlugin.
func (pl *wasmPlugin) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
pl.pool.assignedToSchedulingPodLock.Lock()
defer pl.pool.assignedToSchedulingPodLock.Unlock()

g, err := pl.pool.getOrCreateGuest(ctx, pod.GetUID())
if err != nil {
return framework.AsStatus(err)
}

func (pl *wasmPlugin) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) (status *framework.Status) {
// Add the params to the go context so that the corresponding host function
// can look them up.
params := &params{pod: pod, nodeInfo: nodeInfo}
ctx = context.WithValue(ctx, paramsKey{}, params)
return g.filter(ctx)
if err := pl.pool.doWithSchedulingGuest(ctx, cycleID(pod), func(g *guest) {
status = g.filter(ctx)
}); err != nil {
status = framework.AsStatus(err)
}
return
}

var _ framework.PostFilterPlugin = (*wasmPlugin)(nil)

// PostFilter implements the same method as documented on framework.PostFilterPlugin.
func (pl *wasmPlugin) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
panic("TODO: PostFilter")
panic("TODO: scheduling: PostFilter")
}

var _ framework.PreScorePlugin = (*wasmPlugin)(nil)

// PreScore implements the same method as documented on framework.PreScorePlugin.
func (pl *wasmPlugin) PreScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) *framework.Status {
panic("TODO: PreScore")
panic("TODO: scheduling: PreScore")
}

var _ framework.ScoreExtensions = (*wasmPlugin)(nil)

// NormalizeScore implements the same method as documented on framework.ScoreExtensions.
func (pl *wasmPlugin) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
panic("TODO: PreScore")
panic("TODO: scheduling: NormalizeScore")
}

var _ framework.ScorePlugin = (*wasmPlugin)(nil)

// Score implements the same method as documented on framework.ScorePlugin.
func (pl *wasmPlugin) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
g, err := pl.pool.getOrCreateGuest(ctx, pod.GetUID())
if err != nil {
return 0, framework.AsStatus(err)
}

func (pl *wasmPlugin) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (score int64, status *framework.Status) {
// Add the params to the go context so that the corresponding host function
// can look them up.
params := &params{pod: pod, nodeName: nodeName}
ctx = context.WithValue(ctx, paramsKey{}, params)
return g.score(ctx)
if err := pl.pool.doWithSchedulingGuest(ctx, cycleID(pod), func(g *guest) {
score, status = g.score(ctx)
}); err != nil {
status = framework.AsStatus(err)
}
return
}

// ScoreExtensions implements the same method as documented on framework.ScorePlugin.
func (pl *wasmPlugin) ScoreExtensions() framework.ScoreExtensions {
panic("TODO: Score")
panic("TODO: scheduling: ScoreExtensions")
}

var _ framework.ReservePlugin = (*wasmPlugin)(nil)

// Reserve implements the same method as documented on framework.ReservePlugin.
func (pl *wasmPlugin) Reserve(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status {
// TODO: support Reserve in wasm guest.
// Currently, it's implemented to implement the ReservePlugin interface.
return nil
func (pl *wasmPlugin) Reserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
if err := pl.pool.doWithSchedulingGuest(ctx, cycleID(pod), func(g *guest) {
// TODO: partially implemented for testing
}); err != nil {
status = framework.AsStatus(err)
}
return
}

// Unreserve implements the same method as documented on framework.ReservePlugin.
func (pl *wasmPlugin) Unreserve(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) {
pl.pool.unassignForBinding(p.GetUID())
// TODO: support Unreserve in wasm guest.
func (pl *wasmPlugin) Unreserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) {
// Note: Unlike the below diagram, this is not a part of the scheduling
// cycle, rather the binding on error.
// https://kubernetes.io/docs/concepts/scheduling-eviction/scheduling-framework/#extension-points

cycleID := cycleID(pod)
defer pl.pool.freeFromBinding(cycleID) // the cycle is over, put it back into the pool.

// TODO: partially implemented for testing
}

var _ framework.PreBindPlugin = (*wasmPlugin)(nil)

// PreBind implements the same method as documented on framework.PreBindPlugin.
func (pl *wasmPlugin) PreBind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
panic("TODO: PreBind")
panic("TODO: binding: PreBind")
}

var _ framework.PostBindPlugin = (*wasmPlugin)(nil)

// PostBind implements the same method as documented on framework.PostBindPlugin.
func (pl *wasmPlugin) PostBind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) {
pl.pool.unassignForBinding(pod.GetUID())
// TODO: support PostBind in wasm guest.
cycleID := cycleID(pod)
defer pl.pool.freeFromBinding(cycleID) // the cycle is over, put it back into the pool.

// TODO: partially implemented for testing
}

var _ framework.PermitPlugin = (*wasmPlugin)(nil)

// Permit implements the same method as documented on framework.PermitPlugin.
func (pl *wasmPlugin) Permit(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (*framework.Status, time.Duration) {
// assume that the pod is going to binding cycle and continue to assign the instance to the pod.
// unassign the instance in Unreserve or PostBind.
pl.pool.assignForBinding(p.GetUID())
func (pl *wasmPlugin) Permit(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (*framework.Status, time.Duration) {
_ = pl.pool.getForBinding(cycleID(pod))

// TODO: support Permit in wasm guest.
// TODO: partially implemented for testing

return nil, 0
}
Expand All @@ -295,7 +288,7 @@ var _ framework.BindPlugin = (*wasmPlugin)(nil)

// Bind implements the same method as documented on framework.BindPlugin.
func (pl *wasmPlugin) Bind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
panic("TODO: Bind")
panic("TODO: binding: Bind")
}

// Close implements io.Closer
Expand All @@ -306,3 +299,16 @@ func (pl *wasmPlugin) Close() error {
}
return nil
}

// cycleID is stable through a scheduling or binding cycle. For example, it
// will be different when the same pod is rescheduled due to an error. The
// cycleID is not derived from the v1.Pod UID for this reason.
//
// We use the last 32-bits of the pod's pointer as its ID, as the struct is
// re-instantiated each scheduling cycle, but the same object is used for each
// callback within one.
// See https://github.com/kubernetes/kubernetes/blob/9740bc0e0a10aad753cf7fcbed0c7be25ab200dd/pkg/scheduler/schedule_one.go#L133
func cycleID(pod *v1.Pod) uint32 {
podPtr := uintptr(unsafe.Pointer(pod))
return uint32(podPtr)
}
Loading

0 comments on commit 84d22b7

Please sign in to comment.