diff --git a/client/matching/loadbalancer.go b/client/matching/loadbalancer.go
index e40a574d187..f739472507c 100644
--- a/client/matching/loadbalancer.go
+++ b/client/matching/loadbalancer.go
@@ -25,15 +25,15 @@
package matching
import (
- "fmt"
"math/rand"
- "strings"
enumspb "go.temporal.io/api/enums/v1"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/namespace"
+ "go.temporal.io/server/common/tqname"
+ "go.temporal.io/server/common/util"
)
type (
@@ -72,10 +72,6 @@ type (
}
)
-const (
- taskQueuePartitionPrefix = "/_sys/"
-)
-
// NewLoadBalancer returns an instance of matching load balancer that
// can help distribute api calls across task queue partitions
func NewLoadBalancer(
@@ -114,30 +110,22 @@ func (lb *defaultLoadBalancer) pickPartition(
forwardedFrom string,
nPartitions dynamicconfig.IntPropertyFnWithTaskQueueInfoFilters,
) string {
-
if forwardedFrom != "" || taskQueue.GetKind() == enumspb.TASK_QUEUE_KIND_STICKY {
return taskQueue.GetName()
}
- if strings.HasPrefix(taskQueue.GetName(), taskQueuePartitionPrefix) {
- // this should never happen when forwardedFrom is empty
- return taskQueue.GetName()
- }
+ tqName, err := tqname.FromBaseName(taskQueue.GetName())
- namespace, err := lb.namespaceIDToName(namespaceID)
+ // this should never happen when forwardedFrom is empty
if err != nil {
return taskQueue.GetName()
}
- n := nPartitions(namespace.String(), taskQueue.GetName(), taskQueueType)
- if n <= 0 {
- return taskQueue.GetName()
- }
-
- p := rand.Intn(n)
- if p == 0 {
+ nsName, err := lb.namespaceIDToName(namespaceID)
+ if err != nil {
return taskQueue.GetName()
}
- return fmt.Sprintf("%v%v/%v", taskQueuePartitionPrefix, taskQueue.GetName(), p)
+ n := util.Min(1, nPartitions(nsName.String(), tqName.BaseNameString(), taskQueueType))
+ return tqName.WithPartition(rand.Intn(n)).FullName()
}
diff --git a/client/matching/metric_client.go b/client/matching/metric_client.go
index 3d36ebb4021..12ee978df50 100644
--- a/client/matching/metric_client.go
+++ b/client/matching/metric_client.go
@@ -26,20 +26,19 @@ package matching
import (
"context"
- "strings"
"time"
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"google.golang.org/grpc"
- "go.temporal.io/server/common/headers"
- serviceerrors "go.temporal.io/server/common/serviceerror"
-
"go.temporal.io/server/api/matchingservice/v1"
+ "go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
+ serviceerrors "go.temporal.io/server/common/serviceerror"
+ "go.temporal.io/server/common/tqname"
)
var _ matchingservice.MatchingServiceClient = (*metricClient)(nil)
@@ -179,12 +178,12 @@ func (c *metricClient) emitForwardedSourceStats(
return
}
- isChildPartition := strings.HasPrefix(taskQueue.GetName(), taskQueuePartitionPrefix)
switch {
case forwardedFrom != "":
metricsHandler.Counter(metrics.MatchingClientForwardedCounter.GetMetricName()).Record(1)
default:
- if isChildPartition {
+ _, err := tqname.FromBaseName(taskQueue.GetName())
+ if err != nil {
metricsHandler.Counter(metrics.MatchingClientInvalidTaskQueueName.GetMetricName()).Record(1)
}
}
diff --git a/common/tqname/tqname.go b/common/tqname/tqname.go
new file mode 100644
index 00000000000..476625c80a0
--- /dev/null
+++ b/common/tqname/tqname.go
@@ -0,0 +1,190 @@
+// The MIT License
+//
+// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
+//
+// Copyright (c) 2020 Uber Technologies, Inc.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+package tqname
+
+import (
+ "errors"
+ "fmt"
+ "strconv"
+ "strings"
+)
+
+const (
+ // mangledTaskQueuePrefix is the prefix for all mangled task queue names.
+ mangledTaskQueuePrefix = "/_sys/"
+
+ suffixDelimiter = "/"
+ versionSetDelimiter = ":"
+)
+
+type (
+ // Name encapsulates all the name mangling we do for task queues.
+ //
+ // Users work with "high-level task queues" and can give them arbitrary names (except for
+ // our prefix).
+ //
+ // Each high-level task queue corresponds to one or more "low-level task queues", each of
+ // which has a distinct task queue manager in memory in matching service, as well as a
+ // distinct identity in persistence.
+ //
+ // There are two pieces of identifying information for low-level task queues: a partition
+ // index, and a version set identifier. All low-level task queues have a partition index,
+ // which may be 0. Partition 0 is called the "root". The version set identifier is
+ // optional: task queues with it are called "versioned" and task queues without it are
+ // called "unversioned".
+ //
+ // All versioned low-level task queues use mangled names. All unversioned low-level task
+ // queues with non-zero partition also use mangled names.
+ //
+ // Mangled names look like this:
+ //
+ // /_sys//[:]
+ //
+ // The partition id is required, and the version set id is optional. If present, it is
+ // separated from the partition id by a colon. This scheme lets users use anything they
+ // like for a base name, except for strings starting with "/_sys/", without ambiguity.
+ //
+ // For backward compatibility, unversioned low-level task queues with partition 0 do not
+ // use mangled names, they use the bare base name.
+ Name struct {
+ baseName string // base name of the task queue as specified by user
+ partition int // partition of task queue
+ versionSet string // version set id
+ }
+)
+
+var (
+ ErrNoParent = errors.New("root task queue partition has no parent")
+ ErrInvalidDegree = errors.New("invalid task queue partition branching degree")
+)
+
+// Parse takes a mangled low-level task queue name and returns a Name. Returns an error if the
+// given name is not a valid mangled name.
+func Parse(name string) (Name, error) {
+ baseName := name
+ partition := 0
+ versionSet := ""
+
+ if strings.HasPrefix(name, mangledTaskQueuePrefix) {
+ suffixOff := strings.LastIndex(name, suffixDelimiter)
+ if suffixOff <= len(mangledTaskQueuePrefix) {
+ return Name{}, fmt.Errorf("invalid task queue name %q", name)
+ }
+ baseName = name[len(mangledTaskQueuePrefix):suffixOff]
+
+ suffix := name[suffixOff+1:]
+ if partitionOff := strings.LastIndex(suffix, versionSetDelimiter); partitionOff == 0 {
+ return Name{}, fmt.Errorf("invalid task queue name %q", name)
+ } else if partitionOff > 0 {
+ // pull out version set
+ versionSet, suffix = suffix[:partitionOff], suffix[partitionOff+1:]
+ }
+
+ var err error
+ partition, err = strconv.Atoi(suffix)
+ if err != nil || partition < 0 || partition == 0 && len(versionSet) == 0 {
+ return Name{}, fmt.Errorf("invalid task queue name %q", name)
+ }
+ }
+
+ return Name{
+ baseName: baseName,
+ partition: partition,
+ versionSet: versionSet,
+ }, nil
+}
+
+// FromBaseName takes a base name and returns a Name. Returns an error if name looks like a
+// mangled name.
+func FromBaseName(name string) (Name, error) {
+ if strings.HasPrefix(name, mangledTaskQueuePrefix) {
+ return Name{}, fmt.Errorf("base name %q must not have prefix %q", name, mangledTaskQueuePrefix)
+ }
+ return Name{baseName: name}, nil
+}
+
+// IsRoot returns true if this task queue is a root partition.
+func (tn Name) IsRoot() bool {
+ return tn.partition == 0
+}
+
+// WithPartition returns a new Name with the same base and version set but a different partition.
+func (tn Name) WithPartition(partition int) Name {
+ nn := tn
+ nn.partition = partition
+ return nn
+}
+
+// Root is shorthand for WithPartition(0).
+func (tn Name) Root() Name {
+ return tn.WithPartition(0)
+}
+
+// WithVersionSet returns a new Name with the same base and partition but a different version set.
+func (tn Name) WithVersionSet(versionSet string) Name {
+ nn := tn
+ nn.versionSet = versionSet
+ return nn
+}
+
+// BaseNameString returns the base name for a task queue. This should be used when looking up
+// settings in dynamic config, and pretty much nowhere else. To get the name of the root
+// partition, use tn.Root().FullName().
+func (tn Name) BaseNameString() string {
+ return tn.baseName
+}
+
+// Partition returns the partition number for a task queue.
+func (tn Name) Partition() int {
+ return tn.partition
+}
+
+// VersionSet returns the version set for a task queue.
+func (tn Name) VersionSet() string {
+ return tn.versionSet
+}
+
+// Parent returns a Name for the parent partition, using the given branching degree.
+func (tn Name) Parent(degree int) (Name, error) {
+ if tn.IsRoot() {
+ return Name{}, ErrNoParent
+ } else if degree < 1 {
+ return Name{}, ErrInvalidDegree
+ }
+ parent := (tn.partition+degree-1)/degree - 1
+ return tn.WithPartition(parent), nil
+}
+
+// FullName returns the mangled name of the task queue, to be used in RPCs and persistence.
+func (tn Name) FullName() string {
+ if len(tn.versionSet) == 0 {
+ if tn.partition == 0 {
+ return tn.baseName
+ }
+ return fmt.Sprintf("%s%s%s%d", mangledTaskQueuePrefix, tn.baseName, suffixDelimiter, tn.partition)
+ }
+ // versioned always use prefix
+ return fmt.Sprintf("%s%s%s%s%s%d", mangledTaskQueuePrefix, tn.baseName, suffixDelimiter, tn.versionSet, versionSetDelimiter, tn.partition)
+}
diff --git a/service/matching/taskqueue_test.go b/common/tqname/tqname_test.go
similarity index 55%
rename from service/matching/taskqueue_test.go
rename to common/tqname/tqname_test.go
index 075b74d1a30..d5069a83a63 100644
--- a/service/matching/taskqueue_test.go
+++ b/common/tqname/tqname_test.go
@@ -22,15 +22,95 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
-package matching
+package tqname
import (
"strconv"
"testing"
+ "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
+func TestParse(t *testing.T) {
+ a := assert.New(t)
+
+ n, err := Parse("my-basic-tq-name")
+ a.NoError(err)
+ a.Equal("my-basic-tq-name", n.BaseNameString())
+ a.Equal(0, n.Partition())
+ a.Equal("", n.VersionSet())
+ a.Equal("my-basic-tq-name", n.FullName())
+ a.True(n.IsRoot())
+ _, err = n.Parent(5)
+ a.Equal(ErrNoParent, err)
+
+ n, err = Parse("/_sys/my-basic-tq-name/23")
+ a.NoError(err)
+ a.Equal("my-basic-tq-name", n.BaseNameString())
+ a.Equal(23, n.Partition())
+ a.Equal("", n.VersionSet())
+ a.Equal("/_sys/my-basic-tq-name/23", n.FullName())
+ a.False(n.IsRoot())
+ a.Equal(4, mustParent(n, 5).Partition())
+ a.Equal(0, mustParent(n, 32).Partition())
+
+ n, err = Parse("/_sys/my-basic-tq-name/verxyz:23")
+ a.NoError(err)
+ a.Equal("my-basic-tq-name", n.BaseNameString())
+ a.Equal(23, n.Partition())
+ a.Equal("verxyz", n.VersionSet())
+ a.Equal("/_sys/my-basic-tq-name/verxyz:23", n.FullName())
+}
+
+func TestFromBaseName(t *testing.T) {
+ a := assert.New(t)
+
+ n, err := FromBaseName("my-basic-tq-name")
+ a.NoError(err)
+ a.Equal("my-basic-tq-name", n.BaseNameString())
+ a.Equal(0, n.Partition())
+ a.Equal("", n.VersionSet())
+
+ _, err = FromBaseName("/_sys/my-basic-tq-name/23")
+ a.Error(err)
+}
+
+func TestWithPartition(t *testing.T) {
+ a := assert.New(t)
+
+ n, err := FromBaseName("tq")
+ a.NoError(err)
+ n = n.WithPartition(23)
+ a.Equal("tq", n.BaseNameString())
+ a.Equal(23, n.Partition())
+ a.Equal("/_sys/tq/23", n.FullName())
+ a.False(n.IsRoot())
+}
+
+func TestWithVersionSet(t *testing.T) {
+ a := assert.New(t)
+
+ n, err := FromBaseName("tq")
+ a.NoError(err)
+ n = n.WithVersionSet("abc3")
+ a.Equal("tq", n.BaseNameString())
+ a.Equal(0, n.Partition())
+ a.Equal("/_sys/tq/abc3:0", n.FullName())
+}
+
+func TestWithPartitionAndVersionSet(t *testing.T) {
+ a := assert.New(t)
+
+ n, err := FromBaseName("tq")
+ a.NoError(err)
+ n = n.WithPartition(11).WithVersionSet("abc3")
+ a.Equal("tq", n.BaseNameString())
+ a.Equal(11, n.Partition())
+ a.Equal("abc3", n.VersionSet())
+ a.Equal("/_sys/tq/abc3:11", n.FullName())
+}
+
func TestValidTaskQueueNames(t *testing.T) {
testCases := []struct {
input string
@@ -51,31 +131,32 @@ func TestValidTaskQueueNames(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.input, func(t *testing.T) {
- tn, err := newTaskQueueName(tc.input)
+ tn, err := Parse(tc.input)
require.NoError(t, err)
require.Equal(t, tc.partition, tn.partition)
require.Equal(t, tc.partition == 0, tn.IsRoot())
require.Equal(t, tc.baseName, tn.baseName)
- require.Equal(t, tc.baseName, tn.GetRoot())
- require.Equal(t, tc.input, tn.name)
+ require.Equal(t, tc.baseName, tn.BaseNameString())
+ require.Equal(t, tc.input, tn.FullName())
})
}
}
func TestTaskQueueParentName(t *testing.T) {
+ const invalid = "__invalid__"
testCases := []struct {
name string
degree int
output string
}{
/* unexpected input */
- {"list0", 0, ""},
+ {"list0", 0, invalid},
/* 1-ary tree */
- {"list0", 1, ""},
+ {"list0", 1, invalid},
{"/_sys/list0/1", 1, "list0"},
{"/_sys/list0/2", 1, "/_sys/list0/1"},
/* 2-ary tree */
- {"list0", 2, ""},
+ {"list0", 2, invalid},
{"/_sys/list0/1", 2, "list0"},
{"/_sys/list0/2", 2, "list0"},
{"/_sys/list0/3", 2, "/_sys/list0/1"},
@@ -95,9 +176,14 @@ func TestTaskQueueParentName(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name+"#"+strconv.Itoa(tc.degree), func(t *testing.T) {
- tn, err := newTaskQueueName(tc.name)
+ tn, err := Parse(tc.name)
require.NoError(t, err)
- require.Equal(t, tc.output, tn.Parent(tc.degree))
+ parent, err := tn.Parent(tc.degree)
+ if tc.output == invalid {
+ require.Equal(t, ErrNoParent, err)
+ } else {
+ require.Equal(t, tc.output, parent.FullName())
+ }
})
}
}
@@ -111,31 +197,21 @@ func TestInvalidTaskqueueNames(t *testing.T) {
"/_sys/list0",
"/_sys/list0/0",
"/_sys/list0/-1",
+ "/_sys/list0/abc",
+ "/_sys/list0:verxyz:23",
}
for _, name := range inputs {
t.Run(name, func(t *testing.T) {
- _, err := newTaskQueueName(name)
+ _, err := Parse(name)
require.Error(t, err)
})
}
}
-func TestTaskQueueCreateNameWIthPartition(t *testing.T) {
- testCases := []struct {
- name string
- part int
- output string
- }{
- {"foo", 0, "foo"},
- {"foo", 1, "/_sys/foo/1"},
- {"foo", 2, "/_sys/foo/2"},
- }
-
- for _, tc := range testCases {
- t.Run(tc.name+"#"+strconv.Itoa(tc.part), func(t *testing.T) {
- tn, err := NewTaskQueueNameWithPartition(tc.name, tc.part)
- require.NoError(t, err)
- require.Equal(t, tc.output, tn.name)
- })
+func mustParent(tn Name, n int) Name {
+ parent, err := tn.Parent(n)
+ if err != nil {
+ panic(err)
}
+ return parent
}
diff --git a/service/matching/config.go b/service/matching/config.go
index 3079ee3b837..b311a1f89c2 100644
--- a/service/matching/config.go
+++ b/service/matching/config.go
@@ -156,7 +156,7 @@ func NewConfig(dc *dynamicconfig.Collection) *Config {
}
func newTaskQueueConfig(id *taskQueueID, config *Config, namespace namespace.Name) *taskQueueConfig {
- taskQueueName := id.name
+ taskQueueName := id.FullName()
taskType := id.taskType
return &taskQueueConfig{
diff --git a/service/matching/db.go b/service/matching/db.go
index f4ad40f885d..0ffde9a381e 100644
--- a/service/matching/db.go
+++ b/service/matching/db.go
@@ -122,7 +122,7 @@ func (db *taskQueueDB) takeOverTaskQueueLocked(
) error {
response, err := db.store.GetTaskQueue(ctx, &persistence.GetTaskQueueRequest{
NamespaceID: db.namespaceID.String(),
- TaskQueue: db.taskQueue.name,
+ TaskQueue: db.taskQueue.FullName(),
TaskType: db.taskQueue.taskType,
})
switch err.(type) {
@@ -221,7 +221,7 @@ func (db *taskQueueDB) GetTasks(
) (*persistence.GetTasksResponse, error) {
return db.store.GetTasks(ctx, &persistence.GetTasksRequest{
NamespaceID: db.namespaceID.String(),
- TaskQueue: db.taskQueue.name,
+ TaskQueue: db.taskQueue.FullName(),
TaskType: db.taskQueue.taskType,
PageSize: batchSize,
InclusiveMinTaskID: inclusiveMinTaskID,
@@ -237,7 +237,7 @@ func (db *taskQueueDB) CompleteTask(
err := db.store.CompleteTask(ctx, &persistence.CompleteTaskRequest{
TaskQueue: &persistence.TaskQueueKey{
NamespaceID: db.namespaceID.String(),
- TaskQueueName: db.taskQueue.name,
+ TaskQueueName: db.taskQueue.FullName(),
TaskQueueType: db.taskQueue.taskType,
},
TaskID: taskID,
@@ -248,7 +248,8 @@ func (db *taskQueueDB) CompleteTask(
tag.Error(err),
tag.TaskID(taskID),
tag.WorkflowTaskQueueType(db.taskQueue.taskType),
- tag.WorkflowTaskQueueName(db.taskQueue.name))
+ tag.WorkflowTaskQueueName(db.taskQueue.FullName()),
+ )
}
return err
}
@@ -263,7 +264,7 @@ func (db *taskQueueDB) CompleteTasksLessThan(
) (int, error) {
n, err := db.store.CompleteTasksLessThan(ctx, &persistence.CompleteTasksLessThanRequest{
NamespaceID: db.namespaceID.String(),
- TaskQueueName: db.taskQueue.name,
+ TaskQueueName: db.taskQueue.FullName(),
TaskType: db.taskQueue.taskType,
ExclusiveMaxTaskID: exclusiveMaxTaskID,
Limit: limit,
@@ -274,7 +275,8 @@ func (db *taskQueueDB) CompleteTasksLessThan(
tag.Error(err),
tag.TaskID(exclusiveMaxTaskID),
tag.WorkflowTaskQueueType(db.taskQueue.taskType),
- tag.WorkflowTaskQueueName(db.taskQueue.name))
+ tag.WorkflowTaskQueueName(db.taskQueue.FullName()),
+ )
}
return n, err
}
@@ -303,7 +305,7 @@ func (db *taskQueueDB) getVersioningDataLocked(
tqInfo, err := db.store.GetTaskQueue(ctx, &persistence.GetTaskQueueRequest{
NamespaceID: db.namespaceID.String(),
- TaskQueue: db.taskQueue.name,
+ TaskQueue: db.taskQueue.FullName(),
TaskType: db.taskQueue.taskType,
})
if err != nil {
@@ -388,7 +390,7 @@ func (db *taskQueueDB) expiryTime() *time.Time {
func (db *taskQueueDB) cachedQueueInfo() *persistencespb.TaskQueueInfo {
return &persistencespb.TaskQueueInfo{
NamespaceId: db.namespaceID.String(),
- Name: db.taskQueue.name,
+ Name: db.taskQueue.FullName(),
TaskType: db.taskQueue.taskType,
Kind: db.taskQueueKind,
AckLevel: db.ackLevel,
diff --git a/service/matching/forwarder.go b/service/matching/forwarder.go
index d7c7759563d..bc7946ab98c 100644
--- a/service/matching/forwarder.go
+++ b/service/matching/forwarder.go
@@ -76,7 +76,6 @@ type (
)
var (
- errNoParent = errors.New("cannot find parent task queue for forwarding")
errTaskQueueKind = errors.New("forwarding is not supported on sticky task queue")
errInvalidTaskQueueType = errors.New("unrecognized task queue type")
errForwarderSlowDown = errors.New("limit exceeded")
@@ -88,7 +87,7 @@ var (
// forwarder is tied to a single task queue. All of the exposed
// methods can return the following errors:
// Returns following errors:
-// - errNoParent: If this task queue doesn't have a parent to forward to
+// - tqname.ErrNoParent, tqname.ErrInvalidDegree: If this task queue doesn't have a parent to forward to
// - errTaskQueueKind: If the task queue is a sticky task queue. Sticky task queues are never partitioned
// - errForwarderSlowDown: When the rate limit is exceeded
// - errInvalidTaskType: If the task queue type is invalid
@@ -120,17 +119,16 @@ func (fwdr *Forwarder) ForwardTask(ctx context.Context, task *internalTask) erro
return errTaskQueueKind
}
- name := fwdr.taskQueueID.Parent(fwdr.cfg.ForwarderMaxChildrenPerNode())
- if name == "" {
- return errNoParent
+ degree := fwdr.cfg.ForwarderMaxChildrenPerNode()
+ target, err := fwdr.taskQueueID.Parent(degree)
+ if err != nil {
+ return err
}
if !fwdr.limiter.Allow() {
return errForwarderSlowDown
}
- var err error
-
var expirationDuration time.Duration
expirationTime := timestamp.TimeValue(task.event.Data.ExpiryTime)
if expirationTime.IsZero() {
@@ -148,28 +146,28 @@ func (fwdr *Forwarder) ForwardTask(ctx context.Context, task *internalTask) erro
NamespaceId: task.event.Data.GetNamespaceId(),
Execution: task.workflowExecution(),
TaskQueue: &taskqueuepb.TaskQueue{
- Name: name,
+ Name: target.FullName(),
Kind: fwdr.taskQueueKind,
},
ScheduledEventId: task.event.Data.GetScheduledEventId(),
Clock: task.event.Data.GetClock(),
Source: task.source,
ScheduleToStartTimeout: &expirationDuration,
- ForwardedSource: fwdr.taskQueueID.name,
+ ForwardedSource: fwdr.taskQueueID.FullName(),
})
case enumspb.TASK_QUEUE_TYPE_ACTIVITY:
_, err = fwdr.client.AddActivityTask(ctx, &matchingservice.AddActivityTaskRequest{
NamespaceId: task.event.Data.GetNamespaceId(),
Execution: task.workflowExecution(),
TaskQueue: &taskqueuepb.TaskQueue{
- Name: name,
+ Name: target.FullName(),
Kind: fwdr.taskQueueKind,
},
ScheduledEventId: task.event.Data.GetScheduledEventId(),
Clock: task.event.Data.GetClock(),
Source: task.source,
ScheduleToStartTimeout: &expirationDuration,
- ForwardedSource: fwdr.taskQueueID.name,
+ ForwardedSource: fwdr.taskQueueID.FullName(),
})
default:
return errInvalidTaskQueueType
@@ -188,19 +186,20 @@ func (fwdr *Forwarder) ForwardQueryTask(
return nil, errTaskQueueKind
}
- name := fwdr.taskQueueID.Parent(fwdr.cfg.ForwarderMaxChildrenPerNode())
- if name == "" {
- return nil, errNoParent
+ degree := fwdr.cfg.ForwarderMaxChildrenPerNode()
+ target, err := fwdr.taskQueueID.Parent(degree)
+ if err != nil {
+ return nil, err
}
resp, err := fwdr.client.QueryWorkflow(ctx, &matchingservice.QueryWorkflowRequest{
NamespaceId: task.query.request.GetNamespaceId(),
TaskQueue: &taskqueuepb.TaskQueue{
- Name: name,
+ Name: target.FullName(),
Kind: fwdr.taskQueueKind,
},
QueryRequest: task.query.request.QueryRequest,
- ForwardedSource: fwdr.taskQueueID.name,
+ ForwardedSource: fwdr.taskQueueID.FullName(),
})
return resp, fwdr.handleErr(err)
@@ -212,9 +211,10 @@ func (fwdr *Forwarder) ForwardPoll(ctx context.Context) (*internalTask, error) {
return nil, errTaskQueueKind
}
- name := fwdr.taskQueueID.Parent(fwdr.cfg.ForwarderMaxChildrenPerNode())
- if name == "" {
- return nil, errNoParent
+ degree := fwdr.cfg.ForwarderMaxChildrenPerNode()
+ target, err := fwdr.taskQueueID.Parent(degree)
+ if err != nil {
+ return nil, err
}
pollerID, _ := ctx.Value(pollerIDKey).(string)
@@ -227,12 +227,12 @@ func (fwdr *Forwarder) ForwardPoll(ctx context.Context) (*internalTask, error) {
PollerId: pollerID,
PollRequest: &workflowservice.PollWorkflowTaskQueueRequest{
TaskQueue: &taskqueuepb.TaskQueue{
- Name: name,
+ Name: target.FullName(),
Kind: fwdr.taskQueueKind,
},
Identity: identity,
},
- ForwardedSource: fwdr.taskQueueID.name,
+ ForwardedSource: fwdr.taskQueueID.FullName(),
})
if err != nil {
return nil, fwdr.handleErr(err)
@@ -244,12 +244,12 @@ func (fwdr *Forwarder) ForwardPoll(ctx context.Context) (*internalTask, error) {
PollerId: pollerID,
PollRequest: &workflowservice.PollActivityTaskQueueRequest{
TaskQueue: &taskqueuepb.TaskQueue{
- Name: name,
+ Name: target.FullName(),
Kind: fwdr.taskQueueKind,
},
Identity: identity,
},
- ForwardedSource: fwdr.taskQueueID.name,
+ ForwardedSource: fwdr.taskQueueID.FullName(),
})
if err != nil {
return nil, fwdr.handleErr(err)
diff --git a/service/matching/forwarder_test.go b/service/matching/forwarder_test.go
index dd1b4e18b35..052faca3996 100644
--- a/service/matching/forwarder_test.go
+++ b/service/matching/forwarder_test.go
@@ -43,6 +43,7 @@ import (
"go.temporal.io/server/common"
"go.temporal.io/server/common/convert"
"go.temporal.io/server/common/namespace"
+ "go.temporal.io/server/common/tqname"
)
type ForwarderTestSuite struct {
@@ -77,7 +78,7 @@ func (t *ForwarderTestSuite) TearDownTest() {
func (t *ForwarderTestSuite) TestForwardTaskError() {
task := newInternalTask(&persistencespb.AllocatedTaskInfo{}, nil, enumsspb.TASK_SOURCE_HISTORY, "", false)
- t.Equal(errNoParent, t.fwdr.ForwardTask(context.Background(), task))
+ t.Equal(tqname.ErrNoParent, t.fwdr.ForwardTask(context.Background(), task))
t.usingTaskqueuePartition(enumspb.TASK_QUEUE_TYPE_ACTIVITY)
t.fwdr.taskQueueKind = enumspb.TASK_QUEUE_KIND_STICKY
@@ -98,7 +99,7 @@ func (t *ForwarderTestSuite) TestForwardWorkflowTask() {
task := newInternalTask(taskInfo, nil, enumsspb.TASK_SOURCE_HISTORY, "", false)
t.NoError(t.fwdr.ForwardTask(context.Background(), task))
t.NotNil(request)
- t.Equal(t.taskQueue.Parent(20), request.TaskQueue.GetName())
+ t.Equal(mustParent(t.taskQueue.Name, 20).FullName(), request.TaskQueue.GetName())
t.Equal(enumspb.TaskQueueKind(t.fwdr.taskQueueKind), request.TaskQueue.GetKind())
t.Equal(taskInfo.Data.GetNamespaceId(), request.GetNamespaceId())
t.Equal(taskInfo.Data.GetWorkflowId(), request.GetExecution().GetWorkflowId())
@@ -108,7 +109,7 @@ func (t *ForwarderTestSuite) TestForwardWorkflowTask() {
schedToStart := int32(request.GetScheduleToStartTimeout().Seconds())
rewritten := convert.Int32Ceil(time.Until(*taskInfo.Data.ExpiryTime).Seconds())
t.EqualValues(schedToStart, rewritten)
- t.Equal(t.taskQueue.name, request.GetForwardedSource())
+ t.Equal(t.taskQueue.FullName(), request.GetForwardedSource())
}
func (t *ForwarderTestSuite) TestForwardActivityTask() {
@@ -125,7 +126,7 @@ func (t *ForwarderTestSuite) TestForwardActivityTask() {
task := newInternalTask(taskInfo, nil, enumsspb.TASK_SOURCE_HISTORY, "", false)
t.NoError(t.fwdr.ForwardTask(context.Background(), task))
t.NotNil(request)
- t.Equal(t.taskQueue.Parent(20), request.TaskQueue.GetName())
+ t.Equal(mustParent(t.taskQueue.Name, 20).FullName(), request.TaskQueue.GetName())
t.Equal(t.fwdr.taskQueueKind, request.TaskQueue.GetKind())
t.Equal(taskInfo.Data.GetNamespaceId(), request.GetNamespaceId())
t.Equal(taskInfo.Data.GetWorkflowId(), request.GetExecution().GetWorkflowId())
@@ -133,7 +134,7 @@ func (t *ForwarderTestSuite) TestForwardActivityTask() {
t.Equal(taskInfo.Data.GetScheduledEventId(), request.GetScheduledEventId())
t.EqualValues(convert.Int32Ceil(time.Until(*taskInfo.Data.ExpiryTime).Seconds()),
int32(request.GetScheduleToStartTimeout().Seconds()))
- t.Equal(t.taskQueue.name, request.GetForwardedSource())
+ t.Equal(t.taskQueue.FullName(), request.GetForwardedSource())
}
func (t *ForwarderTestSuite) TestForwardTaskRateExceeded() {
@@ -152,7 +153,7 @@ func (t *ForwarderTestSuite) TestForwardTaskRateExceeded() {
func (t *ForwarderTestSuite) TestForwardQueryTaskError() {
task := newInternalQueryTask("id1", &matchingservice.QueryWorkflowRequest{})
_, err := t.fwdr.ForwardQueryTask(context.Background(), task)
- t.Equal(errNoParent, err)
+ t.Equal(tqname.ErrNoParent, err)
t.usingTaskqueuePartition(enumspb.TASK_QUEUE_TYPE_WORKFLOW)
t.fwdr.taskQueueKind = enumspb.TASK_QUEUE_KIND_STICKY
@@ -173,7 +174,7 @@ func (t *ForwarderTestSuite) TestForwardQueryTask() {
gotResp, err := t.fwdr.ForwardQueryTask(context.Background(), task)
t.NoError(err)
- t.Equal(t.taskQueue.Parent(20), request.TaskQueue.GetName())
+ t.Equal(mustParent(t.taskQueue.Name, 20).FullName(), request.TaskQueue.GetName())
t.Equal(enumspb.TaskQueueKind(t.fwdr.taskQueueKind), request.TaskQueue.GetKind())
t.Equal(task.query.request.QueryRequest, request.QueryRequest)
t.Equal(resp, gotResp)
@@ -195,7 +196,7 @@ func (t *ForwarderTestSuite) TestForwardQueryTaskRateNotEnforced() {
func (t *ForwarderTestSuite) TestForwardPollError() {
_, err := t.fwdr.ForwardPoll(context.Background())
- t.Equal(errNoParent, err)
+ t.Equal(tqname.ErrNoParent, err)
t.usingTaskqueuePartition(enumspb.TASK_QUEUE_TYPE_ACTIVITY)
t.fwdr.taskQueueKind = enumspb.TASK_QUEUE_KIND_STICKY
@@ -226,7 +227,7 @@ func (t *ForwarderTestSuite) TestForwardPollWorkflowTaskQueue() {
t.Equal(pollerID, request.GetPollerId())
t.Equal(t.taskQueue.namespaceID, namespace.ID(request.GetNamespaceId()))
t.Equal("id1", request.GetPollRequest().GetIdentity())
- t.Equal(t.taskQueue.Parent(20), request.GetPollRequest().GetTaskQueue().GetName())
+ t.Equal(mustParent(t.taskQueue.Name, 20).FullName(), request.GetPollRequest().GetTaskQueue().GetName())
t.Equal(enumspb.TaskQueueKind(t.fwdr.taskQueueKind), request.GetPollRequest().GetTaskQueue().GetKind())
t.Equal(resp, task.pollWorkflowTaskQueueResponse())
t.Nil(task.pollActivityTaskQueueResponse())
@@ -254,7 +255,7 @@ func (t *ForwarderTestSuite) TestForwardPollForActivity() {
t.Equal(pollerID, request.GetPollerId())
t.Equal(t.taskQueue.namespaceID, namespace.ID(request.GetNamespaceId()))
t.Equal("id1", request.GetPollRequest().GetIdentity())
- t.Equal(t.taskQueue.Parent(20), request.GetPollRequest().GetTaskQueue().GetName())
+ t.Equal(mustParent(t.taskQueue.Name, 20).FullName(), request.GetPollRequest().GetTaskQueue().GetName())
t.Equal(enumspb.TaskQueueKind(t.fwdr.taskQueueKind), request.GetPollRequest().GetTaskQueue().GetKind())
t.Equal(resp, task.pollActivityTaskQueueResponse())
t.Nil(task.pollWorkflowTaskQueueResponse())
@@ -342,6 +343,24 @@ func (t *ForwarderTestSuite) TestMaxOutstandingConfigUpdate() {
}
func (t *ForwarderTestSuite) usingTaskqueuePartition(taskType enumspb.TaskQueueType) {
- t.taskQueue = newTestTaskQueueID("fwdr", taskQueuePartitionPrefix+"tl0/1", taskType)
+ n := mustFromBaseName("tl0").WithPartition(1)
+ t.taskQueue = newTestTaskQueueID("fwdr", n.FullName(), taskType)
t.fwdr.taskQueueID = t.taskQueue
}
+
+// Wrappers for tqname functions to make tests read better:
+func mustFromBaseName(name string) tqname.Name {
+ n, err := tqname.FromBaseName(name)
+ if err != nil {
+ panic(err)
+ }
+ return n
+}
+
+func mustParent(tn tqname.Name, n int) tqname.Name {
+ parent, err := tn.Parent(n)
+ if err != nil {
+ panic(err)
+ }
+ return parent
+}
diff --git a/service/matching/matcher_test.go b/service/matching/matcher_test.go
index 594a51a785f..9501f59db8d 100644
--- a/service/matching/matcher_test.go
+++ b/service/matching/matcher_test.go
@@ -70,7 +70,9 @@ func (t *MatcherTestSuite) SetupTest() {
t.controller = gomock.NewController(t.T())
t.client = matchingservicemock.NewMockMatchingServiceClient(t.controller)
cfg := NewConfig(dynamicconfig.NewNoopCollection())
- t.taskQueue = newTestTaskQueueID(namespace.ID(uuid.New()), taskQueuePartitionPrefix+"tl0/1", enumspb.TASK_QUEUE_TYPE_WORKFLOW)
+
+ n := mustFromBaseName("tl0").WithPartition(1)
+ t.taskQueue = newTestTaskQueueID(namespace.ID(uuid.New()), n.FullName(), enumspb.TASK_QUEUE_TYPE_WORKFLOW)
tlCfg := newTaskQueueConfig(t.taskQueue, cfg, "test-namespace")
tlCfg.forwarderConfig = forwarderConfig{
ForwarderMaxOutstandingPolls: func() int { return 1 },
@@ -82,7 +84,7 @@ func (t *MatcherTestSuite) SetupTest() {
t.fwdr = newForwarder(&t.cfg.forwarderConfig, t.taskQueue, enumspb.TASK_QUEUE_KIND_NORMAL, t.client)
t.matcher = newTaskMatcher(tlCfg, t.fwdr, metrics.NoopMetricsHandler)
- rootTaskQueue := newTestTaskQueueID(t.taskQueue.namespaceID, t.taskQueue.Parent(20), enumspb.TASK_QUEUE_TYPE_WORKFLOW)
+ rootTaskQueue := newTestTaskQueueID(t.taskQueue.namespaceID, mustParent(t.taskQueue.Name, 20).FullName(), enumspb.TASK_QUEUE_TYPE_WORKFLOW)
rootTaskqueueCfg := newTaskQueueConfig(rootTaskQueue, cfg, "test-namespace")
t.rootMatcher = newTaskMatcher(rootTaskqueueCfg, nil, metrics.NoopMetricsHandler)
}
@@ -187,8 +189,8 @@ func (t *MatcherTestSuite) testRemoteSyncMatch(taskSource enumsspb.TaskSource) {
t.NotNil(req)
t.NoError(err)
t.True(remoteSyncMatch)
- t.Equal(t.taskQueue.name, req.GetForwardedSource())
- t.Equal(t.taskQueue.Parent(20), req.GetTaskQueue().GetName())
+ t.Equal(t.taskQueue.FullName(), req.GetForwardedSource())
+ t.Equal(mustParent(t.taskQueue.Name, 20).FullName(), req.GetTaskQueue().GetName())
}
func (t *MatcherTestSuite) TestSyncMatchFailure() {
@@ -293,8 +295,8 @@ func (t *MatcherTestSuite) TestQueryRemoteSyncMatch() {
err = payloads.Decode(result.GetQueryResult(), &answer)
t.NoError(err)
t.Equal("answer", answer)
- t.Equal(t.taskQueue.name, req.GetForwardedSource())
- t.Equal(t.taskQueue.Parent(20), req.GetTaskQueue().GetName())
+ t.Equal(t.taskQueue.FullName(), req.GetForwardedSource())
+ t.Equal(mustParent(t.taskQueue.Name, 20).FullName(), req.GetTaskQueue().GetName())
}
func (t *MatcherTestSuite) TestQueryRemoteSyncMatchError() {
@@ -425,8 +427,8 @@ func (t *MatcherTestSuite) TestMustOfferRemoteMatch() {
t.NoError(err)
t.True(remoteSyncMatch)
t.True(taskCompleted)
- t.Equal(t.taskQueue.name, req.GetForwardedSource())
- t.Equal(t.taskQueue.Parent(20), req.GetTaskQueue().GetName())
+ t.Equal(t.taskQueue.FullName(), req.GetForwardedSource())
+ t.Equal(mustParent(t.taskQueue.Name, 20).FullName(), req.GetTaskQueue().GetName())
}
func (t *MatcherTestSuite) TestRemotePoll() {
diff --git a/service/matching/matchingEngine.go b/service/matching/matchingEngine.go
index a240285bda0..aa9b240c208 100644
--- a/service/matching/matchingEngine.go
+++ b/service/matching/matchingEngine.go
@@ -835,18 +835,10 @@ func (e *matchingEngineImpl) getAllPartitions(
if err != nil {
return partitionKeys, err
}
- rootPartition := taskQueueID.GetRoot()
- partitionKeys = append(partitionKeys, rootPartition)
-
- nWritePartitions := e.config.NumTaskqueueWritePartitions
- n := nWritePartitions(namespace.String(), rootPartition, taskQueueType)
- if n <= 0 {
- return partitionKeys, nil
- }
-
- for i := 1; i < n; i++ {
- partitionKeys = append(partitionKeys, fmt.Sprintf("%v%v/%v", taskQueuePartitionPrefix, rootPartition, i))
+ n := e.config.NumTaskqueueWritePartitions(namespace.String(), taskQueueID.BaseNameString(), taskQueueType)
+ for i := 0; i < n; i++ {
+ partitionKeys = append(partitionKeys, taskQueueID.WithPartition(i).FullName())
}
return partitionKeys, nil
diff --git a/service/matching/matchingEngine_test.go b/service/matching/matchingEngine_test.go
index 1ff0e990639..79fb5317d8e 100644
--- a/service/matching/matchingEngine_test.go
+++ b/service/matching/matchingEngine_test.go
@@ -2434,7 +2434,7 @@ func (m *testTaskManager) String() string {
} else {
result += "Workflow"
}
- result += " task queue " + id.name
+ result += " task queue " + id.FullName()
result += "\n"
result += fmt.Sprintf("AckLevel=%v\n", tl.ackLevel)
result += fmt.Sprintf("CreateTaskCount=%v\n", tl.createTaskCount)
diff --git a/service/matching/taskQueueManager.go b/service/matching/taskQueueManager.go
index c16f2891485..d64826e240e 100644
--- a/service/matching/taskQueueManager.go
+++ b/service/matching/taskQueueManager.go
@@ -204,13 +204,13 @@ func newTaskQueueManager(
db := newTaskQueueDB(e.taskManager, taskQueue.namespaceID, taskQueue, taskQueueKind, e.logger)
logger := log.With(e.logger,
- tag.WorkflowTaskQueueName(taskQueue.name),
+ tag.WorkflowTaskQueueName(taskQueue.FullName()),
tag.WorkflowTaskQueueType(taskQueue.taskType),
tag.WorkflowNamespace(nsName.String()))
taggedMetricsHandler := metrics.GetPerTaskQueueScope(
e.metricsHandler.WithTags(metrics.OperationTag(metrics.MatchingTaskQueueMgrScope), metrics.TaskQueueTypeTag(taskQueue.taskType)),
nsName.String(),
- taskQueue.name,
+ taskQueue.FullName(),
taskQueueKind,
)
tlMgr := &taskQueueManagerImpl{
@@ -495,7 +495,7 @@ func (c *taskQueueManagerImpl) MutateVersioningData(ctx context.Context, mutator
}
wg.Add(1)
go func(i int, tqt enumspb.TaskQueueType) {
- tq := c.taskQueueID.mkName(i)
+ tq := c.taskQueueID.WithPartition(i).FullName()
_, err := c.matchingClient.InvalidateTaskQueueMetadata(ctx,
&matchingservice.InvalidateTaskQueueMetadataRequest{
NamespaceId: c.taskQueueID.namespaceID.String(),
@@ -587,7 +587,7 @@ func (c *taskQueueManagerImpl) String() string {
buf.WriteString("Workflow")
}
rangeID := c.db.RangeID()
- _, _ = fmt.Fprintf(buf, " task queue %v\n", c.taskQueueID.name)
+ _, _ = fmt.Fprintf(buf, " task queue %v\n", c.taskQueueID.FullName())
_, _ = fmt.Fprintf(buf, "RangeID=%v\n", rangeID)
_, _ = fmt.Fprintf(buf, "TaskIDBlock=%+v\n", rangeIDToTaskIDBlock(rangeID, c.config.RangeSize))
_, _ = fmt.Fprintf(buf, "AckLevel=%v\n", c.taskAckManager.ackLevel)
@@ -622,7 +622,7 @@ func (c *taskQueueManagerImpl) completeTask(task *persistencespb.AllocatedTaskIn
c.logger.Error("Persistent store operation failure",
tag.StoreOperationStopTaskQueue,
tag.Error(err),
- tag.WorkflowTaskQueueName(c.taskQueueID.name),
+ tag.WorkflowTaskQueueName(c.taskQueueID.FullName()),
tag.WorkflowTaskQueueType(c.taskQueueID.taskType))
c.signalFatalProblem(c)
return
@@ -748,7 +748,7 @@ func (c *taskQueueManagerImpl) fetchMetadataFromRootPartition(ctx context.Contex
}
curHash := HashVersioningData(curDat)
- rootTqName := c.taskQueueID.GetRoot()
+ rootTqName := c.taskQueueID.Root().FullName()
if len(curHash) == 0 {
// if we have no data, make sure we send a sigil value, so it's known we desire versioning data
curHash = []byte{0}
diff --git a/service/matching/taskWriter.go b/service/matching/taskWriter.go
index 68027391b38..8b47a58e073 100644
--- a/service/matching/taskWriter.go
+++ b/service/matching/taskWriter.go
@@ -210,7 +210,7 @@ func (w *taskWriter) appendTasks(
w.logger.Error("Persistent store operation failure",
tag.StoreOperationCreateTask,
tag.Error(err),
- tag.WorkflowTaskQueueName(w.taskQueueID.name),
+ tag.WorkflowTaskQueueName(w.taskQueueID.FullName()),
tag.WorkflowTaskQueueType(w.taskQueueID.taskType))
return nil, err
}
diff --git a/service/matching/taskqueue.go b/service/matching/taskqueue.go
index f750964a35a..dc8a5d6b8f4 100644
--- a/service/matching/taskqueue.go
+++ b/service/matching/taskqueue.go
@@ -26,158 +26,58 @@ package matching
import (
"bytes"
- "fmt"
- "strconv"
- "strings"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/server/common/namespace"
+ "go.temporal.io/server/common/tqname"
)
-// TODO: This file ought to be moved into common. Frontend is duplicating this name mangling scheme
-// and if it ever changes, it's likely to break.
-
type (
// taskQueueID is the key that uniquely identifies a task queue
taskQueueID struct {
- QualifiedTaskQueueName
+ tqname.Name
namespaceID namespace.ID
taskType enumspb.TaskQueueType
}
- // QualifiedTaskQueueName refers to the fully qualified task queue name
- QualifiedTaskQueueName struct {
- name string // internal name of the tasks list
- baseName string // original name of the task queue as specified by user
- partition int // partitionID of task queue
- }
)
-const (
- // taskQueuePartitionPrefix is the required naming prefix for any task queue partition other than partition 0
- taskQueuePartitionPrefix = "/_sys/"
-)
-
-// newTaskQueueName returns a fully qualified task queue name.
-// Fully qualified names contain additional metadata about task queue
-// derived from their given name. The additional metadata only makes sense
-// when a task queue has more than one partition. When there is more than
-// one partition for a user specified task queue, each of the
-// individual partitions have an internal name of the form
-//
-// /_sys/[original-name]/[partitionID]
-//
-// The name of the root partition is always the same as the user specified name. Rest of
-// the partitions follow the naming convention above. In addition, the task queues partitions
-// logically form a N-ary tree where N is configurable dynamically. The tree formation is an
-// optimization to allow for partitioned task queues to dispatch tasks with low latency when
-// throughput is low - See https://github.com/uber/cadence/issues/2098
-//
-// Returns error if the given name is non-compliant with the required format
-// for task queue names
-func newTaskQueueName(name string) (QualifiedTaskQueueName, error) {
- tn := QualifiedTaskQueueName{
- name: name,
- baseName: name,
- }
- if err := tn.init(); err != nil {
- return QualifiedTaskQueueName{}, err
- }
- return tn, nil
-}
-
-// NewTaskQueueNameWithPartition can be used to create root and non-root taskqueue names easily without needing to
-// manually craft the correct string. See newTaskQueueName for more details.
-func NewTaskQueueNameWithPartition(baseName string, partition int) (QualifiedTaskQueueName, error) {
- tqName, err := newTaskQueueName(baseName)
- if partition == 0 {
- return tqName, err
- }
- partName := tqName.mkName(partition)
- tqName.partition = partition
- tqName.name = partName
- return tqName, err
-}
-
-// IsRoot returns true if this task queue is a root partition
-func (tn *QualifiedTaskQueueName) IsRoot() bool {
- return tn.partition == 0
-}
-
-// GetRoot returns the root name for a task queue
-func (tn *QualifiedTaskQueueName) GetRoot() string {
- return tn.baseName
-}
-
-// Parent returns the name of the parent task queue
-// input:
-//
-// degree: Number of children at each level of the tree
-//
-// Returns empty string if this task queue is the root
-func (tn *QualifiedTaskQueueName) Parent(degree int) string {
- if tn.IsRoot() || degree == 0 {
- return ""
- }
- pid := (tn.partition+degree-1)/degree - 1
- return tn.mkName(pid)
-}
-
-func (tn *QualifiedTaskQueueName) mkName(partition int) string {
- if partition == 0 {
- return tn.baseName
- }
- return fmt.Sprintf("%v%v/%v", taskQueuePartitionPrefix, tn.baseName, partition)
-}
-
-func (tn *QualifiedTaskQueueName) init() error {
- if !strings.HasPrefix(tn.name, taskQueuePartitionPrefix) {
- return nil
- }
-
- suffixOff := strings.LastIndex(tn.name, "/")
- if suffixOff <= len(taskQueuePartitionPrefix) {
- return fmt.Errorf("invalid partitioned task queue name %v", tn.name)
- }
-
- p, err := strconv.Atoi(tn.name[suffixOff+1:])
- if err != nil || p <= 0 {
- return fmt.Errorf("invalid partitioned task queue name %v", tn.name)
- }
-
- tn.partition = p
- tn.baseName = tn.name[len(taskQueuePartitionPrefix):suffixOff]
- return nil
-}
-
-func (tn *QualifiedTaskQueueName) String() string {
- return tn.mkName(tn.partition)
-}
-
// newTaskQueueID returns taskQueueID which uniquely identfies as task queue
func newTaskQueueID(namespaceID namespace.ID, taskQueueName string, taskType enumspb.TaskQueueType) (*taskQueueID, error) {
- return newTaskQueueIDWithPartition(namespaceID, taskQueueName, taskType, 0)
+ return newTaskQueueIDWithPartition(namespaceID, taskQueueName, taskType, -1)
}
func newTaskQueueIDWithPartition(
namespaceID namespace.ID, taskQueueName string, taskType enumspb.TaskQueueType, partition int,
) (*taskQueueID, error) {
- name, err := NewTaskQueueNameWithPartition(taskQueueName, partition)
+ name, err := tqname.Parse(taskQueueName)
if err != nil {
return nil, err
}
+ if partition >= 0 {
+ name = name.WithPartition(partition)
+ }
return &taskQueueID{
- QualifiedTaskQueueName: name,
- namespaceID: namespaceID,
- taskType: taskType,
+ Name: name,
+ namespaceID: namespaceID,
+ taskType: taskType,
}, nil
}
+// To be used in a later versioning PR:
+// func newTaskQueueIDWithVersionSet(id *taskQueueID, versionSet string) *taskQueueID {
+// return &taskQueueID{
+// Name: id.Name.WithVersionSet(versionSet),
+// namespaceID: id.namespaceID,
+// taskType: id.taskType,
+// }
+// }
+
func (tid *taskQueueID) String() string {
var b bytes.Buffer
b.WriteString("[")
b.WriteString("name=")
- b.WriteString(tid.name)
+ b.WriteString(tid.FullName())
b.WriteString("type=")
if tid.taskType == enumspb.TASK_QUEUE_TYPE_ACTIVITY {
b.WriteString("activity")
diff --git a/tests/versioning_test.go b/tests/versioning_test.go
index d89e8d381c9..988d308694b 100644
--- a/tests/versioning_test.go
+++ b/tests/versioning_test.go
@@ -31,8 +31,6 @@ import (
"testing"
"time"
- "go.temporal.io/server/service/matching"
-
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
@@ -45,6 +43,7 @@ import (
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log/tag"
+ "go.temporal.io/server/common/tqname"
)
type versioningIntegSuite struct {
@@ -243,11 +242,12 @@ func (s *versioningIntegSuite) TestVersioningChangesPropagatedToSubPartitions()
}
for i := 1; i < partCount; i++ {
- subPartName, err := matching.NewTaskQueueNameWithPartition(tq, i)
+ subPartName, err := tqname.FromBaseName(tq)
s.NoError(err)
+ subPartName = subPartName.WithPartition(i)
res, err := s.engine.GetWorkerBuildIdOrdering(ctx, &workflowservice.GetWorkerBuildIdOrderingRequest{
Namespace: s.namespace,
- TaskQueue: subPartName.String(),
+ TaskQueue: subPartName.FullName(),
})
s.NoError(err)
s.NotNil(res)
@@ -265,11 +265,12 @@ func (s *versioningIntegSuite) TestVersioningChangesPropagatedToSubPartitions()
s.NotNil(res)
for i := 1; i < partCount; i++ {
- subPartName, err := matching.NewTaskQueueNameWithPartition(tq, i)
+ subPartName, err := tqname.FromBaseName(tq)
s.NoError(err)
+ subPartName = subPartName.WithPartition(i)
res, err := s.engine.GetWorkerBuildIdOrdering(ctx, &workflowservice.GetWorkerBuildIdOrderingRequest{
Namespace: s.namespace,
- TaskQueue: subPartName.String(),
+ TaskQueue: subPartName.FullName(),
})
s.NoError(err)
s.NotNil(res)