Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix GetWorkflowExecution in PostgreSQL #3816

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions common/persistence/sql/sqlplugin/mysql/visibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,7 @@ func (mdb *db) SelectFromVisibility(
return nil, err
}
for i := range rows {
rows[i].StartTime = mdb.converter.FromMySQLDateTime(rows[i].StartTime)
rows[i].ExecutionTime = mdb.converter.FromMySQLDateTime(rows[i].ExecutionTime)
if rows[i].CloseTime != nil {
closeTime := mdb.converter.FromMySQLDateTime(*rows[i].CloseTime)
rows[i].CloseTime = &closeTime
}
mdb.processRowFromDB(&rows[i])
}
return rows, nil
}
Expand All @@ -298,5 +293,15 @@ func (mdb *db) GetFromVisibility(
if err != nil {
return nil, err
}
mdb.processRowFromDB(&row)
return &row, nil
}

func (mdb *db) processRowFromDB(row *sqlplugin.VisibilityRow) {
row.StartTime = mdb.converter.FromMySQLDateTime(row.StartTime)
row.ExecutionTime = mdb.converter.FromMySQLDateTime(row.ExecutionTime)
if row.CloseTime != nil {
closeTime := mdb.converter.FromMySQLDateTime(*row.CloseTime)
row.CloseTime = &closeTime
}
}
27 changes: 16 additions & 11 deletions common/persistence/sql/sqlplugin/postgresql/visibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,26 +290,18 @@ func (pdb *db) SelectFromVisibility(
return nil, err
}
for i := range rows {
rows[i].StartTime = pdb.converter.FromPostgreSQLDateTime(rows[i].StartTime)
rows[i].ExecutionTime = pdb.converter.FromPostgreSQLDateTime(rows[i].ExecutionTime)
if rows[i].CloseTime != nil {
closeTime := pdb.converter.FromPostgreSQLDateTime(*rows[i].CloseTime)
rows[i].CloseTime = &closeTime
}
// need to trim the run ID, or otherwise the returned value will
// come with lots of trailing spaces, probably due to the CHAR(64) type
rows[i].RunID = strings.TrimSpace(rows[i].RunID)
pdb.processRowFromDB(&rows[i])
}
return rows, nil
}

// GetFromVisibility reads one row from visibility table
func (mdb *db) GetFromVisibility(
func (pdb *db) GetFromVisibility(
ctx context.Context,
filter sqlplugin.VisibilityGetFilter,
) (*sqlplugin.VisibilityRow, error) {
var row sqlplugin.VisibilityRow
err := mdb.conn.GetContext(ctx,
err := pdb.conn.GetContext(ctx,
&row,
templateGetWorkflowExecution,
filter.NamespaceID,
Expand All @@ -318,5 +310,18 @@ func (mdb *db) GetFromVisibility(
if err != nil {
return nil, err
}
pdb.processRowFromDB(&row)
return &row, nil
}

func (pdb *db) processRowFromDB(row *sqlplugin.VisibilityRow) {
row.StartTime = pdb.converter.FromPostgreSQLDateTime(row.StartTime)
row.ExecutionTime = pdb.converter.FromPostgreSQLDateTime(row.ExecutionTime)
if row.CloseTime != nil {
closeTime := pdb.converter.FromPostgreSQLDateTime(*row.CloseTime)
row.CloseTime = &closeTime
}
// need to trim the run ID, or otherwise the returned value will
// come with lots of trailing spaces, probably due to the CHAR(64) type
row.RunID = strings.TrimSpace(row.RunID)
}
17 changes: 11 additions & 6 deletions common/persistence/sql/sqlplugin/sqlite/visibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,12 +270,7 @@ func (mdb *db) SelectFromVisibility(
return nil, err
}
for i := range rows {
rows[i].StartTime = mdb.converter.FromSQLiteDateTime(rows[i].StartTime)
rows[i].ExecutionTime = mdb.converter.FromSQLiteDateTime(rows[i].ExecutionTime)
if rows[i].CloseTime != nil {
closeTime := mdb.converter.FromSQLiteDateTime(*rows[i].CloseTime)
rows[i].CloseTime = &closeTime
}
mdb.processRowFromDB(&rows[i])
}
return rows, nil
}
Expand All @@ -295,5 +290,15 @@ func (mdb *db) GetFromVisibility(
if err != nil {
return nil, err
}
mdb.processRowFromDB(&row)
return &row, nil
}

func (mdb *db) processRowFromDB(row *sqlplugin.VisibilityRow) {
row.StartTime = mdb.converter.FromSQLiteDateTime(row.StartTime)
row.ExecutionTime = mdb.converter.FromSQLiteDateTime(row.ExecutionTime)
if row.CloseTime != nil {
closeTime := mdb.converter.FromSQLiteDateTime(*row.CloseTime)
row.CloseTime = &closeTime
}
}
53 changes: 52 additions & 1 deletion common/persistence/tests/visibility_persistence_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,7 @@ func (s *VisibilityPersistenceSuite) TestDeleteWorkflow() {

// TestUpsertWorkflowExecution test
func (s *VisibilityPersistenceSuite) TestUpsertWorkflowExecution() {
temporalChangeVersionPayload, _ := payload.Encode([]string{"dummy"})
tests := []struct {
request *manager.UpsertWorkflowExecutionRequest
expected error
Expand All @@ -630,7 +631,7 @@ func (s *VisibilityPersistenceSuite) TestUpsertWorkflowExecution() {
Memo: nil,
SearchAttributes: &commonpb.SearchAttributes{
IndexedFields: map[string]*commonpb.Payload{
searchattribute.TemporalChangeVersion: payload.EncodeBytes([]byte("dummy")),
searchattribute.TemporalChangeVersion: temporalChangeVersionPayload,
},
},
Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
Expand Down Expand Up @@ -665,6 +666,56 @@ func (s *VisibilityPersistenceSuite) TestUpsertWorkflowExecution() {
}
}

// TestGetWorkflowExecution test
func (s *VisibilityPersistenceSuite) TestGetWorkflowExecution() {
testNamespaceUUID := namespace.ID(uuid.New())
closeTime := time.Now().UTC()
startTime := closeTime.Add(-5 * time.Second)

var startRequests []*manager.RecordWorkflowExecutionStartedRequest
for i := 0; i < 5; i++ {
startRequests = append(
startRequests,
s.createOpenWorkflowRecord(
testNamespaceUUID,
"visibility-workflow-test",
"visibility-workflow",
startTime,
"test-queue",
),
)
}
for _, req := range startRequests {
resp, err := s.VisibilityMgr.GetWorkflowExecution(
s.ctx,
&manager.GetWorkflowExecutionRequest{
NamespaceID: testNamespaceUUID,
RunID: req.Execution.RunId,
StartTime: &startTime,
},
)
s.NoError(err)
s.assertOpenExecutionEquals(req, resp.Execution)
}

var closeRequests []*manager.RecordWorkflowExecutionClosedRequest
for _, startReq := range startRequests {
closeRequests = append(closeRequests, s.createClosedWorkflowRecord(startReq, closeTime))
}
for _, req := range closeRequests {
resp, err := s.VisibilityMgr.GetWorkflowExecution(
s.ctx,
&manager.GetWorkflowExecutionRequest{
NamespaceID: testNamespaceUUID,
RunID: req.Execution.RunId,
CloseTime: &closeTime,
},
)
s.NoError(err)
s.assertClosedExecutionEquals(req, resp.Execution)
}
}

// TestAdvancedVisibilityPagination test
func (s *VisibilityPersistenceSuite) TestAdvancedVisibilityPagination() {
testNamespaceUUID := namespace.ID(uuid.New())
Expand Down