Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ccl/changefeedccl: implement WITH diff option #41793

Merged
merged 6 commits into from
Nov 21, 2019

Conversation

nvanbenschoten
Copy link
Member

@nvanbenschoten nvanbenschoten commented Oct 22, 2019

Fixes #28666.

This commit adds a WITH diff option to CREATE CHANGEFEED. When the option is provided, changefeeds publications will include a before field, which includes the value of the row before the CDC update was applied.

We are able to implement this efficiently by pushing the option into Rangefeed and performing the scan of the previous value immediately before applying the update in the Raft processing goroutine. #41788 allows us to avoid performing this lookup when WITH diff isn't specified for Rangefeeds, so the small (unmeasured) perf hit is strictly opt-in.

@cockroach-teamcity
Copy link
Member

This change is Reviewable

Copy link
Contributor

@danhhz danhhz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay first pass here

  • I'm sciencedog on the MinTimestamp change but I trust you know what you're doing there
  • Lets leave the avro reordering out of this PR so it can be considered separately

Testing plan

  • Very basic with diff and without diff test
  • Add it to nemesis test somehow
  • Whatever registration/deregistration edge cases you are most worried about

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @danhhz and @nvanbenschoten)


pkg/ccl/changefeedccl/avro.go, line 521 at r4 (raw file):

	}
	if opts.afterField {
		schema.after = after

do we need a similar line for before?


pkg/ccl/changefeedccl/avro.go, line 578 at r4 (raw file):

				return nil, err
			}
			native[`before`] = goavro.Union(avroUnionKey(&r.after.avroRecord), beforeNative)

seems weird that this is still named r.after. if they're going to be shared, we should probably rename it


pkg/ccl/changefeedccl/changefeed.go, line 106 at r4 (raw file):

			}
			if r.row.datums == nil {
				log.Fatalf(ctx, "TODO DURING REVIEW can this happen?")

good question, maybe not. AssertionErrorf?


pkg/ccl/changefeedccl/changefeed.go, line 112 at r4 (raw file):

			r.row.updated = schemaTimestamp

			var nextRow emitEntry

this block is just asserting that we don't get a second row? let's throw in a comment saying that


pkg/ccl/changefeedccl/encoder.go, line 49 at r4 (raw file):

	// prevDatums is the old value of a changed table row.
	prevDatums sqlbase.EncDatumRow
	// TODO DURING REVIEW: Do we need this?

i think so, right? what if a row is deleted then re-inserted. we'd see the tombstone when we looked up the previous value. sounds like a good test case


pkg/ccl/changefeedccl/encoder.go, line 107 at r4 (raw file):

	_, e.beforeField = opts[optDiff]
	if e.beforeField && !e.wrapped {
		return nil, errors.Errorf(`%s is only usable with %s=%s`,

note to self: make sure there's a test for this


pkg/ccl/changefeedccl/encoder.go, line 435 at r4 (raw file):

	}
	binary.BigEndian.PutUint32(header[1:5], uint32(registered.registryID))
	return registered.schema.BinaryFromRow(header, meta, nil /* row */, nil)

nit: comment for second nil


pkg/ccl/changefeedccl/poller.go, line 524 at r4 (raw file):

		sort.Sort(byValueTimestamp(kvs))
		for _, kv := range kvs {
			if err := p.buf.AddKV(ctx, kv, nil /* prevVal */, schemaTimestamp); err != nil {

oh hmmmm this is a good question. what should be the value for the backfill we run after schema changes? leaving it unset is a little strange


pkg/roachpb/api.proto, line 1847 at r4 (raw file):

  bytes key       = 1 [(gogoproto.casttype) = "Key"];
  Value value     = 2 [(gogoproto.nullable) = false];
  // TODO DURING REVIEW: how should this be represented? Should this be:

okay, so the two axis here are a) is the consumer responsible for knowing whether the with_diff option was set on the request and b) does the consumer need to know the difference between the previous key is a deletion vs it never existed. 1 is no and yes, 2 is no and no, 3 is yes and no

a) i could go either way
b) where do we currently expose the difference between a key that was deleted and one that never existed? at the moment, i think it's basically only things that involve spans of time (incremental backup, rangefeed catchup scan). these events are all single point in time, so i think i lean toward not needed to differentiate between these

so....given that non-nullable fields are generally preferred, i guess 3, which is what you already have. i don't feel like i have a strong feel for this though

@nvanbenschoten nvanbenschoten force-pushed the nvanbenschoten/cdcDiff branch 3 times, most recently from 14da513 to 91323f8 Compare October 24, 2019 17:14
Copy link
Member Author

@nvanbenschoten nvanbenschoten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing plan

Very basic with diff and without diff test
Add it to nemesis test somehow
Whatever registration/deregistration edge cases you are most worried about

Done.
Done.
Done.

The only remaining things missing here are addressing some review feedback and getting this working with schema changes. I'm going to sit on those items until #41842 lands so I'm not stepping on @aayushshah15's toes.

@benesch this should be ready for use with TPC-CH.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @benesch, @danhhz, and @nvanbenschoten)


pkg/ccl/changefeedccl/avro.go, line 521 at r4 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

do we need a similar line for before?

It's a little unclear to me what we should do here in the cases where the before and after fields have a) the same schema b) different schemas. In case a), we could either define two schemas with different names or we could define the schema in one place and reference it in both. In case b) we will need to define two schemas with different names. Do you have a preference here? Or maybe @benesch has insight into the right approach?


pkg/ccl/changefeedccl/avro.go, line 578 at r4 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

seems weird that this is still named r.after. if they're going to be shared, we should probably rename it

Agreed, I'll rename.


pkg/ccl/changefeedccl/encoder.go, line 49 at r4 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

i think so, right? what if a row is deleted then re-inserted. we'd see the tombstone when we looked up the previous value. sounds like a good test case

This ties into the question we're discussing below.


pkg/ccl/changefeedccl/encoder.go, line 107 at r4 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

note to self: make sure there's a test for this

Done.


pkg/roachpb/api.proto, line 1847 at r4 (raw file):

a) is the consumer responsible for knowing whether the with_diff option was set on the request

Yeah, I don't have a strong feeling about this either. I'd lean towards no.

b) does the consumer need to know the difference between the previous key is a deletion vs it never existed

I'd also say no on this one. I don't think prevValue should ever be a deletion tombstone.

So that points me towards 3: non-nullable and only populated if prev val exists and with_diff=true specified.

Copy link
Contributor

@danhhz danhhz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @benesch, @danhhz, and @nvanbenschoten)


pkg/ccl/changefeedccl/avro.go, line 521 at r4 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

It's a little unclear to me what we should do here in the cases where the before and after fields have a) the same schema b) different schemas. In case a), we could either define two schemas with different names or we could define the schema in one place and reference it in both. In case b) we will need to define two schemas with different names. Do you have a preference here? Or maybe @benesch has insight into the right approach?

I also am having trouble developing a strong opinion here. @benesch if you have one it would be welcomed

Copy link
Contributor

@benesch benesch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @danhhz and @nvanbenschoten)


pkg/ccl/changefeedccl/avro.go, line 521 at r4 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

I also am having trouble developing a strong opinion here. @benesch if you have one it would be welcomed

Forgive me if I'm being dense, but... how could before and after have different schemas?

Copy link
Contributor

@danhhz danhhz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @benesch, @danhhz, and @nvanbenschoten)


pkg/ccl/changefeedccl/avro.go, line 521 at r4 (raw file):

Previously, benesch (Nikhil Benesch) wrote…

Forgive me if I'm being dense, but... how could before and after have different schemas?

If the timestamp of the row change exactly equals the timestamp of a schema change, then the before will have the previous schema. Perhaps surprisingly, this is not a rare edge case. It happens for every schema change because we do the changefeed-level backfill at exactly the time the column shows up/is removed from the SQL perspective.

(Hmm, I think materialize might want to skip the changefeed-level backfill entirely, which is an option I've always assumed we'd have at some point but we've never implemented. Anyway, you seem to have more experience with avro schema registry than me at this point, so I'd still like your take)

Copy link
Contributor

@benesch benesch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @danhhz and @nvanbenschoten)


pkg/ccl/changefeedccl/avro.go, line 521 at r4 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

If the timestamp of the row change exactly equals the timestamp of a schema change, then the before will have the previous schema. Perhaps surprisingly, this is not a rare edge case. It happens for every schema change because we do the changefeed-level backfill at exactly the time the column shows up/is removed from the SQL perspective.

(Hmm, I think materialize might want to skip the changefeed-level backfill entirely, which is an option I've always assumed we'd have at some point but we've never implemented. Anyway, you seem to have more experience with avro schema registry than me at this point, so I'd still like your take)

🤯

I guess that makes sense! Let me see what Debezium does, if anything. The Debezium docs claim that the before schema is always equal to the after schema, and it would stink not to have that invariant. But I'm also not seeing another sensible way to represent a change event that spans a schema change, so perhaps the Debezium docs are just wrong in the face of schema changes.

Copy link
Contributor

@ajwerner ajwerner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @benesch, @danhhz, and @nvanbenschoten)


pkg/ccl/changefeedccl/avro.go, line 521 at r4 (raw file):

an option I've always assumed we'd have at some point but we've never implemented

#31213 for posterity

@nvanbenschoten nvanbenschoten changed the title [DNM] ccl/changefeedccl: implement WITH diff option ccl/changefeedccl: implement WITH diff option Nov 6, 2019
@nvanbenschoten nvanbenschoten force-pushed the nvanbenschoten/cdcDiff branch 2 times, most recently from e547934 to 8830479 Compare November 6, 2019 16:46
@nvanbenschoten
Copy link
Member Author

Rebased on top of #41842. As expected TestChangefeedNemeses now fails due to schema change events. If I disable those events, it passes again. This lack of proper support for schema changes is also picked up by the cdc/schemareg roachtest.

Getting schema changes working here is the last remaining item before this is ready to go. For that, we'll need to answer how a schema change should influence the schema of the before field in Avro records.

@lopezator
Copy link
Contributor

Is there any advance on this? We are interested on this feature. Thanks!

@nvanbenschoten
Copy link
Member Author

Hi @lopezator! I'm thrilled to hear that you're interested in the feature. This will, in all likelihood, be making it into the next release of Cockroach. We just released v19.2 last week, so we're probably about 6 months out from that, but the plan is to begin publishing v20.1 alphas shortly. If you're willing to test on alpha releases (or even on master), we can probably get it into your hands fairly soon. We'd love to hear your feedback on it once you're able to test it out.

@danhhz This has been updated to support schema changes correctly, as we discussed. When a schema change triggers a backfill, it produces a record with the same KV pair interpreted with the old and new schema as the "before" and "after" fields, respectively. This is exactly the contract that SQL enforces when performing AOST queries at and immediately before the schema change timestamp, as is now tested by the beforeAfterValidator. I think this is ready for a final review.

@nvanbenschoten nvanbenschoten force-pushed the nvanbenschoten/cdcDiff branch 2 times, most recently from af9523d to fa5de4b Compare November 16, 2019 01:06
@lopezator
Copy link
Contributor

lopezator commented Nov 16, 2019

@nvanbenschoten that are good news!

We are cockroach cloud users, but possibly wouldn't mind to update the cluster to an alpha or even a master checkout to bring this feature in.

By the way, thanks a lot for your great work.

CC/ @glerchundi

Copy link
Contributor

@danhhz danhhz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I think this is getting really close. Everything here is a nit except two larger things

  1. The logic to switch between whether it's the initial backfill (no previous values) or a schema change backfill (yes previous values) seems off. If so, this is a hole in the testing. I think the nemesis test is not catching it because BeforeAfterValidator is not strictly asserting that the first time it sees a key, that before should always be unset.
  2. I'm confused about our treatment of the previous value's timestamp and which schema we use to print it. If this is correct, it's not clear to me why and we should put a mini-overview somewhere. Even if it's correct now, it seems to me that it would be much more straightforward if the previous value's timestamp was threaded all the way from rangefeed, but perhaps I'm wrong here? Curious what your take is on this

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @benesch, @danhhz, and @nvanbenschoten)


pkg/ccl/changefeedccl/avro.go, line 387 at r8 (raw file):

	name := SQLNameToAvroName(tableDesc.Name)
	if nameSuffix != "" {
		name = fmt.Sprintf("%s_%s", name, nameSuffix)

nit: isn't it faster (and clearer) to + the strings here?


pkg/ccl/changefeedccl/avro.go, line 606 at r8 (raw file):

		}
	}
	// WIP verify that meta is now empty

would you mind doing this WIP while you're in here? should just be something likeif k := range meta { return nil, errors.AssertionErrorf(`unhandled meta key: %s`, k) }


pkg/ccl/changefeedccl/avro_test.go, line 119 at r8 (raw file):

		tableDesc.Columns = append(tableDesc.Columns, *colDesc)
	}
	return tableToAvroSchema(tableDesc, "")

thoughts on a named constant for this? avroSchemaNoSuffix?


pkg/ccl/changefeedccl/buffer.go, line 30 at r8 (raw file):

type bufferEntry struct {
	kv       roachpb.KeyValue
	prevVal  *roachpb.Value

comment explaining when this is set/not set pls


pkg/ccl/changefeedccl/buffer.go, line 37 at r8 (raw file):

	// Timestamp of the schema that should be used to read the previous
	// version of this KV.
	// If unset (zero-valued), the previous value will be interpretted with

instead of zero value having a special meaning, i'd find it more straightforward if we fill it in whenever prevVal is set

edit: actually i have larger questions about previous value timestamps and schemas


pkg/ccl/changefeedccl/buffer.go, line 111 at r8 (raw file):

	*types.Bytes, // span.Key
	*types.Bytes, // span.EndKey
	*types.Int,   // ts.WallTime

I'm confused about why we don't need to pass the prevVal's timestamp through here


pkg/ccl/changefeedccl/changefeed.go, line 112 at r4 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

this block is just asserting that we don't get a second row? let's throw in a comment saying that

ping : - )


pkg/ccl/changefeedccl/changefeed.go, line 125 at r8 (raw file):

			}
			if nextRow.row.datums != nil {
				return nil, errors.AssertionFailedf("unexpected non-empty datums")

is there an easy way to print the datums in this message?


pkg/ccl/changefeedccl/changefeed.go, line 133 at r8 (raw file):

			prevRF := rf
			if prevSchemaTimestamp != schemaTimestamp {
				// If the previous value is being interpretted under a different

nit: interpreted


pkg/ccl/changefeedccl/changefeed_test.go, line 1935 at r8 (raw file):

func TestChangefeedRestartDuringBackfill(t *testing.T) {
	defer leaktest.AfterTest(t)()
	// t.Skip("TODO(WIP): doesn't work yet")

remove


pkg/ccl/changefeedccl/changefeed_test.go, line 2027 at r8 (raw file):

			// TODO(dan): Track duplicates more precisely in sinklessFeed/tableFeed.
			// `foo: [0]->{"after": {"a": 0, "b": "backfill"}}`,
			// TODO DURING REVIEW: is this correct? Should the backfill on a RESUME JOB

looks correct to me. it was paused during a schema change backfill and this test is verifying that it restarts the schema change backfill when unpaused


pkg/ccl/changefeedccl/encoder.go, line 435 at r4 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

nit: comment for second nil

ping : - )


pkg/ccl/changefeedccl/encoder.go, line 52 at r8 (raw file):

	// prevDatums is the old value of a changed table row.
	prevDatums sqlbase.EncDatumRow
	// TODO DURING REVIEW: Do we need this?

yes, this would happen if we inserted a previously deleted value. we should add a test for this if there isn't one already


pkg/ccl/changefeedccl/encoder.go, line 374 at r8 (raw file):

		}

		afterDataSchema, err := tableToAvroSchema(row.tableDesc, ``)

my suggestion offline was for this suffix to be after. however, i like that this is the same name it had previously, so i'm okay with this. i wish there was better precedent here. i wonder if @rolandcrosby knows any


pkg/ccl/changefeedccl/encoder_test.go, line 131 at r8 (raw file):

			resolved: `{"resolved":{"string":"1.0000000002"}}`,
		},
		`format=experimental_avro,envelope=key_only,diff`: {

this should error with the same thing as format=json,envelope=row,diff right?


pkg/ccl/changefeedccl/poller.go, line 191 at r8 (raw file):

	// Perform a full scan if necessary - either an initial scan or a backfill
	// Full scans are still performed using an Export operation.
	initialScan := i == 0

is this logic correct when resuming? i'd think it should be based on the frontier


pkg/storage/rangefeed/registry.go, line 121 at r8 (raw file):

	case *roachpb.RangeFeedValue:
		if t.Key == nil {
			panic(fmt.Sprintf("unexpected empty RangeFeedValue.Key: %v", t))

I wonder if we do anything special if this panic gets an assertion error instead of a string

Copy link
Member Author

@nvanbenschoten nvanbenschoten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TFTR!

I think the nemesis test is not catching it because BeforeAfterValidator is not strictly asserting that the first time it sees a key, that before should always be unset.

See my comment below. The one that starts with "I'm not really sure".

it's not clear to me why and we should put a mini-overview somewhere

Completely agree. Does some overview exist that I can add to? I hope I missed it because it was pretty tough to track down the flow of things through the tableHistory's validate function through the poller's scanBoundaries, which eventually triggered the backfill (do I have that right?).

it seems to me that it would be much more straightforward if the previous value's timestamp was threaded all the way from rangefeed, but perhaps I'm wrong here

Well, the previous value's timestamp never comes from Rangefeed, and it never needs to. The previous value is logically set right up to the new value's timestamp. I guess we could always treat the previous value's timestamp as curVal.Timestamp.Prev() and always look up the schema at that timestamp. Then we wouldn't need any special logic when doing a backfill. That would lead to wasted work in the overwhelmingly common case where we know the schemas to be identical though. What do you think?

EDIT: actually, it seems like we're in the exact same boat with this schemaTimestamp. It's only set on backfills. In fact, it doesn't even need to be passed through memBuffer (want me to add a commit?). Can we unify this with prevSchemaTimestamp such that if schemaTimestamp is provided we treat prevVal.Timestamp as schemaTimestamp.Prev and if not we treat prevVal.Timestamp as curVal.Timestamp.Prev?

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @danhhz and @rolandcrosby)


pkg/ccl/changefeedccl/avro.go, line 387 at r8 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

nit: isn't it faster (and clearer) to + the strings here?

Done.


pkg/ccl/changefeedccl/avro.go, line 606 at r8 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

would you mind doing this WIP while you're in here? should just be something likeif k := range meta { return nil, errors.AssertionErrorf(`unhandled meta key: %s`, k) }

Done.


pkg/ccl/changefeedccl/avro_test.go, line 119 at r8 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

thoughts on a named constant for this? avroSchemaNoSuffix?

Done.


pkg/ccl/changefeedccl/buffer.go, line 30 at r8 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

comment explaining when this is set/not set pls

Done.


pkg/ccl/changefeedccl/buffer.go, line 37 at r8 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

instead of zero value having a special meaning, i'd find it more straightforward if we fill it in whenever prevVal is set

edit: actually i have larger questions about previous value timestamps and schemas

See below. In almost all cases, the two timestamps are the same. It's only in the schema change case where the timestamps diverge.


pkg/ccl/changefeedccl/buffer.go, line 111 at r8 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

I'm confused about why we don't need to pass the prevVal's timestamp through here

Because the only time that we need to provide it is when we're performing a schema change backfill, in which case we skip the memBuffer. On the normal path, the prevVal timestamp isn't provided.


pkg/ccl/changefeedccl/changefeed.go, line 112 at r4 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

ping : - )

Done.


pkg/ccl/changefeedccl/changefeed.go, line 125 at r8 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

is there an easy way to print the datums in this message?

Not really. I had some debug information around here for a different reason and we had to use the schema to decode the datums. I think it's more effort than it's worth for this assertion. It really should never fire because we gave the row fetcher a single KV.


pkg/ccl/changefeedccl/changefeed.go, line 133 at r8 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

nit: interpreted

Done.


pkg/ccl/changefeedccl/changefeed_test.go, line 1935 at r8 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

remove

Woops. Done.


pkg/ccl/changefeedccl/changefeed_test.go, line 2027 at r8 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

looks correct to me. it was paused during a schema change backfill and this test is verifying that it restarts the schema change backfill when unpaused

👍 thanks for the explanation. Removed.


pkg/ccl/changefeedccl/encoder.go, line 435 at r4 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

ping : - )

Done.


pkg/ccl/changefeedccl/encoder.go, line 52 at r8 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

yes, this would happen if we inserted a previously deleted value. we should add a test for this if there isn't one already

Done.


pkg/ccl/changefeedccl/encoder.go, line 374 at r8 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

my suggestion offline was for this suffix to be after. however, i like that this is the same name it had previously, so i'm okay with this. i wish there was better precedent here. i wonder if @rolandcrosby knows any

Yeah, I had after and then removed it because it broke tests and generally pointed at backwards incompatibility. I ended up liking this approach more.


pkg/ccl/changefeedccl/encoder_test.go, line 131 at r8 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

this should error with the same thing as format=json,envelope=row,diff right?

Yes, good catch. Done.

Do we need the same error for format=experimental_avro,envelope=key_only,updated?


pkg/ccl/changefeedccl/poller.go, line 524 at r4 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

oh hmmmm this is a good question. what should be the value for the backfill we run after schema changes? leaving it unset is a little strange

Done.


pkg/ccl/changefeedccl/poller.go, line 191 at r8 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

is this logic correct when resuming? i'd think it should be based on the frontier

I'm not really sure. I'm going to need some help understanding the different cases at play here. Is it correct that we perform backfills in these three cases:

  1. upon job creation
  2. upon job resumption
  3. on schema changes

If so then which of these do we expect to include before fields? Definitely the third, but also the second? What does before mean in that case? I think these semantics need a bit more refinement.


pkg/roachpb/api.proto, line 1847 at r4 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

a) is the consumer responsible for knowing whether the with_diff option was set on the request

Yeah, I don't have a strong feeling about this either. I'd lean towards no.

b) does the consumer need to know the difference between the previous key is a deletion vs it never existed

I'd also say no on this one. I don't think prevValue should ever be a deletion tombstone.

So that points me towards 3: non-nullable and only populated if prev val exists and with_diff=true specified.

Switched to approach 3. It turned out clean.


pkg/storage/rangefeed/registry.go, line 121 at r8 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

I wonder if we do anything special if this panic gets an assertion error instead of a string

I don't know of anything. Are you aware of special handling?

@ajwerner
Copy link
Contributor

@aayushshah15 do you want to take a look at this? Especially as it pertains to #42053.

@aayushshah15
Copy link
Contributor

I've been passively following the PR, don't think my change should clash with this. Though I think I could wait until this is in, rebase and stress my change for a little bit.

Copy link
Contributor

@danhhz danhhz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking out loud here, there are a few timestamps involved. Listing them out explicitly (some of these end up being the same):

  • tUpdated: the timestamp in the updates field of an emitted message
  • tAfterKV: the timestamp of the kv being emitted as a row
  • tAfterSchema: the timestamp used to select a tabledesc for interpreting the after
  • tBeforeKV: the timestamp of the previous value
  • tBeforeSchema: the timestamp used to select a tabledesc for interpreting the before
  • tBackfill: the timestamp a backfill is being run at

For the initial backfill when a changefeed starts up:

  • tAfterKV and tAfterSchema are set to tBackfill
  • tBeforeKV, tBeforeSchema are non-applicable, there are no before values in the initial backfill

During normal operation:

  • tAfterSchema and tUpdated is set to tAfterKV
  • tBeforeSchema is set to tAfterKV (see below for me re-convincing myself of this)

During a schema change backfill:

  • tAfterKV and tAfterSchema are set to tBackfill
  • tUpdated is set to tBackfill
  • tBeforeSchema is set to tBackfill.Prev()

You are correct that nothing depends on tBeforeKV. My remaining question is where all the "is set to" logic lives. Right now it seems haphazard, especially because of the name "schemaTimestamp". I propose that bufferEntry have two timestamps: one in the kv and schemaTimestamp is renamed to backfillTimestamp. Then the "is set to" logic is entirely moved to the block inkvsToRows where it mostly already is: https://github.com/cockroachdb/cockroach/blob/5765fd2e2c1c84b6abadb336224311842388e944/pkg/ccl/changefeedccl/changefeed.go#L196-L203

It's still possible to remove the second timestamp (schemaTimestamp/backfillTimestamp) from memBuffer.

Thoughts?


Justification for "tBeforeSchema is set to tAfterKV" during normal operation

It's possible you and I already hashed the following out and I forgot, but I had to re-convince myself this morning of what timestamp we should be using when grabbing a schema for the before. Given the following situation:

CREATE TABLE foo (a INT PRIMARY KEY)
CREATE CHANGEFEED FOR TABLE foo ... WITH diff
INSERT INTO foo VALUES (1)
ALTER TABLE foo ADD COLUMN b STRING DEFAULT 'default' (gets a backfill)
ALTER TABLE foo ADD COLUMN c STRING (does not get a backfill)
UPDATE foo SET c='update' WHERE a = 1

Initially, I see two ways this could work.

  1. For each before, use the schema that was active at the time the before was written. In this world, each before for a row matches the non-backfill after that precedes it. So

    before=null after=(1)
    before=(1) after=(1, 'default') (the backfill)
    before=(1) after=(1, 'default', 'update') (matches the previous non-backfill row)
    

    This is mostly interesting because everything lines up when we add the option to skip backfills.

    before=null after=(1)
    before=(1) after=(1, 'default', 'update')
    
  2. For each before, use the schema that's active at the time the after was written. This gets us the same answer as if we'd done an AS OF SYSTEM TIME at the time immediately before the update timestamp.

    before=null after=(1)
    before=(1) after=(1, 'default') (the backfill)
    before=(1, 'default', null) after=(1, 'default', 'update')
    

When I made that last round of comments, I was thinking of (1) but what you have is (2) and I think that's right, so disregard what I was saying about this in the last round of comments.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @danhhz, @nvanbenschoten, and @rolandcrosby)


pkg/ccl/changefeedccl/changefeed.go, line 125 at r8 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Not really. I had some debug information around here for a different reason and we had to use the schema to decode the datums. I think it's more effort than it's worth for this assertion. It really should never fire because we gave the row fetcher a single KV.

gotcha


pkg/ccl/changefeedccl/encoder_test.go, line 131 at r8 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Yes, good catch. Done.

Do we need the same error for format=experimental_avro,envelope=key_only,updated?

Oh, good catch, yes. Though in contrast to diff, which only works with env=wrapped, updated will work with either env=wrapped or env=row. This is technically a backward incompatible change, so it should be it's own commit with release note. I've you don't feel like picking that off, i'm fine with us filing it as an issue for followup.


pkg/ccl/changefeedccl/poller.go, line 191 at r8 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

I'm not really sure. I'm going to need some help understanding the different cases at play here. Is it correct that we perform backfills in these three cases:

  1. upon job creation
  2. upon job resumption
  3. on schema changes

If so then which of these do we expect to include before fields? Definitely the third, but also the second? What does before mean in that case? I think these semantics need a bit more refinement.

We don't backfill on (2) unless it was paused during a type (1) or (3) backfill, in which case it has to resume the backfill it had started but not finished. Maybe that's where your confusion is coming from?

This combined with your question above about where we'd put a comment with details on the timestamp flow makes me think we're long overdue for a changefeed mega-comment. I'll try to circle back and do that at some point.


pkg/storage/rangefeed/registry.go, line 121 at r8 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

I don't know of anything. Are you aware of special handling?

I think we at least get the type information in sentry. The string makes it look like an assertion, so perhaps the string is fine. I guess I lean slightly toward panic'ing an AssertionErrorf here, but I don't feel strongly

Copy link
Member Author

@nvanbenschoten nvanbenschoten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I made that last round of comments, I was thinking of (1) but what you have is (2) and I think that's right, so disregard what I was saying about this in the last round of comments.

Yes, we have (2) in this PR, and AS OF SYSTEM TIME is exactly what the beforeAfterValidator checks.

It's still possible to remove the second timestamp (schemaTimestamp/backfillTimestamp) from memBuffer.
Thoughts?

I like that all. I added a new commit that reworks the handling of these timestamps and I think it came out pretty clean.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @danhhz)


pkg/ccl/changefeedccl/encoder_test.go, line 131 at r8 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

Oh, good catch, yes. Though in contrast to diff, which only works with env=wrapped, updated will work with either env=wrapped or env=row. This is technically a backward incompatible change, so it should be it's own commit with release note. I've you don't feel like picking that off, i'm fine with us filing it as an issue for followup.

Done in a new commit.


pkg/ccl/changefeedccl/poller.go, line 191 at r8 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

We don't backfill on (2) unless it was paused during a type (1) or (3) backfill, in which case it has to resume the backfill it had started but not finished. Maybe that's where your confusion is coming from?

This combined with your question above about where we'd put a comment with details on the timestamp flow makes me think we're long overdue for a changefeed mega-comment. I'll try to circle back and do that at some point.

👍


pkg/storage/rangefeed/registry.go, line 121 at r8 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

I think we at least get the type information in sentry. The string makes it look like an assertion, so perhaps the string is fine. I guess I lean slightly toward panic'ing an AssertionErrorf here, but I don't feel strongly

I'm going to leave this because there's not a strong reason to change it and this PR is large enough.

Copy link
Contributor

@danhhz danhhz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm_strong: 👏

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @danhhz)


pkg/ccl/changefeedccl/encoder.go, line 52 at r8 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Done.

Sorry if I missed it, but is there a test for this? I think adding an INSERT after the DELETE in TestChangefeedDiff would be sufficient

This was causing tests to stall because a transaction's MinTimestamp
could end up being above its commit timestamp.

Release note: None
Fixes cockroachdb#28666.
First commit from cockroachdb#41788.

This commit adds a `WITH diff` option to CREATE CHANGEFEED. When the
option is provided, changefeeds publications will include a `before`
field, which includes the value of the row before the CDC update was
applied.

We are able to implement this efficiently by pushing the option into
Rangefeed and performing the scan of the previous value immediately
before applying the update in the Raft processing goroutine. cockroachdb#41788
allows us to avoid performing this lookup when `WITH diff` isn't
specified for Rangefeeds, so the small (unmeasured) perf hit is
strictly opt-in.

Release note (sql change): CHANGEFEED now supports a `WITH diff` option,
which instructs it to include a `before` field in each publication.
This commit re-orders the avro encoding of CDC messages to place fields
in the order: [`before`, `after`, `updated`, `resolved`].

Rumor has it that Materialize has a fast-path if these fields are
ordered like this.

Release note (sql change): The fields in the Avro format for CHANGEFEED
records have been re-ordered to allow for optimized parsing. This is a
backwards compatable change.
We now verify that all fields in the avroMetadata are encoded.
This change cleans up the handling of bufferEntry.backfillTimestamp
and removes the schemaTimestamp from memBuffer entirely.
…nvelope

Before this change, we would accept the `WITH updated` option when using Avro's
`WITH envelope=key_only` option but would never actually output any updated
fields. This commit properly rejects the config, which doesn't make any sense.

Release note (backwards-incompatible change): The combination of
CHANGEFEED's format=experimental_avro, envelope=key_only, and updated
is now rejected. This is because the use of key_only prevents any
rows with updated fields from being emitted, so the updated option
is meaningless.
Copy link
Member Author

@nvanbenschoten nvanbenschoten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I stressed TestChangefeedNemeses for 30 minutes after rebasing on top of #42053. Everything looks good. TFTR!

bors r+

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @danhhz)


pkg/ccl/changefeedccl/encoder.go, line 52 at r8 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

Sorry if I missed it, but is there a test for this? I think adding an INSERT after the DELETE in TestChangefeedDiff would be sufficient

Yes, without this a number of tests fail. Adding an INSERT after the DELETE in TestChangefeedDiff is a good idea though. Done.

craig bot pushed a commit that referenced this pull request Nov 21, 2019
41793: ccl/changefeedccl: implement `WITH diff` option r=nvanbenschoten a=nvanbenschoten

Fixes #28666.

This commit adds a `WITH diff` option to CREATE CHANGEFEED. When the option is provided, changefeeds publications will include a `before` field, which includes the value of the row before the CDC update was applied.

We are able to implement this efficiently by pushing the option into Rangefeed and performing the scan of the previous value immediately before applying the update in the Raft processing goroutine. #41788 allows us to avoid performing this lookup when `WITH diff` isn't specified for Rangefeeds, so the small (unmeasured) perf hit is strictly opt-in.

Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
@craig
Copy link
Contributor

craig bot commented Nov 21, 2019

Build succeeded

@craig craig bot merged commit d44467b into cockroachdb:master Nov 21, 2019
nvanbenschoten added a commit to nvanbenschoten/cockroach that referenced this pull request Nov 25, 2019
nvanbenschoten added a commit to nvanbenschoten/cockroach that referenced this pull request Nov 25, 2019
craig bot pushed a commit that referenced this pull request Nov 25, 2019
42650: sql: stop observing the CommitTimestamp in TRUNCATE r=ajwerner a=ajwerner

In #40581 we stopped observing the commit timestamp to write it into table
descriptors. In this change I overlooked (rather forgot) about this additional
place in the code where we observed the commit timestamp. As far as I can tell
we don't read this field anywhere ever. Furthermore we know that the the table
descriptor in question to which we are referring must be alive and equal to
the provided value at the timestamp at which it was read due to serializability.
In short, this minor change continues to populate the field with a sensible
value and will permit TRUNCATE to be pushed.

Fixes #41566.

Release note (bug fix): Long running transactions which attempt to TRUNCATE
can now be pushed and will commit in cases where they previously could fail
or retry forever.

42746: roachtest/cdc: fix cdc/bank and cdc/schemareg r=nvanbenschoten a=nvanbenschoten

Fixes #41177.
Fixes #42690.

These were both broken by #41793 because prior versions of crdb didn't support the `WITH diff` option.

Co-authored-by: Andrew Werner <ajwerner@cockroachlabs.com>
Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
@nvanbenschoten nvanbenschoten deleted the nvanbenschoten/cdcDiff branch December 27, 2019 22:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

changefeedccl: implement WITH diff option
7 participants