Skip to content

Commit

Permalink
ccl/changefeedccl: re-order avro CDC encoding for faster parsing
Browse files Browse the repository at this point in the history
This commit re-orders the avro encoding of CDC messages to place fields
in the order: [`before`, `after`, `updated`, `resolved`].

Rumor has it that Materialize has a fast-path if these fields are
ordered like this.

Release note (sql change): The fields in the Avro format for CHANGEFEED
records have been re-ordered to allow for optimized parsing. This is a
backwards compatable change.
  • Loading branch information
nvanbenschoten committed Nov 16, 2019
1 parent e5cc36d commit fa5de4b
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 51 deletions.
85 changes: 41 additions & 44 deletions pkg/ccl/changefeedccl/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ type avroMetadata map[string]interface{}

// avroEnvelopeOpts controls which fields in avroEnvelopeRecord are set.
type avroEnvelopeOpts struct {
updatedField, resolvedField bool
beforeField, afterField bool
updatedField, resolvedField bool
}

// avroEnvelopeRecord is an `avroRecord` that wraps a changed SQL row and some
Expand Down Expand Up @@ -507,6 +507,24 @@ func envelopeToAvroSchema(
opts: opts,
}

if opts.beforeField {
schema.before = before
beforeField := &avroSchemaField{
Name: `before`,
SchemaType: []avroSchemaType{avroSchemaNull, before},
Default: nil,
}
schema.Fields = append(schema.Fields, beforeField)
}
if opts.afterField {
schema.after = after
afterField := &avroSchemaField{
Name: `after`,
SchemaType: []avroSchemaType{avroSchemaNull, after},
Default: nil,
}
schema.Fields = append(schema.Fields, afterField)
}
if opts.updatedField {
updatedField := &avroSchemaField{
SchemaType: []avroSchemaType{avroSchemaNull, avroSchemaString},
Expand All @@ -523,24 +541,6 @@ func envelopeToAvroSchema(
}
schema.Fields = append(schema.Fields, resolvedField)
}
if opts.afterField {
schema.after = after
afterField := &avroSchemaField{
Name: `after`,
SchemaType: []avroSchemaType{avroSchemaNull, after},
Default: nil,
}
schema.Fields = append(schema.Fields, afterField)
}
if opts.beforeField {
schema.before = before
beforeField := &avroSchemaField{
Name: `before`,
SchemaType: []avroSchemaType{avroSchemaNull, before},
Default: nil,
}
schema.Fields = append(schema.Fields, beforeField)
}

schemaJSON, err := json.Marshal(schema)
if err != nil {
Expand All @@ -558,9 +558,28 @@ func envelopeToAvroSchema(
func (r *avroEnvelopeRecord) BinaryFromRow(
buf []byte, meta avroMetadata, beforeRow, afterRow sqlbase.EncDatumRow,
) ([]byte, error) {
native := map[string]interface{}{
`before`: nil,
`after`: nil,
native := map[string]interface{}{}
if r.opts.beforeField {
if beforeRow == nil {
native[`before`] = nil
} else {
beforeNative, err := r.before.nativeFromRow(beforeRow)
if err != nil {
return nil, err
}
native[`before`] = goavro.Union(avroUnionKey(&r.before.avroRecord), beforeNative)
}
}
if r.opts.afterField {
if afterRow == nil {
native[`after`] = nil
} else {
afterNative, err := r.after.nativeFromRow(afterRow)
if err != nil {
return nil, err
}
native[`after`] = goavro.Union(avroUnionKey(&r.after.avroRecord), afterNative)
}
}
if r.opts.updatedField {
native[`updated`] = nil
Expand All @@ -585,28 +604,6 @@ func (r *avroEnvelopeRecord) BinaryFromRow(
}
}
// WIP verify that meta is now empty
if r.opts.afterField {
if afterRow == nil {
native[`after`] = nil
} else {
afterNative, err := r.after.nativeFromRow(afterRow)
if err != nil {
return nil, err
}
native[`after`] = goavro.Union(avroUnionKey(&r.after.avroRecord), afterNative)
}
}
if r.opts.beforeField {
if beforeRow == nil {
native[`before`] = nil
} else {
beforeNative, err := r.before.nativeFromRow(beforeRow)
if err != nil {
return nil, err
}
native[`before`] = goavro.Union(avroUnionKey(&r.before.avroRecord), beforeNative)
}
}
return r.codec.BinaryFromNative(buf, native)
}

Expand Down
14 changes: 7 additions & 7 deletions pkg/cmd/roachtest/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,13 +449,13 @@ func runCDCSchemaRegistry(ctx context.Context, t *test, c *cluster) {
sort.Strings(updated)

expected := []string{
`{"updated":{"string":""},"after":{"foo":{"a":{"long":1},"c":null}},"before":{"foo_before":{"a":{"long":1},"c":null}}}`,
`{"updated":{"string":""},"after":{"foo":{"a":{"long":1}}},"before":null}`,
`{"updated":{"string":""},"after":{"foo":{"a":{"long":2},"b":{"string":"2"}}},"before":null}`,
`{"updated":{"string":""},"after":{"foo":{"a":{"long":2},"c":null}},"before":{"foo_before":{"a":{"long":2},"c":null}}}`,
`{"updated":{"string":""},"after":{"foo":{"a":{"long":3},"b":{"string":"3"},"c":{"long":3}}},"before":null}`,
`{"updated":{"string":""},"after":{"foo":{"a":{"long":3},"c":{"long":3}}},"before":{"foo_before":{"a":{"long":3},"c":{"long":3}}}}`,
`{"updated":{"string":""},"after":{"foo":{"a":{"long":4},"c":{"long":4}}},"before":null}`,
`{"before":null,"after":{"foo":{"a":{"long":1}}},"updated":{"string":""}}`,
`{"before":null,"after":{"foo":{"a":{"long":2},"b":{"string":"2"}}},"updated":{"string":""}}`,
`{"before":null,"after":{"foo":{"a":{"long":3},"b":{"string":"3"},"c":{"long":3}}},"updated":{"string":""}}`,
`{"before":null,"after":{"foo":{"a":{"long":4},"c":{"long":4}}},"updated":{"string":""}}`,
`{"before":{"foo_before":{"a":{"long":1},"c":null}},"after":{"foo":{"a":{"long":1},"c":null}},"updated":{"string":""}}`,
`{"before":{"foo_before":{"a":{"long":2},"c":null}},"after":{"foo":{"a":{"long":2},"c":null}},"updated":{"string":""}}`,
`{"before":{"foo_before":{"a":{"long":3},"c":{"long":3}}},"after":{"foo":{"a":{"long":3},"c":{"long":3}}},"updated":{"string":""}}`,
}
if strings.Join(expected, "\n") != strings.Join(updated, "\n") {
t.Fatalf("expected\n%s\n\ngot\n%s\n\n",
Expand Down

0 comments on commit fa5de4b

Please sign in to comment.