Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#32498][prism] Add split / progress back off + catch-up. #32526

Merged
merged 2 commits into from
Sep 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,11 @@ func Test_preprocessor_preProcessGraph(t *testing.T) {
}})

gotStages := pre.preProcessGraph(test.input, nil)
if diff := cmp.Diff(test.wantStages, gotStages, cmp.AllowUnexported(stage{}, link{}), cmpopts.EquateEmpty()); diff != "" {
if diff := cmp.Diff(test.wantStages, gotStages,
cmp.AllowUnexported(stage{}, link{}),
cmpopts.EquateEmpty(),
cmpopts.IgnoreFields(stage{}, "baseProgTick"),
); diff != "" {
t.Errorf("preProcessGraph(%q) stages diff (-want,+got)\n%v", test.name, diff)
}

Expand Down
46 changes: 45 additions & 1 deletion sdks/go/pkg/beam/runners/prism/internal/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"sync/atomic"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
Expand Down Expand Up @@ -76,9 +77,32 @@ type stage struct {

SinkToPCollection map[string]string
OutputsToCoders map[string]engine.PColInfo

// Stage specific progress and splitting interval.
baseProgTick atomic.Value // time.Duration
}

// The minimum and maximum durations between each ProgressBundleRequest and split evaluation.
const (
minimumProgTick = 100 * time.Millisecond
maximumProgTick = 30 * time.Second
)

func clampTick(dur time.Duration) time.Duration {
switch {
case dur < minimumProgTick:
return minimumProgTick
case dur > maximumProgTick:
return maximumProgTick
default:
return dur
}
}

func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, comps *pipepb.Components, em *engine.ElementManager, rb engine.RunBundle) (err error) {
if s.baseProgTick.Load() == nil {
s.baseProgTick.Store(minimumProgTick)
}
defer func() {
// Convert execution panics to errors to fail the bundle.
if e := recover(); e != nil {
Expand Down Expand Up @@ -142,7 +166,9 @@ func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, c
previousTotalCount := int64(-2) // Total count of all pcollection elements.

unsplit := true
progTick := time.NewTicker(100 * time.Millisecond)
baseTick := s.baseProgTick.Load().(time.Duration)
ticked := false
progTick := time.NewTicker(baseTick)
defer progTick.Stop()
var dataFinished, bundleFinished bool
// If we have no data outputs, we still need to have progress & splits
Expand Down Expand Up @@ -170,6 +196,7 @@ progress:
break progress // exit progress loop on close.
}
case <-progTick.C:
ticked = true
resp, err := b.Progress(ctx, wk)
if err != nil {
slog.Debug("SDK Error from progress, aborting progress", "bundle", rb, "error", err.Error())
Expand All @@ -196,6 +223,7 @@ progress:
unsplit = false
continue progress
}

// TODO sort out rescheduling primary Roots on bundle failure.
var residuals []engine.Residual
for _, rr := range sr.GetResidualRoots() {
Expand All @@ -220,12 +248,28 @@ progress:
Data: residuals,
})
}

// Any split means we're processing slower than desired, but splitting should increase
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non blocking question. How does split translate to processing slower than desired?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question.

That can be answered by looking at the issue being fixed, and seeing the behavior.

In this case, the file was being opened repeatedly and endlessly*.

By splitting too quickly, we end up doing more work, serializing and deserializing the elements in the set of elements to be processed by the SDK. So we weren't letting the SDK actually get any work done

This meant that because we were slow to open the file, we opened the file, again and again in different bundles.

*Eventually there would be nothing left to split, and lines would be emitted, but it would have been very wasteful.

// throughput. Back off for this and other bundles for this stage
baseTime := s.baseProgTick.Load().(time.Duration)
newTime := clampTick(baseTime * 4)
if s.baseProgTick.CompareAndSwap(baseTime, newTime) {
progTick.Reset(newTime)
} else {
progTick.Reset(s.baseProgTick.Load().(time.Duration))
}
} else {
previousIndex = index["index"]
previousTotalCount = index["totalCount"]
}
}
}
// If we never received any progress ticks, we may have too long a time, shrink it for new runs instead.
if !ticked {
newTick := clampTick(baseTick - minimumProgTick)
// If it's otherwise unchanged, apply the new duration.
s.baseProgTick.CompareAndSwap(baseTick, newTick)
}
// Tentative Data is ready, commit it to the main datastore.
slog.Debug("Execute: committing data", "bundle", rb, slog.Any("outputsWithData", maps.Keys(b.OutputData.Raw)), slog.Any("outputs", maps.Keys(s.OutputsToCoders)))

Expand Down
Loading