Skip to content

Commit

Permalink
Fix GetWorkflowExecution in PostgreSQL
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigozhou committed Jan 24, 2023
1 parent 5175ab6 commit ee060ae
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 24 deletions.
17 changes: 11 additions & 6 deletions common/persistence/sql/sqlplugin/mysql/visibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,7 @@ func (mdb *db) SelectFromVisibility(
return nil, err
}
for i := range rows {
rows[i].StartTime = mdb.converter.FromMySQLDateTime(rows[i].StartTime)
rows[i].ExecutionTime = mdb.converter.FromMySQLDateTime(rows[i].ExecutionTime)
if rows[i].CloseTime != nil {
closeTime := mdb.converter.FromMySQLDateTime(*rows[i].CloseTime)
rows[i].CloseTime = &closeTime
}
mdb.processRowFromDB(&rows[i])
}
return rows, nil
}
Expand All @@ -298,5 +293,15 @@ func (mdb *db) GetFromVisibility(
if err != nil {
return nil, err
}
mdb.processRowFromDB(&row)
return &row, nil
}

func (mdb *db) processRowFromDB(row *sqlplugin.VisibilityRow) {
row.StartTime = mdb.converter.FromMySQLDateTime(row.StartTime)
row.ExecutionTime = mdb.converter.FromMySQLDateTime(row.ExecutionTime)
if row.CloseTime != nil {
closeTime := mdb.converter.FromMySQLDateTime(*row.CloseTime)
row.CloseTime = &closeTime
}
}
27 changes: 16 additions & 11 deletions common/persistence/sql/sqlplugin/postgresql/visibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,26 +290,18 @@ func (pdb *db) SelectFromVisibility(
return nil, err
}
for i := range rows {
rows[i].StartTime = pdb.converter.FromPostgreSQLDateTime(rows[i].StartTime)
rows[i].ExecutionTime = pdb.converter.FromPostgreSQLDateTime(rows[i].ExecutionTime)
if rows[i].CloseTime != nil {
closeTime := pdb.converter.FromPostgreSQLDateTime(*rows[i].CloseTime)
rows[i].CloseTime = &closeTime
}
// need to trim the run ID, or otherwise the returned value will
// come with lots of trailing spaces, probably due to the CHAR(64) type
rows[i].RunID = strings.TrimSpace(rows[i].RunID)
pdb.processRowFromDB(&rows[i])
}
return rows, nil
}

// GetFromVisibility reads one row from visibility table
func (mdb *db) GetFromVisibility(
func (pdb *db) GetFromVisibility(
ctx context.Context,
filter sqlplugin.VisibilityGetFilter,
) (*sqlplugin.VisibilityRow, error) {
var row sqlplugin.VisibilityRow
err := mdb.conn.GetContext(ctx,
err := pdb.conn.GetContext(ctx,
&row,
templateGetWorkflowExecution,
filter.NamespaceID,
Expand All @@ -318,5 +310,18 @@ func (mdb *db) GetFromVisibility(
if err != nil {
return nil, err
}
pdb.processRowFromDB(&row)
return &row, nil
}

func (pdb *db) processRowFromDB(row *sqlplugin.VisibilityRow) {
row.StartTime = pdb.converter.FromPostgreSQLDateTime(row.StartTime)
row.ExecutionTime = pdb.converter.FromPostgreSQLDateTime(row.ExecutionTime)
if row.CloseTime != nil {
closeTime := pdb.converter.FromPostgreSQLDateTime(*row.CloseTime)
row.CloseTime = &closeTime
}
// need to trim the run ID, or otherwise the returned value will
// come with lots of trailing spaces, probably due to the CHAR(64) type
row.RunID = strings.TrimSpace(row.RunID)
}
17 changes: 11 additions & 6 deletions common/persistence/sql/sqlplugin/sqlite/visibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,12 +270,7 @@ func (mdb *db) SelectFromVisibility(
return nil, err
}
for i := range rows {
rows[i].StartTime = mdb.converter.FromSQLiteDateTime(rows[i].StartTime)
rows[i].ExecutionTime = mdb.converter.FromSQLiteDateTime(rows[i].ExecutionTime)
if rows[i].CloseTime != nil {
closeTime := mdb.converter.FromSQLiteDateTime(*rows[i].CloseTime)
rows[i].CloseTime = &closeTime
}
mdb.processRowFromDB(&rows[i])
}
return rows, nil
}
Expand All @@ -295,5 +290,15 @@ func (mdb *db) GetFromVisibility(
if err != nil {
return nil, err
}
mdb.processRowFromDB(&row)
return &row, nil
}

func (mdb *db) processRowFromDB(row *sqlplugin.VisibilityRow) {
row.StartTime = mdb.converter.FromSQLiteDateTime(row.StartTime)
row.ExecutionTime = mdb.converter.FromSQLiteDateTime(row.ExecutionTime)
if row.CloseTime != nil {
closeTime := mdb.converter.FromSQLiteDateTime(*row.CloseTime)
row.CloseTime = &closeTime
}
}
53 changes: 52 additions & 1 deletion common/persistence/tests/visibility_persistence_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,7 @@ func (s *VisibilityPersistenceSuite) TestDeleteWorkflow() {

// TestUpsertWorkflowExecution test
func (s *VisibilityPersistenceSuite) TestUpsertWorkflowExecution() {
temporalChangeVersionPayload, _ := payload.Encode([]string{"dummy"})
tests := []struct {
request *manager.UpsertWorkflowExecutionRequest
expected error
Expand All @@ -630,7 +631,7 @@ func (s *VisibilityPersistenceSuite) TestUpsertWorkflowExecution() {
Memo: nil,
SearchAttributes: &commonpb.SearchAttributes{
IndexedFields: map[string]*commonpb.Payload{
searchattribute.TemporalChangeVersion: payload.EncodeBytes([]byte("dummy")),
searchattribute.TemporalChangeVersion: temporalChangeVersionPayload,
},
},
Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
Expand Down Expand Up @@ -665,6 +666,56 @@ func (s *VisibilityPersistenceSuite) TestUpsertWorkflowExecution() {
}
}

// TestGetWorkflowExecution test
func (s *VisibilityPersistenceSuite) TestGetWorkflowExecution() {
testNamespaceUUID := namespace.ID(uuid.New())
closeTime := time.Now().UTC()
startTime := closeTime.Add(-5 * time.Second)

var startRequests []*manager.RecordWorkflowExecutionStartedRequest
for i := 0; i < 5; i++ {
startRequests = append(
startRequests,
s.createOpenWorkflowRecord(
testNamespaceUUID,
"visibility-workflow-test",
"visibility-workflow",
startTime,
"test-queue",
),
)
}
for _, req := range startRequests {
resp, err := s.VisibilityMgr.GetWorkflowExecution(
s.ctx,
&manager.GetWorkflowExecutionRequest{
NamespaceID: testNamespaceUUID,
RunID: req.Execution.RunId,
StartTime: &startTime,
},
)
s.NoError(err)
s.assertOpenExecutionEquals(req, resp.Execution)
}

var closeRequests []*manager.RecordWorkflowExecutionClosedRequest
for _, startReq := range startRequests {
closeRequests = append(closeRequests, s.createClosedWorkflowRecord(startReq, closeTime))
}
for _, req := range closeRequests {
resp, err := s.VisibilityMgr.GetWorkflowExecution(
s.ctx,
&manager.GetWorkflowExecutionRequest{
NamespaceID: testNamespaceUUID,
RunID: req.Execution.RunId,
CloseTime: &closeTime,
},
)
s.NoError(err)
s.assertClosedExecutionEquals(req, resp.Execution)
}
}

// TestAdvancedVisibilityPagination test
func (s *VisibilityPersistenceSuite) TestAdvancedVisibilityPagination() {
testNamespaceUUID := namespace.ID(uuid.New())
Expand Down

0 comments on commit ee060ae

Please sign in to comment.