Skip to content

Commit

Permalink
IGNITE-19020 Add ability to provide expireTime during conflict resolu…
Browse files Browse the repository at this point in the history
…tion (apache#10594)
  • Loading branch information
nizhikov authored Mar 15, 2023
1 parent 2ab8ec1 commit fda3908
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -49,6 +48,9 @@ public class GridCacheVersionConflictContext<K, V> {
/** TTL. */
private long ttl;

/** Expire time. */
private long expireTime;

/** Manual resolve flag. */
private boolean manualResolve;

Expand Down Expand Up @@ -118,19 +120,21 @@ public void useNew() {
* Force cache to use neither old, nor new, but some other value passed as argument. In this case old
* value will be replaced with merge value and update will be considered as local.
* <p>
* Also in case of merge you have to specify new TTL explicitly. For unlimited TTL use {@code 0}.
* Also in case of merge you have to specify new TTL and expire time explicitly. For unlimited TTL use {@code 0}.
*
* @param mergeVal Merge value or {@code null} to force remove.
* @param ttl Time to live in milliseconds (must be non-negative).
* @param expireTime Expire time.
*/
public void merge(@Nullable V mergeVal, long ttl) {
public void merge(@Nullable V mergeVal, long ttl, long expireTime) {
if (ttl < 0)
throw new IllegalArgumentException("TTL must be non-negative: " + ttl);

state = State.MERGE;

this.mergeVal = mergeVal;
this.ttl = ttl;
this.expireTime = expireTime;
}

/**
Expand Down Expand Up @@ -186,7 +190,7 @@ public long ttl() {
* @return Expire time.
*/
public long expireTime() {
return isUseNew() ? newEntry.expireTime() : isUseOld() ? oldEntry.expireTime() : CU.toExpireTime(ttl);
return isUseNew() ? newEntry.expireTime() : isUseOld() ? oldEntry.expireTime() : expireTime;
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,41 @@
import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager;
import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheTestStore;
import org.apache.ignite.internal.processors.cache.IgniteCacheAbstractTest;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.lang.GridPlainClosure2;
import org.apache.ignite.internal.util.lang.GridPlainInClosure;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.AbstractCachePluginProvider;
import org.apache.ignite.plugin.AbstractTestPluginProvider;
import org.apache.ignite.plugin.CachePluginContext;
import org.apache.ignite.plugin.CachePluginProvider;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
Expand Down Expand Up @@ -88,6 +108,38 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
/** */
private Integer lastKey = 0;

/** */
private boolean conflictResolverPlugin;

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

if (conflictResolverPlugin) {
cfg.setPluginProviders(new AbstractTestPluginProvider() {
@Override public String name() {
return "ConflictResolverProvider";
}

@Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) {
if (!ctx.igniteCacheConfiguration().getName().equals(DEFAULT_CACHE_NAME))
return null;

return new AbstractCachePluginProvider() {
@Override public Object createComponent(Class cls) {
if (cls != CacheConflictResolutionManager.class)
return null;

return new TestCacheConflictResolutionManager();
}
};
}
});
}

return cfg;
}

/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
// No-op.
Expand Down Expand Up @@ -209,6 +261,89 @@ private void zeroOnUpdate(Integer key) throws Exception {
checkNoValue(F.asList(key));
}

/** @throws Exception If failed. */
@Test
public void testPutAllConflictWithResolverPluginTtlUpdateOrder() throws Exception {
conflictResolverPlugin = true;

try {
startGrids();

final IgniteInternalCache<Integer, Object> cache = grid(0).cachex(DEFAULT_CACHE_NAME);

GridCacheVersion ver = new GridCacheVersion(1, 1, 1, 2);

CacheObjectImpl val = new CacheObjectImpl(1, null);

doTestTtlUpdateOrder(
key -> cache.putAllConflict(F.asMap(
new KeyCacheObjectImpl(key, null, -1),
new GridCacheDrInfo(val, ver)
)),
(key, ttl) -> {
cache.putAllConflict(F.asMap(
new KeyCacheObjectImpl(key, null, -1),
new GridCacheDrExpirationInfo(val, ver, ttl, CU.toExpireTime(ttl))
));

return null;
}
);
}
finally {
conflictResolverPlugin = false;
}
}

/** @throws Exception If failed. */
@Test
public void testTtlUpdateOrder() throws Exception {
startGrids();

final IgniteCache<Integer, Object> cache = jcache(0);

doTestTtlUpdateOrder(
key -> cache.put(key, 1),
(key, ttl) -> {
cache.withExpiryPolicy(new TestPolicy(ttl, ttl, ttl)).put(key, 1);

return null;
}
);
}

/** */
private void doTestTtlUpdateOrder(
GridPlainInClosure<Integer> withoutTtl,
GridPlainClosure2<Integer, Long, Void> withTtl
) throws IgniteCheckedException {
final IgniteCache<Integer, Object> cache = jcache(0);

Integer key = primaryKey(cache);

long ttl = 2_000;

withoutTtl.apply(key);

assertTrue(cache.containsKey(key));

withTtl.apply(key, ttl);

assertTrue(cache.containsKey(key));

assertTrue(GridTestUtils.waitForCondition(() -> !cache.containsKey(key), 3 * ttl));

withTtl.apply(key, ttl);

assertTrue(cache.containsKey(key));

withoutTtl.apply(key);

assertTrue(cache.containsKey(key));

assertTrue(GridTestUtils.waitForCondition(() -> !cache.containsKey(key), 3 * ttl));
}

/**
* @throws Exception If failed.
*/
Expand Down Expand Up @@ -1408,4 +1543,32 @@ private static class TestPolicy implements ExpiryPolicy, Serializable {
return S.toString(TestPolicy.class, this);
}
}

/** */
public static class TestCacheConflictResolutionManager<K, V> extends GridCacheManagerAdapter<K, V>
implements CacheConflictResolutionManager<K, V> {

/** {@inheritDoc} */
@Override public CacheVersionConflictResolver conflictResolver() {
return new CacheVersionConflictResolver() {
@Override public <K, V> GridCacheVersionConflictContext<K, V> resolve(
CacheObjectValueContext ctx,
GridCacheVersionedEntryEx<K, V> oldEntry,
GridCacheVersionedEntryEx<K, V> newEntry,
boolean atomicVerComparator
) {
GridCacheVersionConflictContext<K, V> res =
new GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry);

res.merge(
newEntry.value(ctx),
Math.max(oldEntry.ttl(), newEntry.ttl()),
Math.max(oldEntry.expireTime(), newEntry.expireTime())
);

return res;
}
};
}
}
}

0 comments on commit fda3908

Please sign in to comment.