-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
changefeedccl: implement WITH diff
option
#28666
Comments
envelope=diff
optionWITH
option
WITH
optionWITH diff
option
WITH diff
optionWITH diff
option
to be clear, this would add a |
More context because I had already typed it: Is your feature request related to a problem? Please describe. Changefeeds currently provide users with a mechanism to mirror table state by starting with a snapshot of the data and then emitting events corresponding to each update to the table. This is useful for continuous export into a data warehouse or for event processing for append only tables (or append/delete tables). Often systems would like to react to state changes, such as row updates which change the row from state A->B. Currently it is very difficult to use changefeeds for this purpose. Describe the solution you'd like Ideally there would be a changefeed option to request the Describe alternatives you've considered In theory one could write code that performs an AS OF SYSTEM TIME query at updated.Prev() but doing that is more painful than it sounds because one has to be aware of the schema of the table and then needs to convert the json to the appropriate SQL values for the predicate for use in the SELECT and then one has to find a way to convert that SQL back to json such that the read previous value looks like that "after" value. Perhaps a SQL builtin to turn a tuple into a JSON object would lessen the reach here enough to make it plausible. |
Oh by skipping ones that are identical when converted to SQL, how interesting. You might even be able to distinguish between that and someone updating (via SQL) some column with a value that happens to be the old value because in that case the kvs would be identical. I haven't worked out all the details, but this passes the eye test and is probably worth exploring. |
Btw, @nvanbenschoten had a flex friday project to implement this before/after. Nathan, how far away from mergeable was that? It might be worth getting this scheduled if it's not too much extra work. |
The missing parts of that prototype were testing and actually exposing a I think it could be pushed over the finish line in 1-2 days. Want to work on it tomorrow together since we'll be sitting next to each other? This was the remaining blocker for @benesch so I'm sure he'd be happy to have it hooked up. |
FWIW, this is my use case for this. |
Works for me! |
This commit creates a rangefeed.Filter object, which informs the producer of logical operations of the information that a rangefeed Processor is interested in, given its current set of registrations. It can be used to avoid performing extra work to provide the Processor with information which will be ignored. This is an important optimization for single-key or small key span rangefeeds, as it avoids some extraneous work for the key spans not being watched. For now, this extra work is just fetching the post-operation value for MVCCWriteValueOp and MVCCCommitIntentOp operations, but this can be extended in the future to include the pre-operation value as well. This will be important to avoid any perf regression when addressing cockroachdb#28666. Release note (<category, see below>): <what> <show> <why>
@danhhz I got the plumbing hooked up, so this is mostly just missing testing and polish at this point. See #41793. Also, while we're here, @benesch do you mind writing down what you mentioned in-person about improving the Avro encoding for Materialize's use case? I believe your suggestions were split between re-ordering the fields in the acro encoding to allow for less buffering (ceddb83638dc7f9fe1973a1c5afd564c83ab7ed1) and creating an Avro-style union type between the |
Oh man, I'd be over the moon!
Yeah, the first one was the most important. It's totally backwards compatible assuming that the change will cause a new schema to be generated with the new field ordering... which, now that I think about it, isn't obviously going to happen. @danhhz, how does CDC decide when to publish a new schema? Is it maintaining a hash map from schema -> schema registry ID, or does it do something more complicated based on the schema of the underlying table? The second one is most definitely not backwards compatible, and more of a philosophical point than anything else. Cockroach currently pushes records with entirely different shapes into the same Kafka topic, namely, the actual data records and the watermark (metadata) records. AFAICT this is fairly nonstandard use of the schema registry, which goes to great lengths to allow users to specify the backwards/forward compatibility requirements for all schemas in a given subject (see: https://docs.confluent.io/current/schema-registry/avro.html). I haven't tried this, but I wonder if Cockroach would have trouble if the schema registry it was communicating with required global forward compatibility? There's a test that claims this is handled cockroach/pkg/cmd/roachtest/cdc.go Lines 360 to 361 in c0e8390
So yeah, it seems to me that it might be preferable to have a single schema used for the topic that has a top-level union type with two variants, one for the data records and one for the metadata records. Given that the metadata variant won't evolve, the compatibility requirements will naturally apply to the data variant and therefore the union as a whole. Perhaps the variant would be too annoying, though, for the users who don't want to constantly unpack watermarks? Then again, you don't get the watermarks unless you request them. One trick we've been using at Materialize is to stuff watermarks in records like this: {
"before": null,
"after": null,
"arbitrary_metadata": {
"watermark": 12345,
}
} The In any case, I'm sure @danhhz has thought about this—curious to hear your thoughts, Dan! |
Fixes cockroachdb#28666. First commit from cockroachdb#41788. This commit adds a `WITH diff` option to CREATE CHANGEFEED. When the option is provided, changefeeds publications will include a `before` field, which includes the value of the row before the CDC update was applied. We are able to implement this efficiently by pushing the option into Rangefeed and performing the scan of the previous value immediately before applying the update in the Raft processing goroutine. cockroachdb#41788 allows us to avoid performing this lookup when `WITH diff` isn't specified for Rangefeeds, so the small (unmeasured) perf hit is strictly opt-in. DNM: this still needs testing and a bit of polish. It's also not clear how we want to handle catch-up scans when the option is provided.
Our internal caching is based on the table id and version, but the schema registry itself does it based on some fingerprint of the final schema. I think this might end up being okay. If a node switches the avro "meta-schema" (the change you're talking about here), it will necessarily be because the node restarted with a new binary version, which means its in-memory cache will be cold. It fills it by talking to the schema registry, which should do the right thing here. This will cause the topic to be a strange mixture of the two meta-schemas as the nodes are rolled over to the new version, but this is probably fine for an experimental feature. cockroach/pkg/ccl/changefeedccl/encoder.go Line 319 in 77764a5
(Side Q since I've paged all this out: Does reordering fields in an avro schema matter? Can you not read records written with the same fields in a different order?)
I thought pretty hard about being a good citizen of the schema registry so interested into digging into this. IIRC, we do actually offer full backward and forward compatibility though unions with null. There are two variants, one for the resolved timestamp messages and one for the data rows. Basically, as far as I can tell what we're doing is fundamentally what you're describing. We do flatten the metadata down into the top-level instead of its own "arbitrary_metadata" record, which I'm hesitant to change since it currently exactly mirrors the json format. Is that important for some reason I'm not seeing? @benesch How about this, can you give me a concrete example of something actually output by CockroachDB (one data entry and one metadata entry) and the same example formatted in the way you're looking for? |
Sweet, that sounds perfect.
So it doesn't matter for correctness, only for performance. Since Avro records are identified in the schema by their name, not their position, Avro decoders will give you the same object back regardless of the ordering of the fields in the record schema. For example, both of these Avro schemas {
"type": "record",
"name": "row",
"fields" : [
{"name": "a", "type": "int"},
{"name": "b", "type": "int"}
]
} {
"type": "record",
"name": "row",
"fields" : [
{"name": "b", "type": "int"},
{"name": "a", "type": "int"}
]
} can be decoded into the same struct: struct row {
int a;
int b;
} The on-the-wire encoding is order sensitive, of course, and we play some tricks if the record is laid out in exactly the right order, namely a nullable before field, followed by a nullable after field, followed by anything. For example, decoding the before/after fields via a full-fledged Avro decoder results in an allocation for each field; our specialized decoder can avoid that allocation. The more compelling point—and the reason that I feel somewhat reasonable requesting this change in Cockroach—is that Avro doesn't allow efficient skipping of fields. If the unimportant fields come first, you have no choice but to decode them to get to the data that you care about. If the important fields come first, you can leave the unimportant fields undecoded at the end of the record.
For sure! Here's some output from the office dogs example.
The first record has this schema:
while the second record has this schema: {
"type": "record",
"name": "office_dogs_envelope",
"fields": [
{
"name": "resolved",
"type": [
"null",
"string"
],
"default": null
}
]
} Now that I look at it, I guess the fact that [
{
"type": "record",
"name": "office_dogs_envelope",
"fields": [
{
"name": "updated",
"type": [
"null",
"string"
]
},
{
"name": "before",
"type": [
"null",
{
"type": "record",
"name": "office_dogs",
"fields": [
{
"name": "id",
"type": [
"null",
"long"
],
"default": null,
"__crdb__": "id INT8 NOT NULL"
},
{
"name": "name",
"type": [
"null",
"string"
],
"default": null,
"__crdb__": "name STRING NULL"
}
]
}
]
},
{ "name": "after", "type": "office_dogs" }
]
},
{
"type": "record",
"name": "cockroachdb-timestamp-metadata",
"fields": [
{
"name": "resolved",
"type": "string"
}
]
}
] |
If avro is fine decoding a struct that was encoded using a schema with the same fields in a different order and it gets the same result, then I'm absolutely fine making the field reordering change. If we were doing this all from scratch, I think I'd agree with you that a union of the two resolved/row records is more natural. I like that a user would basically have a switch/match that hands one or the other to whatever code is responsible for processing it. However, having it mirror the JSON format also seems important to me. If it's better for avro, I don't see why it wouldn't also be better for json, so I would do it for both. This would require adding a new |
Ok, great! And yeah, every Avro library I've seen is like goavro in that a record is encoded to/decoded from a
Cool, sounds like we're on the same page. The current format is totally workable; it was just a bit surprising/confusing to see two different schema IDs, since the Avro decoding code I wrote expected multiple schema IDs to mean that a schema change was happening and started to warn the user. Now we have special logic to determine whether the new schema ID we're seeing is just a special CockroachDB resolved timestamp. I'm still a bit confused by what you mean when you say it should mirror the JSON format. Do you mean that with the union-style schema, the JSON records would look like this: {"office_dogs_envelope": {"updated": {"string": "1571844805329302000.0000000000"}, "after": {"office_dogs": {"id": {"long": 1}, "name": {"string": "Petee H"}}}}
{"cockroachdb-timestamp-metadata": {"resolved": {"string": "1571844807817082000.0000000000"}}} (I guess that makes sense!) |
Yeah, that's exactly what I'm saying. Though I'd probably call the fields something that doesn't vary per table/topic. |
This commit creates a rangefeed.Filter object, which informs the producer of logical operations of the information that a rangefeed Processor is interested in, given its current set of registrations. It can be used to avoid performing extra work to provide the Processor with information which will be ignored. This is an important optimization for single-key or small key span rangefeeds, as it avoids some extraneous work for the key spans not being watched. For now, this extra work is just fetching the post-operation value for MVCCWriteValueOp and MVCCCommitIntentOp operations, but this can be extended in the future to include the pre-operation value as well. This will be important to avoid any perf regression when addressing cockroachdb#28666. Release note (<category, see below>): <what> <show> <why>
Fixes cockroachdb#28666. First commit from cockroachdb#41788. This commit adds a `WITH diff` option to CREATE CHANGEFEED. When the option is provided, changefeeds publications will include a `before` field, which includes the value of the row before the CDC update was applied. We are able to implement this efficiently by pushing the option into Rangefeed and performing the scan of the previous value immediately before applying the update in the Raft processing goroutine. cockroachdb#41788 allows us to avoid performing this lookup when `WITH diff` isn't specified for Rangefeeds, so the small (unmeasured) perf hit is strictly opt-in. Release note (sql change): CHANGEFEED now supports a `WITH diff` option, which instructs it to include a `before` field in each publication.
This commit creates a rangefeed.Filter object, which informs the producer of logical operations of the information that a rangefeed Processor is interested in, given its current set of registrations. It can be used to avoid performing extra work to provide the Processor with information which will be ignored. This is an important optimization for single-key or small key span rangefeeds, as it avoids some extraneous work for the key spans not being watched. For now, this extra work is just fetching the post-operation value for MVCCWriteValueOp and MVCCCommitIntentOp operations, but this can be extended in the future to include the pre-operation value as well. This will be important to avoid any perf regression when addressing cockroachdb#28666. Release note (<category, see below>): <what> <show> <why>
This commit creates a rangefeed.Filter object, which informs the producer of logical operations of the information that a rangefeed Processor is interested in, given its current set of registrations. It can be used to avoid performing extra work to provide the Processor with information which will be ignored. This is an important optimization for single-key or small key span rangefeeds, as it avoids some extraneous work for the key spans not being watched. For now, this extra work is just fetching the post-operation value for MVCCWriteValueOp and MVCCCommitIntentOp operations, but this can be extended in the future to include the pre-operation value as well. This will be important to avoid any perf regression when addressing cockroachdb#28666.
41788: storage/rangefeed: push rangefeed.Filter into Raft processing goroutine r=danhhz a=nvanbenschoten This commit creates a `rangefeed.Filter` object, which informs the producer of logical operations of the information that a `rangefeed.Processor` is interested in, given its current set of registrations. It can be used to avoid performing extra work to provide the Processor with information which will be ignored. This is an important optimization for single-key or small key span rangefeeds, as it avoids some extraneous work for the key spans not being watched. For now, this extra work is just fetching the post-operation value for `MVCCWriteValueOp` and `MVCCCommitIntentOp` operations, but this can be extended in the future to include the pre-operation value as well. This will be important to avoid any perf regression when addressing #28666. 42580: sql: add precision support for TIMESTAMP/TIMESTAMPTZ (v2) r=otan a=otan Refs #32098 Alternative to #42552 ---- Add precision support for TIMESTAMP/TIMESTAMPTZ, supporting precisions from 0 to 6 inclusive. When storing this in the type proto, we introduce a `TimePrecisionIsSet` variable, which is used to differentiate 0 (unset) or 0 (explicitly set) for time related protos. This is abstracted away in the `Precision()` function which returns the correct precision (unset => 6) for time. This allows forwards and backwards compatibility. Note Time will come later, as it involves a bit more plumbing to the base library. Release note (sql change): Introduces precision support for TIMESTAMP and TIMESTAMPTZ, supporting precisions from 0 to 6 inclusive. Previous versions of TIMESTAMP and TIMESTAMPTZ will default to 6 units of precision. Note that if anyone downgrades whilst having precision set, they will have full precision (6) again, but if they re-upgrade they will find their precisions truncated again. Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com> Co-authored-by: Oliver Tan <otan@cockroachlabs.com>
41793: ccl/changefeedccl: implement `WITH diff` option r=nvanbenschoten a=nvanbenschoten Fixes #28666. This commit adds a `WITH diff` option to CREATE CHANGEFEED. When the option is provided, changefeeds publications will include a `before` field, which includes the value of the row before the CDC update was applied. We are able to implement this efficiently by pushing the option into Rangefeed and performing the scan of the previous value immediately before applying the update in the Raft processing goroutine. #41788 allows us to avoid performing this lookup when `WITH diff` isn't specified for Rangefeeds, so the small (unmeasured) perf hit is strictly opt-in. Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
Should this be labeled as 20.1? |
This alters the emitted messages to have the previous and new versions of each updated row instead of just the new one. It will probably require a followup scan to get the old data and so will be slower.
The text was updated successfully, but these errors were encountered: