Skip to content

Commit

Permalink
Advanced visibility for SQLite
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigozhou committed Feb 3, 2023
1 parent 52d1ba0 commit 9c9062d
Show file tree
Hide file tree
Showing 7 changed files with 420 additions and 207 deletions.
2 changes: 2 additions & 0 deletions common/persistence/sql/sqlplugin/sqlite/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,5 +204,7 @@ func buildDSNAttr(cfg *config.SQL) (url.Values, error) {
// assume pragma
parameters.Add("_pragma", fmt.Sprintf("%s=%s", key, value))
}
// set time format
parameters.Add("_time_format", "sqlite")
return parameters, nil
}
2 changes: 1 addition & 1 deletion common/persistence/sql/sqlplugin/sqlite/typeconv.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (c *converter) ToSQLiteDateTime(t time.Time) time.Time {
if t.IsZero() {
return minSQLiteDateTime
}
return t.UTC()
return t.UTC().Truncate(time.Microsecond)
}

// FromSQLiteDateTime converts SQLite datetime and returns go time
Expand Down
311 changes: 106 additions & 205 deletions common/persistence/sql/sqlplugin/sqlite/visibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,248 +29,104 @@ package sqlite
import (
"context"
"database/sql"
"errors"
"fmt"
"strings"

"go.temporal.io/server/common/persistence/sql/sqlplugin"
)

const (
templateCreateWorkflowExecutionStarted = `INSERT INTO executions_visibility (` +
`namespace_id, workflow_id, run_id, start_time, execution_time, workflow_type_name, status, memo, encoding, task_queue) ` +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ` +
`ON CONFLICT (namespace_id, run_id) DO NOTHING`
var (
keywordListSeparator = "♡"

templateCreateWorkflowExecutionClosed = `REPLACE INTO executions_visibility (` +
`namespace_id, workflow_id, run_id, start_time, execution_time, workflow_type_name, close_time, status, history_length, memo, encoding, task_queue) ` +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `

// RunID condition is needed for correct pagination
templateConditions = ` AND namespace_id = ?
AND start_time >= ?
AND start_time <= ?
AND ((run_id > ? and start_time = ?) OR (start_time < ?))
ORDER BY start_time DESC, run_id
LIMIT ?`

templateConditionsClosedWorkflows = ` AND namespace_id = ?
AND close_time >= ?
AND close_time <= ?
AND ((run_id > ? and close_time = ?) OR (close_time < ?))
ORDER BY close_time DESC, run_id
LIMIT ?`

templateOpenFieldNames = `workflow_id, run_id, start_time, execution_time, workflow_type_name, status, memo, encoding, task_queue`
templateOpenSelect = `SELECT ` + templateOpenFieldNames + ` FROM executions_visibility WHERE status = 1 `

templateClosedSelect = `SELECT ` + templateOpenFieldNames + `, close_time, history_length
FROM executions_visibility WHERE status != 1 `

templateGetOpenWorkflowExecutions = templateOpenSelect + templateConditions

templateGetClosedWorkflowExecutions = templateClosedSelect + templateConditionsClosedWorkflows

templateGetOpenWorkflowExecutionsByType = templateOpenSelect + `AND workflow_type_name = ?` + templateConditions

templateGetClosedWorkflowExecutionsByType = templateClosedSelect + `AND workflow_type_name = ?` + templateConditionsClosedWorkflows

templateGetOpenWorkflowExecutionsByID = templateOpenSelect + `AND workflow_id = ?` + templateConditions

templateGetClosedWorkflowExecutionsByID = templateClosedSelect + `AND workflow_id = ?` + templateConditionsClosedWorkflows

templateGetClosedWorkflowExecutionsByStatus = templateClosedSelect + `AND status = ?` + templateConditionsClosedWorkflows
templateInsertWorkflowExecution = fmt.Sprintf(
`INSERT INTO executions_visibility (%s)
VALUES (%s)
ON CONFLICT (namespace_id, run_id) DO NOTHING`,
strings.Join(sqlplugin.DbFields, ", "),
sqlplugin.BuildNamedPlaceholder(sqlplugin.DbFields...),
)

templateGetClosedWorkflowExecution = `SELECT workflow_id, run_id, start_time, execution_time, memo, encoding, close_time, workflow_type_name, status, history_length, task_queue
FROM executions_visibility
WHERE namespace_id = ? AND status != 1
AND run_id = ?`
templateUpsertWorkflowExecution = fmt.Sprintf(
`INSERT INTO executions_visibility (%s)
VALUES (%s)
%s`,
strings.Join(sqlplugin.DbFields, ", "),
sqlplugin.BuildNamedPlaceholder(sqlplugin.DbFields...),
buildOnDuplicateKeyUpdate(sqlplugin.DbFields...),
)

templateGetWorkflowExecution = `
SELECT
workflow_id,
run_id,
start_time,
execution_time,
memo,
encoding,
close_time,
workflow_type_name,
status,
history_length,
task_queue
FROM executions_visibility
WHERE namespace_id = ? AND run_id = ?`
templateDeleteWorkflowExecution = `
DELETE FROM executions_visibility
WHERE namespace_id = :namespace_id AND run_id = :run_id`

templateDeleteWorkflowExecution = "DELETE FROM executions_visibility WHERE namespace_id = ? AND run_id = ?"
templateGetWorkflowExecution = fmt.Sprintf(
`SELECT %s FROM executions_visibility
WHERE namespace_id = :namespace_id AND run_id = :run_id`,
strings.Join(sqlplugin.DbFields, ", "),
)
)

var errCloseParams = errors.New("missing one of {closeTime, historyLength} params")
func buildOnDuplicateKeyUpdate(fields ...string) string {
items := make([]string, len(fields))
for i, field := range fields {
items[i] = fmt.Sprintf("%s = excluded.%s", field, field)
}
return fmt.Sprintf(
"ON CONFLICT (namespace_id, run_id) DO UPDATE SET %s",
strings.Join(items, ", "),
)
}

// InsertIntoVisibility inserts a row into visibility table. If an row already exist,
// its left as such and no update will be made
func (mdb *db) InsertIntoVisibility(
ctx context.Context,
row *sqlplugin.VisibilityRow,
) (sql.Result, error) {
row.StartTime = mdb.converter.ToSQLiteDateTime(row.StartTime)
return mdb.conn.ExecContext(ctx,
templateCreateWorkflowExecutionStarted,
row.NamespaceID,
row.WorkflowID,
row.RunID,
row.StartTime,
row.ExecutionTime,
row.WorkflowTypeName,
row.Status,
row.Memo,
row.Encoding,
row.TaskQueue,
)
finalRow := mdb.prepareRowForDB(row)
return mdb.conn.NamedExecContext(ctx, templateInsertWorkflowExecution, finalRow)
}

// ReplaceIntoVisibility replaces an existing row if it exist or creates a new row in visibility table
func (mdb *db) ReplaceIntoVisibility(
ctx context.Context,
row *sqlplugin.VisibilityRow,
) (sql.Result, error) {
switch {
case row.CloseTime != nil && row.HistoryLength != nil:
row.StartTime = mdb.converter.ToSQLiteDateTime(row.StartTime)
closeTime := mdb.converter.ToSQLiteDateTime(*row.CloseTime)
return mdb.conn.ExecContext(ctx,
templateCreateWorkflowExecutionClosed,
row.NamespaceID,
row.WorkflowID,
row.RunID,
row.StartTime,
row.ExecutionTime,
row.WorkflowTypeName,
closeTime,
row.Status,
*row.HistoryLength,
row.Memo,
row.Encoding,
row.TaskQueue,
)
default:
return nil, errCloseParams
}
finalRow := mdb.prepareRowForDB(row)
return mdb.conn.NamedExecContext(ctx, templateUpsertWorkflowExecution, finalRow)
}

// DeleteFromVisibility deletes a row from visibility table if it exist
func (mdb *db) DeleteFromVisibility(
ctx context.Context,
filter sqlplugin.VisibilityDeleteFilter,
) (sql.Result, error) {
return mdb.conn.ExecContext(ctx,
templateDeleteWorkflowExecution,
filter.NamespaceID,
filter.RunID,
)
return mdb.conn.NamedExecContext(ctx, templateDeleteWorkflowExecution, filter)
}

// SelectFromVisibility reads one or more rows from visibility table
func (mdb *db) SelectFromVisibility(
ctx context.Context,
filter sqlplugin.VisibilitySelectFilter,
) ([]sqlplugin.VisibilityRow, error) {
var err error
var rows []sqlplugin.VisibilityRow
if filter.MinTime != nil {
*filter.MinTime = mdb.converter.ToSQLiteDateTime(*filter.MinTime)
}
if filter.MaxTime != nil {
*filter.MaxTime = mdb.converter.ToSQLiteDateTime(*filter.MaxTime)
}
// If filter.Status == 0 (UNSPECIFIED) then only closed workflows will be returned (all excluding 1 (RUNNING)).
switch {
case filter.MinTime == nil && filter.RunID != nil && filter.Status != 1:
var row sqlplugin.VisibilityRow
err = mdb.conn.GetContext(ctx,
&row,
templateGetClosedWorkflowExecution,
filter.NamespaceID,
*filter.RunID,
)
if err == nil {
rows = append(rows, row)
if len(filter.Query) == 0 {
// backward compatibility for existing tests
err := sqlplugin.GenerateSelectQuery(&filter, mdb.converter.ToSQLiteDateTime)
if err != nil {
return nil, err
}
case filter.MinTime != nil && filter.MaxTime != nil &&
filter.WorkflowID != nil && filter.RunID != nil && filter.PageSize != nil:
qry := templateGetOpenWorkflowExecutionsByID
if filter.Status != 1 {
qry = templateGetClosedWorkflowExecutionsByID
}
err = mdb.conn.SelectContext(ctx,
&rows,
qry,
*filter.WorkflowID,
filter.NamespaceID,
*filter.MinTime,
*filter.MaxTime,
*filter.RunID,
*filter.MaxTime,
*filter.MaxTime,
*filter.PageSize,
)
case filter.MinTime != nil && filter.MaxTime != nil &&
filter.WorkflowTypeName != nil && filter.RunID != nil && filter.PageSize != nil:
qry := templateGetOpenWorkflowExecutionsByType
if filter.Status != 1 {
qry = templateGetClosedWorkflowExecutionsByType
}
err = mdb.conn.SelectContext(ctx,
&rows,
qry,
*filter.WorkflowTypeName,
filter.NamespaceID,
*filter.MinTime,
*filter.MaxTime,
*filter.RunID,
*filter.MaxTime,
*filter.MaxTime,
*filter.PageSize,
)
case filter.MinTime != nil && filter.MaxTime != nil &&
filter.RunID != nil && filter.PageSize != nil &&
filter.Status != 0 && filter.Status != 1: // 0 is UNSPECIFIED, 1 is RUNNING
err = mdb.conn.SelectContext(ctx,
&rows,
templateGetClosedWorkflowExecutionsByStatus,
filter.Status,
filter.NamespaceID,
*filter.MinTime,
*filter.MaxTime,
*filter.RunID,
*filter.MaxTime,
*filter.MaxTime,
*filter.PageSize,
)
case filter.MinTime != nil && filter.MaxTime != nil &&
filter.RunID != nil && filter.PageSize != nil:
qry := templateGetOpenWorkflowExecutions
if filter.Status != 1 {
qry = templateGetClosedWorkflowExecutions
}
err = mdb.conn.SelectContext(ctx,
&rows,
qry,
filter.NamespaceID,
*filter.MinTime,
*filter.MaxTime,
*filter.RunID,
*filter.MaxTime,
*filter.MaxTime,
*filter.PageSize,
)
default:
return nil, fmt.Errorf("invalid query filter")
}

var rows []sqlplugin.VisibilityRow
err := mdb.conn.SelectContext(ctx, &rows, filter.Query, filter.QueryArgs...)
if err != nil {
return nil, err
}
for i := range rows {
mdb.processRowFromDB(&rows[i])
err = mdb.processRowFromDB(&rows[i])
if err != nil {
return nil, err
}
}
return rows, nil
}
Expand All @@ -281,24 +137,69 @@ func (mdb *db) GetFromVisibility(
filter sqlplugin.VisibilityGetFilter,
) (*sqlplugin.VisibilityRow, error) {
var row sqlplugin.VisibilityRow
err := mdb.conn.GetContext(ctx,
&row,
templateGetWorkflowExecution,
filter.NamespaceID,
filter.RunID,
)
stmt, err := mdb.conn.PrepareNamedContext(ctx, templateGetWorkflowExecution)
if err != nil {
return nil, err
}
err = stmt.GetContext(ctx, &row, filter)
if err != nil {
return nil, err
}
err = mdb.processRowFromDB(&row)
if err != nil {
return nil, err
}
mdb.processRowFromDB(&row)
return &row, nil
}

func (mdb *db) processRowFromDB(row *sqlplugin.VisibilityRow) {
func (mdb *db) prepareRowForDB(row *sqlplugin.VisibilityRow) *sqlplugin.VisibilityRow {
if row == nil {
return nil
}
finalRow := *row
finalRow.StartTime = mdb.converter.ToSQLiteDateTime(finalRow.StartTime)
finalRow.ExecutionTime = mdb.converter.ToSQLiteDateTime(finalRow.ExecutionTime)
if finalRow.CloseTime != nil {
*finalRow.CloseTime = mdb.converter.ToSQLiteDateTime(*finalRow.CloseTime)
}
if finalRow.SearchAttributes != nil {
finalSearchAttributes := sqlplugin.VisibilitySearchAttributes{}
for name, value := range *finalRow.SearchAttributes {
switch v := value.(type) {
case []string:
finalSearchAttributes[name] = strings.Join(v, keywordListSeparator)
default:
finalSearchAttributes[name] = v
}
}
finalRow.SearchAttributes = &finalSearchAttributes
}
return &finalRow
}

func (mdb *db) processRowFromDB(row *sqlplugin.VisibilityRow) error {
if row == nil {
return nil
}
row.StartTime = mdb.converter.FromSQLiteDateTime(row.StartTime)
row.ExecutionTime = mdb.converter.FromSQLiteDateTime(row.ExecutionTime)
if row.CloseTime != nil {
closeTime := mdb.converter.FromSQLiteDateTime(*row.CloseTime)
row.CloseTime = &closeTime
}
if row.SearchAttributes != nil {
for saName, saValue := range *row.SearchAttributes {
switch typedSaValue := saValue.(type) {
case string:
if strings.Index(typedSaValue, keywordListSeparator) >= 0 {
// If the string contains the keywordListSeparator, then we need to split it
// into a list of keywords.
(*row.SearchAttributes)[saName] = strings.Split(typedSaValue, keywordListSeparator)
}
default:
// no-op
}
}
}
return nil
}
Loading

0 comments on commit 9c9062d

Please sign in to comment.