Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-13647] Use role for Go worker binary. #16729

Merged
merged 4 commits into from
Feb 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions model/pipeline/src/main/proto/beam_runner_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1414,6 +1414,12 @@ message StandardArtifacts {
// A URN for pip-requirements-file role.
// payload: None
PIP_REQUIREMENTS_FILE = 1 [(beam_urn) = "beam:artifact:role:pip_requirements_file:v1"];

// A URN for the Go worker binary role.
// This represents the executable for a Go SDK environment.
// A Go environment may have one such artifact with this role.
// payload: None
GO_WORKER_BINARY = 2 [(beam_urn) = "beam:artifact:role:go_worker_binary:v1"];
}
}

Expand Down
16 changes: 14 additions & 2 deletions sdks/go/container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func main() {
log.Fatalf("Failed to retrieve staged files: %v", err)
}

// TODO(BEAM-13647): Remove legacy hack once aged out.
const worker = "worker"
name := worker

Expand All @@ -110,12 +111,23 @@ func main() {
default:
found := false
for _, a := range artifacts {
n, _ := artifact.MustExtractFilePayload(a)
if n == worker {
if a.GetRoleUrn() == artifact.URNGoWorkerBinaryRole {
name, _ = artifact.MustExtractFilePayload(a)
found = true
break
}
}
// TODO(BEAM-13647): Remove legacy hack once aged out.
if !found {
for _, a := range artifacts {
n, _ := artifact.MustExtractFilePayload(a)
if n == worker {
found = true
log.Printf("Go worker binary found with legacy name '%v' found", worker)
break
}
}
}
if !found {
log.Fatalf("No artifact named '%v' found", worker)
}
Expand Down
5 changes: 3 additions & 2 deletions sdks/go/pkg/beam/artifact/materialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
const (
URNFileArtifact = "beam:artifact:type:file:v1"
URNUrlArtifact = "beam:artifact:type:url:v1"
URNGoWorkerBinaryRole = "beam:artifact:role:go_worker_binary:v1"
URNPipRequirementsFile = "beam:artifact:role:pip_requirements_file:v1"
URNStagingTo = "beam:artifact:role:staging_to:v1"
NoArtifactsStaged = "__no_artifacts_staged__"
Expand Down Expand Up @@ -172,11 +173,11 @@ func extractStagingToPath(artifact *pipepb.ArtifactInformation) (string, error)

func MustExtractFilePayload(artifact *pipepb.ArtifactInformation) (string, string) {
if artifact.TypeUrn != URNFileArtifact {
log.Fatalf("Unsupported artifact type #{artifact.TypeUrn}")
log.Fatalf("Unsupported artifact type %v", artifact.TypeUrn)
}
ty := pipepb.ArtifactFilePayload{}
if err := proto.Unmarshal(artifact.TypePayload, &ty); err != nil {
log.Fatalf("failed to parse artifact file payload: #{err}")
log.Fatalf("failed to parse artifact file payload: %v", err)
}
return ty.Path, ty.Sha256
}
Expand Down
48 changes: 38 additions & 10 deletions sdks/go/pkg/beam/core/runtime/graphx/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,17 @@ const (

URNRequiresSplittableDoFn = "beam:requirement:pardo:splittable_dofn:v1"

URNArtifactGoWorker = "beam:artifact:type:go_worker_binary:v1"
URNArtifactStagingTo = "beam:artifact:role:staging_to:v1"
// Deprecated: Determine worker binary based on GoWorkerBinary Role instead.
URNArtifactGoWorker = "beam:artifact:type:go_worker_binary:v1"

URNArtifactFileType = "beam:artifact:type:file:v1"
URNArtifactURLType = "beam:artifact:type:url:v1"
URNArtifactGoWorkerRole = "beam:artifact:role:go_worker_binary:v1"

// Environment Urns.
URNEnvProcess = "beam:env:process:v1"
URNEnvExternal = "beam:env:external:v1"
URNEnvDocker = "beam:env:docker:v1"
)

func goCapabilities() []string {
Expand All @@ -85,14 +94,14 @@ func goCapabilities() []string {
func CreateEnvironment(ctx context.Context, urn string, extractEnvironmentConfig func(context.Context) string) (*pipepb.Environment, error) {
var serializedPayload []byte
switch urn {
case "beam:env:process:v1":
case URNEnvProcess:
// TODO Support process based SDK Harness.
return nil, errors.Errorf("unsupported environment %v", urn)
case "beam:env:external:v1":
case URNEnvExternal:
config := extractEnvironmentConfig(ctx)
payload := &pipepb.ExternalPayload{Endpoint: &pipepb.ApiServiceDescriptor{Url: config}}
serializedPayload = protox.MustEncode(payload)
case "beam:env:docker:v1":
case URNEnvDocker:
fallthrough
default:
config := extractEnvironmentConfig(ctx)
Expand All @@ -105,11 +114,9 @@ func CreateEnvironment(ctx context.Context, urn string, extractEnvironmentConfig
Capabilities: goCapabilities(),
Dependencies: []*pipepb.ArtifactInformation{
{
TypeUrn: URNArtifactGoWorker,
RoleUrn: URNArtifactStagingTo,
RolePayload: protox.MustEncode(&pipepb.ArtifactStagingToRolePayload{
StagedName: "worker",
}),
TypeUrn: URNArtifactFileType,
TypePayload: protox.MustEncode(&pipepb.ArtifactFilePayload{}),
RoleUrn: URNArtifactGoWorkerRole,
},
},
}, nil
Expand Down Expand Up @@ -981,6 +988,7 @@ func boolToBounded(bounded bool) pipepb.IsBounded_Enum {
return pipepb.IsBounded_UNBOUNDED
}

// defaultEnvId is the environment ID used for Go Pipeline Environments.
const defaultEnvId = "go"

func (m *marshaller) addDefaultEnv() string {
Expand Down Expand Up @@ -1259,3 +1267,23 @@ func nodeID(n *graph.Node) string {
func scopeID(s *graph.Scope) string {
return fmt.Sprintf("s%v", s.ID())
}

// UpdateDefaultEnvWorkerType is so runners can update the pipeline's default environment
// with the correct artifact type and payload for the Go worker binary.
func UpdateDefaultEnvWorkerType(typeUrn string, pyld []byte, p *pipepb.Pipeline) error {
// Get the Go environment out.
envs := p.GetComponents().GetEnvironments()
env, ok := envs[defaultEnvId]
if !ok {
return errors.Errorf("unable to find default Go environment with ID %q", defaultEnvId)
}
for _, dep := range env.GetDependencies() {
if dep.RoleUrn != URNArtifactGoWorkerRole {
continue
}
dep.TypeUrn = typeUrn
dep.TypePayload = pyld
return nil
}
return errors.Errorf("unable to find dependency with %q role in environment with ID %q,", URNArtifactGoWorkerRole, defaultEnvId)
}
165 changes: 165 additions & 0 deletions sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@
package graphx_test

import (
"context"
"reflect"
"testing"

"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/testing/protocmp"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/protox"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -206,3 +210,164 @@ func (fn *splitPickFn) CreateTracker(_ int) *testRT { return &testRT{}
func (fn *splitPickFn) ProcessElement(_ *testRT, a int, small, big func(int)) {
pickFn(a, small, big)
}

func TestCreateEnvironment(t *testing.T) {
t.Run("process", func(t *testing.T) {
const wantEnv = "process"
urn := graphx.URNEnvProcess
got, err := graphx.CreateEnvironment(context.Background(), urn, func(_ context.Context) string { return wantEnv })
if err == nil {
t.Errorf("CreateEnvironment(%v) = %v error, want error since it's unsupported", urn, err)
}
want := (*pipepb.Environment)(nil)
if !proto.Equal(got, want) {
t.Errorf("CreateEnvironment(%v) = %v, want %v since it's unsupported", urn, got, want)
}
})
tests := []struct {
name string
urn string
payload func(name string) []byte
}{
{
name: "external",
urn: graphx.URNEnvExternal,
payload: func(name string) []byte {
return protox.MustEncode(&pipepb.ExternalPayload{
Endpoint: &pipepb.ApiServiceDescriptor{
Url: name,
},
})
},
}, {
name: "docker",
urn: graphx.URNEnvDocker,
payload: func(name string) []byte {
return protox.MustEncode(&pipepb.DockerPayload{
ContainerImage: name,
})
},
},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
got, err := graphx.CreateEnvironment(context.Background(), test.urn, func(_ context.Context) string { return test.name })
if err != nil {
t.Errorf("CreateEnvironment(%v) = %v error, want nil", test.urn, err)
}
want := &pipepb.Environment{
Urn: test.urn,
Payload: test.payload(test.name),
Dependencies: []*pipepb.ArtifactInformation{
{
TypeUrn: graphx.URNArtifactFileType,
TypePayload: protox.MustEncode(&pipepb.ArtifactFilePayload{}),
RoleUrn: graphx.URNArtifactGoWorkerRole,
},
},
}
opts := []cmp.Option{
protocmp.Transform(),
// Ignore the capabilities field, since we can't access that method here.
protocmp.IgnoreFields(&pipepb.Environment{}, protoreflect.Name("capabilities")),
}
if d := cmp.Diff(want, got, opts...); d != "" {
t.Errorf("CreateEnvironment(%v) diff (-want, +got):\n%v", test.urn, d)
}
})
}
}

func TestUpdateDefaultEnvWorkerType(t *testing.T) {
t.Run("noEnvs", func(t *testing.T) {
if err := graphx.UpdateDefaultEnvWorkerType("unused", nil, &pipepb.Pipeline{
Components: &pipepb.Components{},
}); err == nil {
t.Error("UpdateDefaultEnvWorkerType(<emptyEnv>) no error, want err")
}
})
t.Run("noGoEnvs", func(t *testing.T) {
if err := graphx.UpdateDefaultEnvWorkerType("unused", nil, &pipepb.Pipeline{
Components: &pipepb.Components{
Environments: map[string]*pipepb.Environment{
"java": {Urn: "java"},
"python": {Urn: "python"},
"typescript": {Urn: "typescript"},
},
},
}); err == nil {
t.Error("UpdateDefaultEnvWorkerType(<noGoEnvs>) no error, want err")
}
})
t.Run("badGoEnv", func(t *testing.T) {
if err := graphx.UpdateDefaultEnvWorkerType("unused", nil, &pipepb.Pipeline{
Components: &pipepb.Components{
Environments: map[string]*pipepb.Environment{
"java": {Urn: "java"},
"python": {Urn: "python"},
"typescript": {Urn: "typescript"},
"go": {
Urn: "test",
Payload: []byte("test"),
Dependencies: []*pipepb.ArtifactInformation{
{
RoleUrn: "unset",
},
},
},
},
},
}); err == nil {
t.Error("UpdateDefaultEnvWorkerType(<badGoEnv>) no error, want err")
}
})
t.Run("goEnv", func(t *testing.T) {
wantUrn := graphx.URNArtifactFileType
wantPyld := protox.MustEncode(&pipepb.ArtifactFilePayload{
Path: "good",
})
p := &pipepb.Pipeline{
Components: &pipepb.Components{
Environments: map[string]*pipepb.Environment{
"java": {Urn: "java"},
"python": {Urn: "python"},
"typescript": {Urn: "typescript"},
"go": {
Urn: "test",
Payload: []byte("test"),
Dependencies: []*pipepb.ArtifactInformation{
{
TypeUrn: "to be removed",
TypePayload: nil,
RoleUrn: graphx.URNArtifactGoWorkerRole,
},
},
},
},
},
}
if err := graphx.UpdateDefaultEnvWorkerType(wantUrn, wantPyld, p); err != nil {
t.Errorf("UpdateDefaultEnvWorkerType(<goEnv>) = %v, want nil", err)
}
got := p.GetComponents().GetEnvironments()["go"]
want := &pipepb.Environment{
Urn: "test",
Payload: []byte("test"),
Dependencies: []*pipepb.ArtifactInformation{
{
TypeUrn: wantUrn,
TypePayload: wantPyld,
RoleUrn: graphx.URNArtifactGoWorkerRole,
},
},
}
opts := []cmp.Option{
protocmp.Transform(),
}
if d := cmp.Diff(want, got, opts...); d != "" {
t.Errorf("UpdateDefaultEnvWorkerType(<goEnv>) diff (-want, +got):\n%v", d)
}
})

}
16 changes: 13 additions & 3 deletions sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/protox"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
Expand Down Expand Up @@ -59,16 +60,25 @@ func Execute(ctx context.Context, raw *pipepb.Pipeline, opts *JobOptions, worker
}

log.Infof(ctx, "Staging worker binary: %v", bin)

if err := StageFile(ctx, opts.Project, workerURL, bin); err != nil {
hash, err := stageFile(ctx, opts.Project, workerURL, bin)
if err != nil {
return presult, err
}
log.Infof(ctx, "Staged worker binary: %v", workerURL)

if err := graphx.UpdateDefaultEnvWorkerType(
graphx.URNArtifactURLType,
protox.MustEncode(&pipepb.ArtifactUrlPayload{
Url: workerURL,
Sha256: hash,
}), raw); err != nil {
return presult, err
}

if opts.WorkerJar != "" {
log.Infof(ctx, "Staging Dataflow worker jar: %v", opts.WorkerJar)

if err := StageFile(ctx, opts.Project, jarURL, opts.WorkerJar); err != nil {
if _, err := stageFile(ctx, opts.Project, jarURL, opts.WorkerJar); err != nil {
return presult, err
}
log.Infof(ctx, "Staged worker jar: %v", jarURL)
Expand Down
Loading