Skip to content

Commit

Permalink
fix: [TKC-2583] tags OR operator for twe (#5861)
Browse files Browse the repository at this point in the history
* fix: tags OR operator

Signed-off-by: Vladislav Sukhin <vladislav@kubeshop.io>

* fix: add streaming logs

Signed-off-by: Vladislav Sukhin <vladislav@kubeshop.io>

* fix: add id to logs

Signed-off-by: Vladislav Sukhin <vladislav@kubeshop.io>

* fix: or for same tags, and for different tags

Signed-off-by: Vladislav Sukhin <vladislav@kubeshop.io>

* fix: unit tests

Signed-off-by: Vladislav Sukhin <vladislav@kubeshop.io>

* fix: unit test

Signed-off-by: Vladislav Sukhin <vladislav@kubeshop.io>

* fix: integration test

Signed-off-by: Vladislav Sukhin <vladislav@kubeshop.io>

---------

Signed-off-by: Vladislav Sukhin <vladislav@kubeshop.io>
  • Loading branch information
vsukhin authored Sep 24, 2024
1 parent 9f29f9f commit 4567a1a
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 6 deletions.
23 changes: 19 additions & 4 deletions internal/app/api/v1/testworkflowexecutions.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,29 @@ func (s *TestkubeAPI) StreamTestWorkflowExecutionNotificationsHandler() fiber.Ha

// Stream the notifications
ctx.SetBodyStreamWriter(func(w *bufio.Writer) {
_ = w.Flush()
err := w.Flush()
if err != nil {
s.Log.Errorw("could not flush stream body", "error", err, "id", id)
}

enc := json.NewEncoder(w)

for n := range ctrl.Watch(ctx) {
if n.Error == nil {
_ = enc.Encode(n.Value)
_, _ = fmt.Fprintf(w, "\n")
_ = w.Flush()
err := enc.Encode(n.Value)
if err != nil {
s.Log.Errorw("could not encode value", "error", err, "id", id)
}

_, err = fmt.Fprintf(w, "\n")
if err != nil {
s.Log.Errorw("could not print new line", "error", err, "id", id)
}

err = w.Flush()
if err != nil {
s.Log.Errorw("could not flush stream body", "error", err, "id", id)
}
}
}
})
Expand Down
31 changes: 29 additions & 2 deletions pkg/repository/testworkflow/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,14 +363,41 @@ func composeQueryAndOpts(filter Filter) (bson.M, *options.FindOptions) {

if filter.TagSelector() != "" {
items := strings.Split(filter.TagSelector(), ",")
inValues := make(map[string][]string)
existsValues := make(map[string]struct{})
for _, item := range items {
elements := strings.Split(item, "=")
if len(elements) == 2 {
query["tags."+elements[0]] = elements[1]
inValues["tags."+utils.EscapeDots(elements[0])] = append(inValues["tags."+utils.EscapeDots(elements[0])], elements[1])
} else if len(elements) == 1 {
query["tags."+elements[0]] = bson.M{"$exists": true}
existsValues["tags."+utils.EscapeDots(elements[0])] = struct{}{}
}
}
subquery := bson.A{}
for tag, values := range inValues {
if _, ok := existsValues[tag]; ok {
subquery = append(subquery, bson.M{tag: bson.M{"$exists": true}})
delete(existsValues, tag)
continue
}

tagValues := bson.A{}
for _, value := range values {
tagValues = append(tagValues, value)
}

if len(tagValues) > 0 {
subquery = append(subquery, bson.M{tag: bson.M{"$in": tagValues}})
}
}

for tag := range existsValues {
subquery = append(subquery, bson.M{tag: bson.M{"$exists": true}})
}

if len(subquery) > 0 {
query["$and"] = subquery
}
}

if filter.LabelSelector() != nil && len(filter.LabelSelector().Or) > 0 {
Expand Down
112 changes: 112 additions & 0 deletions pkg/repository/testworkflow/mongo_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,115 @@ func strPtr(s string) *string {
func boolPtr(b bool) *bool {
return &b
}

func TestNewMongoRepository_GetExecutions_Tags_Integration(t *testing.T) {
test.IntegrationTest(t)

ctx := context.Background()

client, err := mongo.Connect(ctx, options.Client().ApplyURI(cfg.APIMongoDSN))
if err != nil {
t.Fatalf("error connecting to mongo: %v", err)
}
db := client.Database("testworkflow-executions-tags-mongo-repository-test")
t.Cleanup(func() {
db.Drop(ctx)
})

repo := NewMongoRepository(db, false)

execution := testkube.TestWorkflowExecution{
Id: "test-id-1",
Name: "test-name-1",
Workflow: &testkube.TestWorkflow{
Name: "test-name-1",
Spec: &testkube.TestWorkflowSpec{},
},
Tags: map[string]string{
"my.key1": "value1",
},
}
if err := repo.Insert(ctx, execution); err != nil {
t.Fatalf("error inserting execution: %v", err)
}

execution = testkube.TestWorkflowExecution{
Id: "test-id-2",
Name: "test-name-2",
Workflow: &testkube.TestWorkflow{
Name: "test-name-2",
Spec: &testkube.TestWorkflowSpec{},
},
Tags: map[string]string{
"key2": "value2",
},
}
if err := repo.Insert(ctx, execution); err != nil {
t.Fatalf("error inserting execution: %v", err)
}

execution = testkube.TestWorkflowExecution{
Id: "test-id-3",
Name: "test-name-3",
Workflow: &testkube.TestWorkflow{
Name: "test-name-3",
Spec: &testkube.TestWorkflowSpec{},
},
Tags: map[string]string{
"my.key1": "value3",
"key2": "",
},
}
if err := repo.Insert(ctx, execution); err != nil {
t.Fatalf("error inserting execution: %v", err)
}

res, err := repo.GetExecutions(ctx, NewExecutionsFilter())
if err != nil {
t.Fatalf("error getting executions: %v", err)
}

assert.Len(t, res, 3)

tagSelector := "my.key1=value1"
res, err = repo.GetExecutions(ctx, NewExecutionsFilter().WithTagSelector(tagSelector))
if err != nil {
t.Fatalf("error getting executions: %v", err)
}

assert.Len(t, res, 1)
assert.Equal(t, "test-name-1", res[0].Name)

tagSelector = "my.key1"
res, err = repo.GetExecutions(ctx, NewExecutionsFilter().WithTagSelector(tagSelector))
if err != nil {
t.Fatalf("error getting executions: %v", err)
}

assert.Len(t, res, 2)

tagSelector = "my.key1=value3,key2"
res, err = repo.GetExecutions(ctx, NewExecutionsFilter().WithTagSelector(tagSelector))
if err != nil {
t.Fatalf("error getting executions: %v", err)
}

assert.Len(t, res, 1)
assert.Equal(t, "test-name-3", res[0].Name)

tagSelector = "my.key1=value1,key2=value2"
res, err = repo.GetExecutions(ctx, NewExecutionsFilter().WithTagSelector(tagSelector))
if err != nil {
t.Fatalf("error getting executions: %v", err)
}

assert.Len(t, res, 0)

tagSelector = "my.key1=value1,my.key1=value3"
res, err = repo.GetExecutions(ctx, NewExecutionsFilter().WithTagSelector(tagSelector))
if err != nil {
t.Fatalf("error getting executions: %v", err)
}

assert.Len(t, res, 2)
}

0 comments on commit 4567a1a

Please sign in to comment.