Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: implement WITH diff option #28666

Closed
danhhz opened this issue Aug 15, 2018 · 15 comments · Fixed by #41793
Closed

changefeedccl: implement WITH diff option #28666

danhhz opened this issue Aug 15, 2018 · 15 comments · Fixed by #41793
Labels
A-cdc Change Data Capture C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception)

Comments

@danhhz
Copy link
Contributor

danhhz commented Aug 15, 2018

This alters the emitted messages to have the previous and new versions of each updated row instead of just the new one. It will probably require a followup scan to get the old data and so will be slower.

@danhhz danhhz added C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception) A-cdc Change Data Capture labels Aug 15, 2018
@danhhz danhhz added this to the 2.2 milestone Aug 15, 2018
@petermattis petermattis removed this from the 2.2 milestone Oct 5, 2018
@danhhz danhhz changed the title chagnefeedccl: implement envelope=diff option chagnefeedccl: implement WITH option Apr 16, 2019
@danhhz danhhz changed the title chagnefeedccl: implement WITH option chagnefeedccl: implement WITH diff option Apr 16, 2019
@benesch benesch changed the title chagnefeedccl: implement WITH diff option changefeedccl: implement WITH diff option Jun 11, 2019
@rolandcrosby
Copy link

to be clear, this would add a before field to complement the existing after field, showing the state of the row before the KV change got written. I'm wondering if this would also enable us to suppress output of unchanged rows (e.g. the extra identical writes that currently show up during schema changes).

@ajwerner
Copy link
Contributor

More context because I had already typed it:

Is your feature request related to a problem? Please describe.

Changefeeds currently provide users with a mechanism to mirror table state by starting with a snapshot of the data and then emitting events corresponding to each update to the table. This is useful for continuous export into a data warehouse or for event processing for append only tables (or append/delete tables).

Often systems would like to react to state changes, such as row updates which change the row from state A->B. Currently it is very difficult to use changefeeds for this purpose.

Describe the solution you'd like

Ideally there would be a changefeed option to request the "before" state of a row in added.

Describe alternatives you've considered

In theory one could write code that performs an AS OF SYSTEM TIME query at updated.Prev() but doing that is more painful than it sounds because one has to be aware of the schema of the table and then needs to convert the json to the appropriate SQL values for the predicate for use in the SELECT and then one has to find a way to convert that SQL back to json such that the read previous value looks like that "after" value.

Perhaps a SQL builtin to turn a tuple into a JSON object would lessen the reach here enough to make it plausible.

@danhhz
Copy link
Contributor Author

danhhz commented Oct 21, 2019

I'm wondering if this would also enable us to suppress output of unchanged rows (e.g. the extra identical writes that currently show up during schema changes).

Oh by skipping ones that are identical when converted to SQL, how interesting. You might even be able to distinguish between that and someone updating (via SQL) some column with a value that happens to be the old value because in that case the kvs would be identical.

I haven't worked out all the details, but this passes the eye test and is probably worth exploring.

@danhhz
Copy link
Contributor Author

danhhz commented Oct 21, 2019

Btw, @nvanbenschoten had a flex friday project to implement this before/after. Nathan, how far away from mergeable was that? It might be worth getting this scheduled if it's not too much extra work.

@nvanbenschoten
Copy link
Member

The missing parts of that prototype were testing and actually exposing a WITH diff option that's threaded all the way down to rangefeed. The latter is a little tricky because we'll need to synchronize with the raft processing goroutine in addition to the rangefeed goroutine. That's not impossible though, and we'll also need to solve that problem to efficiently support per-key rangefeeds. We could also always include the diff in rangefeed publications and avoid some of this complication, but that would likely have a performance impact that we'd like to avoid.

I think it could be pushed over the finish line in 1-2 days. Want to work on it tomorrow together since we'll be sitting next to each other? This was the remaining blocker for @benesch so I'm sure he'd be happy to have it hooked up.

@vilterp
Copy link
Contributor

vilterp commented Oct 21, 2019

Often systems would like to react to state changes, such as row updates which change the row from state A->B. Currently it is very difficult to use changefeeds for this purpose.

FWIW, this is my use case for this.

@danhhz
Copy link
Contributor Author

danhhz commented Oct 21, 2019

I think it could be pushed over the finish line in 1-2 days. Want to work on it tomorrow together since we'll be sitting next to each other?

Works for me!

nvanbenschoten added a commit to nvanbenschoten/cockroach that referenced this issue Oct 22, 2019
This commit creates a rangefeed.Filter object, which informs the producer of
logical operations of the information that a rangefeed Processor is interested
in, given its current set of registrations. It can be used to avoid performing
extra work to provide the Processor with information which will be ignored.

This is an important optimization for single-key or small key span rangefeeds,
as it avoids some extraneous work for the key spans not being watched.

For now, this extra work is just fetching the post-operation value for
MVCCWriteValueOp and MVCCCommitIntentOp operations, but this can be extended in
the future to include the pre-operation value as well. This will be important
to avoid any perf regression when addressing cockroachdb#28666.

Release note (<category, see below>): <what> <show> <why>
@nvanbenschoten
Copy link
Member

@danhhz I got the plumbing hooked up, so this is mostly just missing testing and polish at this point. See #41793.

Also, while we're here, @benesch do you mind writing down what you mentioned in-person about improving the Avro encoding for Materialize's use case? I believe your suggestions were split between re-ordering the fields in the acro encoding to allow for less buffering (ceddb83638dc7f9fe1973a1c5afd564c83ab7ed1) and creating an Avro-style union type between the resolved messages and the before/after/updated messages (not implemented). Do you know which of these are backward compatible changes?

@benesch
Copy link
Contributor

benesch commented Oct 22, 2019

This was the remaining blocker for @benesch so I'm sure he'd be happy to have it hooked up.

Oh man, I'd be over the moon!

Also, while we're here, @benesch do you mind writing down what you mentioned in-person about improving the Avro encoding for Materialize's use case? I believe your suggestions were split between re-ordering the fields in the acro encoding to allow for less buffering (ceddb83) and creating an Avro-style union type between the resolved messages and the before/after/updated messages (not implemented). Do you know which of these are backward compatible changes?

Yeah, the first one was the most important. It's totally backwards compatible assuming that the change will cause a new schema to be generated with the new field ordering... which, now that I think about it, isn't obviously going to happen. @danhhz, how does CDC decide when to publish a new schema? Is it maintaining a hash map from schema -> schema registry ID, or does it do something more complicated based on the schema of the underlying table?

The second one is most definitely not backwards compatible, and more of a philosophical point than anything else. Cockroach currently pushes records with entirely different shapes into the same Kafka topic, namely, the actual data records and the watermark (metadata) records. AFAICT this is fairly nonstandard use of the schema registry, which goes to great lengths to allow users to specify the backwards/forward compatibility requirements for all schemas in a given subject (see: https://docs.confluent.io/current/schema-registry/avro.html).

I haven't tried this, but I wonder if Cockroach would have trouble if the schema registry it was communicating with required global forward compatibility? There's a test that claims this is handled

// end-to-end (including the schema registry default of requiring backward
// compatibility within a topic).
but I haven't tracked down the code around it.

So yeah, it seems to me that it might be preferable to have a single schema used for the topic that has a top-level union type with two variants, one for the data records and one for the metadata records. Given that the metadata variant won't evolve, the compatibility requirements will naturally apply to the data variant and therefore the union as a whole.

Perhaps the variant would be too annoying, though, for the users who don't want to constantly unpack watermarks? Then again, you don't get the watermarks unless you request them. One trick we've been using at Materialize is to stuff watermarks in records like this:

{
  "before": null,
  "after": null,
  "arbitrary_metadata": {
    "watermark": 12345,
  }
}

The {"before": null, "after": null} is a bit weird, but has the nice property of being pretty obviously a no-op.

In any case, I'm sure @danhhz has thought about this—curious to hear your thoughts, Dan!

nvanbenschoten added a commit to nvanbenschoten/cockroach that referenced this issue Oct 22, 2019
Fixes cockroachdb#28666.
First commit from cockroachdb#41788.

This commit adds a `WITH diff` option to CREATE CHANGEFEED. When the
option is provided, changefeeds publications will include a `before`
field, which includes the value of the row before the CDC update was
applied.

We are able to implement this efficiently by pushing the option into
Rangefeed and performing the scan of the previous value immediately
before applying the update in the Raft processing goroutine. cockroachdb#41788
allows us to avoid performing this lookup when `WITH diff` isn't
specified for Rangefeeds, so the small (unmeasured) perf hit is
strictly opt-in.

DNM: this still needs testing and a bit of polish. It's also not clear
how we want to handle catch-up scans when the option is provided.
@danhhz
Copy link
Contributor Author

danhhz commented Oct 22, 2019

@danhhz, how does CDC decide when to publish a new schema? Is it maintaining a hash map from schema -> schema registry ID, or does it do something more complicated based on the schema of the underlying table?

Our internal caching is based on the table id and version, but the schema registry itself does it based on some fingerprint of the final schema. I think this might end up being okay. If a node switches the avro "meta-schema" (the change you're talking about here), it will necessarily be because the node restarted with a new binary version, which means its in-memory cache will be cold. It fills it by talking to the schema registry, which should do the right thing here.

This will cause the topic to be a strange mixture of the two meta-schemas as the nodes are rolled over to the new version, but this is probably fine for an experimental feature.

cacheKey := makeTableIDAndVersion(row.tableDesc.ID, row.tableDesc.Version)

(Side Q since I've paged all this out: Does reordering fields in an avro schema matter? Can you not read records written with the same fields in a different order?)

The second one is most definitely not backwards compatible, and more of a philosophical point than anything else. Cockroach currently pushes records with entirely different shapes into the same Kafka topic, namely, the actual data records and the watermark (metadata) records. AFAICT this is fairly nonstandard use of the schema registry, which goes to great lengths to allow users to specify the backwards/forward compatibility requirements for all schemas in a given subject (see: https://docs.confluent.io/current/schema-registry/avro.html).

I haven't tried this, but I wonder if Cockroach would have trouble if the schema registry it was communicating with required global forward compatibility? There's a test that claims this is handled

// end-to-end (including the schema registry default of requiring backward
// compatibility within a topic).
but I haven't tracked down the code around it.

So yeah, it seems to me that it might be preferable to have a single schema used for the topic that has a top-level union type with two variants, one for the data records and one for the metadata records. Given that the metadata variant won't evolve, the compatibility requirements will naturally apply to the data variant and therefore the union as a whole.

Perhaps the variant would be too annoying, though, for the users who don't want to constantly unpack watermarks? Then again, you don't get the watermarks unless you request them. One trick we've been using at Materialize is to stuff watermarks in records like this:

{
  "before": null,
  "after": null,
  "arbitrary_metadata": {
    "watermark": 12345,
  }
}

The {"before": null, "after": null} is a bit weird, but has the nice property of being pretty obviously a no-op.

I thought pretty hard about being a good citizen of the schema registry so interested into digging into this. IIRC, we do actually offer full backward and forward compatibility though unions with null. There are two variants, one for the resolved timestamp messages and one for the data rows.

Basically, as far as I can tell what we're doing is fundamentally what you're describing. We do flatten the metadata down into the top-level instead of its own "arbitrary_metadata" record, which I'm hesitant to change since it currently exactly mirrors the json format. Is that important for some reason I'm not seeing?

@benesch How about this, can you give me a concrete example of something actually output by CockroachDB (one data entry and one metadata entry) and the same example formatted in the way you're looking for?

@benesch
Copy link
Contributor

benesch commented Oct 23, 2019

This will cause the topic to be a strange mixture of the two meta-schemas as the nodes are rolled over to the new version, but this is probably fine for an experimental feature.

Sweet, that sounds perfect.

(Side Q since I've paged all this out: Does reordering fields in an avro schema matter? Can you not read records written with the same fields in a different order?)

So it doesn't matter for correctness, only for performance. Since Avro records are identified in the schema by their name, not their position, Avro decoders will give you the same object back regardless of the ordering of the fields in the record schema. For example, both of these Avro schemas

{
  "type": "record",
  "name": "row",
  "fields" : [
    {"name": "a", "type": "int"},
    {"name": "b", "type": "int"}
  ]
}
{
  "type": "record",
  "name": "row",
  "fields" : [
    {"name": "b", "type": "int"},
    {"name": "a", "type": "int"}
  ]
}

can be decoded into the same struct:

struct row {
    int a;
    int b;
}

The on-the-wire encoding is order sensitive, of course, and we play some tricks if the record is laid out in exactly the right order, namely a nullable before field, followed by a nullable after field, followed by anything. For example, decoding the before/after fields via a full-fledged Avro decoder results in an allocation for each field; our specialized decoder can avoid that allocation.

The more compelling point—and the reason that I feel somewhat reasonable requesting this change in Cockroach—is that Avro doesn't allow efficient skipping of fields. If the unimportant fields come first, you have no choice but to decode them to get to the data that you care about. If the important fields come first, you can leave the unimportant fields undecoded at the end of the record.

@benesch How about this, can you give me a concrete example of something actually output by CockroachDB (one data entry and one metadata entry) and the same example formatted in the way you're looking for?

For sure! Here's some output from the office dogs example.

{"updated": {"string": "1571844805329302000.0000000000"}, "after": {"office_dogs": {"id": {"long": 1}, "name": {"string": "Petee H"}}}}
{"resolved": {"string": "1571844807817082000.0000000000"}}

The first record has this schema:

{
  "type": "record",
  "name": "office_dogs_envelope",
  "fields": [
    {
      "name": "updated",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "after",
      "type": [
        "null",
        {
          "type": "record",
          "name": "office_dogs",
          "fields": [
            {
              "name": "id",
              "type": [
                "null",
                "long"
              ],
              "default": null,
              "__crdb__": "id INT8 NOT NULL"
            },
            {
              "name": "name",
              "type": [
                "null",
                "string"
              ],
              "default": null,
              "__crdb__": "name STRING NULL"
            }
          ]
        }
      ],
      "default": null
    }
  ]
}

while the second record has this schema:

{
  "type": "record",
  "name": "office_dogs_envelope",
  "fields": [
    {
      "name": "resolved",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ]
}

Now that I look at it, I guess the fact that resolved, before, after, and updated are all nullable means that these two schemas are forwards and backwards compatible with one another. I guess I just would have expected one schema that unions the two schema versions together like this:

[
  {
    "type": "record",
    "name": "office_dogs_envelope",
    "fields": [
      {
        "name": "updated",
        "type": [
          "null",
          "string"
        ]
      },
      {
        "name": "before",
        "type": [
          "null",
          {
            "type": "record",
            "name": "office_dogs",
            "fields": [
              {
                "name": "id",
                "type": [
                  "null",
                  "long"
                ],
                "default": null,
                "__crdb__": "id INT8 NOT NULL"
              },
              {
                "name": "name",
                "type": [
                  "null",
                  "string"
                ],
                "default": null,
                "__crdb__": "name STRING NULL"
              }
            ]
          }
        ]
      },
      { "name": "after", "type": "office_dogs" }
    ]
  },
  {
    "type": "record",
    "name": "cockroachdb-timestamp-metadata",
    "fields": [
      {
        "name": "resolved",
        "type": "string"
      }
    ]
  }
]

@danhhz
Copy link
Contributor Author

danhhz commented Oct 23, 2019

If avro is fine decoding a struct that was encoded using a schema with the same fields in a different order and it gets the same result, then I'm absolutely fine making the field reordering change.

If we were doing this all from scratch, I think I'd agree with you that a union of the two resolved/row records is more natural. I like that a user would basically have a switch/match that hands one or the other to whatever code is responsible for processing it. However, having it mirror the JSON format also seems important to me. If it's better for avro, I don't see why it wouldn't also be better for json, so I would do it for both. This would require adding a new format= option and changing the default for newly created changefeeds from envelope to this new one. TBH, I'm extremely hesitant to do that (again, we changed the default format type in 19.2) without more design partners than just y'all. Luckily this is always something we can do later and it also sounds like the one you care about less.

@benesch
Copy link
Contributor

benesch commented Oct 23, 2019

If avro is fine decoding a struct that was encoded using a schema with the same fields in a different order and it gets the same result, then I'm absolutely fine making the field reordering change.

Ok, great! And yeah, every Avro library I've seen is like goavro in that a record is encoded to/decoded from a map[string]interface{}, and so the ordering of the fields is totally invisible to the user.

If we were doing this all from scratch, I think I'd agree with you that a union of the two resolved/row records is more natural. I like that a user would basically have a switch/match that hands one or the other to whatever code is responsible for processing it...

...If we were doing this all from scratch, I think I'd agree with you that a union of the two resolved/row records is more natural. I like that a user would basically have a switch/match that hands one or the other to whatever code is responsible for processing it.

Cool, sounds like we're on the same page. The current format is totally workable; it was just a bit surprising/confusing to see two different schema IDs, since the Avro decoding code I wrote expected multiple schema IDs to mean that a schema change was happening and started to warn the user. Now we have special logic to determine whether the new schema ID we're seeing is just a special CockroachDB resolved timestamp.

I'm still a bit confused by what you mean when you say it should mirror the JSON format. Do you mean that with the union-style schema, the JSON records would look like this:

{"office_dogs_envelope": {"updated": {"string": "1571844805329302000.0000000000"}, "after": {"office_dogs": {"id": {"long": 1}, "name": {"string": "Petee H"}}}}
{"cockroachdb-timestamp-metadata": {"resolved": {"string": "1571844807817082000.0000000000"}}}

(I guess that makes sense!)

@danhhz
Copy link
Contributor Author

danhhz commented Oct 23, 2019

I'm still a bit confused by what you mean when you say it should mirror the JSON format. Do you mean that with the union-style schema, the JSON records would look like this:

{"office_dogs_envelope": {"updated": {"string": "1571844805329302000.0000000000"}, "after": {"office_dogs": {"id": {"long": 1}, "name": {"string": "Petee H"}}}}
{"cockroachdb-timestamp-metadata": {"resolved": {"string": "1571844807817082000.0000000000"}}}

(I guess that makes sense!)

Yeah, that's exactly what I'm saying. Though I'd probably call the fields something that doesn't vary per table/topic.

nvanbenschoten added a commit to nvanbenschoten/cockroach that referenced this issue Nov 16, 2019
This commit creates a rangefeed.Filter object, which informs the producer of
logical operations of the information that a rangefeed Processor is interested
in, given its current set of registrations. It can be used to avoid performing
extra work to provide the Processor with information which will be ignored.

This is an important optimization for single-key or small key span rangefeeds,
as it avoids some extraneous work for the key spans not being watched.

For now, this extra work is just fetching the post-operation value for
MVCCWriteValueOp and MVCCCommitIntentOp operations, but this can be extended in
the future to include the pre-operation value as well. This will be important
to avoid any perf regression when addressing cockroachdb#28666.

Release note (<category, see below>): <what> <show> <why>
nvanbenschoten added a commit to nvanbenschoten/cockroach that referenced this issue Nov 16, 2019
Fixes cockroachdb#28666.
First commit from cockroachdb#41788.

This commit adds a `WITH diff` option to CREATE CHANGEFEED. When the
option is provided, changefeeds publications will include a `before`
field, which includes the value of the row before the CDC update was
applied.

We are able to implement this efficiently by pushing the option into
Rangefeed and performing the scan of the previous value immediately
before applying the update in the Raft processing goroutine. cockroachdb#41788
allows us to avoid performing this lookup when `WITH diff` isn't
specified for Rangefeeds, so the small (unmeasured) perf hit is
strictly opt-in.

Release note (sql change): CHANGEFEED now supports a `WITH diff` option,
which instructs it to include a `before` field in each publication.
nvanbenschoten added a commit to nvanbenschoten/cockroach that referenced this issue Nov 19, 2019
This commit creates a rangefeed.Filter object, which informs the producer of
logical operations of the information that a rangefeed Processor is interested
in, given its current set of registrations. It can be used to avoid performing
extra work to provide the Processor with information which will be ignored.

This is an important optimization for single-key or small key span rangefeeds,
as it avoids some extraneous work for the key spans not being watched.

For now, this extra work is just fetching the post-operation value for
MVCCWriteValueOp and MVCCCommitIntentOp operations, but this can be extended in
the future to include the pre-operation value as well. This will be important
to avoid any perf regression when addressing cockroachdb#28666.

Release note (<category, see below>): <what> <show> <why>
nvanbenschoten added a commit to nvanbenschoten/cockroach that referenced this issue Nov 20, 2019
This commit creates a rangefeed.Filter object, which informs the producer of
logical operations of the information that a rangefeed Processor is interested
in, given its current set of registrations. It can be used to avoid performing
extra work to provide the Processor with information which will be ignored.

This is an important optimization for single-key or small key span rangefeeds,
as it avoids some extraneous work for the key spans not being watched.

For now, this extra work is just fetching the post-operation value for
MVCCWriteValueOp and MVCCCommitIntentOp operations, but this can be extended in
the future to include the pre-operation value as well. This will be important
to avoid any perf regression when addressing cockroachdb#28666.
craig bot pushed a commit that referenced this issue Nov 21, 2019
41788: storage/rangefeed: push rangefeed.Filter into Raft processing goroutine r=danhhz a=nvanbenschoten

This commit creates a `rangefeed.Filter` object, which informs the producer of logical operations of the information that a `rangefeed.Processor` is interested in, given its current set of registrations. It can be used to avoid performing extra work to provide the Processor with information which will be ignored.

This is an important optimization for single-key or small key span rangefeeds, as it avoids some extraneous work for the key spans not being watched.

For now, this extra work is just fetching the post-operation value for `MVCCWriteValueOp` and `MVCCCommitIntentOp` operations, but this can be extended in the future to include the pre-operation value as well. This will be important to avoid any perf regression when addressing #28666.

42580: sql: add precision support for TIMESTAMP/TIMESTAMPTZ (v2) r=otan a=otan

Refs #32098
Alternative to #42552

----

Add precision support for TIMESTAMP/TIMESTAMPTZ, supporting precisions
from 0 to 6 inclusive.

When storing this in the type proto, we introduce a `TimePrecisionIsSet`
variable, which is used to differentiate 0 (unset) or 0 (explicitly set)
for time related protos. This is abstracted away in the `Precision()`
function which returns the correct precision (unset => 6) for time. This
allows forwards and backwards compatibility.

Note Time will come later, as it involves a bit more plumbing to the
base library.

Release note (sql change): Introduces precision support for TIMESTAMP
and TIMESTAMPTZ, supporting precisions from 0 to 6 inclusive. Previous
versions of TIMESTAMP and TIMESTAMPTZ will default to 6 units of
precision. Note that if anyone downgrades whilst having precision set,
they will have full precision (6) again, but if they re-upgrade they
will find their precisions truncated again.

Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
Co-authored-by: Oliver Tan <otan@cockroachlabs.com>
craig bot pushed a commit that referenced this issue Nov 21, 2019
41793: ccl/changefeedccl: implement `WITH diff` option r=nvanbenschoten a=nvanbenschoten

Fixes #28666.

This commit adds a `WITH diff` option to CREATE CHANGEFEED. When the option is provided, changefeeds publications will include a `before` field, which includes the value of the row before the CDC update was applied.

We are able to implement this efficiently by pushing the option into Rangefeed and performing the scan of the previous value immediately before applying the update in the Raft processing goroutine. #41788 allows us to avoid performing this lookup when `WITH diff` isn't specified for Rangefeeds, so the small (unmeasured) perf hit is strictly opt-in.

Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
@craig craig bot closed this as completed in a24cda0 Nov 21, 2019
@lopezator
Copy link
Contributor

Should this be labeled as 20.1?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-cdc Change Data Capture C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception)
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants