Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-13581 Change Data Capture implementation #9054

Merged
merged 44 commits into from
Jun 30, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
32ea9a9
IGNITE-13582 WAL force rollover timeout introduced (#8902)
nizhikov Mar 23, 2021
d028cef
Merge branch 'master' into ignite-cdc
nizhikov Mar 23, 2021
db8bb81
IGNITE-13596 Flag to distinguish DataRecord on primary and backup add…
nizhikov Mar 25, 2021
e579b1e
IGNITE-14360 Refactor FileLockHolder for reusage (#8905)
nizhikov Mar 26, 2021
398a6db
Merge branch 'master' into ignite-cdc
nizhikov Mar 26, 2021
f590a18
IGNITE-14353 Ability to specify postfix for IgniteLogger instead of n…
nizhikov Mar 31, 2021
38e270e
Merge branch 'master' into ignite-cdc
nizhikov Mar 31, 2021
ab7bf2b
IGNITE-14435 Refactor PdsConsistentIdProcessor for reusage (#8946)
nizhikov Mar 31, 2021
b8723fb
Merge branch 'ignite-cdc' of https://github.com/apache/ignite into ig…
nizhikov Mar 31, 2021
7018fef
Merge branch 'master' into ignite-cdc
nizhikov Apr 1, 2021
d318245
IGNITE-13581 compilation fix.
nizhikov Apr 1, 2021
00631f1
Merge branch 'master' into ignite-cdc
nizhikov Apr 13, 2021
c430c51
Merge branch 'master' into ignite-cdc
nizhikov Apr 28, 2021
7f10e44
Merge branch 'master' into ignite-cdc
nizhikov Apr 28, 2021
2a3cdad
Merge branch 'master' into ignite-cdc
nizhikov May 10, 2021
dec2e64
IGNITE-13596 Calculation of primary flag based on tx flags (#9092)
nizhikov May 14, 2021
92e6ff1
ignite-cdc Code review fixes.
nizhikov May 14, 2021
197f3e2
ignite-cdc Code review fixes.
nizhikov May 14, 2021
f364a88
ignite-cdc Code review fixes.
nizhikov May 15, 2021
a9d09f4
ignite-cdc Code review fixes.
nizhikov May 17, 2021
16bea82
Revert "IGNITE-14353 Ability to specify postfix for IgniteLogger inst…
nizhikov May 17, 2021
d181fbb
Merge branch 'master' into ignite-cdc
nizhikov May 20, 2021
1b2de82
ignite-cdc revert Logger changes
nizhikov May 20, 2021
4204deb
IGNITE-14353 Ability to specify application name for IgniteLogger (#9…
nizhikov May 20, 2021
4f1c0d9
Merge branch 'master' into ignite-cdc
nizhikov May 25, 2021
4b369f7
Merge branch 'master' into ignite-cdc
nizhikov May 26, 2021
d8fa305
Merge branch 'master' into ignite-cdc
nizhikov Jun 1, 2021
896f013
Merge branch 'master' into ignite-cdc
nizhikov Jun 6, 2021
7acff01
IGNITE-13581 Capture Data Change implementation (#8909)
nizhikov Jun 7, 2021
a2236a6
IGNITE-13581: Minor fix.
nizhikov Jun 7, 2021
975595c
IGNITE-13581: Minor fix.
nizhikov Jun 8, 2021
53d207c
IGNITE-13581: Minor fix.
nizhikov Jun 10, 2021
8f2b78b
IGNITE-13581: Minor fix.
nizhikov Jun 15, 2021
8f568e5
IGNITE-13581: Minor fix.
nizhikov Jun 18, 2021
a2b3a33
Merge branch 'master' into ignite-cdc
nizhikov Jun 22, 2021
374f31e
ChangeDataCapture -> Cdc (#9190)
nizhikov Jun 23, 2021
2ad5e39
ChangeDataCapture -> Cdc
nizhikov Jun 23, 2021
fe923c5
Improve logging
nizhikov Jun 25, 2021
ce2a1bd
Improve logging
nizhikov Jun 25, 2021
f7176d0
Improve logging
nizhikov Jun 28, 2021
a61013c
Improve logging
nizhikov Jun 29, 2021
ad9cf6f
Merge branch 'master' into ignite-cdc
nizhikov Jun 29, 2021
6536248
fix javadoc
nizhikov Jun 30, 2021
cead6e6
NPE fix
nizhikov Jun 30, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
IGNITE-13596 Flag to distinguish DataRecord on primary and backup add…
…ed (#8904)
  • Loading branch information
nizhikov authored Mar 25, 2021
commit db8bb814c5c5a4876b21de7a22126ed74d6ffddf
Original file line number Diff line number Diff line change
Expand Up @@ -1523,7 +1523,8 @@ private void corruptDataEntry(
new GridCacheVersion(),
0L,
partId,
updateCntr
updateCntr,
false
);

GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)ctx.shared().database();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2630,7 +2630,8 @@ private void corruptDataEntry(
new GridCacheVersion(),
0L,
partId,
updateCntr
updateCntr,
false
);

GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)ctx.shared().database();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ public class DataEntry {
@GridToStringInclude
protected long partCnt;

/** If {@code true} then change made on primary node. */
@GridToStringInclude
protected boolean primary;

/** Constructor. */
private DataEntry() {
// No-op, used from factory methods.
Expand All @@ -76,6 +80,7 @@ private DataEntry() {
* @param expireTime Expire time.
* @param partId Partition ID.
* @param partCnt Partition counter.
* @param primary {@code True} if node is primary for partition in the moment of logging.
*/
public DataEntry(
int cacheId,
Expand All @@ -86,7 +91,8 @@ public DataEntry(
GridCacheVersion writeVer,
long expireTime,
int partId,
long partCnt
long partCnt,
boolean primary
) {
this.cacheId = cacheId;
this.key = key;
Expand All @@ -97,6 +103,7 @@ public DataEntry(
this.expireTime = expireTime;
this.partId = partId;
this.partCnt = partCnt;
this.primary = primary;

// Only READ, CREATE, UPDATE and DELETE operations should be stored in WAL.
assert op == GridCacheOperation.READ || op == GridCacheOperation.CREATE || op == GridCacheOperation.UPDATE || op == GridCacheOperation.DELETE : op;
Expand Down Expand Up @@ -177,6 +184,13 @@ public long expireTime() {
return expireTime;
}

/**
* @return {@code True} if node is primary for partition in the moment of logging.
*/
public boolean primary() {
return primary;
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DataEntry.class, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class DataRecord extends TimeStampRecord {

/** {@inheritDoc} */
@Override public RecordType type() {
return RecordType.DATA_RECORD;
return RecordType.DATA_RECORD_V2;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class LazyDataEntry extends DataEntry implements MarshalledDataEntry {
* @param expireTime Expire time.
* @param partId Partition ID.
* @param partCnt Partition counter.
* @param primary {@code True} if node is primary for partition in the moment of logging.
*/
public LazyDataEntry(
GridCacheSharedContext cctx,
Expand All @@ -73,9 +74,10 @@ public LazyDataEntry(
GridCacheVersion writeVer,
long expireTime,
int partId,
long partCnt
long partCnt,
boolean primary
) {
super(cacheId, null, null, op, nearXidVer, writeVer, expireTime, partId, partCnt);
super(cacheId, null, null, op, nearXidVer, writeVer, expireTime, partId, partCnt, primary);

this.cctx = cctx;
this.keyType = keyType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public MvccDataEntry(
long partCnt,
MvccVersion mvccVer
) {
super(cacheId, key, val, op, nearXidVer, writeVer, expireTime, partId, partCnt);
super(cacheId, key, val, op, nearXidVer, writeVer, expireTime, partId, partCnt, false);

this.mvccVer = mvccVer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class UnwrapDataEntry extends DataEntry implements UnwrappedDataEntry {
* @param partCnt Partition counter.
* @param cacheObjValCtx cache object value context for unwrapping objects.
* @param keepBinary disable unwrapping for non primitive objects, Binary Objects would be returned instead.
* @param primary {@code True} if node is primary for partition in the moment of logging.
*/
public UnwrapDataEntry(
final int cacheId,
Expand All @@ -60,8 +61,9 @@ public UnwrapDataEntry(
final int partId,
final long partCnt,
final CacheObjectValueContext cacheObjValCtx,
final boolean keepBinary) {
super(cacheId, key, val, op, nearXidVer, writeVer, expireTime, partId, partCnt);
final boolean keepBinary,
final boolean primary) {
super(cacheId, key, val, op, nearXidVer, writeVer, expireTime, partId, partCnt, primary);
this.cacheObjValCtx = cacheObjValCtx;
this.keepBinary = keepBinary;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public enum RecordType {
/** */
PAGE_RECORD(1, PHYSICAL),

/** */
/** @deprecated Use {@link #DATA_RECORD_V2} instead. */
@Deprecated
DATA_RECORD(2, LOGICAL),

/** Checkpoint (begin) record */
Expand Down Expand Up @@ -206,7 +207,11 @@ public enum RecordType {
/** Encrypted WAL-record. */
ENCRYPTED_RECORD(52, PHYSICAL),

/** Ecnrypted data record. */
/**
* Ecnrypted data record.
* @deprecated Use {@link #ENCRYPTED_DATA_RECORD_V3} instead.
*/
@Deprecated
ENCRYPTED_DATA_RECORD(53, LOGICAL),

/** Mvcc data record. */
Expand Down Expand Up @@ -239,7 +244,11 @@ public enum RecordType {
/** Encrypted WAL-record. */
ENCRYPTED_RECORD_V2(63, PHYSICAL),

/** Ecnrypted data record. */
/**
* Ecnrypted data record.
* @deprecated Use {@link #ENCRYPTED_DATA_RECORD_V3} instead.
*/
@Deprecated
ENCRYPTED_DATA_RECORD_V2(64, LOGICAL),

/** Master key change record containing multiple keys for single cache group. */
Expand All @@ -252,7 +261,13 @@ public enum RecordType {
PARTITION_META_PAGE_DELTA_RECORD_V3(67, PHYSICAL),

/** Index meta page delta record includes encryption status data. */
INDEX_META_PAGE_DELTA_RECORD(68, PHYSICAL);
INDEX_META_PAGE_DELTA_RECORD(68, PHYSICAL),

/** Data record V2. */
DATA_RECORD_V2(69, LOGICAL),

/** Ecnrypted data record. */
ENCRYPTED_DATA_RECORD_V3(70, LOGICAL);

/** Index for serialization. Should be consistent throughout all versions. */
private final int idx;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1569,7 +1569,7 @@ else if (interceptorVal != val0)
updateCntr0 = nextPartitionCounter(tx, updateCntr);

if (tx != null && cctx.group().persistenceEnabled() && cctx.group().walEnabled())
logPtr = logTxUpdate(tx, val, expireTime, updateCntr0);
logPtr = logTxUpdate(tx, val, expireTime, updateCntr0, topVer);

update(val, expireTime, ttl, newVer, true);

Expand Down Expand Up @@ -1791,7 +1791,7 @@ protected Object keyValue(boolean cpy) {
updateCntr0 = nextPartitionCounter(tx, updateCntr);

if (tx != null && cctx.group().persistenceEnabled() && cctx.group().walEnabled())
logPtr = logTxUpdate(tx, null, 0, updateCntr0);
logPtr = logTxUpdate(tx, null, 0, updateCntr0, topVer);

drReplicate(drType, null, newVer, topVer);

Expand Down Expand Up @@ -2148,7 +2148,7 @@ else if (ttl != CU.TTL_ZERO)

update(updated, expireTime, ttl, ver, true);

logUpdate(op, updated, ver, expireTime, 0);
logUpdate(op, updated, ver, expireTime, 0, cctx.affinity().affinityTopologyVersion());

if (evt) {
CacheObject evtOld = null;
Expand Down Expand Up @@ -2180,7 +2180,7 @@ else if (ttl != CU.TTL_ZERO)

update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, ver, true);

logUpdate(op, null, ver, CU.EXPIRE_TIME_ETERNAL, 0);
logUpdate(op, null, ver, CU.EXPIRE_TIME_ETERNAL, 0, cctx.affinity().affinityTopologyVersion());

if (evt) {
CacheObject evtOld = null;
Expand Down Expand Up @@ -3495,7 +3495,8 @@ else if (deletedUnlocked())
ver,
expireTime,
partition(),
updateCntr
updateCntr,
cctx.affinity().primaryByPartition(cctx.localNode(), partition(), topVer)
)));
}
}
Expand Down Expand Up @@ -4323,9 +4324,16 @@ protected boolean storeValue(
* @param writeVer Write version.
* @param expireTime Expire time.
* @param updCntr Update counter.
* @param topVer Topology version.
*/
protected void logUpdate(GridCacheOperation op, CacheObject val, GridCacheVersion writeVer, long expireTime, long updCntr)
throws IgniteCheckedException {
protected void logUpdate(
GridCacheOperation op,
CacheObject val,
GridCacheVersion writeVer,
long expireTime,
long updCntr,
AffinityTopologyVersion topVer
) throws IgniteCheckedException {
// We log individual updates only in ATOMIC cache.
assert cctx.atomic();

Expand All @@ -4340,7 +4348,8 @@ protected void logUpdate(GridCacheOperation op, CacheObject val, GridCacheVersio
writeVer,
expireTime,
partition(),
updCntr)));
updCntr,
cctx.affinity().primaryByPartition(cctx.localNode(), partition(), topVer))));
}
catch (StorageException e) {
throw new IgniteCheckedException("Failed to log ATOMIC cache update [key=" + key + ", op=" + op +
Expand All @@ -4353,10 +4362,16 @@ protected void logUpdate(GridCacheOperation op, CacheObject val, GridCacheVersio
* @param val Value.
* @param expireTime Expire time (or 0 if not applicable).
* @param updCntr Update counter.
* @param topVer Topology version.
* @throws IgniteCheckedException In case of log failure.
*/
protected WALPointer logTxUpdate(IgniteInternalTx tx, CacheObject val, long expireTime, long updCntr)
throws IgniteCheckedException {
protected WALPointer logTxUpdate(
IgniteInternalTx tx,
CacheObject val,
long expireTime,
long updCntr,
AffinityTopologyVersion topVer
) throws IgniteCheckedException {
assert cctx.transactional() && !cctx.transactionalSnapshot();

if (tx.local()) { // For remote tx we log all updates in batch: GridDistributedTxRemoteAdapter.commitIfLocked()
Expand All @@ -4375,7 +4390,8 @@ protected WALPointer logTxUpdate(IgniteInternalTx tx, CacheObject val, long expi
tx.writeVersion(),
expireTime,
key.partition(),
updCntr)));
updCntr,
cctx.affinity().primaryByPartition(cctx.localNode(), partition(), topVer))));
}
else
return null;
Expand Down Expand Up @@ -6484,7 +6500,7 @@ else if (interceptorVal != updated0) {

long updateCntr0 = entry.nextPartitionCounter(topVer, primary, false, updateCntr);

entry.logUpdate(op, updated, newVer, newExpireTime, updateCntr0);
entry.logUpdate(op, updated, newVer, newExpireTime, updateCntr0, topVer);

if (!entry.isNear()) {
newRow = entry.localPartition().dataStore().createRow(
Expand Down Expand Up @@ -6571,7 +6587,7 @@ private void remove(@Nullable GridCacheVersionConflictContext<?, ?> conflictCtx,

long updateCntr0 = entry.nextPartitionCounter(topVer, primary, false, updateCntr);

entry.logUpdate(op, null, newVer, 0, updateCntr0);
entry.logUpdate(op, null, newVer, 0, updateCntr0, topVer);

if (oldVal != null) {
assert !entry.deletedUnlocked();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,8 @@ else if (conflictCtx.isMerge()) {
writeVersion(),
0,
txEntry.key().partition(),
txEntry.updateCounter()
txEntry.updateCounter(),
cacheCtx.affinity().primaryByPartition(cctx.localNode(), txEntry.key().partition(), topVer)
),
txEntry
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.colocated;

import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
Expand Down Expand Up @@ -73,13 +74,14 @@ public void resetFromPrimary(CacheObject val, GridCacheVersion ver) {
}

/** {@inheritDoc} */
@Override protected void logUpdate(GridCacheOperation op, CacheObject val, GridCacheVersion writeVer, long expireTime, long updCntr) throws IgniteCheckedException {
@Override protected void logUpdate(GridCacheOperation op, CacheObject val, GridCacheVersion writeVer,
long expireTime, long updCntr, AffinityTopologyVersion topVer) {
// No-op for detached entries, index is updated on primary or backup nodes.
}

/** {@inheritDoc} */
@Override protected WALPointer logTxUpdate(IgniteInternalTx tx, CacheObject val, long expireTime, long updCntr)
throws IgniteCheckedException {
@Override protected WALPointer logTxUpdate(IgniteInternalTx tx, CacheObject val, long expireTime, long updCntr,
AffinityTopologyVersion topVer) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,14 +471,14 @@ public boolean loadedValue(@Nullable IgniteInternalTx tx,
}

/** {@inheritDoc} */
@Override protected void logUpdate(GridCacheOperation op, CacheObject val, GridCacheVersion ver, long expireTime, long updCntr)
throws IgniteCheckedException {
@Override protected void logUpdate(GridCacheOperation op, CacheObject val, GridCacheVersion ver, long expireTime,
long updCntr, AffinityTopologyVersion topVer) {
// No-op: queries are disabled for near cache.
}

/** {@inheritDoc} */
@Override protected WALPointer logTxUpdate(IgniteInternalTx tx, CacheObject val, long expireTime, long updCntr)
throws IgniteCheckedException {
@Override protected WALPointer logTxUpdate(IgniteInternalTx tx, CacheObject val, long expireTime, long updCntr,
AffinityTopologyVersion topVer) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2496,6 +2496,7 @@ public void applyUpdatesOnRecovery(
switch (rec.type()) {
case MVCC_DATA_RECORD:
case DATA_RECORD:
case DATA_RECORD_V2:
checkpointReadLock();

try {
Expand Down Expand Up @@ -2635,8 +2636,10 @@ private RestoreLogicalState applyLogicalUpdates(

case MVCC_DATA_RECORD:
case DATA_RECORD:
case DATA_RECORD_V2:
case ENCRYPTED_DATA_RECORD:
case ENCRYPTED_DATA_RECORD_V2:
case ENCRYPTED_DATA_RECORD_V3:
DataRecord dataRec = (DataRecord)rec;

for (DataEntry dataEntry : dataRec.writeEntries()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,9 @@ private boolean checkBounds(long idx) {
GridKernalContext kernalCtx = sharedCtx.kernalContext();
IgniteCacheObjectProcessor processor = kernalCtx.cacheObjects();

if (processor != null && (rec.type() == RecordType.DATA_RECORD || rec.type() == RecordType.MVCC_DATA_RECORD)) {
if (processor != null && (rec.type() == RecordType.DATA_RECORD
|| rec.type() == RecordType.DATA_RECORD_V2
|| rec.type() == RecordType.MVCC_DATA_RECORD)) {
try {
return postProcessDataRecord((DataRecord)rec, kernalCtx, processor);
}
Expand Down Expand Up @@ -498,7 +500,8 @@ private DataEntry unwrapDataEntry(CacheObjectContext coCtx, DataEntry dataEntry,
dataEntry.partitionId(),
dataEntry.partitionCounter(),
coCtx,
keepBinary);
keepBinary,
dataEntry.primary());
}

/** {@inheritDoc} */
Expand Down
Loading