diff --git a/command/create_source_command.go b/command/create_source_command.go index 8e15bc8a..23f3d69f 100644 --- a/command/create_source_command.go +++ b/command/create_source_command.go @@ -2,6 +2,11 @@ package command import ( "fmt" + "strconv" + "strings" + "sync" + "time" + "github.com/alecthomas/repr" "github.com/google/uuid" log "github.com/sirupsen/logrus" @@ -13,10 +18,6 @@ import ( "github.com/squareup/pranadb/meta" "github.com/squareup/pranadb/push/source" "google.golang.org/protobuf/reflect/protoreflect" - "strconv" - "strings" - "sync" - "time" ) type CreateSourceCommand struct { @@ -206,8 +207,10 @@ func (c *CreateSourceCommand) onPhase1() error { defer c.lock.Unlock() // Activate the message consumers for the source - if err := c.source.Start(); err != nil { - return errors.WithStack(err) + if !c.sourceInfo.OriginInfo.StartWithFirstMV { + if err := c.source.Start(); err != nil { + return errors.WithStack(err) + } } // Register the source in the in memory meta data @@ -288,6 +291,7 @@ func (c *CreateSourceCommand) getSourceInfo(ast *parser.CreateSource) (*common.S brokerName, topicName string initialiseFrom string transient bool + startWithFirstMV bool sRetentionTime string ) for _, opt := range ast.OriginInformation { @@ -332,6 +336,9 @@ func (c *CreateSourceCommand) getSourceInfo(ast *parser.CreateSource) (*common.S if opt.Transient != nil && *opt.Transient { transient = true } + if opt.StartWithFirstMV != nil && *opt.StartWithFirstMV { + startWithFirstMV = true + } } if headerEncoding == common.KafkaEncodingUnknown { return nil, errors.NewPranaErrorf(errors.InvalidStatement, "headerEncoding is required") @@ -382,17 +389,18 @@ func (c *CreateSourceCommand) getSourceInfo(ast *parser.CreateSource) (*common.S } originInfo := &common.SourceOriginInfo{ - BrokerName: brokerName, - TopicName: topicName, - HeaderEncoding: headerEncoding, - KeyEncoding: keyEncoding, - ValueEncoding: valueEncoding, - IngestFilter: ingestFilter, - ColSelectors: colSelectors, - Properties: propsMap, - InitialState: initialiseFrom, - ConsumerGroupID: c.consumerGroupID, - Transient: transient, + BrokerName: brokerName, + TopicName: topicName, + HeaderEncoding: headerEncoding, + KeyEncoding: keyEncoding, + ValueEncoding: valueEncoding, + IngestFilter: ingestFilter, + ColSelectors: colSelectors, + Properties: propsMap, + InitialState: initialiseFrom, + ConsumerGroupID: c.consumerGroupID, + Transient: transient, + StartWithFirstMV: startWithFirstMV, } var colsVisible []bool var lastUpdateIndexID uint64 diff --git a/command/parser/ast.go b/command/parser/ast.go index 72d82020..8bbf732d 100644 --- a/command/parser/ast.go +++ b/command/parser/ast.go @@ -115,17 +115,18 @@ type CreateSink struct { } type SourceOriginInformation struct { - BrokerName string `"BrokerName" "=" @String` - TopicName string `|"TopicName" "=" @String` - HeaderEncoding string `|"HeaderEncoding" "=" @String` - KeyEncoding string `|"KeyEncoding" "=" @String` - ValueEncoding string `|"ValueEncoding" "=" @String` - IngestFilter string `|"IngestFilter" "=" @String` - InitialState string `|"InitialState" "=" @String` - Transient *Boolean `|"Transient" "=" @Ident` - RetentionTime string `|"RetentionTime" "=" @String` - ColSelectors []*selector.ColumnSelectorAST `|"ColumnSelectors" "=" "(" (@@ ("," @@)*)? ")"` - Properties []*TopicInfoProperty `|"Properties" "=" "(" (@@ ("," @@)*)? ")"` + BrokerName string `"BrokerName" "=" @String` + TopicName string `|"TopicName" "=" @String` + HeaderEncoding string `|"HeaderEncoding" "=" @String` + KeyEncoding string `|"KeyEncoding" "=" @String` + ValueEncoding string `|"ValueEncoding" "=" @String` + IngestFilter string `|"IngestFilter" "=" @String` + InitialState string `|"InitialState" "=" @String` + Transient *Boolean `|"Transient" "=" @Ident` + StartWithFirstMV *Boolean `|"StartWithFirstMV" "=" @Ident` + RetentionTime string `|"RetentionTime" "=" @String` + ColSelectors []*selector.ColumnSelectorAST `|"ColumnSelectors" "=" "(" (@@ ("," @@)*)? ")"` + Properties []*TopicInfoProperty `|"Properties" "=" "(" (@@ ("," @@)*)? ")"` } type SinkTargetInformation struct { @@ -150,7 +151,7 @@ func (b *Boolean) Capture(values []string) error { } else if lv == "false" { *b = false } else { - return participle.Errorf(lexer.Position{}, "invalid value for 'Transient' field in source definition: %s", values[0]) + return participle.Errorf(lexer.Position{}, "invalid value for boolean field in source definition: %s", values[0]) } *b = values[0] == "true" return nil diff --git a/common/meta.go b/common/meta.go index 4a2c763e..f71d06ca 100644 --- a/common/meta.go +++ b/common/meta.go @@ -407,17 +407,18 @@ func (i *MetaTableInfo) String() string { } type SourceOriginInfo struct { - BrokerName string - TopicName string - KeyEncoding KafkaEncoding - ValueEncoding KafkaEncoding - HeaderEncoding KafkaEncoding - ColSelectors []selector.ColumnSelector - Properties map[string]string - IngestFilter string - InitialState string - ConsumerGroupID string - Transient bool + BrokerName string + TopicName string + KeyEncoding KafkaEncoding + ValueEncoding KafkaEncoding + HeaderEncoding KafkaEncoding + ColSelectors []selector.ColumnSelector + Properties map[string]string + IngestFilter string + InitialState string + ConsumerGroupID string + Transient bool + StartWithFirstMV bool } type SinkTargetInfo struct { diff --git a/push/engine.go b/push/engine.go index 0c6fdc46..25730fcb 100644 --- a/push/engine.go +++ b/push/engine.go @@ -2,6 +2,11 @@ package push import ( "fmt" + "math/rand" + "strconv" + "sync" + "time" + "github.com/google/uuid" "github.com/squareup/pranadb/failinject" "github.com/squareup/pranadb/interruptor" @@ -11,10 +16,6 @@ import ( "github.com/squareup/pranadb/push/util" "github.com/squareup/pranadb/remoting" "github.com/squareup/pranadb/tidb/planner" - "math/rand" - "strconv" - "sync" - "time" "github.com/squareup/pranadb/errors" diff --git a/push/exec/table_exec.go b/push/exec/table_exec.go index bf794f78..65d5df6f 100644 --- a/push/exec/table_exec.go +++ b/push/exec/table_exec.go @@ -1,6 +1,9 @@ package exec import ( + "sync" + "time" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/squareup/pranadb/cluster" @@ -8,8 +11,6 @@ import ( "github.com/squareup/pranadb/errors" "github.com/squareup/pranadb/interruptor" "github.com/squareup/pranadb/table" - "sync" - "time" ) const ( diff --git a/push/mv.go b/push/mv.go index 9d7adf65..2d873cd2 100644 --- a/push/mv.go +++ b/push/mv.go @@ -1,6 +1,8 @@ package push import ( + "reflect" + log "github.com/sirupsen/logrus" "github.com/squareup/pranadb/cluster" "github.com/squareup/pranadb/common" @@ -10,7 +12,6 @@ import ( "github.com/squareup/pranadb/push/exec" "github.com/squareup/pranadb/push/sched" "github.com/squareup/pranadb/sharder" - "reflect" ) type MaterializedView struct { @@ -207,6 +208,26 @@ func (m *MaterializedView) Fill(shardIDs []uint64, interruptor *interruptor.Inte return errors.WithStack(err) } + // Start any sources that wait to ingest on first MV + for _, tableExec := range tes { + table, ok := m.schema.GetTable(tableExec.TableInfo.Name) + if !ok { + return errors.Errorf("unknown source or materialized view %s", tableExec.TableInfo.Name) + } + sourceInfo, ok := table.(*common.SourceInfo) + if ok { + if sourceInfo.OriginInfo.StartWithFirstMV { + source, err := m.pe.GetSource(tableExec.TableInfo.ID) + if err != nil { + return errors.WithStack(err) + } + if err := source.Start(); err != nil { + return errors.WithStack(err) + } + } + } + } + log.Debugf("materialized view fill for shards %v", shardIDs) schedulers := make(map[uint64]*sched.ShardScheduler) diff --git a/push/reaper/reaper_test.go b/push/reaper/reaper_test.go index bbc85f11..ff11caa2 100644 --- a/push/reaper/reaper_test.go +++ b/push/reaper/reaper_test.go @@ -1,6 +1,9 @@ package reaper import ( + "testing" + "time" + log "github.com/sirupsen/logrus" "github.com/squareup/pranadb/cluster" "github.com/squareup/pranadb/cluster/fake" @@ -8,8 +11,6 @@ import ( "github.com/squareup/pranadb/push/exec" "github.com/squareup/pranadb/table" "github.com/stretchr/testify/require" - "testing" - "time" ) const shardID uint64 = 12345 diff --git a/sqltest/sql_test_runner.go b/sqltest/sql_test_runner.go index 38f4b98e..c13ac914 100644 --- a/sqltest/sql_test_runner.go +++ b/sqltest/sql_test_runner.go @@ -5,7 +5,6 @@ import ( "context" "flag" "fmt" - "github.com/squareup/pranadb/pull/exec" "io/ioutil" "math/rand" "net/http" @@ -18,6 +17,8 @@ import ( "testing" "time" + "github.com/squareup/pranadb/pull/exec" + "github.com/squareup/pranadb/command" "github.com/squareup/pranadb/interruptor" pranalog "github.com/squareup/pranadb/log" @@ -46,7 +47,7 @@ import ( ) const ( - TestPrefix = "" // Set this to the name of a test if you want to only run that test, e.g. during development + TestPrefix = "start_source_on_first_mv" // Set this to the name of a test if you want to only run that test, e.g. during development ExcludedTestPrefixes = "" TestClusterID = 12345678 ProtoDescriptorDir = "../protos" diff --git a/sqltest/testdata/start_source_on_first_mv_test_data.txt b/sqltest/testdata/start_source_on_first_mv_test_data.txt new file mode 100644 index 00000000..bffc0c3d --- /dev/null +++ b/sqltest/testdata/start_source_on_first_mv_test_data.txt @@ -0,0 +1,11 @@ +dataset:dataset_1 test_source_1 +1,10,1000,1234.4321,12345678.99,str1,2020-01-01 01:00:00.123456 +2,20,2000,2234.4321,22345678.99,str2,2020-01-02 01:00:00.123456 +3,30,3000,3234.4321,32345678.99,str3,2020-01-03 01:00:00.123456 +4,40,4000,4234.4321,42345678.99,str4,2020-01-04 01:00:00.123456 +5,50,5000,5234.4321,52345678.99,str5,2020-01-05 01:00:00.123456 +6,60,6000,6234.4321,62345678.99,str6,2020-01-06 01:00:00.123456 +7,70,7000,7234.4321,72345678.99,str7,2020-01-07 01:00:00.123456 +8,80,8000,8234.4321,82345678.99,str8,2020-01-08 01:00:00.123456 +9,90,9000,9234.4321,92345678.99,str9,2020-01-09 01:00:00.123456 +10,100,10000,10234.4321,93345678.99,str10,2020-01-10 01:00:00.123456 \ No newline at end of file diff --git a/sqltest/testdata/start_source_on_first_mv_test_out.txt b/sqltest/testdata/start_source_on_first_mv_test_out.txt new file mode 100644 index 00000000..5b90e6c3 --- /dev/null +++ b/sqltest/testdata/start_source_on_first_mv_test_out.txt @@ -0,0 +1,87 @@ +--create topic testtopic; +use test; +0 rows returned +create source test_source_1( + col0 bigint, + col1 tinyint, + col2 int, + col3 double, + col4 decimal(10, 2), + col5 varchar, + col6 timestamp, + primary key (col0) +) with ( + startwithfirstmv = true, + brokername = "testbroker", + topicname = "testtopic", + headerencoding = "json", + keyencoding = "json", + valueencoding = "json", + columnselectors = ( + meta("key").k0, + v1, + v2, + v3, + v4, + v5, + v6 + ) +); +0 rows returned + +--load data dataset_1 no wait; + +--pause 1000; + +select * from test_source_1 order by col0; ++---------------------------------------------------------------------------------------------------------------------+ +| col0 | col1 | col2 | col3 | col4 | col5 | col6 | ++---------------------------------------------------------------------------------------------------------------------+ +0 rows returned + +create materialized view test_mv1 as select * from test_source_1; +0 rows returned + +--wait for rows test_source_1 10; + +select * from test_source_1 order by col0; ++---------------------------------------------------------------------------------------------------------------------+ +| col0 | col1 | col2 | col3 | col4 | col5 | col6 | ++---------------------------------------------------------------------------------------------------------------------+ +| 1 | 10 | 1000 | 1234.432100 | 12345678.99 | str1 | 2020-01-01 01:00:00.000000 | +| 2 | 20 | 2000 | 2234.432100 | 22345678.99 | str2 | 2020-01-02 01:00:00.000000 | +| 3 | 30 | 3000 | 3234.432100 | 32345678.99 | str3 | 2020-01-03 01:00:00.000000 | +| 4 | 40 | 4000 | 4234.432100 | 42345678.99 | str4 | 2020-01-04 01:00:00.000000 | +| 5 | 50 | 5000 | 5234.432100 | 52345678.99 | str5 | 2020-01-05 01:00:00.000000 | +| 6 | 60 | 6000 | 6234.432100 | 62345678.99 | str6 | 2020-01-06 01:00:00.000000 | +| 7 | 70 | 7000 | 7234.432100 | 72345678.99 | str7 | 2020-01-07 01:00:00.000000 | +| 8 | 80 | 8000 | 8234.432100 | 82345678.99 | str8 | 2020-01-08 01:00:00.000000 | +| 9 | 90 | 9000 | 9234.432100 | 92345678.99 | str9 | 2020-01-09 01:00:00.000000 | +| 10 | 100 | 10000 | 10234.432100 | 93345678.99 | str10 | 2020-01-10 01:00:00.000000 | ++---------------------------------------------------------------------------------------------------------------------+ +10 rows returned + +select * from test_mv1 order by col0; ++---------------------------------------------------------------------------------------------------------------------+ +| col0 | col1 | col2 | col3 | col4 | col5 | col6 | ++---------------------------------------------------------------------------------------------------------------------+ +| 1 | 10 | 1000 | 1234.432100 | 12345678.99 | str1 | 2020-01-01 01:00:00.000000 | +| 2 | 20 | 2000 | 2234.432100 | 22345678.99 | str2 | 2020-01-02 01:00:00.000000 | +| 3 | 30 | 3000 | 3234.432100 | 32345678.99 | str3 | 2020-01-03 01:00:00.000000 | +| 4 | 40 | 4000 | 4234.432100 | 42345678.99 | str4 | 2020-01-04 01:00:00.000000 | +| 5 | 50 | 5000 | 5234.432100 | 52345678.99 | str5 | 2020-01-05 01:00:00.000000 | +| 6 | 60 | 6000 | 6234.432100 | 62345678.99 | str6 | 2020-01-06 01:00:00.000000 | +| 7 | 70 | 7000 | 7234.432100 | 72345678.99 | str7 | 2020-01-07 01:00:00.000000 | +| 8 | 80 | 8000 | 8234.432100 | 82345678.99 | str8 | 2020-01-08 01:00:00.000000 | +| 9 | 90 | 9000 | 9234.432100 | 92345678.99 | str9 | 2020-01-09 01:00:00.000000 | +| 10 | 100 | 10000 | 10234.432100 | 93345678.99 | str10 | 2020-01-10 01:00:00.000000 | ++---------------------------------------------------------------------------------------------------------------------+ +10 rows returned + +drop materialized view test_mv1; +0 rows returned + +drop source test_source_1; +0 rows returned + +--delete topic testtopic; diff --git a/sqltest/testdata/start_source_on_first_mv_test_script.txt b/sqltest/testdata/start_source_on_first_mv_test_script.txt new file mode 100644 index 00000000..1538705b --- /dev/null +++ b/sqltest/testdata/start_source_on_first_mv_test_script.txt @@ -0,0 +1,48 @@ +--create topic testtopic; +use test; +create source test_source_1( + col0 bigint, + col1 tinyint, + col2 int, + col3 double, + col4 decimal(10, 2), + col5 varchar, + col6 timestamp, + primary key (col0) +) with ( + startwithfirstmv = true, + brokername = "testbroker", + topicname = "testtopic", + headerencoding = "json", + keyencoding = "json", + valueencoding = "json", + columnselectors = ( + meta("key").k0, + v1, + v2, + v3, + v4, + v5, + v6 + ) +); + +--load data dataset_1 no wait; + +--pause 1000; + +select * from test_source_1 order by col0; + +create materialized view test_mv1 as select * from test_source_1; + +--wait for rows test_source_1 10; + +select * from test_source_1 order by col0; + +select * from test_mv1 order by col0; + +drop materialized view test_mv1; + +drop source test_source_1; + +--delete topic testtopic; \ No newline at end of file