diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders.go b/sdks/go/pkg/beam/runners/prism/internal/coders.go index 6fdaf804a34f5..eb8abe16ecf89 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/coders.go +++ b/sdks/go/pkg/beam/runners/prism/internal/coders.go @@ -26,6 +26,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/ioutilx" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" + "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns" "golang.org/x/exp/slog" "google.golang.org/protobuf/encoding/prototext" @@ -80,19 +81,24 @@ func makeWindowedValueCoder(pID string, comps *pipepb.Components, coders map[str return wvcID, nil } -// makeWindowCoders makes the coder pair but behavior is ultimately determined by the strategy's windowFn. -func makeWindowCoders(wc *pipepb.Coder) (exec.WindowDecoder, exec.WindowEncoder) { +// makeWindowCoders categorizes and provides the encoder, decoder pair for the type of window. +func makeWindowCoders(wc *pipepb.Coder) (engine.WinCoderType, exec.WindowDecoder, exec.WindowEncoder) { var cwc *coder.WindowCoder + var winCoder engine.WinCoderType switch wc.GetSpec().GetUrn() { case urns.CoderGlobalWindow: + winCoder = engine.WinGlobal cwc = coder.NewGlobalWindow() case urns.CoderIntervalWindow: + winCoder = engine.WinInterval cwc = coder.NewIntervalWindow() default: + // TODO(https://github.com/apache/beam/issues/31921): Support custom windowfns instead of panicking here. + winCoder = engine.WinCustom slog.LogAttrs(context.TODO(), slog.LevelError, "makeWindowCoders: unknown urn", slog.String("urn", wc.GetSpec().GetUrn())) panic(fmt.Sprintf("makeWindowCoders, unknown urn: %v", prototext.Format(wc))) } - return exec.MakeWindowDecoder(cwc), exec.MakeWindowEncoder(cwc) + return winCoder, exec.MakeWindowDecoder(cwc), exec.MakeWindowEncoder(cwc) } // lpUnknownCoders takes a coder, and populates coders with any new coders diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders_test.go b/sdks/go/pkg/beam/runners/prism/internal/coders_test.go index 3f9557ff83613..4656a94e03ec7 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/coders_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/coders_test.go @@ -25,6 +25,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" + "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns" "github.com/google/go-cmp/cmp" "google.golang.org/protobuf/testing/protocmp" @@ -91,14 +92,15 @@ func Test_makeWindowedValueCoder(t *testing.T) { func Test_makeWindowCoders(t *testing.T) { tests := []struct { - urn string - window typex.Window + urn string + window typex.Window + coderType engine.WinCoderType }{ - {urns.CoderGlobalWindow, window.GlobalWindow{}}, + {urns.CoderGlobalWindow, window.GlobalWindow{}, engine.WinGlobal}, {urns.CoderIntervalWindow, window.IntervalWindow{ Start: mtime.MinTimestamp, End: mtime.MaxTimestamp, - }}, + }, engine.WinInterval}, } for _, test := range tests { undertest := &pipepb.Coder{ @@ -106,7 +108,11 @@ func Test_makeWindowCoders(t *testing.T) { Urn: test.urn, }, } - dec, enc := makeWindowCoders(undertest) + gotCoderType, dec, enc := makeWindowCoders(undertest) + + if got, want := gotCoderType, test.coderType; got != want { + t.Errorf("makeWindowCoders returned different coder type: got %v, want %v", got, want) + } // Validate we're getting a round trip coder. var buf bytes.Buffer diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index c73db507c7920..0781efd5ff0ce 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -79,13 +79,35 @@ type elements struct { } type PColInfo struct { - GlobalID string - WDec exec.WindowDecoder - WEnc exec.WindowEncoder - EDec func(io.Reader) []byte - KeyDec func(io.Reader) []byte + GlobalID string + WindowCoder WinCoderType + WDec exec.WindowDecoder + WEnc exec.WindowEncoder + EDec func(io.Reader) []byte + KeyDec func(io.Reader) []byte } +// WinCoderType indicates what kind of coder +// the window is using. There are only 3 +// valid single window encodings. +// +// - Global (for Global windows) +// - Interval (for fixed, sliding, and session windows) +// - Custom (for custom user windows) +// +// TODO: Handle custom variants with built in "known" coders, and length prefixed ones as separate cases. +// As a rule we don't care about the bytes, but we do need to be able to get to the next element. +type WinCoderType int + +const ( + // WinGlobal indicates the window is empty coded, with 0 bytes. + WinGlobal WinCoderType = iota + // WinInterval indicates the window is interval coded with the end event time timestamp followed by the duration in milliseconds + WinInterval + // WinCustom indicates the window customm coded with end event time timestamp followed by a custom coder. + WinCustom +) + // ToData recodes the elements with their approprate windowed value header. func (es elements) ToData(info PColInfo) [][]byte { var ret [][]byte @@ -870,7 +892,7 @@ func (em *ElementManager) triageTimers(d TentativeData, inputInfo PColInfo, stag keyToTimers := map[timerKey]element{} for _, t := range timers { // TODO: Call in a for:range loop when Beam's minimum Go version hits 1.23.0 - iter := decodeTimerIter(inputInfo.KeyDec, true, t) + iter := decodeTimerIter(inputInfo.KeyDec, inputInfo.WindowCoder, t) iter(func(ret timerRet) bool { for _, e := range ret.elms { keyToTimers[timerKey{key: string(ret.keyBytes), tag: ret.tag, win: e.window}] = e diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go b/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go index 9a3bd6f9682bc..8b90591974b89 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go @@ -46,7 +46,31 @@ type timerRet struct { // If the timer has been cleared, no elements will be returned. Any existing timers // for the tag *must* be cleared from the pending queue. The windows associated with // the clear are provided to be able to delete pending timers. -func decodeTimerIter(keyDec func(io.Reader) []byte, usesGlobalWindow bool, raw []byte) func(func(timerRet) bool) { +func decodeTimerIter(keyDec func(io.Reader) []byte, winCoder WinCoderType, raw []byte) func(func(timerRet) bool) { + + var singleWindowExtractor func(*decoder) typex.Window + switch winCoder { + case WinGlobal: + singleWindowExtractor = func(*decoder) typex.Window { + return window.GlobalWindow{} + } + case WinInterval: + singleWindowExtractor = func(d *decoder) typex.Window { + return d.IntervalWindow() + } + case WinCustom: + // Default to a length prefixed window coder here until we have different information. + // Everything else is either:: variable, 1, 4, or 8 bytes long + // KVs (especially nested ones, could occur but are unlikely, and it would be + // easier for Prism to force such coders to be length prefixed. + singleWindowExtractor = func(d *decoder) typex.Window { + return d.CustomWindowLengthPrefixed() + } + default: + // Unsupported + panic(fmt.Sprintf("unsupported WindowCoder Type: %v", winCoder)) + } + return func(yield func(timerRet) bool) { for len(raw) > 0 { keyBytes := keyDec(bytes.NewBuffer(raw)) @@ -55,15 +79,8 @@ func decodeTimerIter(keyDec func(io.Reader) []byte, usesGlobalWindow bool, raw [ var ws []typex.Window numWin := d.Fixed32() - if usesGlobalWindow { - for i := 0; i < int(numWin); i++ { - ws = append(ws, window.GlobalWindow{}) - } - } else { - // Assume interval windows here, since we don't understand custom windows yet. - for i := 0; i < int(numWin); i++ { - ws = append(ws, d.IntervalWindow()) - } + for i := 0; i < int(numWin); i++ { + ws = append(ws, singleWindowExtractor(&d)) } clear := d.Bool() @@ -149,6 +166,37 @@ func (d *decoder) IntervalWindow() window.IntervalWindow { } } +// CustomWindowLengthPrefixed assumes the custom window coder is a variable, length prefixed type +// such as string, bytes, or a length prefix wrapped coder. +func (d *decoder) CustomWindowLengthPrefixed() customWindow { + end := d.Timestamp() + + customStart := d.cursor + l := d.Varint() + endCursor := d.cursor + int(l) + d.cursor = endCursor + return customWindow{ + End: end, + Custom: d.raw[customStart:endCursor], + } +} + +type customWindow struct { + End typex.EventTime + Custom []byte // The custom portion of the window, ignored by the runner +} + +func (w customWindow) MaxTimestamp() typex.EventTime { + return w.End +} + +func (w customWindow) Equals(o typex.Window) bool { + if c, ok := o.(customWindow); ok { + return w.End == c.End && bytes.Equal(w.Custom, c.Custom) + } + return false +} + func (d *decoder) Byte() byte { defer func() { d.cursor += 1 diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index 08ab9f687c5fc..d7605f34f5f20 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -187,18 +187,19 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic col := comps.GetPcollections()[onlyOut] ed := collectionPullDecoder(col.GetCoderId(), coders, comps) - wDec, wEnc := getWindowValueCoders(comps, col, coders) + winCoder, wDec, wEnc := getWindowValueCoders(comps, col, coders) var kd func(io.Reader) []byte if kcid, ok := extractKVCoderID(col.GetCoderId(), coders); ok { kd = collectionPullDecoder(kcid, coders, comps) } stage.OutputsToCoders[onlyOut] = engine.PColInfo{ - GlobalID: onlyOut, - WDec: wDec, - WEnc: wEnc, - EDec: ed, - KeyDec: kd, + GlobalID: onlyOut, + WindowCoder: winCoder, + WDec: wDec, + WEnc: wEnc, + EDec: ed, + KeyDec: kd, } // There's either 0, 1 or many inputs, but they should be all the same @@ -206,12 +207,13 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic for _, global := range t.GetInputs() { col := comps.GetPcollections()[global] ed := collectionPullDecoder(col.GetCoderId(), coders, comps) - wDec, wEnc := getWindowValueCoders(comps, col, coders) + winCoder, wDec, wEnc := getWindowValueCoders(comps, col, coders) stage.inputInfo = engine.PColInfo{ - GlobalID: global, - WDec: wDec, - WEnc: wEnc, - EDec: ed, + GlobalID: global, + WindowCoder: winCoder, + WDec: wDec, + WEnc: wEnc, + EDec: ed, } break } @@ -222,18 +224,19 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic for _, global := range t.GetInputs() { col := comps.GetPcollections()[global] ed := collectionPullDecoder(col.GetCoderId(), coders, comps) - wDec, wEnc := getWindowValueCoders(comps, col, coders) + winCoder, wDec, wEnc := getWindowValueCoders(comps, col, coders) var kd func(io.Reader) []byte if kcid, ok := extractKVCoderID(col.GetCoderId(), coders); ok { kd = collectionPullDecoder(kcid, coders, comps) } stage.inputInfo = engine.PColInfo{ - GlobalID: global, - WDec: wDec, - WEnc: wEnc, - EDec: ed, - KeyDec: kd, + GlobalID: global, + WindowCoder: winCoder, + WDec: wDec, + WEnc: wEnc, + EDec: ed, + KeyDec: kd, } } em.StageAggregates(stage.ID) @@ -378,7 +381,7 @@ func extractKVCoderID(coldCId string, coders map[string]*pipepb.Coder) (string, return "", false } -func getWindowValueCoders(comps *pipepb.Components, col *pipepb.PCollection, coders map[string]*pipepb.Coder) (exec.WindowDecoder, exec.WindowEncoder) { +func getWindowValueCoders(comps *pipepb.Components, col *pipepb.PCollection, coders map[string]*pipepb.Coder) (engine.WinCoderType, exec.WindowDecoder, exec.WindowEncoder) { ws := comps.GetWindowingStrategies()[col.GetWindowingStrategyId()] wcID, err := lpUnknownCoders(ws.GetWindowCoderId(), coders, comps.GetCoders()) if err != nil { diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go index eecebde3d693f..8590fd0d4cede 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go +++ b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go @@ -317,7 +317,7 @@ func gbkBytes(ws *pipepb.WindowingStrategy, wc, kc, vc *pipepb.Coder, toAggregat panic(fmt.Sprintf("unsupported OutputTime behavior: %v", ws.GetOutputTime())) } - wDec, wEnc := makeWindowCoders(wc) + _, wDec, wEnc := makeWindowCoders(wc) type keyTime struct { key []byte diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index d4abed293534d..1a62f2f6f4206 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -433,14 +433,15 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *eng kd = collectionPullDecoder(kcid, coders, comps) } - wDec, wEnc := getWindowValueCoders(comps, col, coders) + winCoder, wDec, wEnc := getWindowValueCoders(comps, col, coders) sink2Col[sinkID] = o.Global col2Coders[o.Global] = engine.PColInfo{ - GlobalID: o.Global, - WDec: wDec, - WEnc: wEnc, - EDec: ed, - KeyDec: kd, + GlobalID: o.Global, + WindowCoder: winCoder, + WDec: wDec, + WEnc: wEnc, + EDec: ed, + KeyDec: kd, } transforms[sinkID] = sinkTransform(sinkID, portFor(wOutCid, wk), o.Global) } @@ -493,7 +494,7 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *eng return fmt.Errorf("buildDescriptor: failed to handle coder on stage %v for primary input, pcol %q %v:\n%w\n%v", stg.ID, stg.primaryInput, prototext.Format(col), err, stg.transforms) } ed := collectionPullDecoder(col.GetCoderId(), coders, comps) - wDec, wEnc := getWindowValueCoders(comps, col, coders) + winCoder, wDec, wEnc := getWindowValueCoders(comps, col, coders) var kd func(io.Reader) []byte if kcid, ok := extractKVCoderID(col.GetCoderId(), coders); ok { @@ -501,11 +502,12 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *eng } inputInfo := engine.PColInfo{ - GlobalID: stg.primaryInput, - WDec: wDec, - WEnc: wEnc, - EDec: ed, - KeyDec: kd, + GlobalID: stg.primaryInput, + WindowCoder: winCoder, + WDec: wDec, + WEnc: wEnc, + EDec: ed, + KeyDec: kd, } stg.inputTransformID = stg.ID + "_source"