diff --git a/sdks/go/test/integration/expansions.go b/sdks/go/test/integration/expansions.go new file mode 100644 index 0000000000000..2c9c54a62658b --- /dev/null +++ b/sdks/go/test/integration/expansions.go @@ -0,0 +1,119 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package integration + +import ( + "fmt" + "strconv" + "time" + + "github.com/apache/beam/sdks/v2/go/test/integration/internal/jars" + "github.com/apache/beam/sdks/v2/go/test/integration/internal/ports" +) + +// ExpansionServices is a struct used for getting addresses and starting expansion services, based +// on the --expansion_jar and --expansion_addr flags in this package. The main reason to use this +// instead of accessing the flags directly is to let it handle jar startup and shutdown. +// +// Usage +// +// Create an ExpansionServices object in TestMain with NewExpansionServices. Then use GetAddr for +// every expansion service needed for the test. Call Shutdown on it before finishing TestMain (or +// simply defer a call to it). +// +// ExpansionServices is not concurrency safe, and so a single instance should not be used within +// multiple individual tests, due to the possibility of those tests being run concurrently. It is +// recommended to only use ExpansionServices in TestMain to avoid this. +// +// Example: +// var retCode int +// defer func() { os.Exit(retCode) }() // Defer os.Exit so it happens after other defers. +// services := integration.NewExpansionServices() +// defer func() { services.Shutdown() }() +// addr, err := services.GetAddr("example") +// if err != nil { +// retCode = 1 +// panic(err) +// } +// expansionAddr = addr // Save address to a package-level variable used by tests. +// retCode = ptest.MainRet(m) +type ExpansionServices struct { + addrs map[string]string + jars map[string]string + procs []jars.Process + // Callback for running jars, stored this way for testing purposes. + run func(time.Duration, string, ...string) (jars.Process, error) + waitTime time.Duration // Time to sleep after running jar. Tests can adjust this. +} + +// NewExpansionServices creates and initializes an ExpansionServices instance. +func NewExpansionServices() *ExpansionServices { + return &ExpansionServices{ + addrs: GetExpansionAddrs(), + jars: GetExpansionJars(), + procs: make([]jars.Process, 0), + run: jars.Run, + waitTime: 3 * time.Second, + } +} + +// GetAddr gets the address for the expansion service with the given label. The label corresponds to +// the labels used in the --expansion_jar and --expansion_addr flags. If an expansion service is +// provided as a jar, then that jar will be run to retrieve the address, and the jars are not +// guaranteed to be shut down unless Shutdown is called. +// +// Note: If this function starts a jar, it waits a few seconds for it to initialize. Do not use +// this function if the possibility of a few seconds of latency is not acceptable. +func (es *ExpansionServices) GetAddr(label string) (string, error) { + // Always default to existing address before running a jar. + if addr, ok := es.addrs[label]; ok { + return addr, nil + } + jar, ok := es.jars[label] + if !ok { + err := fmt.Errorf("no --expansion_jar or --expansion_addr flag provided with label \"%s\"", label) + return "", fmt.Errorf("expansion service labeled \"%s\" not found: %w", label, err) + } + + // Start jar on open port. + port, err := ports.GetOpenTCP() + if err != nil { + return "", fmt.Errorf("cannot get open port for expansion service labeled \"%s\": %w", label, err) + } + portStr := strconv.Itoa(port) + + // Run jar and cache its info. + proc, err := es.run(*ExpansionTimeout, jar, portStr) + if err != nil { + return "", fmt.Errorf("cannot run jar for expansion service labeled \"%s\": %w", label, err) + } + time.Sleep(es.waitTime) // Wait a bit for the jar to start. + es.procs = append(es.procs, proc) + addr := "localhost:" + portStr + es.addrs[label] = addr + return addr, nil +} + +// Shutdown shuts down any jars started by the ExpansionServices struct and should get called if it +// was used at all. +func (es *ExpansionServices) Shutdown() { + for _, p := range es.procs { + p.Kill() + } + es.jars = nil + es.addrs = nil + es.procs = nil +} diff --git a/sdks/go/test/integration/expansions_test.go b/sdks/go/test/integration/expansions_test.go new file mode 100644 index 0000000000000..99878d0623fdb --- /dev/null +++ b/sdks/go/test/integration/expansions_test.go @@ -0,0 +1,179 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package integration + +import ( + "fmt" + "testing" + "time" + + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/samza" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/spark" + "github.com/apache/beam/sdks/v2/go/test/integration/internal/jars" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" +) + +type testProcess struct { + killed bool + jar string +} + +func (p *testProcess) Kill() error { + p.killed = true + return nil +} + +func failRun(_ time.Duration, _ string, _ ...string) (jars.Process, error) { + return nil, fmt.Errorf("unexpectedly running a jar, failing") +} + +func succeedRun(_ time.Duration, jar string, _ ...string) (jars.Process, error) { + return &testProcess{jar: jar}, nil +} + +// TestExpansionServices_GetAddr_Addresses tests calling GetAddr on provided addresses. +func TestExpansionServices_GetAddr_Addresses(t *testing.T) { + addrsMap := map[string]string{ + "label1": "testAddr1", + "label2": "testAddr2", + "label3": "testAddr3", + } + jarsMap := map[string]string{ + "label2": "jarFilepath2", + } + es := &ExpansionServices{ + addrs: addrsMap, + jars: jarsMap, + procs: make([]jars.Process, 0), + run: failRun, + waitTime: 0, + } + + // Ensure we get the same map we put in, and that addresses take priority over jars if + // both are given for the same label. + for label, wantAddr := range addrsMap { + gotAddr, err := es.GetAddr(label) + if err != nil { + t.Errorf("unexpected error when getting address for \"%v\": %v", label, err) + continue + } + if gotAddr != wantAddr { + t.Errorf("incorrect address for \"%v\", want %v, got %v", label, wantAddr, gotAddr) + } + } + // Check that nonexistent labels fail. + if _, err := es.GetAddr("nonexistent_label"); err == nil { + t.Errorf("did not receive error when calling GetAddr with nonexistent label") + } +} + +// TestExpansionServices_GetAddr_Jars tests calling GetAddr on provided jars. +func TestExpansionServices_GetAddr_Jars(t *testing.T) { + addrsMap := map[string]string{} + jarsMap := map[string]string{ + "label1": "jarFilepath1", + "label2": "jarFilepath2", + "label3": "jarFilepath3", + } + es := &ExpansionServices{ + addrs: addrsMap, + jars: jarsMap, + procs: make([]jars.Process, 0), + run: succeedRun, + waitTime: 0, + } + + // Call GetAddr on each jar twice, checking that the addresses remain consistent. + gotMap := make(map[string]string) + for label := range jarsMap { + gotAddr, err := es.GetAddr(label) + if err != nil { + t.Errorf("unexpected error when getting address for \"%v\": %v", label, err) + continue + } + gotMap[label] = gotAddr + } + for label, gotAddr := range gotMap { + secondAddr, err := es.GetAddr(label) + if err != nil { + t.Errorf("unexpected error when getting address for \"%v\": %v", label, err) + continue + } + if secondAddr != gotAddr { + t.Errorf("getAddr returned different address when called twice for \"%v\", "+ + "attempt 1: %v, attempt 2: %v", label, gotAddr, secondAddr) + } + } + // Check that all jars were run. + gotJars := make([]string, 0) + for _, proc := range es.procs { + testProc := proc.(*testProcess) + gotJars = append(gotJars, testProc.jar) + } + wantJars := make([]string, 0) + for _, jar := range jarsMap { + wantJars = append(wantJars, jar) + } + lessFunc := func(a, b string) bool { return a < b } + if diff := cmp.Diff(wantJars, gotJars, cmpopts.SortSlices(lessFunc)); diff != "" { + t.Errorf("processes in ExpansionServices does not match jars that should be running: diff(-want,+got):\n%v", diff) + } +} + +// TestExpansionServices_Shutdown tests that a shutdown correctly kills all jars started by an +// ExpansionServices. +func TestExpansionServices_Shutdown(t *testing.T) { + addrsMap := map[string]string{} + jarsMap := map[string]string{ + "label1": "jarFilepath1", + "label2": "jarFilepath2", + "label3": "jarFilepath3", + } + es := &ExpansionServices{ + addrs: addrsMap, + jars: jarsMap, + procs: make([]jars.Process, 0), + run: succeedRun, + waitTime: 0, + } + // Call getAddr on each label to run jars. + for label := range addrsMap { + _, err := es.GetAddr(label) + if err != nil { + t.Errorf("unexpected error when getting address for \"%v\": %v", label, err) + continue + } + } + + // Shutdown and confirm that jars are killed and addresses can no longer be retrieved. + procs := es.procs + es.Shutdown() + for _, proc := range procs { + testProc := proc.(*testProcess) + if !testProc.killed { + t.Errorf("process for jar %v was not killed on Shutdown()", testProc.jar) + } + } + for label := range addrsMap { + _, err := es.GetAddr(label) + if err == nil { + t.Errorf("calling GetAddr after Shutdown did not return an error for \"%v\"", label) + } + } +} diff --git a/sdks/go/test/integration/flags.go b/sdks/go/test/integration/flags.go index d1b56b2a9ad53..6275578900ee1 100644 --- a/sdks/go/test/integration/flags.go +++ b/sdks/go/test/integration/flags.go @@ -15,7 +15,11 @@ package integration -import "flag" +import ( + "flag" + "fmt" + "strings" +) // The following flags are flags used in one or more integration tests, and that // may be used by scripts that execute "go test ./sdks/go/test/integration/...". @@ -53,4 +57,84 @@ var ( KafkaJarTimeout = flag.String("kafka_jar_timeout", "10m", "Sets an auto-shutdown timeout to the Kafka cluster. "+ "Requires the timeout command to be present in Path, unless the value is set to \"\".") + + // ExpansionJars contains elements in the form "label:jar" describing jar + // filepaths for expansion services to use in integration tests, and the + // corresponding labels. Once provided through this flag, those jars can + // be used in tests via the ExpansionServices struct. + ExpansionJars stringSlice + + // ExpansionAddrs contains elements in the form "label:address" describing + // endpoints for expansion services to use in integration tests, and the + // corresponding labels. Once provided through this flag, those addresses + // can be used in tests via the ExpansionServices struct. + ExpansionAddrs stringSlice + + // ExpansionTimeout attempts to apply an auto-shutdown timeout to any + // expansion services started by integration tests. + ExpansionTimeout = flag.Duration("expansion_timeout", 0, + "Sets an auto-shutdown timeout to any started expansion services. "+ + "Requires the timeout command to be present in Path, unless the value is set to 0.") ) + +func init() { + flag.Var(&ExpansionJars, "expansion_jar", + "Define jar locations for expansion services. Each entry consists of "+ + "two values, an arbitrary label and a jar filepath, separated by a "+ + "\":\", in the form \"label:jar\". Jars provided through this flag "+ + "can be started by tests.") + flag.Var(&ExpansionAddrs, "expansion_addr", + "Define addresses for expansion services. Each entry consists of "+ + "two values, an arbitrary label and an address, separated by a "+ + "\":\", in the form \"label:address\". Addresses provided through "+ + "this flag can be used as expansion addresses by tests.") +} + +// GetExpansionJars gets all the jars given to --expansion_jar as a map of label to jar location. +func GetExpansionJars() map[string]string { + ret := make(map[string]string) + for _, jar := range ExpansionJars { + splits := strings.SplitN(jar, ":", 2) + ret[splits[0]] = splits[1] + } + return ret +} + +// GetExpansionAddrs gets all the addresses given to --expansion_addr as a map of label to address. +func GetExpansionAddrs() map[string]string { + ret := make(map[string]string) + for _, addr := range ExpansionAddrs { + splits := strings.SplitN(addr, ":", 2) + ret[splits[0]] = splits[1] + } + return ret +} + +// stringSlice is a flag.Value implementation for string slices, that allows +// multiple strings to be assigned to one flag by specifying multiple instances +// of the flag. +// +// Example: +// var myFlags stringSlice +// flag.Var(&myFlags, "my_flag", "A list of flags") +// With the example above, the slice can be set to contain ["foo", "bar"]: +// cmd -my_flag foo -my_flag bar +type stringSlice []string + +// String implements the String method of flag.Value. This outputs the value +// of the flag as a string. +func (s *stringSlice) String() string { + return fmt.Sprintf("%v", *s) +} + +// Set implements the Set method of flag.Value. This stores a string input to +// the flag into a stringSlice representation. +func (s *stringSlice) Set(value string) error { + *s = append(*s, value) + return nil +} + +// Get returns the instance itself. +func (s stringSlice) Get() interface{} { + return s +} diff --git a/sdks/go/test/integration/internal/jars/jars.go b/sdks/go/test/integration/internal/jars/jars.go new file mode 100644 index 0000000000000..9ac70a2746152 --- /dev/null +++ b/sdks/go/test/integration/internal/jars/jars.go @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package jars contains functionality for running jars for integration tests. The main entry point +// for running a jar is the Run function. The Process interface is used to interact with the running +// jars, and most importantly for shutting down the jars once finished with them. +package jars + +import ( + "fmt" + "os/exec" + "time" +) + +type runCallback func(dur time.Duration, jar string, args ...string) (Process, error) + +var runner runCallback // Saves which behavior to use when Run is called. + +func init() { + runner = getRunner() +} + +// getRunner is used to determine the appropriate behavior for run during initialization time, +// based on the OS and installed binaries of the system. This is returned as a runCallback which +// can be called whenever Run is called. If an error prevents Run from being used at all (for +// example, Java is not installed), then the runCallback will return that error. +func getRunner() runCallback { + // First check if we can even run jars. + _, err := exec.LookPath("java") + if err != nil { + err := fmt.Errorf("cannot run jar: 'java' command not installed: %w", err) + return func(_ time.Duration, _ string, _ ...string) (Process, error) { + return nil, err + } + } + + // Defer to OS-specific logic for checking for presence of timeout command. + return getTimeoutRunner() +} + +// Run runs a jar given an optional duration, a path to the jar, and any desired arguments to the +// jar. It returns a Process object which can be used to shut down the jar once finished. +// +// The dur parameter is a duration for the timeout command which can be used to automatically kill +// the jar after a set duration, in order to avoid resource leakage. Timeout is described here: +// https://man7.org/linux/man-pages/man1/timeout.1.html. Durations will be translated from +// time.Duration to a string based on the number of minutes. If a duration is provided but the +// system is unable to use the timeout is unable to use the timeout command, this function will +// return an error. To indicate that a duration isn't needed, pass in 0. +func Run(dur time.Duration, jar string, args ...string) (Process, error) { + return runner(dur, jar, args...) +} diff --git a/sdks/go/test/integration/internal/jars/proc.go b/sdks/go/test/integration/internal/jars/proc.go new file mode 100644 index 0000000000000..fa12beeead9a8 --- /dev/null +++ b/sdks/go/test/integration/internal/jars/proc.go @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jars + +// Process is an interface to allow wrapping os.Process with alternate behavior +// depending on OS. +type Process interface { + Kill() error +} diff --git a/sdks/go/test/integration/internal/jars/proc_unix.go b/sdks/go/test/integration/internal/jars/proc_unix.go new file mode 100644 index 0000000000000..c9f2d9d05bea8 --- /dev/null +++ b/sdks/go/test/integration/internal/jars/proc_unix.go @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Match build constraints of imported package golang.org/x/sys/unix. +//go:build aix || darwin || dragonfly || freebsd || linux || netbsd || openbsd || solaris || zos +// +build aix darwin dragonfly freebsd linux netbsd openbsd solaris zos + +package jars + +import ( + "golang.org/x/sys/unix" + "os" +) + +// UnixProcess wraps os.Process and changes the Kill function to perform a more +// graceful shutdown, mainly for compatibility with the timeout command. +type UnixProcess struct { + proc *os.Process // The os.Process for the running jar. +} + +// Kill gracefully shuts down the process. It is recommended to use this +// instead of directly killing the process. +func (p *UnixProcess) Kill() error { + // Avoid using SIGKILL. If the jar is wrapped in the timeout command + // SIGKILL will kill the timeout and leave the jar running. + return p.proc.Signal(unix.SIGTERM) +} diff --git a/sdks/go/test/integration/internal/jars/run_nonunix.go b/sdks/go/test/integration/internal/jars/run_nonunix.go new file mode 100644 index 0000000000000..189f2ab27209e --- /dev/null +++ b/sdks/go/test/integration/internal/jars/run_nonunix.go @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Exclude build constraints of package golang.org/x/sys/unix. +//go:build !(aix || darwin || dragonfly || freebsd || linux || netbsd || openbsd || solaris || zos) +// +build !aix,!darwin,!dragonfly,!freebsd,!linux,!netbsd,!openbsd,!solaris,!zos + +package jars + +import ( + "fmt" + "runtime" + "time" +) + +// getTimeoutRunner is an OS-specific branch for determining what behavior to use for Run. This +// non-unix version does not handle timeout durations. +func getTimeoutRunner() runCallback { + // Wrap run with error handling for OS that does not support timeout duration. + return func(dur time.Duration, jar string, args ...string) (*Process, error) { + // Currently, we hard-fail here if a duration is provided but timeout is unsupported. If + // we ever decide to soft-fail instead, this is the code to change. + if dur != 0 { + return nil, fmt.Errorf("cannot run jar: duration parameter provided but timeouts are unsupported on os %s", runtime.GOOS) + } + return run(jar, args...) + } +} + +// run simply starts up a jar and returns the cmd.Process. +func run(jar string, args ...string) (Process, error) { + var cmdArr []string + cmdArr = append(cmdArr, "java", "-jar", jar) + cmdArr = append(cmdArr, args...) + + cmd := exec.Command(cmdArr[0], cmdArr[1:]...) + err := cmd.Start() + if err != nil { + return nil, err + } + return cmd.Process, nil +} diff --git a/sdks/go/test/integration/internal/jars/run_unix.go b/sdks/go/test/integration/internal/jars/run_unix.go new file mode 100644 index 0000000000000..2a401b888ec84 --- /dev/null +++ b/sdks/go/test/integration/internal/jars/run_unix.go @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Match build constraints of imported package golang.org/x/sys/unix. +//go:build aix || darwin || dragonfly || freebsd || linux || netbsd || openbsd || solaris || zos +// +build aix darwin dragonfly freebsd linux netbsd openbsd solaris zos + +package jars + +import ( + "fmt" + "os/exec" + "time" +) + +// getTimeoutRunner is an OS-specific branch for determining what behavior to use for Run. This +// Unix specific version handles timeout. +func getTimeoutRunner() runCallback { + _, err := exec.LookPath("timeout") + if err != nil { + // Wrap run with Unix-specific error handling for missing timeout command. + return func(dur time.Duration, jar string, args ...string) (Process, error) { + // Currently, we hard-fail here if a duration is provided but timeout is unsupported. If + // we ever decide to soft-fail instead, this is the code to change. + if dur != 0 { + return nil, fmt.Errorf("cannot run jar: duration parameter provided but 'timeout' command not installed: %w", err) + } + return run(dur, jar, args...) + } + } + + // Path for a supported timeout, just use the default run function. + return run +} + +// run starts up a jar, and wraps it in "timeout" only if a duration is provided. Processes are +// returned wrapped as Unix processes that provide graceful shutdown for unix specifically. +func run(dur time.Duration, jar string, args ...string) (Process, error) { + var cmdArr []string + if dur != 0 { + durStr := fmt.Sprintf("%.2fm", dur.Minutes()) + cmdArr = append(cmdArr, "timeout", durStr) + } + cmdArr = append(cmdArr, "java", "-jar", jar) + cmdArr = append(cmdArr, args...) + + cmd := exec.Command(cmdArr[0], cmdArr[1:]...) + err := cmd.Start() + if err != nil { + return nil, err + } + return &UnixProcess{proc: cmd.Process}, nil +} diff --git a/sdks/go/test/integration/internal/ports/ports.go b/sdks/go/test/integration/internal/ports/ports.go new file mode 100644 index 0000000000000..895824edf7763 --- /dev/null +++ b/sdks/go/test/integration/internal/ports/ports.go @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package ports contains utilities for handling ports needed for integration +// tests. +package ports + +import "net" + +// GetOpenTCP gets an open TCP port and returns it, or an error on failure. +func GetOpenTCP() (int, error) { + listener, err := net.Listen("tcp", ":0") + if err != nil { + return 0, err + } + defer listener.Close() + return listener.Addr().(*net.TCPAddr).Port, nil +}