Skip to content

Commit

Permalink
roachtest/cdc: fix < 19.2 compat for cdc/bank and cdc/schemareg
Browse files Browse the repository at this point in the history
Fixes cockroachdb#42690.
Fixes cockroachdb#41177.

This was broken by cockroachdb#41793.
  • Loading branch information
nvanbenschoten committed Nov 25, 2019
1 parent 93a8062 commit 7044348
Showing 1 changed file with 50 additions and 21 deletions.
71 changes: 50 additions & 21 deletions pkg/cmd/roachtest/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,16 @@ func runCDCBank(ctx context.Context, t *test, c *cluster) {
); err != nil {
t.Fatal(err)
}

// NB: the WITH diff option was not supported until v20.1.
withDiff := t.IsBuildVersion("v20.1.0")
var opts = []string{`updated`, `resolved`}
if withDiff {
opts = append(opts, `diff`)
}
var jobID string
if err := db.QueryRow(
`CREATE CHANGEFEED FOR bank.bank INTO $1 WITH updated, resolved, diff`, kafka.sinkURL(ctx),
`CREATE CHANGEFEED FOR bank.bank INTO $1 WITH `+strings.Join(opts, `, `), kafka.sinkURL(ctx),
).Scan(&jobID); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -314,19 +321,22 @@ func runCDCBank(ctx context.Context, t *test, c *cluster) {
}

const requestedResolved = 100
baV, err := cdctest.NewBeforeAfterValidator(db, `bank.bank`)
if err != nil {
return err
}
fprintV, err := cdctest.NewFingerprintValidator(db, `bank.bank`, `fprint`, tc.partitions, 0)
if err != nil {
return err
}
v := cdctest.MakeCountValidator(cdctest.Validators{
validators := cdctest.Validators{
cdctest.NewOrderValidator(`bank`),
baV,
fprintV,
})
}
if withDiff {
baV, err := cdctest.NewBeforeAfterValidator(db, `bank.bank`)
if err != nil {
return err
}
validators = append(validators, baV)
}
v := cdctest.MakeCountValidator(validators)

for {
m := tc.Next(ctx)
Expand Down Expand Up @@ -387,10 +397,16 @@ func runCDCSchemaRegistry(ctx context.Context, t *test, c *cluster) {
if _, err := db.Exec(`CREATE TABLE foo (a INT PRIMARY KEY)`); err != nil {
t.Fatal(err)
}

// NB: the WITH diff option was not supported until v20.1.
withDiff := t.IsBuildVersion("v20.1.0")
var opts = []string{`updated`, `resolved`, `format=experimental_avro`, `confluent_schema_registry=$2`}
if withDiff {
opts = append(opts, `diff`)
}
var jobID string
if err := db.QueryRow(
`CREATE CHANGEFEED FOR foo INTO $1`+
`WITH updated, resolved, diff, format=experimental_avro, confluent_schema_registry=$2`,
`CREATE CHANGEFEED FOR foo INTO $1 WITH `+strings.Join(opts, `, `),
kafka.sinkURL(ctx), kafka.schemaRegistryURL(ctx),
).Scan(&jobID); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -448,17 +464,30 @@ func runCDCSchemaRegistry(ctx context.Context, t *test, c *cluster) {
}
sort.Strings(updated)

expected := []string{
`{"before":null,"after":{"foo":{"a":{"long":1}}},"updated":{"string":""}}`,
`{"before":null,"after":{"foo":{"a":{"long":2},"b":{"string":"2"}}},"updated":{"string":""}}`,
`{"before":null,"after":{"foo":{"a":{"long":3},"b":{"string":"3"},"c":{"long":3}}},"updated":{"string":""}}`,
`{"before":null,"after":{"foo":{"a":{"long":4},"c":{"long":4}}},"updated":{"string":""}}`,
`{"before":{"foo_before":{"a":{"long":1},"b":null,"c":null}},"after":{"foo":{"a":{"long":1},"c":null}},"updated":{"string":""}}`,
`{"before":{"foo_before":{"a":{"long":1},"c":null}},"after":{"foo":{"a":{"long":1},"c":null}},"updated":{"string":""}}`,
`{"before":{"foo_before":{"a":{"long":2},"b":{"string":"2"},"c":null}},"after":{"foo":{"a":{"long":2},"c":null}},"updated":{"string":""}}`,
`{"before":{"foo_before":{"a":{"long":2},"c":null}},"after":{"foo":{"a":{"long":2},"c":null}},"updated":{"string":""}}`,
`{"before":{"foo_before":{"a":{"long":3},"b":{"string":"3"},"c":{"long":3}}},"after":{"foo":{"a":{"long":3},"c":{"long":3}}},"updated":{"string":""}}`,
`{"before":{"foo_before":{"a":{"long":3},"c":{"long":3}}},"after":{"foo":{"a":{"long":3},"c":{"long":3}}},"updated":{"string":""}}`,
var expected []string
if withDiff {
expected = []string{
`{"before":null,"after":{"foo":{"a":{"long":1}}},"updated":{"string":""}}`,
`{"before":null,"after":{"foo":{"a":{"long":2},"b":{"string":"2"}}},"updated":{"string":""}}`,
`{"before":null,"after":{"foo":{"a":{"long":3},"b":{"string":"3"},"c":{"long":3}}},"updated":{"string":""}}`,
`{"before":null,"after":{"foo":{"a":{"long":4},"c":{"long":4}}},"updated":{"string":""}}`,
`{"before":{"foo_before":{"a":{"long":1},"b":null,"c":null}},"after":{"foo":{"a":{"long":1},"c":null}},"updated":{"string":""}}`,
`{"before":{"foo_before":{"a":{"long":1},"c":null}},"after":{"foo":{"a":{"long":1},"c":null}},"updated":{"string":""}}`,
`{"before":{"foo_before":{"a":{"long":2},"b":{"string":"2"},"c":null}},"after":{"foo":{"a":{"long":2},"c":null}},"updated":{"string":""}}`,
`{"before":{"foo_before":{"a":{"long":2},"c":null}},"after":{"foo":{"a":{"long":2},"c":null}},"updated":{"string":""}}`,
`{"before":{"foo_before":{"a":{"long":3},"b":{"string":"3"},"c":{"long":3}}},"after":{"foo":{"a":{"long":3},"c":{"long":3}}},"updated":{"string":""}}`,
`{"before":{"foo_before":{"a":{"long":3},"c":{"long":3}}},"after":{"foo":{"a":{"long":3},"c":{"long":3}}},"updated":{"string":""}}`,
}
} else {
expected = []string{
`{"updated":{"string":""},"after":{"foo":{"a":{"long":1},"c":null}}}`,
`{"updated":{"string":""},"after":{"foo":{"a":{"long":1}}}}`,
`{"updated":{"string":""},"after":{"foo":{"a":{"long":2},"b":{"string":"2"}}}}`,
`{"updated":{"string":""},"after":{"foo":{"a":{"long":2},"c":null}}}`,
`{"updated":{"string":""},"after":{"foo":{"a":{"long":3},"b":{"string":"3"},"c":{"long":3}}}}`,
`{"updated":{"string":""},"after":{"foo":{"a":{"long":3},"c":{"long":3}}}}`,
`{"updated":{"string":""},"after":{"foo":{"a":{"long":4},"c":{"long":4}}}}`,
}
}
if strings.Join(expected, "\n") != strings.Join(updated, "\n") {
t.Fatalf("expected\n%s\n\ngot\n%s\n\n",
Expand Down

0 comments on commit 7044348

Please sign in to comment.