diff --git a/common/persistence/sql/sqlplugin/mysql/visibility.go b/common/persistence/sql/sqlplugin/mysql/visibility.go index 247d996f3c4..bfcbaa26770 100644 --- a/common/persistence/sql/sqlplugin/mysql/visibility.go +++ b/common/persistence/sql/sqlplugin/mysql/visibility.go @@ -273,12 +273,7 @@ func (mdb *db) SelectFromVisibility( return nil, err } for i := range rows { - rows[i].StartTime = mdb.converter.FromMySQLDateTime(rows[i].StartTime) - rows[i].ExecutionTime = mdb.converter.FromMySQLDateTime(rows[i].ExecutionTime) - if rows[i].CloseTime != nil { - closeTime := mdb.converter.FromMySQLDateTime(*rows[i].CloseTime) - rows[i].CloseTime = &closeTime - } + mdb.processRowFromDB(&rows[i]) } return rows, nil } @@ -298,5 +293,15 @@ func (mdb *db) GetFromVisibility( if err != nil { return nil, err } + mdb.processRowFromDB(&row) return &row, nil } + +func (mdb *db) processRowFromDB(row *sqlplugin.VisibilityRow) { + row.StartTime = mdb.converter.FromMySQLDateTime(row.StartTime) + row.ExecutionTime = mdb.converter.FromMySQLDateTime(row.ExecutionTime) + if row.CloseTime != nil { + closeTime := mdb.converter.FromMySQLDateTime(*row.CloseTime) + row.CloseTime = &closeTime + } +} diff --git a/common/persistence/sql/sqlplugin/postgresql/visibility.go b/common/persistence/sql/sqlplugin/postgresql/visibility.go index 56807bd1c40..ec3362d18a7 100644 --- a/common/persistence/sql/sqlplugin/postgresql/visibility.go +++ b/common/persistence/sql/sqlplugin/postgresql/visibility.go @@ -290,26 +290,18 @@ func (pdb *db) SelectFromVisibility( return nil, err } for i := range rows { - rows[i].StartTime = pdb.converter.FromPostgreSQLDateTime(rows[i].StartTime) - rows[i].ExecutionTime = pdb.converter.FromPostgreSQLDateTime(rows[i].ExecutionTime) - if rows[i].CloseTime != nil { - closeTime := pdb.converter.FromPostgreSQLDateTime(*rows[i].CloseTime) - rows[i].CloseTime = &closeTime - } - // need to trim the run ID, or otherwise the returned value will - // come with lots of trailing spaces, probably due to the CHAR(64) type - rows[i].RunID = strings.TrimSpace(rows[i].RunID) + pdb.processRowFromDB(&rows[i]) } return rows, nil } // GetFromVisibility reads one row from visibility table -func (mdb *db) GetFromVisibility( +func (pdb *db) GetFromVisibility( ctx context.Context, filter sqlplugin.VisibilityGetFilter, ) (*sqlplugin.VisibilityRow, error) { var row sqlplugin.VisibilityRow - err := mdb.conn.GetContext(ctx, + err := pdb.conn.GetContext(ctx, &row, templateGetWorkflowExecution, filter.NamespaceID, @@ -318,5 +310,18 @@ func (mdb *db) GetFromVisibility( if err != nil { return nil, err } + pdb.processRowFromDB(&row) return &row, nil } + +func (pdb *db) processRowFromDB(row *sqlplugin.VisibilityRow) { + row.StartTime = pdb.converter.FromPostgreSQLDateTime(row.StartTime) + row.ExecutionTime = pdb.converter.FromPostgreSQLDateTime(row.ExecutionTime) + if row.CloseTime != nil { + closeTime := pdb.converter.FromPostgreSQLDateTime(*row.CloseTime) + row.CloseTime = &closeTime + } + // need to trim the run ID, or otherwise the returned value will + // come with lots of trailing spaces, probably due to the CHAR(64) type + row.RunID = strings.TrimSpace(row.RunID) +} diff --git a/common/persistence/sql/sqlplugin/sqlite/visibility.go b/common/persistence/sql/sqlplugin/sqlite/visibility.go index ce6e287a55c..c422a1a841a 100644 --- a/common/persistence/sql/sqlplugin/sqlite/visibility.go +++ b/common/persistence/sql/sqlplugin/sqlite/visibility.go @@ -270,12 +270,7 @@ func (mdb *db) SelectFromVisibility( return nil, err } for i := range rows { - rows[i].StartTime = mdb.converter.FromSQLiteDateTime(rows[i].StartTime) - rows[i].ExecutionTime = mdb.converter.FromSQLiteDateTime(rows[i].ExecutionTime) - if rows[i].CloseTime != nil { - closeTime := mdb.converter.FromSQLiteDateTime(*rows[i].CloseTime) - rows[i].CloseTime = &closeTime - } + mdb.processRowFromDB(&rows[i]) } return rows, nil } @@ -295,5 +290,15 @@ func (mdb *db) GetFromVisibility( if err != nil { return nil, err } + mdb.processRowFromDB(&row) return &row, nil } + +func (mdb *db) processRowFromDB(row *sqlplugin.VisibilityRow) { + row.StartTime = mdb.converter.FromSQLiteDateTime(row.StartTime) + row.ExecutionTime = mdb.converter.FromSQLiteDateTime(row.ExecutionTime) + if row.CloseTime != nil { + closeTime := mdb.converter.FromSQLiteDateTime(*row.CloseTime) + row.CloseTime = &closeTime + } +} diff --git a/common/persistence/tests/visibility_persistence_suite_test.go b/common/persistence/tests/visibility_persistence_suite_test.go index ffea7043eab..9f98eaad2b2 100644 --- a/common/persistence/tests/visibility_persistence_suite_test.go +++ b/common/persistence/tests/visibility_persistence_suite_test.go @@ -613,6 +613,7 @@ func (s *VisibilityPersistenceSuite) TestDeleteWorkflow() { // TestUpsertWorkflowExecution test func (s *VisibilityPersistenceSuite) TestUpsertWorkflowExecution() { + temporalChangeVersionPayload, _ := payload.Encode([]string{"dummy"}) tests := []struct { request *manager.UpsertWorkflowExecutionRequest expected error @@ -630,7 +631,7 @@ func (s *VisibilityPersistenceSuite) TestUpsertWorkflowExecution() { Memo: nil, SearchAttributes: &commonpb.SearchAttributes{ IndexedFields: map[string]*commonpb.Payload{ - searchattribute.TemporalChangeVersion: payload.EncodeBytes([]byte("dummy")), + searchattribute.TemporalChangeVersion: temporalChangeVersionPayload, }, }, Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, @@ -665,6 +666,56 @@ func (s *VisibilityPersistenceSuite) TestUpsertWorkflowExecution() { } } +// TestGetWorkflowExecution test +func (s *VisibilityPersistenceSuite) TestGetWorkflowExecution() { + testNamespaceUUID := namespace.ID(uuid.New()) + closeTime := time.Now().UTC() + startTime := closeTime.Add(-5 * time.Second) + + var startRequests []*manager.RecordWorkflowExecutionStartedRequest + for i := 0; i < 5; i++ { + startRequests = append( + startRequests, + s.createOpenWorkflowRecord( + testNamespaceUUID, + "visibility-workflow-test", + "visibility-workflow", + startTime, + "test-queue", + ), + ) + } + for _, req := range startRequests { + resp, err := s.VisibilityMgr.GetWorkflowExecution( + s.ctx, + &manager.GetWorkflowExecutionRequest{ + NamespaceID: testNamespaceUUID, + RunID: req.Execution.RunId, + StartTime: &startTime, + }, + ) + s.NoError(err) + s.assertOpenExecutionEquals(req, resp.Execution) + } + + var closeRequests []*manager.RecordWorkflowExecutionClosedRequest + for _, startReq := range startRequests { + closeRequests = append(closeRequests, s.createClosedWorkflowRecord(startReq, closeTime)) + } + for _, req := range closeRequests { + resp, err := s.VisibilityMgr.GetWorkflowExecution( + s.ctx, + &manager.GetWorkflowExecutionRequest{ + NamespaceID: testNamespaceUUID, + RunID: req.Execution.RunId, + CloseTime: &closeTime, + }, + ) + s.NoError(err) + s.assertClosedExecutionEquals(req, resp.Execution) + } +} + // TestAdvancedVisibilityPagination test func (s *VisibilityPersistenceSuite) TestAdvancedVisibilityPagination() { testNamespaceUUID := namespace.ID(uuid.New())