Skip to content

Commit

Permalink
[BEAM-13857] Add K:V flags for expansion service jars and addresses t…
Browse files Browse the repository at this point in the history
…o Go ITs. (#16908)

Adds functionality for running jars to the Go integration test framework, and uses this functionality to implement handling of K:V flags for providing expansion service jars and addresses to the test framework. This means that tests can simply get the address of an expansion service with the appropriate label, and this feature will handle running a jar if necessary, or just using the passed in endpoint otherwise.
  • Loading branch information
youngoli committed Mar 3, 2022
1 parent 117123c commit 2aa4da0
Show file tree
Hide file tree
Showing 9 changed files with 657 additions and 1 deletion.
119 changes: 119 additions & 0 deletions sdks/go/test/integration/expansions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package integration

import (
"fmt"
"strconv"
"time"

"github.com/apache/beam/sdks/v2/go/test/integration/internal/jars"
"github.com/apache/beam/sdks/v2/go/test/integration/internal/ports"
)

// ExpansionServices is a struct used for getting addresses and starting expansion services, based
// on the --expansion_jar and --expansion_addr flags in this package. The main reason to use this
// instead of accessing the flags directly is to let it handle jar startup and shutdown.
//
// Usage
//
// Create an ExpansionServices object in TestMain with NewExpansionServices. Then use GetAddr for
// every expansion service needed for the test. Call Shutdown on it before finishing TestMain (or
// simply defer a call to it).
//
// ExpansionServices is not concurrency safe, and so a single instance should not be used within
// multiple individual tests, due to the possibility of those tests being run concurrently. It is
// recommended to only use ExpansionServices in TestMain to avoid this.
//
// Example:
// var retCode int
// defer func() { os.Exit(retCode) }() // Defer os.Exit so it happens after other defers.
// services := integration.NewExpansionServices()
// defer func() { services.Shutdown() }()
// addr, err := services.GetAddr("example")
// if err != nil {
// retCode = 1
// panic(err)
// }
// expansionAddr = addr // Save address to a package-level variable used by tests.
// retCode = ptest.MainRet(m)
type ExpansionServices struct {
addrs map[string]string
jars map[string]string
procs []jars.Process
// Callback for running jars, stored this way for testing purposes.
run func(time.Duration, string, ...string) (jars.Process, error)
waitTime time.Duration // Time to sleep after running jar. Tests can adjust this.
}

// NewExpansionServices creates and initializes an ExpansionServices instance.
func NewExpansionServices() *ExpansionServices {
return &ExpansionServices{
addrs: GetExpansionAddrs(),
jars: GetExpansionJars(),
procs: make([]jars.Process, 0),
run: jars.Run,
waitTime: 3 * time.Second,
}
}

// GetAddr gets the address for the expansion service with the given label. The label corresponds to
// the labels used in the --expansion_jar and --expansion_addr flags. If an expansion service is
// provided as a jar, then that jar will be run to retrieve the address, and the jars are not
// guaranteed to be shut down unless Shutdown is called.
//
// Note: If this function starts a jar, it waits a few seconds for it to initialize. Do not use
// this function if the possibility of a few seconds of latency is not acceptable.
func (es *ExpansionServices) GetAddr(label string) (string, error) {
// Always default to existing address before running a jar.
if addr, ok := es.addrs[label]; ok {
return addr, nil
}
jar, ok := es.jars[label]
if !ok {
err := fmt.Errorf("no --expansion_jar or --expansion_addr flag provided with label \"%s\"", label)
return "", fmt.Errorf("expansion service labeled \"%s\" not found: %w", label, err)
}

// Start jar on open port.
port, err := ports.GetOpenTCP()
if err != nil {
return "", fmt.Errorf("cannot get open port for expansion service labeled \"%s\": %w", label, err)
}
portStr := strconv.Itoa(port)

// Run jar and cache its info.
proc, err := es.run(*ExpansionTimeout, jar, portStr)
if err != nil {
return "", fmt.Errorf("cannot run jar for expansion service labeled \"%s\": %w", label, err)
}
time.Sleep(es.waitTime) // Wait a bit for the jar to start.
es.procs = append(es.procs, proc)
addr := "localhost:" + portStr
es.addrs[label] = addr
return addr, nil
}

// Shutdown shuts down any jars started by the ExpansionServices struct and should get called if it
// was used at all.
func (es *ExpansionServices) Shutdown() {
for _, p := range es.procs {
p.Kill()
}
es.jars = nil
es.addrs = nil
es.procs = nil
}
179 changes: 179 additions & 0 deletions sdks/go/test/integration/expansions_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package integration

import (
"fmt"
"testing"
"time"

_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/samza"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/spark"
"github.com/apache/beam/sdks/v2/go/test/integration/internal/jars"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
)

type testProcess struct {
killed bool
jar string
}

func (p *testProcess) Kill() error {
p.killed = true
return nil
}

func failRun(_ time.Duration, _ string, _ ...string) (jars.Process, error) {
return nil, fmt.Errorf("unexpectedly running a jar, failing")
}

func succeedRun(_ time.Duration, jar string, _ ...string) (jars.Process, error) {
return &testProcess{jar: jar}, nil
}

// TestExpansionServices_GetAddr_Addresses tests calling GetAddr on provided addresses.
func TestExpansionServices_GetAddr_Addresses(t *testing.T) {
addrsMap := map[string]string{
"label1": "testAddr1",
"label2": "testAddr2",
"label3": "testAddr3",
}
jarsMap := map[string]string{
"label2": "jarFilepath2",
}
es := &ExpansionServices{
addrs: addrsMap,
jars: jarsMap,
procs: make([]jars.Process, 0),
run: failRun,
waitTime: 0,
}

// Ensure we get the same map we put in, and that addresses take priority over jars if
// both are given for the same label.
for label, wantAddr := range addrsMap {
gotAddr, err := es.GetAddr(label)
if err != nil {
t.Errorf("unexpected error when getting address for \"%v\": %v", label, err)
continue
}
if gotAddr != wantAddr {
t.Errorf("incorrect address for \"%v\", want %v, got %v", label, wantAddr, gotAddr)
}
}
// Check that nonexistent labels fail.
if _, err := es.GetAddr("nonexistent_label"); err == nil {
t.Errorf("did not receive error when calling GetAddr with nonexistent label")
}
}

// TestExpansionServices_GetAddr_Jars tests calling GetAddr on provided jars.
func TestExpansionServices_GetAddr_Jars(t *testing.T) {
addrsMap := map[string]string{}
jarsMap := map[string]string{
"label1": "jarFilepath1",
"label2": "jarFilepath2",
"label3": "jarFilepath3",
}
es := &ExpansionServices{
addrs: addrsMap,
jars: jarsMap,
procs: make([]jars.Process, 0),
run: succeedRun,
waitTime: 0,
}

// Call GetAddr on each jar twice, checking that the addresses remain consistent.
gotMap := make(map[string]string)
for label := range jarsMap {
gotAddr, err := es.GetAddr(label)
if err != nil {
t.Errorf("unexpected error when getting address for \"%v\": %v", label, err)
continue
}
gotMap[label] = gotAddr
}
for label, gotAddr := range gotMap {
secondAddr, err := es.GetAddr(label)
if err != nil {
t.Errorf("unexpected error when getting address for \"%v\": %v", label, err)
continue
}
if secondAddr != gotAddr {
t.Errorf("getAddr returned different address when called twice for \"%v\", "+
"attempt 1: %v, attempt 2: %v", label, gotAddr, secondAddr)
}
}
// Check that all jars were run.
gotJars := make([]string, 0)
for _, proc := range es.procs {
testProc := proc.(*testProcess)
gotJars = append(gotJars, testProc.jar)
}
wantJars := make([]string, 0)
for _, jar := range jarsMap {
wantJars = append(wantJars, jar)
}
lessFunc := func(a, b string) bool { return a < b }
if diff := cmp.Diff(wantJars, gotJars, cmpopts.SortSlices(lessFunc)); diff != "" {
t.Errorf("processes in ExpansionServices does not match jars that should be running: diff(-want,+got):\n%v", diff)
}
}

// TestExpansionServices_Shutdown tests that a shutdown correctly kills all jars started by an
// ExpansionServices.
func TestExpansionServices_Shutdown(t *testing.T) {
addrsMap := map[string]string{}
jarsMap := map[string]string{
"label1": "jarFilepath1",
"label2": "jarFilepath2",
"label3": "jarFilepath3",
}
es := &ExpansionServices{
addrs: addrsMap,
jars: jarsMap,
procs: make([]jars.Process, 0),
run: succeedRun,
waitTime: 0,
}
// Call getAddr on each label to run jars.
for label := range addrsMap {
_, err := es.GetAddr(label)
if err != nil {
t.Errorf("unexpected error when getting address for \"%v\": %v", label, err)
continue
}
}

// Shutdown and confirm that jars are killed and addresses can no longer be retrieved.
procs := es.procs
es.Shutdown()
for _, proc := range procs {
testProc := proc.(*testProcess)
if !testProc.killed {
t.Errorf("process for jar %v was not killed on Shutdown()", testProc.jar)
}
}
for label := range addrsMap {
_, err := es.GetAddr(label)
if err == nil {
t.Errorf("calling GetAddr after Shutdown did not return an error for \"%v\"", label)
}
}
}
86 changes: 85 additions & 1 deletion sdks/go/test/integration/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@

package integration

import "flag"
import (
"flag"
"fmt"
"strings"
)

// The following flags are flags used in one or more integration tests, and that
// may be used by scripts that execute "go test ./sdks/go/test/integration/...".
Expand Down Expand Up @@ -53,4 +57,84 @@ var (
KafkaJarTimeout = flag.String("kafka_jar_timeout", "10m",
"Sets an auto-shutdown timeout to the Kafka cluster. "+
"Requires the timeout command to be present in Path, unless the value is set to \"\".")

// ExpansionJars contains elements in the form "label:jar" describing jar
// filepaths for expansion services to use in integration tests, and the
// corresponding labels. Once provided through this flag, those jars can
// be used in tests via the ExpansionServices struct.
ExpansionJars stringSlice

// ExpansionAddrs contains elements in the form "label:address" describing
// endpoints for expansion services to use in integration tests, and the
// corresponding labels. Once provided through this flag, those addresses
// can be used in tests via the ExpansionServices struct.
ExpansionAddrs stringSlice

// ExpansionTimeout attempts to apply an auto-shutdown timeout to any
// expansion services started by integration tests.
ExpansionTimeout = flag.Duration("expansion_timeout", 0,
"Sets an auto-shutdown timeout to any started expansion services. "+
"Requires the timeout command to be present in Path, unless the value is set to 0.")
)

func init() {
flag.Var(&ExpansionJars, "expansion_jar",
"Define jar locations for expansion services. Each entry consists of "+
"two values, an arbitrary label and a jar filepath, separated by a "+
"\":\", in the form \"label:jar\". Jars provided through this flag "+
"can be started by tests.")
flag.Var(&ExpansionAddrs, "expansion_addr",
"Define addresses for expansion services. Each entry consists of "+
"two values, an arbitrary label and an address, separated by a "+
"\":\", in the form \"label:address\". Addresses provided through "+
"this flag can be used as expansion addresses by tests.")
}

// GetExpansionJars gets all the jars given to --expansion_jar as a map of label to jar location.
func GetExpansionJars() map[string]string {
ret := make(map[string]string)
for _, jar := range ExpansionJars {
splits := strings.SplitN(jar, ":", 2)
ret[splits[0]] = splits[1]
}
return ret
}

// GetExpansionAddrs gets all the addresses given to --expansion_addr as a map of label to address.
func GetExpansionAddrs() map[string]string {
ret := make(map[string]string)
for _, addr := range ExpansionAddrs {
splits := strings.SplitN(addr, ":", 2)
ret[splits[0]] = splits[1]
}
return ret
}

// stringSlice is a flag.Value implementation for string slices, that allows
// multiple strings to be assigned to one flag by specifying multiple instances
// of the flag.
//
// Example:
// var myFlags stringSlice
// flag.Var(&myFlags, "my_flag", "A list of flags")
// With the example above, the slice can be set to contain ["foo", "bar"]:
// cmd -my_flag foo -my_flag bar
type stringSlice []string

// String implements the String method of flag.Value. This outputs the value
// of the flag as a string.
func (s *stringSlice) String() string {
return fmt.Sprintf("%v", *s)
}

// Set implements the Set method of flag.Value. This stores a string input to
// the flag into a stringSlice representation.
func (s *stringSlice) Set(value string) error {
*s = append(*s, value)
return nil
}

// Get returns the instance itself.
func (s stringSlice) Get() interface{} {
return s
}
Loading

0 comments on commit 2aa4da0

Please sign in to comment.