Skip to content

Commit

Permalink
Source, activate on create MV (#617)
Browse files Browse the repository at this point in the history
* New create source command option; tests

* Fix test

* Rename flag; remove flag from table_exec

* Remove extra argument

* Remove args from NewTableExecutor
  • Loading branch information
alex-cash authored Nov 14, 2022
1 parent cdd9edd commit b0d98ad
Show file tree
Hide file tree
Showing 11 changed files with 232 additions and 51 deletions.
42 changes: 25 additions & 17 deletions command/create_source_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ package command

import (
"fmt"
"strconv"
"strings"
"sync"
"time"

"github.com/alecthomas/repr"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
Expand All @@ -13,10 +18,6 @@ import (
"github.com/squareup/pranadb/meta"
"github.com/squareup/pranadb/push/source"
"google.golang.org/protobuf/reflect/protoreflect"
"strconv"
"strings"
"sync"
"time"
)

type CreateSourceCommand struct {
Expand Down Expand Up @@ -206,8 +207,10 @@ func (c *CreateSourceCommand) onPhase1() error {
defer c.lock.Unlock()

// Activate the message consumers for the source
if err := c.source.Start(); err != nil {
return errors.WithStack(err)
if !c.sourceInfo.OriginInfo.StartWithFirstMV {
if err := c.source.Start(); err != nil {
return errors.WithStack(err)
}
}

// Register the source in the in memory meta data
Expand Down Expand Up @@ -288,6 +291,7 @@ func (c *CreateSourceCommand) getSourceInfo(ast *parser.CreateSource) (*common.S
brokerName, topicName string
initialiseFrom string
transient bool
startWithFirstMV bool
sRetentionTime string
)
for _, opt := range ast.OriginInformation {
Expand Down Expand Up @@ -332,6 +336,9 @@ func (c *CreateSourceCommand) getSourceInfo(ast *parser.CreateSource) (*common.S
if opt.Transient != nil && *opt.Transient {
transient = true
}
if opt.StartWithFirstMV != nil && *opt.StartWithFirstMV {
startWithFirstMV = true
}
}
if headerEncoding == common.KafkaEncodingUnknown {
return nil, errors.NewPranaErrorf(errors.InvalidStatement, "headerEncoding is required")
Expand Down Expand Up @@ -382,17 +389,18 @@ func (c *CreateSourceCommand) getSourceInfo(ast *parser.CreateSource) (*common.S
}

originInfo := &common.SourceOriginInfo{
BrokerName: brokerName,
TopicName: topicName,
HeaderEncoding: headerEncoding,
KeyEncoding: keyEncoding,
ValueEncoding: valueEncoding,
IngestFilter: ingestFilter,
ColSelectors: colSelectors,
Properties: propsMap,
InitialState: initialiseFrom,
ConsumerGroupID: c.consumerGroupID,
Transient: transient,
BrokerName: brokerName,
TopicName: topicName,
HeaderEncoding: headerEncoding,
KeyEncoding: keyEncoding,
ValueEncoding: valueEncoding,
IngestFilter: ingestFilter,
ColSelectors: colSelectors,
Properties: propsMap,
InitialState: initialiseFrom,
ConsumerGroupID: c.consumerGroupID,
Transient: transient,
StartWithFirstMV: startWithFirstMV,
}
var colsVisible []bool
var lastUpdateIndexID uint64
Expand Down
25 changes: 13 additions & 12 deletions command/parser/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,18 @@ type CreateSink struct {
}

type SourceOriginInformation struct {
BrokerName string `"BrokerName" "=" @String`
TopicName string `|"TopicName" "=" @String`
HeaderEncoding string `|"HeaderEncoding" "=" @String`
KeyEncoding string `|"KeyEncoding" "=" @String`
ValueEncoding string `|"ValueEncoding" "=" @String`
IngestFilter string `|"IngestFilter" "=" @String`
InitialState string `|"InitialState" "=" @String`
Transient *Boolean `|"Transient" "=" @Ident`
RetentionTime string `|"RetentionTime" "=" @String`
ColSelectors []*selector.ColumnSelectorAST `|"ColumnSelectors" "=" "(" (@@ ("," @@)*)? ")"`
Properties []*TopicInfoProperty `|"Properties" "=" "(" (@@ ("," @@)*)? ")"`
BrokerName string `"BrokerName" "=" @String`
TopicName string `|"TopicName" "=" @String`
HeaderEncoding string `|"HeaderEncoding" "=" @String`
KeyEncoding string `|"KeyEncoding" "=" @String`
ValueEncoding string `|"ValueEncoding" "=" @String`
IngestFilter string `|"IngestFilter" "=" @String`
InitialState string `|"InitialState" "=" @String`
Transient *Boolean `|"Transient" "=" @Ident`
StartWithFirstMV *Boolean `|"StartWithFirstMV" "=" @Ident`
RetentionTime string `|"RetentionTime" "=" @String`
ColSelectors []*selector.ColumnSelectorAST `|"ColumnSelectors" "=" "(" (@@ ("," @@)*)? ")"`
Properties []*TopicInfoProperty `|"Properties" "=" "(" (@@ ("," @@)*)? ")"`
}

type SinkTargetInformation struct {
Expand All @@ -150,7 +151,7 @@ func (b *Boolean) Capture(values []string) error {
} else if lv == "false" {
*b = false
} else {
return participle.Errorf(lexer.Position{}, "invalid value for 'Transient' field in source definition: %s", values[0])
return participle.Errorf(lexer.Position{}, "invalid value for boolean field in source definition: %s", values[0])
}
*b = values[0] == "true"
return nil
Expand Down
23 changes: 12 additions & 11 deletions common/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,17 +407,18 @@ func (i *MetaTableInfo) String() string {
}

type SourceOriginInfo struct {
BrokerName string
TopicName string
KeyEncoding KafkaEncoding
ValueEncoding KafkaEncoding
HeaderEncoding KafkaEncoding
ColSelectors []selector.ColumnSelector
Properties map[string]string
IngestFilter string
InitialState string
ConsumerGroupID string
Transient bool
BrokerName string
TopicName string
KeyEncoding KafkaEncoding
ValueEncoding KafkaEncoding
HeaderEncoding KafkaEncoding
ColSelectors []selector.ColumnSelector
Properties map[string]string
IngestFilter string
InitialState string
ConsumerGroupID string
Transient bool
StartWithFirstMV bool
}

type SinkTargetInfo struct {
Expand Down
9 changes: 5 additions & 4 deletions push/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ package push

import (
"fmt"
"math/rand"
"strconv"
"sync"
"time"

"github.com/google/uuid"
"github.com/squareup/pranadb/failinject"
"github.com/squareup/pranadb/interruptor"
Expand All @@ -11,10 +16,6 @@ import (
"github.com/squareup/pranadb/push/util"
"github.com/squareup/pranadb/remoting"
"github.com/squareup/pranadb/tidb/planner"
"math/rand"
"strconv"
"sync"
"time"

"github.com/squareup/pranadb/errors"

Expand Down
5 changes: 3 additions & 2 deletions push/exec/table_exec.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package exec

import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/squareup/pranadb/cluster"
"github.com/squareup/pranadb/common"
"github.com/squareup/pranadb/errors"
"github.com/squareup/pranadb/interruptor"
"github.com/squareup/pranadb/table"
"sync"
"time"
)

const (
Expand Down
23 changes: 22 additions & 1 deletion push/mv.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package push

import (
"reflect"

log "github.com/sirupsen/logrus"
"github.com/squareup/pranadb/cluster"
"github.com/squareup/pranadb/common"
Expand All @@ -10,7 +12,6 @@ import (
"github.com/squareup/pranadb/push/exec"
"github.com/squareup/pranadb/push/sched"
"github.com/squareup/pranadb/sharder"
"reflect"
)

type MaterializedView struct {
Expand Down Expand Up @@ -207,6 +208,26 @@ func (m *MaterializedView) Fill(shardIDs []uint64, interruptor *interruptor.Inte
return errors.WithStack(err)
}

// Start any sources that wait to ingest on first MV
for _, tableExec := range tes {
table, ok := m.schema.GetTable(tableExec.TableInfo.Name)
if !ok {
return errors.Errorf("unknown source or materialized view %s", tableExec.TableInfo.Name)
}
sourceInfo, ok := table.(*common.SourceInfo)
if ok {
if sourceInfo.OriginInfo.StartWithFirstMV {
source, err := m.pe.GetSource(tableExec.TableInfo.ID)
if err != nil {
return errors.WithStack(err)
}
if err := source.Start(); err != nil {
return errors.WithStack(err)
}
}
}
}

log.Debugf("materialized view fill for shards %v", shardIDs)

schedulers := make(map[uint64]*sched.ShardScheduler)
Expand Down
5 changes: 3 additions & 2 deletions push/reaper/reaper_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package reaper

import (
"testing"
"time"

log "github.com/sirupsen/logrus"
"github.com/squareup/pranadb/cluster"
"github.com/squareup/pranadb/cluster/fake"
"github.com/squareup/pranadb/common"
"github.com/squareup/pranadb/push/exec"
"github.com/squareup/pranadb/table"
"github.com/stretchr/testify/require"
"testing"
"time"
)

const shardID uint64 = 12345
Expand Down
5 changes: 3 additions & 2 deletions sqltest/sql_test_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"flag"
"fmt"
"github.com/squareup/pranadb/pull/exec"
"io/ioutil"
"math/rand"
"net/http"
Expand All @@ -18,6 +17,8 @@ import (
"testing"
"time"

"github.com/squareup/pranadb/pull/exec"

"github.com/squareup/pranadb/command"
"github.com/squareup/pranadb/interruptor"
pranalog "github.com/squareup/pranadb/log"
Expand Down Expand Up @@ -46,7 +47,7 @@ import (
)

const (
TestPrefix = "" // Set this to the name of a test if you want to only run that test, e.g. during development
TestPrefix = "start_source_on_first_mv" // Set this to the name of a test if you want to only run that test, e.g. during development
ExcludedTestPrefixes = ""
TestClusterID = 12345678
ProtoDescriptorDir = "../protos"
Expand Down
11 changes: 11 additions & 0 deletions sqltest/testdata/start_source_on_first_mv_test_data.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
dataset:dataset_1 test_source_1
1,10,1000,1234.4321,12345678.99,str1,2020-01-01 01:00:00.123456
2,20,2000,2234.4321,22345678.99,str2,2020-01-02 01:00:00.123456
3,30,3000,3234.4321,32345678.99,str3,2020-01-03 01:00:00.123456
4,40,4000,4234.4321,42345678.99,str4,2020-01-04 01:00:00.123456
5,50,5000,5234.4321,52345678.99,str5,2020-01-05 01:00:00.123456
6,60,6000,6234.4321,62345678.99,str6,2020-01-06 01:00:00.123456
7,70,7000,7234.4321,72345678.99,str7,2020-01-07 01:00:00.123456
8,80,8000,8234.4321,82345678.99,str8,2020-01-08 01:00:00.123456
9,90,9000,9234.4321,92345678.99,str9,2020-01-09 01:00:00.123456
10,100,10000,10234.4321,93345678.99,str10,2020-01-10 01:00:00.123456
87 changes: 87 additions & 0 deletions sqltest/testdata/start_source_on_first_mv_test_out.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
--create topic testtopic;
use test;
0 rows returned
create source test_source_1(
col0 bigint,
col1 tinyint,
col2 int,
col3 double,
col4 decimal(10, 2),
col5 varchar,
col6 timestamp,
primary key (col0)
) with (
startwithfirstmv = true,
brokername = "testbroker",
topicname = "testtopic",
headerencoding = "json",
keyencoding = "json",
valueencoding = "json",
columnselectors = (
meta("key").k0,
v1,
v2,
v3,
v4,
v5,
v6
)
);
0 rows returned

--load data dataset_1 no wait;

--pause 1000;

select * from test_source_1 order by col0;
+---------------------------------------------------------------------------------------------------------------------+
| col0 | col1 | col2 | col3 | col4 | col5 | col6 |
+---------------------------------------------------------------------------------------------------------------------+
0 rows returned

create materialized view test_mv1 as select * from test_source_1;
0 rows returned

--wait for rows test_source_1 10;

select * from test_source_1 order by col0;
+---------------------------------------------------------------------------------------------------------------------+
| col0 | col1 | col2 | col3 | col4 | col5 | col6 |
+---------------------------------------------------------------------------------------------------------------------+
| 1 | 10 | 1000 | 1234.432100 | 12345678.99 | str1 | 2020-01-01 01:00:00.000000 |
| 2 | 20 | 2000 | 2234.432100 | 22345678.99 | str2 | 2020-01-02 01:00:00.000000 |
| 3 | 30 | 3000 | 3234.432100 | 32345678.99 | str3 | 2020-01-03 01:00:00.000000 |
| 4 | 40 | 4000 | 4234.432100 | 42345678.99 | str4 | 2020-01-04 01:00:00.000000 |
| 5 | 50 | 5000 | 5234.432100 | 52345678.99 | str5 | 2020-01-05 01:00:00.000000 |
| 6 | 60 | 6000 | 6234.432100 | 62345678.99 | str6 | 2020-01-06 01:00:00.000000 |
| 7 | 70 | 7000 | 7234.432100 | 72345678.99 | str7 | 2020-01-07 01:00:00.000000 |
| 8 | 80 | 8000 | 8234.432100 | 82345678.99 | str8 | 2020-01-08 01:00:00.000000 |
| 9 | 90 | 9000 | 9234.432100 | 92345678.99 | str9 | 2020-01-09 01:00:00.000000 |
| 10 | 100 | 10000 | 10234.432100 | 93345678.99 | str10 | 2020-01-10 01:00:00.000000 |
+---------------------------------------------------------------------------------------------------------------------+
10 rows returned

select * from test_mv1 order by col0;
+---------------------------------------------------------------------------------------------------------------------+
| col0 | col1 | col2 | col3 | col4 | col5 | col6 |
+---------------------------------------------------------------------------------------------------------------------+
| 1 | 10 | 1000 | 1234.432100 | 12345678.99 | str1 | 2020-01-01 01:00:00.000000 |
| 2 | 20 | 2000 | 2234.432100 | 22345678.99 | str2 | 2020-01-02 01:00:00.000000 |
| 3 | 30 | 3000 | 3234.432100 | 32345678.99 | str3 | 2020-01-03 01:00:00.000000 |
| 4 | 40 | 4000 | 4234.432100 | 42345678.99 | str4 | 2020-01-04 01:00:00.000000 |
| 5 | 50 | 5000 | 5234.432100 | 52345678.99 | str5 | 2020-01-05 01:00:00.000000 |
| 6 | 60 | 6000 | 6234.432100 | 62345678.99 | str6 | 2020-01-06 01:00:00.000000 |
| 7 | 70 | 7000 | 7234.432100 | 72345678.99 | str7 | 2020-01-07 01:00:00.000000 |
| 8 | 80 | 8000 | 8234.432100 | 82345678.99 | str8 | 2020-01-08 01:00:00.000000 |
| 9 | 90 | 9000 | 9234.432100 | 92345678.99 | str9 | 2020-01-09 01:00:00.000000 |
| 10 | 100 | 10000 | 10234.432100 | 93345678.99 | str10 | 2020-01-10 01:00:00.000000 |
+---------------------------------------------------------------------------------------------------------------------+
10 rows returned

drop materialized view test_mv1;
0 rows returned

drop source test_source_1;
0 rows returned

--delete topic testtopic;
Loading

0 comments on commit b0d98ad

Please sign in to comment.