Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-10976] Bundle finalization: Harness and some exec changes #16980

Merged
merged 10 commits into from
Mar 8, 2022
Merged
12 changes: 6 additions & 6 deletions sdks/go/pkg/beam/core/runtime/exec/combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (n *Combine) Up(ctx context.Context) error {

n.states = metrics.NewPTransformState(n.PID)

if _, err := InvokeWithoutEventTime(ctx, n.Fn.SetupFn(), nil); err != nil {
if _, err := InvokeWithoutEventTime(ctx, n.Fn.SetupFn(), nil, nil); err != nil {
return n.fail(err)
}

Expand Down Expand Up @@ -107,7 +107,7 @@ func (n *Combine) mergeAccumulators(ctx context.Context, a, b interface{}) (inte
}

in := &MainInput{Key: FullValue{Elm: a}}
val, err := n.mergeInv.InvokeWithoutEventTime(ctx, in, b)
val, err := n.mergeInv.InvokeWithoutEventTime(ctx, in, nil, b)
if err != nil {
return nil, n.fail(errors.WithContext(err, "invoking MergeAccumulators"))
}
Expand Down Expand Up @@ -213,7 +213,7 @@ func (n *Combine) Down(ctx context.Context) error {
}
n.status = Down

if _, err := InvokeWithoutEventTime(ctx, n.Fn.TeardownFn(), nil); err != nil {
if _, err := InvokeWithoutEventTime(ctx, n.Fn.TeardownFn(), nil, nil); err != nil {
n.err.TrySetError(err)
}
return n.err.Error()
Expand All @@ -230,7 +230,7 @@ func (n *Combine) newAccum(ctx context.Context, key interface{}) (interface{}, e
opt = &MainInput{Key: FullValue{Elm: key}}
}

val, err := n.createAccumInv.InvokeWithoutEventTime(ctx, opt)
val, err := n.createAccumInv.InvokeWithoutEventTime(ctx, opt, nil)
if err != nil {
return nil, n.fail(errors.WithContext(err, "invoking CreateAccumulator"))
}
Expand Down Expand Up @@ -273,7 +273,7 @@ func (n *Combine) addInput(ctx context.Context, accum, key, value interface{}, t
}
v := n.aiValConvert(value)

val, err := n.addInputInv.InvokeWithoutEventTime(ctx, opt, v)
val, err := n.addInputInv.InvokeWithoutEventTime(ctx, opt, nil, v)
if err != nil {
return nil, n.fail(errors.WithContext(err, "invoking AddInput"))
}
Expand All @@ -287,7 +287,7 @@ func (n *Combine) extract(ctx context.Context, accum interface{}) (interface{},
return accum, nil
}

val, err := n.extractOutputInv.InvokeWithoutEventTime(ctx, nil, accum)
val, err := n.extractOutputInv.InvokeWithoutEventTime(ctx, nil, nil, accum)
if err != nil {
return nil, n.fail(errors.WithContext(err, "invoking ExtractOutput"))
}
Expand Down
53 changes: 43 additions & 10 deletions sdks/go/pkg/beam/core/runtime/exec/fn.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"fmt"
"reflect"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/funcx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
Expand All @@ -39,23 +40,47 @@ type MainInput struct {
RTracker sdf.RTracker
}

type bundleFinalizationCallback struct {
callback func() error
validUntil time.Time
}

// bundleFinalizer holds all the user defined callbacks to be run on bundle finalization.
// Implements typex.BundleFinalization
type bundleFinalizer struct {
callbacks []bundleFinalizationCallback
lastValidCallback time.Time // Used to track when we can safely gc the bundleFinalizer
}

// RegisterCallback is used to register callbacks during DoFn execution.
func (bf *bundleFinalizer) RegisterCallback(t time.Duration, cb func() error) {
callback := bundleFinalizationCallback{
callback: cb,
validUntil: time.Now().Add(t),
}
bf.callbacks = append(bf.callbacks, callback)
if bf.lastValidCallback.Before(callback.validUntil) {
bf.lastValidCallback = callback.validUntil
}
}

// Invoke invokes the fn with the given values. The extra values must match the non-main
// side input and emitters. It returns the direct output, if any.
func Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, fn *funcx.Fn, opt *MainInput, extra ...interface{}) (*FullValue, error) {
func Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, fn *funcx.Fn, opt *MainInput, bf *bundleFinalizer, extra ...interface{}) (*FullValue, error) {
damccorm marked this conversation as resolved.
Show resolved Hide resolved
if fn == nil {
return nil, nil // ok: nothing to Invoke
}
inv := newInvoker(fn)
return inv.Invoke(ctx, pn, ws, ts, opt, extra...)
return inv.Invoke(ctx, pn, ws, ts, opt, bf, extra...)
}

// InvokeWithoutEventTime runs the given function at time 0 in the global window.
func InvokeWithoutEventTime(ctx context.Context, fn *funcx.Fn, opt *MainInput, extra ...interface{}) (*FullValue, error) {
func InvokeWithoutEventTime(ctx context.Context, fn *funcx.Fn, opt *MainInput, bf *bundleFinalizer, extra ...interface{}) (*FullValue, error) {
if fn == nil {
return nil, nil // ok: nothing to Invoke
}
inv := newInvoker(fn)
return inv.InvokeWithoutEventTime(ctx, opt, extra...)
return inv.InvokeWithoutEventTime(ctx, opt, bf, extra...)
}

// invoker is a container struct for hot path invocations of DoFns, to avoid
Expand All @@ -64,9 +89,9 @@ type invoker struct {
fn *funcx.Fn
args []interface{}
// TODO(lostluck): 2018/07/06 consider replacing with a slice of functions to run over the args slice, as an improvement.
ctxIdx, pnIdx, wndIdx, etIdx int // specialized input indexes
outEtIdx, outErrIdx int // specialized output indexes
in, out []int // general indexes
ctxIdx, pnIdx, wndIdx, etIdx, bfIdx int // specialized input indexes
outEtIdx, outErrIdx int // specialized output indexes
in, out []int // general indexes

ret FullValue // ret is a cached allocation for passing to the next Unit. Units never modify the passed in FullValue.
elmConvert, elm2Convert func(interface{}) interface{} // Cached conversion functions, which assums this invoker is always used with the same parameter types.
Expand Down Expand Up @@ -99,6 +124,11 @@ func newInvoker(fn *funcx.Fn) *invoker {
if n.outErrIdx, ok = fn.Error(); !ok {
n.outErrIdx = -1
}
// TODO(BEAM-10976) - add this back in once BundleFinalization is implemented
// if n.bfIdx, ok = fn.BundleFinalization(); !ok {
// n.bfIdx = -1
// }
n.bfIdx = -1

n.initCall()

Expand All @@ -115,13 +145,13 @@ func (n *invoker) Reset() {
}

// InvokeWithoutEventTime runs the function at time 0 in the global window.
func (n *invoker) InvokeWithoutEventTime(ctx context.Context, opt *MainInput, extra ...interface{}) (*FullValue, error) {
return n.Invoke(ctx, typex.NoFiringPane(), window.SingleGlobalWindow, mtime.ZeroTimestamp, opt, extra...)
func (n *invoker) InvokeWithoutEventTime(ctx context.Context, opt *MainInput, bf *bundleFinalizer, extra ...interface{}) (*FullValue, error) {
return n.Invoke(ctx, typex.NoFiringPane(), window.SingleGlobalWindow, mtime.ZeroTimestamp, opt, bf, extra...)
}

// Invoke invokes the fn with the given values. The extra values must match the non-main
// side input and emitters. It returns the direct output, if any.
func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, opt *MainInput, extra ...interface{}) (*FullValue, error) {
func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, opt *MainInput, bf *bundleFinalizer, extra ...interface{}) (*FullValue, error) {
// (1) Populate contexts
// extract these to make things easier to read.
args := n.args
Expand All @@ -143,6 +173,9 @@ func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Wind
if n.etIdx >= 0 {
args[n.etIdx] = ts
}
if n.bfIdx >= 0 {
args[n.bfIdx] = bf
}

// (2) Main input from value, if any.
i := 0
Expand Down
62 changes: 55 additions & 7 deletions sdks/go/pkg/beam/core/runtime/exec/fn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package exec

import (
"context"
"errors"
"fmt"
"reflect"
"testing"
Expand Down Expand Up @@ -178,7 +179,7 @@ func TestInvoke(t *testing.T) {
test.ExpectedTime = ts
}

val, err := Invoke(context.Background(), typex.NoFiringPane(), window.SingleGlobalWindow, ts, fn, test.Opt, test.Args...)
val, err := Invoke(context.Background(), typex.NoFiringPane(), window.SingleGlobalWindow, ts, fn, test.Opt, nil, test.Args...)
if err != nil {
t.Fatalf("Invoke(%v,%v) failed: %v", fn.Fn.Name(), test.Args, err)
}
Expand All @@ -195,6 +196,53 @@ func TestInvoke(t *testing.T) {
}
}

func TestRegisterCallback(t *testing.T) {
bf := bundleFinalizer{
callbacks: []bundleFinalizationCallback{},
lastValidCallback: time.Now(),
}
testVar := 0
bf.RegisterCallback(500*time.Minute, func() error {
testVar += 5
return nil
})
bf.RegisterCallback(2*time.Minute, func() error {
testVar = 25
return nil
})
callbackErr := errors.New("Callback error")
bf.RegisterCallback(2*time.Minute, func() error {
return callbackErr
})

// We can't do exact equality since this relies on real time, we'll give it a broad range
if bf.lastValidCallback.Before(time.Now().Add(400*time.Minute)) || bf.lastValidCallback.After(time.Now().Add(600*time.Minute)) {
t.Errorf("RegisterCallback() lastValidCallback set to %v, want about 500 minutes", bf.lastValidCallback)
}
if got, want := len(bf.callbacks), 3; got != want {
t.Fatalf("Callbacks in bundleFinalizer does not match number of calls to RegisterCallback(), got %v callbacks, want %v", got, want)
}

callbackIdx := 0
if err := bf.callbacks[callbackIdx].callback(); err != nil {
t.Errorf("RegisterCallback() callback at index %v returned unexpected error: %v", callbackIdx, err)
}
if got, want := testVar, 5; got != want {
t.Errorf("RegisterCallback() callback at index %v set testvar to %v, want %v", callbackIdx, got, want)
}
callbackIdx = 1
if err := bf.callbacks[callbackIdx].callback(); err != nil {
t.Errorf("RegisterCallback() callback at index %v returned error %v, want nil", callbackIdx, err)
}
if got, want := testVar, 25; got != want {
t.Errorf("RegisterCallback() callback at index %v set testvar to %v, want %v", callbackIdx, got, want)
}
callbackIdx = 2
if err := bf.callbacks[2].callback(); err != callbackErr {
t.Errorf("RegisterCallback() callback at index %v returned error %v, want %v", callbackIdx, err, callbackErr)
}
}

// Benchmarks

// Invoke is implemented as a single use of a cached invoker, so a measure of
Expand Down Expand Up @@ -314,7 +362,7 @@ func BenchmarkInvoke(b *testing.B) {
ts := mtime.ZeroTimestamp.Add(2 * time.Millisecond)
b.Run(fmt.Sprintf("SingleInvoker_%s", test.Name), func(b *testing.B) {
for i := 0; i < b.N; i++ {
_, err := Invoke(context.Background(), typex.NoFiringPane(), window.SingleGlobalWindow, ts, fn, test.Opt, test.Args...)
_, err := Invoke(context.Background(), typex.NoFiringPane(), window.SingleGlobalWindow, ts, fn, test.Opt, nil, test.Args...)
if err != nil {
b.Fatalf("Invoke(%v,%v) failed: %v", fn.Fn.Name(), test.Args, err)
}
Expand All @@ -323,7 +371,7 @@ func BenchmarkInvoke(b *testing.B) {
b.Run(fmt.Sprintf("CachedInvoker_%s", test.Name), func(b *testing.B) {
inv := newInvoker(fn)
for i := 0; i < b.N; i++ {
_, err := inv.Invoke(context.Background(), typex.NoFiringPane(), window.SingleGlobalWindow, ts, test.Opt, test.Args...)
_, err := inv.Invoke(context.Background(), typex.NoFiringPane(), window.SingleGlobalWindow, ts, test.Opt, nil, test.Args...)
if err != nil {
b.Fatalf("Invoke(%v,%v) failed: %v", fn.Fn.Name(), test.Args, err)
}
Expand Down Expand Up @@ -416,7 +464,7 @@ func BenchmarkInvokeCall(b *testing.B) {
ctx := context.Background()
n := 0
for i := 0; i < b.N; i++ {
ret, _ := InvokeWithoutEventTime(ctx, fn, &MainInput{Key: FullValue{Elm: n}})
ret, _ := InvokeWithoutEventTime(ctx, fn, &MainInput{Key: FullValue{Elm: n}}, nil)
n = ret.Elm.(int)
}
b.Log(n)
Expand All @@ -427,7 +475,7 @@ func BenchmarkInvokeCallExtra(b *testing.B) {
ctx := context.Background()
n := 0
for i := 0; i < b.N; i++ {
ret, _ := InvokeWithoutEventTime(ctx, fn, nil, n)
ret, _ := InvokeWithoutEventTime(ctx, fn, nil, nil, n)
n = ret.Elm.(int)
}
b.Log(n)
Expand All @@ -453,7 +501,7 @@ func BenchmarkInvokeFnCall(b *testing.B) {
ctx := context.Background()
n := 0
for i := 0; i < b.N; i++ {
ret, _ := InvokeWithoutEventTime(ctx, fn, &MainInput{Key: FullValue{Elm: n}})
ret, _ := InvokeWithoutEventTime(ctx, fn, &MainInput{Key: FullValue{Elm: n}}, nil)
n = ret.Elm.(int)
}
b.Log(n)
Expand All @@ -464,7 +512,7 @@ func BenchmarkInvokeFnCallExtra(b *testing.B) {
ctx := context.Background()
n := 0
for i := 0; i < b.N; i++ {
ret, _ := InvokeWithoutEventTime(ctx, fn, nil, n)
ret, _ := InvokeWithoutEventTime(ctx, fn, nil, nil, n)
n = ret.Elm.(int)
}
b.Log(n)
Expand Down
9 changes: 5 additions & 4 deletions sdks/go/pkg/beam/core/runtime/exec/pardo.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type ParDo struct {
emitters []ReusableEmitter
ctx context.Context
inv *invoker
bf *bundleFinalizer

reader StateReader
cache *cacheElm
Expand Down Expand Up @@ -83,7 +84,7 @@ func (n *ParDo) Up(ctx context.Context) error {
// Subsequent bundles might run this same node, and the context here would be
// incorrectly refering to the older bundleId.
setupCtx := metrics.SetPTransformID(ctx, n.PID)
if _, err := InvokeWithoutEventTime(setupCtx, n.Fn.SetupFn(), nil); err != nil {
if _, err := InvokeWithoutEventTime(setupCtx, n.Fn.SetupFn(), nil, nil); err != nil {
return n.fail(err)
}

Expand Down Expand Up @@ -229,7 +230,7 @@ func (n *ParDo) Down(ctx context.Context) error {
n.reader = nil
n.cache = nil

if _, err := InvokeWithoutEventTime(ctx, n.Fn.TeardownFn(), nil); err != nil {
if _, err := InvokeWithoutEventTime(ctx, n.Fn.TeardownFn(), nil, nil); err != nil {
n.err.TrySetError(err)
}
return n.err.Error()
Expand Down Expand Up @@ -295,7 +296,7 @@ func (n *ParDo) invokeDataFn(ctx context.Context, pn typex.PaneInfo, ws []typex.
if err := n.preInvoke(ctx, ws, ts); err != nil {
return nil, err
}
val, err = Invoke(ctx, pn, ws, ts, fn, opt, n.cache.extra...)
val, err = Invoke(ctx, pn, ws, ts, fn, opt, n.bf, n.cache.extra...)
if err != nil {
return nil, err
}
Expand All @@ -313,7 +314,7 @@ func (n *ParDo) invokeProcessFn(ctx context.Context, pn typex.PaneInfo, ws []typ
if err := n.preInvoke(ctx, ws, ts); err != nil {
return nil, err
}
val, err = n.inv.Invoke(ctx, pn, ws, ts, opt, n.cache.extra...)
val, err = n.inv.Invoke(ctx, pn, ws, ts, opt, n.bf, n.cache.extra...)
if err != nil {
return nil, err
}
Expand Down
Loading