Skip to content

Commit

Permalink
[apache#30083][prism] Stabilize additional teststream cases. (apache#…
Browse files Browse the repository at this point in the history
…31046)

* Stabilize additional teststream cases.

* Update sdks/go/test/integration/primitives/teststream_test.go

Co-authored-by: Ritesh Ghorse <riteshghorse@gmail.com>

* Update sdks/go/test/integration/primitives/teststream.go

Co-authored-by: Ritesh Ghorse <riteshghorse@gmail.com>

* Update sdks/go/test/integration/primitives/teststream_test.go

Co-authored-by: Ritesh Ghorse <riteshghorse@gmail.com>

---------

Co-authored-by: lostluck <13907733+lostluck@users.noreply.github.com>
Co-authored-by: Ritesh Ghorse <riteshghorse@gmail.com>
  • Loading branch information
3 people committed Apr 22, 2024
1 parent 1a5dc1c commit 8dcbf96
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 9 deletions.
26 changes: 24 additions & 2 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ type Config struct {
type ElementManager struct {
config Config

stages map[string]*stageState // The state for each stage.
impulses set[string] // List of impulse stages.
stages map[string]*stageState // The state for each stage.

consumers map[string][]string // Map from pcollectionID to stageIDs that consumes them as primary input.
sideConsumers map[string][]LinkID // Map from pcollectionID to the stage+transform+input that consumes them as side input.
Expand Down Expand Up @@ -254,6 +255,14 @@ func (em *ElementManager) Impulse(stageID string) {
em.addPending(count)
}
refreshes := stage.updateWatermarks(em)

// Since impulses are synthetic, we need to simulate them properly
// if a pipeline is only test stream driven.
if em.impulses == nil {
em.impulses = refreshes
} else {
em.impulses.merge(refreshes)
}
em.addRefreshes(refreshes)
}

Expand Down Expand Up @@ -286,6 +295,13 @@ func (em *ElementManager) Bundles(ctx context.Context, nextBundID func() string)
// Watermark evaluation goroutine.
go func() {
defer close(runStageCh)

// If we have a test stream, clear out existing refreshes, so the test stream can
// insert any elements it needs.
if em.testStreamHandler != nil {
em.watermarkRefreshes = singleSet(em.testStreamHandler.ID)
}

for {
em.refreshCond.L.Lock()
// If there are no watermark refreshes available, we wait until there are.
Expand Down Expand Up @@ -370,7 +386,13 @@ func (em *ElementManager) checkForQuiescence(advanced set[string]) {
nextEvent.Execute(em)
// Decrement pending for the event being processed.
em.addPending(-1)
return
// If there are refreshes scheduled, then test stream permitted execution to continue.
// Note: it's a prism bug if test stream never causes a refresh to occur for a given event.
// It's not correct to move to the next event if no refreshes would occur.
if len(em.watermarkRefreshes) > 0 {
return
}
// If there are no refreshes, then there's no mechanism to make progress, so it's time to fast fail.
}

v := em.livePending.Load()
Expand Down
5 changes: 5 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@ func TestTestStream(t *testing.T) {
{pipeline: primitives.TestStreamTwoFloat64Sequences},
{pipeline: primitives.TestStreamTwoInt64Sequences},
{pipeline: primitives.TestStreamTwoUserTypeSequences},

{pipeline: primitives.TestStreamSimple},
{pipeline: primitives.TestStreamSimple_InfinityDefault},
{pipeline: primitives.TestStreamToGBK},
{pipeline: primitives.TestStreamTimersEventTime},
}

configs := []struct {
Expand Down
49 changes: 45 additions & 4 deletions sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package engine

import (
"container/heap"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
Expand Down Expand Up @@ -46,13 +47,15 @@ type testStreamHandler struct {

tagState map[string]tagState // Map from event tag to related outputs.

completed bool // indicates that no further test stream events exist, and all watermarks are advanced to infinity. Used to send the final event, once.
currentHold mtime.Time // indicates if the default watermark hold has been lifted.
completed bool // indicates that no further test stream events exist, and all watermarks are advanced to infinity. Used to send the final event, once.
}

func makeTestStreamHandler(id string) *testStreamHandler {
return &testStreamHandler{
ID: id,
tagState: map[string]tagState{},
ID: id,
tagState: map[string]tagState{},
currentHold: mtime.MinTimestamp,
}
}

Expand Down Expand Up @@ -124,6 +127,35 @@ func (ts *testStreamHandler) NextEvent() tsEvent {
return ev
}

// UpdateHold restrains the watermark based on upcoming elements in the test stream queue
// This uses the element manager's normal hold mechnanisms to avoid premature pipeline termination,
// when there are still remaining events to process.
func (ts *testStreamHandler) UpdateHold(em *ElementManager, newHold mtime.Time) {
if ts == nil {
return
}

ss := em.stages[ts.ID]
ss.mu.Lock()
defer ss.mu.Unlock()

if ss.watermarkHoldsCounts[ts.currentHold] > 0 {
heap.Pop(&ss.watermarkHoldHeap)
ss.watermarkHoldsCounts[ts.currentHold] = ss.watermarkHoldsCounts[ts.currentHold] - 1
}
ts.currentHold = newHold
heap.Push(&ss.watermarkHoldHeap, ts.currentHold)
ss.watermarkHoldsCounts[ts.currentHold] = 1

// kick the TestStream and Impulse stages too.
kick := singleSet(ts.ID)
kick.merge(em.impulses)

// This executes under the refreshCond lock, so we can't call em.addRefreshes.
em.watermarkRefreshes.merge(kick)
em.refreshCond.Broadcast()
}

// TestStreamElement wraps the provided bytes and timestamp for ingestion and use.
type TestStreamElement struct {
Encoded []byte
Expand Down Expand Up @@ -195,6 +227,8 @@ func (ev tsWatermarkEvent) Execute(em *ElementManager) {
ss.updateUpstreamWatermark(ss.inputID, t.watermark)
em.watermarkRefreshes.insert(sID)
}
// Clear the default hold after the inserts have occured.
em.testStreamHandler.UpdateHold(em, t.watermark)
}

// tsProcessingTimeEvent implements advancing the synthetic processing time.
Expand All @@ -215,7 +249,7 @@ type tsFinalEvent struct {
}

func (ev tsFinalEvent) Execute(em *ElementManager) {
em.addPending(1) // We subtrack a pending after event execution, so add one now.
em.testStreamHandler.UpdateHold(em, mtime.MaxTimestamp)
ss := em.stages[ev.stageID]
kickSet := ss.updateWatermarks(em)
kickSet.insert(ev.stageID)
Expand All @@ -242,6 +276,13 @@ var (
func (tsi *testStreamImpl) initHandler(id string) {
if tsi.em.testStreamHandler == nil {
tsi.em.testStreamHandler = makeTestStreamHandler(id)

ss := tsi.em.stages[id]
tsi.em.addPending(1) // We subtrack a pending after event execution, so add one now for the final event to avoid a race condition.

// Arrest the watermark initially to prevent terminal advancement.
heap.Push(&ss.watermarkHoldHeap, tsi.em.testStreamHandler.currentHold)
ss.watermarkHoldsCounts[tsi.em.testStreamHandler.currentHold] = 1
}
}

Expand Down
5 changes: 3 additions & 2 deletions sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@ type B struct {
func (b *B) Init() {
// We need to see final data and timer signals that match the number of
// outputs the stage this bundle executes posesses
b.dataSema.Store(int32(b.OutputCount + len(b.HasTimers)))
outCap := int32(b.OutputCount + len(b.HasTimers))
b.dataSema.Store(outCap)
b.DataWait = make(chan struct{})
if b.OutputCount == 0 {
if outCap == 0 {
close(b.DataWait) // Can happen if there are no outputs for the bundle.
}
b.Resp = make(chan *fnpb.ProcessBundleResponse, 1)
Expand Down
74 changes: 74 additions & 0 deletions sdks/go/test/integration/primitives/teststream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
package primitives

import (
"fmt"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/teststream"
)
Expand Down Expand Up @@ -172,3 +175,74 @@ func TestStreamInt16Sequence(s beam.Scope) {
passert.Count(s, col, "teststream int15", 3)
passert.EqualsList(s, col, ele)
}

// panicIfNot42 panics if the value is not 42.
func panicIfNot42(v int) {
if v != 42 {
panic(fmt.Sprintf("got %v, want 42", v))
}
}

// dropKeyEmitValues drops the key and emits the value.
func dropKeyEmitValues(_ int, vs func(*int) bool, emit func(int)) {
var v int
for vs(&v) {
emit(v)
}
}

func init() {
register.Function1x0(panicIfNot42)
register.Function3x0(dropKeyEmitValues)
}

// TestStreamSimple is a trivial pipeline where teststream sends
// a single element to a DoFn that checks that it's received the value.
// Intended for runner validation.
func TestStreamSimple(s beam.Scope) {
con := teststream.NewConfig()
ele := []int{42}
con.AddElementList(100, ele)
con.AdvanceWatermarkToInfinity()

col := teststream.Create(s, con)
beam.ParDo0(s, panicIfNot42, col)
}

// TestStreamSimple_InfinityDefault is the same trivial pipeline that
// validates that the watermark is automatically advanced to infinity
// even when the user doesn't set it.
// Intended for runner validation.
func TestStreamSimple_InfinityDefault(s beam.Scope) {
con := teststream.NewConfig()
ele := []int{42}
con.AddElementList(100, ele)

col := teststream.Create(s, con)
beam.ParDo0(s, panicIfNot42, col)
}

// TestStreamToGBK is a trivial pipeline where teststream sends
// a single element to a GBK.
func TestStreamToGBK(s beam.Scope) {
con := teststream.NewConfig()
ele := []int{42}
con.AddElementList(100, ele)
con.AdvanceWatermarkToInfinity()

col := teststream.Create(s, con)
keyed := beam.AddFixedKey(s, col)
gbk := beam.GroupByKey(s, keyed)
dropped := beam.ParDo(s, dropKeyEmitValues, gbk)
beam.ParDo0(s, panicIfNot42, dropped)
}

// TestStreamTimersEventTime validates event time timers in a test stream "driven" pipeline.
func TestStreamTimersEventTime(s beam.Scope) {
timersEventTimePipelineBuilder(func(s beam.Scope) beam.PCollection {
c := teststream.NewConfig()
c.AddElements(123456, []byte{42})
c.AdvanceWatermarkToInfinity()
return teststream.Create(s, c)
})(s)
}
20 changes: 20 additions & 0 deletions sdks/go/test/integration/primitives/teststream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,23 @@ func TestTestStreamTwoUserTypeSequences(t *testing.T) {
integration.CheckFilters(t)
ptest.BuildAndRun(t, TestStreamTwoUserTypeSequences)
}

func TestTestStreamSimple(t *testing.T) {
integration.CheckFilters(t)
ptest.BuildAndRun(t, TestStreamSimple)
}

func TestTestStreamSimple_InfinityDefault(t *testing.T) {
integration.CheckFilters(t)
ptest.BuildAndRun(t, TestStreamSimple_InfinityDefault)
}

func TestTestStreamToGBK(t *testing.T) {
integration.CheckFilters(t)
ptest.BuildAndRun(t, TestStreamToGBK)
}

func TestTestStreamTimersEventTime(t *testing.T) {
integration.CheckFilters(t)
ptest.BuildAndRun(t, TestStreamTimersEventTime)
}
2 changes: 1 addition & 1 deletion sdks/go/test/integration/primitives/timers.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (fn *eventTimeFn) OnTimer(ctx context.Context, ts beam.EventTime, sp state.
}
}

// TimersEventTime takes in an impulse transform and produces a pipeline construction
// timersEventTimePipelineBuilder takes in an impulse transform and produces a pipeline construction
// function that validates EventTime timers.
//
// The impulse is provided outside to swap between a bounded impulse, and
Expand Down

0 comments on commit 8dcbf96

Please sign in to comment.