diff --git a/common/persistence/sql/sqlplugin/sqlite/plugin.go b/common/persistence/sql/sqlplugin/sqlite/plugin.go index 7f62ceefedf..e385b27e214 100644 --- a/common/persistence/sql/sqlplugin/sqlite/plugin.go +++ b/common/persistence/sql/sqlplugin/sqlite/plugin.go @@ -204,5 +204,7 @@ func buildDSNAttr(cfg *config.SQL) (url.Values, error) { // assume pragma parameters.Add("_pragma", fmt.Sprintf("%s=%s", key, value)) } + // set time format + parameters.Add("_time_format", "sqlite") return parameters, nil } diff --git a/common/persistence/sql/sqlplugin/sqlite/typeconv.go b/common/persistence/sql/sqlplugin/sqlite/typeconv.go index 721cc381eee..6f22a45e014 100644 --- a/common/persistence/sql/sqlplugin/sqlite/typeconv.go +++ b/common/persistence/sql/sqlplugin/sqlite/typeconv.go @@ -47,7 +47,7 @@ func (c *converter) ToSQLiteDateTime(t time.Time) time.Time { if t.IsZero() { return minSQLiteDateTime } - return t.UTC() + return t.UTC().Truncate(time.Microsecond) } // FromSQLiteDateTime converts SQLite datetime and returns go time diff --git a/common/persistence/sql/sqlplugin/sqlite/visibility.go b/common/persistence/sql/sqlplugin/sqlite/visibility.go index c422a1a841a..c2134b8d274 100644 --- a/common/persistence/sql/sqlplugin/sqlite/visibility.go +++ b/common/persistence/sql/sqlplugin/sqlite/visibility.go @@ -29,82 +29,53 @@ package sqlite import ( "context" "database/sql" - "errors" "fmt" + "strings" "go.temporal.io/server/common/persistence/sql/sqlplugin" ) -const ( - templateCreateWorkflowExecutionStarted = `INSERT INTO executions_visibility (` + - `namespace_id, workflow_id, run_id, start_time, execution_time, workflow_type_name, status, memo, encoding, task_queue) ` + - `VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ` + - `ON CONFLICT (namespace_id, run_id) DO NOTHING` +var ( + keywordListSeparator = "♡" - templateCreateWorkflowExecutionClosed = `REPLACE INTO executions_visibility (` + - `namespace_id, workflow_id, run_id, start_time, execution_time, workflow_type_name, close_time, status, history_length, memo, encoding, task_queue) ` + - `VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ` - - // RunID condition is needed for correct pagination - templateConditions = ` AND namespace_id = ? - AND start_time >= ? - AND start_time <= ? - AND ((run_id > ? and start_time = ?) OR (start_time < ?)) - ORDER BY start_time DESC, run_id - LIMIT ?` - - templateConditionsClosedWorkflows = ` AND namespace_id = ? - AND close_time >= ? - AND close_time <= ? - AND ((run_id > ? and close_time = ?) OR (close_time < ?)) - ORDER BY close_time DESC, run_id - LIMIT ?` - - templateOpenFieldNames = `workflow_id, run_id, start_time, execution_time, workflow_type_name, status, memo, encoding, task_queue` - templateOpenSelect = `SELECT ` + templateOpenFieldNames + ` FROM executions_visibility WHERE status = 1 ` - - templateClosedSelect = `SELECT ` + templateOpenFieldNames + `, close_time, history_length - FROM executions_visibility WHERE status != 1 ` - - templateGetOpenWorkflowExecutions = templateOpenSelect + templateConditions - - templateGetClosedWorkflowExecutions = templateClosedSelect + templateConditionsClosedWorkflows - - templateGetOpenWorkflowExecutionsByType = templateOpenSelect + `AND workflow_type_name = ?` + templateConditions - - templateGetClosedWorkflowExecutionsByType = templateClosedSelect + `AND workflow_type_name = ?` + templateConditionsClosedWorkflows - - templateGetOpenWorkflowExecutionsByID = templateOpenSelect + `AND workflow_id = ?` + templateConditions - - templateGetClosedWorkflowExecutionsByID = templateClosedSelect + `AND workflow_id = ?` + templateConditionsClosedWorkflows - - templateGetClosedWorkflowExecutionsByStatus = templateClosedSelect + `AND status = ?` + templateConditionsClosedWorkflows + templateInsertWorkflowExecution = fmt.Sprintf( + `INSERT INTO executions_visibility (%s) + VALUES (%s) + ON CONFLICT (namespace_id, run_id) DO NOTHING`, + strings.Join(sqlplugin.DbFields, ", "), + sqlplugin.BuildNamedPlaceholder(sqlplugin.DbFields...), + ) - templateGetClosedWorkflowExecution = `SELECT workflow_id, run_id, start_time, execution_time, memo, encoding, close_time, workflow_type_name, status, history_length, task_queue - FROM executions_visibility - WHERE namespace_id = ? AND status != 1 - AND run_id = ?` + templateUpsertWorkflowExecution = fmt.Sprintf( + `INSERT INTO executions_visibility (%s) + VALUES (%s) + %s`, + strings.Join(sqlplugin.DbFields, ", "), + sqlplugin.BuildNamedPlaceholder(sqlplugin.DbFields...), + buildOnDuplicateKeyUpdate(sqlplugin.DbFields...), + ) - templateGetWorkflowExecution = ` - SELECT - workflow_id, - run_id, - start_time, - execution_time, - memo, - encoding, - close_time, - workflow_type_name, - status, - history_length, - task_queue - FROM executions_visibility - WHERE namespace_id = ? AND run_id = ?` + templateDeleteWorkflowExecution = ` + DELETE FROM executions_visibility + WHERE namespace_id = :namespace_id AND run_id = :run_id` - templateDeleteWorkflowExecution = "DELETE FROM executions_visibility WHERE namespace_id = ? AND run_id = ?" + templateGetWorkflowExecution = fmt.Sprintf( + `SELECT %s FROM executions_visibility + WHERE namespace_id = :namespace_id AND run_id = :run_id`, + strings.Join(sqlplugin.DbFields, ", "), + ) ) -var errCloseParams = errors.New("missing one of {closeTime, historyLength} params") +func buildOnDuplicateKeyUpdate(fields ...string) string { + items := make([]string, len(fields)) + for i, field := range fields { + items[i] = fmt.Sprintf("%s = excluded.%s", field, field) + } + return fmt.Sprintf( + "ON CONFLICT (namespace_id, run_id) DO UPDATE SET %s", + strings.Join(items, ", "), + ) +} // InsertIntoVisibility inserts a row into visibility table. If an row already exist, // its left as such and no update will be made @@ -112,20 +83,8 @@ func (mdb *db) InsertIntoVisibility( ctx context.Context, row *sqlplugin.VisibilityRow, ) (sql.Result, error) { - row.StartTime = mdb.converter.ToSQLiteDateTime(row.StartTime) - return mdb.conn.ExecContext(ctx, - templateCreateWorkflowExecutionStarted, - row.NamespaceID, - row.WorkflowID, - row.RunID, - row.StartTime, - row.ExecutionTime, - row.WorkflowTypeName, - row.Status, - row.Memo, - row.Encoding, - row.TaskQueue, - ) + finalRow := mdb.prepareRowForDB(row) + return mdb.conn.NamedExecContext(ctx, templateInsertWorkflowExecution, finalRow) } // ReplaceIntoVisibility replaces an existing row if it exist or creates a new row in visibility table @@ -133,28 +92,8 @@ func (mdb *db) ReplaceIntoVisibility( ctx context.Context, row *sqlplugin.VisibilityRow, ) (sql.Result, error) { - switch { - case row.CloseTime != nil && row.HistoryLength != nil: - row.StartTime = mdb.converter.ToSQLiteDateTime(row.StartTime) - closeTime := mdb.converter.ToSQLiteDateTime(*row.CloseTime) - return mdb.conn.ExecContext(ctx, - templateCreateWorkflowExecutionClosed, - row.NamespaceID, - row.WorkflowID, - row.RunID, - row.StartTime, - row.ExecutionTime, - row.WorkflowTypeName, - closeTime, - row.Status, - *row.HistoryLength, - row.Memo, - row.Encoding, - row.TaskQueue, - ) - default: - return nil, errCloseParams - } + finalRow := mdb.prepareRowForDB(row) + return mdb.conn.NamedExecContext(ctx, templateUpsertWorkflowExecution, finalRow) } // DeleteFromVisibility deletes a row from visibility table if it exist @@ -162,11 +101,7 @@ func (mdb *db) DeleteFromVisibility( ctx context.Context, filter sqlplugin.VisibilityDeleteFilter, ) (sql.Result, error) { - return mdb.conn.ExecContext(ctx, - templateDeleteWorkflowExecution, - filter.NamespaceID, - filter.RunID, - ) + return mdb.conn.NamedExecContext(ctx, templateDeleteWorkflowExecution, filter) } // SelectFromVisibility reads one or more rows from visibility table @@ -174,103 +109,24 @@ func (mdb *db) SelectFromVisibility( ctx context.Context, filter sqlplugin.VisibilitySelectFilter, ) ([]sqlplugin.VisibilityRow, error) { - var err error - var rows []sqlplugin.VisibilityRow - if filter.MinTime != nil { - *filter.MinTime = mdb.converter.ToSQLiteDateTime(*filter.MinTime) - } - if filter.MaxTime != nil { - *filter.MaxTime = mdb.converter.ToSQLiteDateTime(*filter.MaxTime) - } - // If filter.Status == 0 (UNSPECIFIED) then only closed workflows will be returned (all excluding 1 (RUNNING)). - switch { - case filter.MinTime == nil && filter.RunID != nil && filter.Status != 1: - var row sqlplugin.VisibilityRow - err = mdb.conn.GetContext(ctx, - &row, - templateGetClosedWorkflowExecution, - filter.NamespaceID, - *filter.RunID, - ) - if err == nil { - rows = append(rows, row) + if len(filter.Query) == 0 { + // backward compatibility for existing tests + err := sqlplugin.GenerateSelectQuery(&filter, mdb.converter.ToSQLiteDateTime) + if err != nil { + return nil, err } - case filter.MinTime != nil && filter.MaxTime != nil && - filter.WorkflowID != nil && filter.RunID != nil && filter.PageSize != nil: - qry := templateGetOpenWorkflowExecutionsByID - if filter.Status != 1 { - qry = templateGetClosedWorkflowExecutionsByID - } - err = mdb.conn.SelectContext(ctx, - &rows, - qry, - *filter.WorkflowID, - filter.NamespaceID, - *filter.MinTime, - *filter.MaxTime, - *filter.RunID, - *filter.MaxTime, - *filter.MaxTime, - *filter.PageSize, - ) - case filter.MinTime != nil && filter.MaxTime != nil && - filter.WorkflowTypeName != nil && filter.RunID != nil && filter.PageSize != nil: - qry := templateGetOpenWorkflowExecutionsByType - if filter.Status != 1 { - qry = templateGetClosedWorkflowExecutionsByType - } - err = mdb.conn.SelectContext(ctx, - &rows, - qry, - *filter.WorkflowTypeName, - filter.NamespaceID, - *filter.MinTime, - *filter.MaxTime, - *filter.RunID, - *filter.MaxTime, - *filter.MaxTime, - *filter.PageSize, - ) - case filter.MinTime != nil && filter.MaxTime != nil && - filter.RunID != nil && filter.PageSize != nil && - filter.Status != 0 && filter.Status != 1: // 0 is UNSPECIFIED, 1 is RUNNING - err = mdb.conn.SelectContext(ctx, - &rows, - templateGetClosedWorkflowExecutionsByStatus, - filter.Status, - filter.NamespaceID, - *filter.MinTime, - *filter.MaxTime, - *filter.RunID, - *filter.MaxTime, - *filter.MaxTime, - *filter.PageSize, - ) - case filter.MinTime != nil && filter.MaxTime != nil && - filter.RunID != nil && filter.PageSize != nil: - qry := templateGetOpenWorkflowExecutions - if filter.Status != 1 { - qry = templateGetClosedWorkflowExecutions - } - err = mdb.conn.SelectContext(ctx, - &rows, - qry, - filter.NamespaceID, - *filter.MinTime, - *filter.MaxTime, - *filter.RunID, - *filter.MaxTime, - *filter.MaxTime, - *filter.PageSize, - ) - default: - return nil, fmt.Errorf("invalid query filter") } + + var rows []sqlplugin.VisibilityRow + err := mdb.conn.SelectContext(ctx, &rows, filter.Query, filter.QueryArgs...) if err != nil { return nil, err } for i := range rows { - mdb.processRowFromDB(&rows[i]) + err = mdb.processRowFromDB(&rows[i]) + if err != nil { + return nil, err + } } return rows, nil } @@ -281,24 +137,69 @@ func (mdb *db) GetFromVisibility( filter sqlplugin.VisibilityGetFilter, ) (*sqlplugin.VisibilityRow, error) { var row sqlplugin.VisibilityRow - err := mdb.conn.GetContext(ctx, - &row, - templateGetWorkflowExecution, - filter.NamespaceID, - filter.RunID, - ) + stmt, err := mdb.conn.PrepareNamedContext(ctx, templateGetWorkflowExecution) + if err != nil { + return nil, err + } + err = stmt.GetContext(ctx, &row, filter) + if err != nil { + return nil, err + } + err = mdb.processRowFromDB(&row) if err != nil { return nil, err } - mdb.processRowFromDB(&row) return &row, nil } -func (mdb *db) processRowFromDB(row *sqlplugin.VisibilityRow) { +func (mdb *db) prepareRowForDB(row *sqlplugin.VisibilityRow) *sqlplugin.VisibilityRow { + if row == nil { + return nil + } + finalRow := *row + finalRow.StartTime = mdb.converter.ToSQLiteDateTime(finalRow.StartTime) + finalRow.ExecutionTime = mdb.converter.ToSQLiteDateTime(finalRow.ExecutionTime) + if finalRow.CloseTime != nil { + *finalRow.CloseTime = mdb.converter.ToSQLiteDateTime(*finalRow.CloseTime) + } + if finalRow.SearchAttributes != nil { + finalSearchAttributes := sqlplugin.VisibilitySearchAttributes{} + for name, value := range *finalRow.SearchAttributes { + switch v := value.(type) { + case []string: + finalSearchAttributes[name] = strings.Join(v, keywordListSeparator) + default: + finalSearchAttributes[name] = v + } + } + finalRow.SearchAttributes = &finalSearchAttributes + } + return &finalRow +} + +func (mdb *db) processRowFromDB(row *sqlplugin.VisibilityRow) error { + if row == nil { + return nil + } row.StartTime = mdb.converter.FromSQLiteDateTime(row.StartTime) row.ExecutionTime = mdb.converter.FromSQLiteDateTime(row.ExecutionTime) if row.CloseTime != nil { closeTime := mdb.converter.FromSQLiteDateTime(*row.CloseTime) row.CloseTime = &closeTime } + if row.SearchAttributes != nil { + for saName, saValue := range *row.SearchAttributes { + switch typedSaValue := saValue.(type) { + case string: + if strings.Index(typedSaValue, keywordListSeparator) >= 0 { + // If the string contains the keywordListSeparator, then we need to split it + // into a list of keywords. + (*row.SearchAttributes)[saName] = strings.Split(typedSaValue, keywordListSeparator) + } + default: + // no-op + } + } + } + return nil } diff --git a/common/persistence/tests/sqlite_test.go b/common/persistence/tests/sqlite_test.go index e4563fa816c..9ce5ae6a3fe 100644 --- a/common/persistence/tests/sqlite_test.go +++ b/common/persistence/tests/sqlite_test.go @@ -380,6 +380,12 @@ func TestSQLiteFileTaskQueueTaskSuite(t *testing.T) { // TODO: Merge persistence-tests into the tests directory. +func TestSQLiteVisibilityPersistenceSuite(t *testing.T) { + s := new(VisibilityPersistenceSuite) + s.TestBase = persistencetests.NewTestBaseWithSQL(persistencetests.GetSQLiteMemoryTestClusterOption()) + suite.Run(t, s) +} + func TestSQLiteHistoryV2PersistenceSuite(t *testing.T) { s := new(persistencetests.HistoryV2PersistenceSuite) s.TestBase = persistencetests.NewTestBaseWithSQL(persistencetests.GetSQLiteMemoryTestClusterOption()) diff --git a/common/persistence/visibility/factory.go b/common/persistence/visibility/factory.go index 4bde4e4c431..b6a54719585 100644 --- a/common/persistence/visibility/factory.go +++ b/common/persistence/visibility/factory.go @@ -31,6 +31,7 @@ import ( "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/persistence/sql/sqlplugin/mysql" "go.temporal.io/server/common/persistence/sql/sqlplugin/postgresql" + "go.temporal.io/server/common/persistence/sql/sqlplugin/sqlite" "go.temporal.io/server/common/persistence/visibility/manager" "go.temporal.io/server/common/persistence/visibility/store" "go.temporal.io/server/common/persistence/visibility/store/elasticsearch" @@ -283,7 +284,7 @@ func newStandardVisibilityStore( isStandard = true case visibilityStoreCfg.SQL != nil: switch visibilityStoreCfg.SQL.PluginName { - case mysql.PluginNameV8, postgresql.PluginNameV12: + case mysql.PluginNameV8, postgresql.PluginNameV12, sqlite.PluginName: isStandard = false visStore, err = sql.NewSQLVisibilityStore( *visibilityStoreCfg.SQL, diff --git a/common/persistence/visibility/store/sql/query_converter_factory.go b/common/persistence/visibility/store/sql/query_converter_factory.go index 7cb347472ce..cb866001eef 100644 --- a/common/persistence/visibility/store/sql/query_converter_factory.go +++ b/common/persistence/visibility/store/sql/query_converter_factory.go @@ -27,6 +27,7 @@ package sql import ( "go.temporal.io/server/common/persistence/sql/sqlplugin/mysql" "go.temporal.io/server/common/persistence/sql/sqlplugin/postgresql" + "go.temporal.io/server/common/persistence/sql/sqlplugin/sqlite" "go.temporal.io/server/common/persistence/visibility/manager" "go.temporal.io/server/common/searchattribute" ) @@ -42,6 +43,8 @@ func NewQueryConverter( return newMySQLQueryConverter(request, saTypeMap, saMapper) case postgresql.PluginNameV12: return newPostgreSQLQueryConverter(request, saTypeMap, saMapper) + case sqlite.PluginName: + return newSqliteQueryConverter(request, saTypeMap, saMapper) default: return nil } diff --git a/common/persistence/visibility/store/sql/query_converter_sqlite.go b/common/persistence/visibility/store/sql/query_converter_sqlite.go new file mode 100644 index 00000000000..6d1ee382f68 --- /dev/null +++ b/common/persistence/visibility/store/sql/query_converter_sqlite.go @@ -0,0 +1,300 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package sql + +import ( + "fmt" + "strings" + + "github.com/xwb1989/sqlparser" + "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/persistence/sql/sqlplugin" + "go.temporal.io/server/common/persistence/visibility/manager" + "go.temporal.io/server/common/persistence/visibility/store/query" + "go.temporal.io/server/common/searchattribute" +) + +type ( + sqliteQueryConverter struct{} +) + +var _ pluginQueryConverter = (*sqliteQueryConverter)(nil) + +const ( + keywordListTypeFtsTableName = "executions_visibility_fts_keyword_list" + textTypeFtsTableName = "executions_visibility_fts_text" +) + +func newSqliteQueryConverter( + request *manager.ListWorkflowExecutionsRequestV2, + saTypeMap searchattribute.NameTypeMap, + saMapper searchattribute.Mapper, +) *QueryConverter { + return newQueryConverterInternal( + &sqliteQueryConverter{}, + request, + saTypeMap, + saMapper, + ) +} + +func (c *sqliteQueryConverter) getDatetimeFormat() string { + return "2006-01-02 15:04:05.999999-07:00" +} + +//nolint:revive // cyclomatic complexity 17 (> 15) +func (c *sqliteQueryConverter) convertKeywordListComparisonExpr( + expr *sqlparser.ComparisonExpr, +) (sqlparser.Expr, error) { + if !isSupportedKeywordListOperator(expr.Operator) { + return nil, query.NewConverterError("invalid query") + } + + colName, isColName := expr.Left.(*colName) + if !isColName { + return nil, query.NewConverterError( + "%s: must be a column name but was %T", + query.InvalidExpressionErrMessage, + expr.Left, + ) + } + colNameStr := sqlparser.String(colName) + + var ftsQuery string + switch expr.Operator { + case sqlparser.EqualStr, sqlparser.NotEqualStr: + value, err := query.ParseSqlValue(sqlparser.String(expr.Right)) + if err != nil { + return nil, err + } + switch v := value.(type) { + case string: + ftsQuery = fmt.Sprintf(`%s:"%s"`, colNameStr, v) + default: + return nil, query.NewConverterError( + "%s: unexpected value type %T", + query.InvalidExpressionErrMessage, + expr, + ) + } + + case sqlparser.InStr, sqlparser.NotInStr: + valTupleExpr, isValTuple := expr.Right.(sqlparser.ValTuple) + if !isValTuple { + return nil, query.NewConverterError( + "%s: unexpected value type %T", + query.InvalidExpressionErrMessage, + expr, + ) + } + var values []string + for _, valExpr := range valTupleExpr { + value, err := query.ParseSqlValue(sqlparser.String(valExpr)) + if err != nil { + return nil, err + } + switch v := value.(type) { + case string: + values = append(values, fmt.Sprintf(`"%s"`, v)) + default: + return nil, query.NewConverterError( + "%s: unexpected value type %T", + query.InvalidExpressionErrMessage, + expr, + ) + } + } + ftsQuery = fmt.Sprintf("%s:(%s)", colNameStr, strings.Join(values, " OR ")) + + default: + // this should never happen since isSupportedKeywordListOperator should already fail + return nil, query.NewConverterError("invalid query") + } + + var oper string + switch expr.Operator { + case sqlparser.EqualStr, sqlparser.InStr: + oper = sqlparser.InStr + case sqlparser.NotEqualStr, sqlparser.NotInStr: + oper = sqlparser.NotInStr + default: + // this should never happen since isSupportedKeywordListOperator should already fail + return nil, query.NewConverterError("invalid query") + } + + newExpr := sqlparser.ComparisonExpr{ + Operator: oper, + Left: newColName("rowid"), + Right: &sqlparser.Subquery{ + Select: c.buildFtsSelectStmt(keywordListTypeFtsTableName, ftsQuery), + }, + } + return &newExpr, nil +} + +func (c *sqliteQueryConverter) convertTextComparisonExpr( + expr *sqlparser.ComparisonExpr, +) (sqlparser.Expr, error) { + if !isSupportedTextOperator(expr.Operator) { + return nil, query.NewConverterError("invalid query") + } + + colName, isColName := expr.Left.(*colName) + if !isColName { + return nil, query.NewConverterError( + "%s: must be a column name but was %T", + query.InvalidExpressionErrMessage, + expr.Left, + ) + } + colNameStr := sqlparser.String(colName) + + value, err := query.ParseSqlValue(sqlparser.String(expr.Right)) + if err != nil { + return nil, err + } + + var ftsQuery string + switch v := value.(type) { + case string: + ftsQuery = fmt.Sprintf(`%s:(%s)`, colNameStr, v) + default: + return nil, query.NewConverterError( + "%s: unexpected value type %T", + query.InvalidExpressionErrMessage, + expr, + ) + } + + var oper string + switch expr.Operator { + case sqlparser.EqualStr: + oper = sqlparser.InStr + case sqlparser.NotEqualStr: + oper = sqlparser.NotInStr + default: + // this should never happen since isSupportedTextOperator should already fail + return nil, query.NewConverterError("invalid query") + } + + newExpr := sqlparser.ComparisonExpr{ + Operator: oper, + Left: newColName("rowid"), + Right: &sqlparser.Subquery{ + Select: c.buildFtsSelectStmt(textTypeFtsTableName, ftsQuery), + }, + } + return &newExpr, nil +} + +func (c *sqliteQueryConverter) buildSelectStmt( + namespaceID namespace.ID, + queryString string, + pageSize int, + token *pageToken, +) (string, []any) { + whereClauses := make([]string, 0, 3) + queryArgs := make([]any, 0, 8) + + whereClauses = append( + whereClauses, + fmt.Sprintf("%s = ?", searchattribute.GetSqlDbColName(searchattribute.NamespaceID)), + ) + queryArgs = append(queryArgs, namespaceID.String()) + + if len(queryString) > 0 { + whereClauses = append(whereClauses, queryString) + } + + if token != nil { + whereClauses = append( + whereClauses, + fmt.Sprintf( + "((%s = ? AND %s = ? AND %s > ?) OR (%s = ? AND %s < ?) OR %s < ?)", + sqlparser.String(getCoalesceCloseTimeExpr(c.getDatetimeFormat())), + searchattribute.GetSqlDbColName(searchattribute.StartTime), + searchattribute.GetSqlDbColName(searchattribute.RunID), + sqlparser.String(getCoalesceCloseTimeExpr(c.getDatetimeFormat())), + searchattribute.GetSqlDbColName(searchattribute.StartTime), + sqlparser.String(getCoalesceCloseTimeExpr(c.getDatetimeFormat())), + ), + ) + queryArgs = append( + queryArgs, + token.CloseTime, + token.StartTime, + token.RunID, + token.CloseTime, + token.StartTime, + token.CloseTime, + ) + } + + queryArgs = append(queryArgs, pageSize) + + return fmt.Sprintf( + `SELECT %s + FROM executions_visibility + WHERE %s + ORDER BY %s DESC, %s DESC, %s + LIMIT ?`, + strings.Join(sqlplugin.DbFields, ", "), + strings.Join(whereClauses, " AND "), + sqlparser.String(getCoalesceCloseTimeExpr(c.getDatetimeFormat())), + searchattribute.GetSqlDbColName(searchattribute.StartTime), + searchattribute.GetSqlDbColName(searchattribute.RunID), + ), queryArgs +} + +// buildFtsSelectStmt builds the following statement for querying FTS: +// +// SELECT rowid FROM tableName WHERE tableName = '%s' +func (c *sqliteQueryConverter) buildFtsSelectStmt( + tableName string, + queryString string, +) sqlparser.SelectStatement { + return &sqlparser.Select{ + SelectExprs: sqlparser.SelectExprs{ + &sqlparser.AliasedExpr{ + Expr: newColName("rowid"), + }, + }, + From: sqlparser.TableExprs{ + &sqlparser.AliasedTableExpr{ + Expr: &sqlparser.TableName{ + Name: sqlparser.NewTableIdent(tableName), + }, + }, + }, + Where: sqlparser.NewWhere( + sqlparser.WhereStr, + &sqlparser.ComparisonExpr{ + Operator: sqlparser.EqualStr, + Left: newColName(tableName), + Right: newUnsafeSQLString(queryString), + }, + ), + } +}