diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsIgniteApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsIgniteApplier.java index 9afda275b..2277afe91 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsIgniteApplier.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsIgniteApplier.java @@ -23,10 +23,8 @@ import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.processors.cache.CacheObject; -import org.apache.ignite.internal.processors.cache.CacheObjectImpl; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -84,28 +82,16 @@ public CdcEventsIgniteApplier(IgniteEx ignite, int maxBatchSize, IgniteLogger lo /** {@inheritDoc} */ @Override protected KeyCacheObject toKey(CdcEvent evt) { - Object key = evt.key(); - - if (key instanceof KeyCacheObject) - return (KeyCacheObject)key; - else - return new KeyCacheObjectImpl(key, null, evt.partition()); + return evt.keyCacheObject(); } /** {@inheritDoc} */ @Override protected GridCacheDrInfo toValue(int cacheId, CdcEvent evt, GridCacheVersion ver) { - CacheObject cacheObj; - - Object val = evt.value(); - - if (val instanceof CacheObject) - cacheObj = (CacheObject)val; - else - cacheObj = new CacheObjectImpl(val, null); + CacheObject val = evt.valueCacheObject(); return evt.expireTime() != EXPIRE_TIME_ETERNAL ? - new GridCacheDrExpirationInfo(cacheObj, ver, TTL_ETERNAL, evt.expireTime()) : - new GridCacheDrInfo(cacheObj, ver); + new GridCacheDrExpirationInfo(val, ver, TTL_ETERNAL, evt.expireTime()) : + new GridCacheDrInfo(val, ver); } /** @return Cache. */ diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java index 1e275a7be..e8c4ca4fe 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java @@ -17,10 +17,16 @@ package org.apache.ignite.cdc.conflictresolve; +import java.util.Objects; import org.apache.ignite.IgniteLogger; import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheObjectUtils; import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; +import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; +import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -88,11 +94,12 @@ public CacheVersionConflictResolverImpl( CacheObjectValueContext ctx, GridCacheVersionedEntryEx oldEntry, GridCacheVersionedEntryEx newEntry, + Object prevStateMeta, boolean atomicVerComparator ) { GridCacheVersionConflictContext res = new GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry); - boolean useNew = isUseNew(ctx, oldEntry, newEntry); + boolean useNew = isUseNew(ctx, oldEntry, newEntry, prevStateMeta); if (log.isDebugEnabled()) debugResolve(ctx, useNew, oldEntry, newEntry); @@ -117,7 +124,8 @@ public CacheVersionConflictResolverImpl( protected boolean isUseNew( CacheObjectValueContext ctx, GridCacheVersionedEntryEx oldEntry, - GridCacheVersionedEntryEx newEntry + GridCacheVersionedEntryEx newEntry, + Object prevStateMeta ) { if (newEntry.dataCenterId() == clusterId) // Update made on the local cluster always win. return true; @@ -139,8 +147,8 @@ protected boolean isUseNew( } if (conflictResolveFieldEnabled) { - Object oldVal = oldEntry.value(ctx); Object newVal = newEntry.value(ctx); + Object oldVal = oldEntry.value(ctx); if (oldVal != null && newVal != null) { try { @@ -153,6 +161,17 @@ protected boolean isUseNew( ); } } + + Object field = oldVal != null ? value(oldVal) : null; + + if (Objects.equals(field, prevStateMeta)) // Previous value synchronized. + return true; + } + else { + GridCacheVersion oldVer = oldEntry.value(ctx) != null ? oldEntry.version() : null; // TODO null value version (entry vs row) + + if (Objects.equals(oldVer, prevStateMeta)) // Previous value synchronized. + return true; } log.error("Conflict can't be resolved, " + (newEntry.value(ctx) == null ? "remove" : "update") + " ignored " + @@ -162,6 +181,30 @@ protected boolean isUseNew( return false; } + /** + * {@inheritDoc} + */ + @Override public Object previousStateMetadata(GridCacheEntryEx entry) { + if (conflictResolveFieldEnabled) { + CacheObjectValueContext ctx = entry.context().cacheObjectContext(); + CacheObject val = entry.rawGet(); + + return val != null ? + value(CacheObjectUtils.unwrapBinaryIfNeeded(ctx, val, true, true, null)) : + null; + } + else { + try { + GridCacheVersion ver = entry.version(); + + return ver != null ? ver.conflictVersion() : null; + } + catch (GridCacheEntryRemovedException e) { // TODO + throw new RuntimeException(e); + } + } + } + /** @return Conflict resolve field value. */ protected Comparable value(Object val) { return (val instanceof BinaryObject) diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsAbstractTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsAbstractTest.java new file mode 100644 index 000000000..41c35d99f --- /dev/null +++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsAbstractTest.java @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cdc; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheEntry; +import org.apache.ignite.cache.CacheEntryVersion; +import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl; +import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverPluginProvider; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheObjectImpl; +import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; +import org.apache.ignite.internal.processors.cache.GridCacheTestEntryEx; +import org.apache.ignite.internal.processors.cache.IgniteInternalCache; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; +import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; +import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; +import org.apache.ignite.testframework.ListeningTestLogger; +import org.apache.ignite.testframework.LogListener; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.core.config.Configurator; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static java.util.Collections.singletonMap; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; + +/** + * Cache conflict operations test. + */ +@RunWith(Parameterized.class) +public abstract class CacheConflictOperationsAbstractTest extends GridCommonAbstractTest { + /** Cache mode. */ + @Parameterized.Parameter + public CacheAtomicityMode cacheMode; + + /** Other cluster id. */ + @Parameterized.Parameter(1) + public byte otherClusterId; + + /** @return Test parameters. */ + @Parameterized.Parameters(name = "cacheMode={0}, otherClusterId={1}") + public static Collection parameters() { + List params = new ArrayList<>(); + + for (CacheAtomicityMode mode : EnumSet.of(ATOMIC, TRANSACTIONAL)) + for (byte otherClusterId : new byte[] {FIRST_CLUSTER_ID, THIRD_CLUSTER_ID}) + params.add(new Object[] {mode, otherClusterId}); + + return params; + } + + /** */ + private static IgniteCache cache; + + /** */ + private static IgniteInternalCache cachex; + + /** */ + private static IgniteEx client; + + /** Listening test logger. */ + private static ListeningTestLogger listeningLog; + + /** */ + private static final AtomicInteger incKey = new AtomicInteger(); + + /** */ + protected static volatile boolean removeAfterRemove; + + /** */ + private static final byte FIRST_CLUSTER_ID = 1; + + /** */ + private static final byte SECOND_CLUSTER_ID = 2; + + /** */ + private static final byte THIRD_CLUSTER_ID = 3; + + /** */ + protected volatile ConflictResolvableTestData prevValue; + + /** */ + protected volatile GridCacheVersion prevVersion; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + CacheVersionConflictResolverPluginProvider pluginCfg = new CacheVersionConflictResolverPluginProvider<>(); + + pluginCfg.setClusterId(SECOND_CLUSTER_ID); + pluginCfg.setCaches(new HashSet<>(Collections.singleton(DEFAULT_CACHE_NAME))); + pluginCfg.setConflictResolveField(conflictResolveField()); + + return super.getConfiguration(igniteInstanceName) + .setPluginProviders(pluginCfg) + .setGridLogger(listeningLog); + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + listeningLog = new ListeningTestLogger(log); + + startGrid(1); + client = startClientGrid(2); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() { + cache = null; + cachex = null; + client = null; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + if (cachex == null || cachex.configuration().getAtomicityMode() != cacheMode) { + if (cachex != null) + client.cache(DEFAULT_CACHE_NAME).destroy(); + + cache = client.createCache(new CacheConfiguration(DEFAULT_CACHE_NAME) + .setAtomicityMode(cacheMode)); + + cachex = client.cachex(DEFAULT_CACHE_NAME); + } + + assert !removeAfterRemove; + } + + /** Test switching debug log level for ConflictResolver during runtime */ + @Test + public void testResolveDebug() throws Exception { + String key = nextKey(); + + LogListener lsnr = LogListener.matches("isUseNew").build(); + + listeningLog.registerListener(lsnr); + + try { + Configurator.setLevel(CacheVersionConflictResolverImpl.class.getName(), Level.DEBUG); + + try { + putFromOther(key, 1, true); + + putFromOther(key, 1, false); + + assertTrue(lsnr.check()); + } + finally { + Configurator.setLevel(CacheVersionConflictResolverImpl.class.getName(), Level.INFO); + } + + lsnr.reset(); + + putFromOther(key, 1, false); + + assertFalse(lsnr.check()); + } + finally { + listeningLog.unregisterListener(lsnr); + } + } + + /** */ + protected void putLocal(String key) { + ConflictResolvableTestData newVal = ConflictResolvableTestData.create(); + + CacheEntry oldEntry = cache.getEntry(key); + + cache.put(key, newVal); + + CacheEntry newEntry = cache.getEntry(key); + + assertNull(((CacheEntryVersion)newEntry.version()).otherClusterVersion()); + assertEquals(newVal, cache.get(key)); + + if (oldEntry != null) + assertTrue(((CacheEntryVersion)oldEntry.version()).order() < ((CacheEntryVersion)newEntry.version()).order()); + } + + /** Puts entry via {@link IgniteInternalCache#putAllConflict(Map)}. */ + protected void putFromOther(String k, boolean success) throws IgniteCheckedException { + putFromOther(k, 1, success); + } + + /** Puts entry via {@link IgniteInternalCache#putAllConflict(Map)}. */ + protected void putFromOther(String k, long order, boolean success) throws IgniteCheckedException { + putFromOther(k, new GridCacheVersion(1, order, 1, otherClusterId), success); + } + + /** Puts entry via {@link IgniteInternalCache#putAllConflict(Map)}. */ + protected void putFromOther(String k, GridCacheVersion newVer, boolean success) throws IgniteCheckedException { + CacheEntry oldEntry = cache.getEntry(k); + ConflictResolvableTestData newVal = ConflictResolvableTestData.create(); + + KeyCacheObject key = new KeyCacheObjectImpl(k, null, cachex.context().affinity().partition(k)); + CacheObject val = new CacheObjectImpl(client.binary().toBinary(newVal), null); + + CacheVersionConflictResolver resolver = cachex.context().conflictResolver(); + + GridCacheEntryEx entry = + new GridCacheTestEntryEx(cachex.context(), key, cachex.context().toCacheObject(prevValue), prevVersion, 0); + + CacheObject prevStateMeta = cachex.context().toCacheObject(resolver.previousStateMetadata(entry)); + + cachex.putAllConflict(singletonMap(key, new GridCacheDrInfo(val, newVer, prevStateMeta))); + + if (success) { + assertEquals(newVer, ((GridCacheVersion)cache.getEntry(k).version()).conflictVersion()); + assertEquals(newVal, cache.get(k)); + + prevValue = newVal; + prevVersion = newVer; + } + else if (oldEntry != null) { + assertEquals(oldEntry.getValue(), cache.get(k)); + assertEquals(oldEntry.version(), cache.getEntry(k).version()); + } + } + + /** Replicates entry to the virtual other cluster. */ + protected void replicateToOther(String k) { + CacheEntry entry = cache.getEntry(k); + + prevValue = entry != null ? entry.getValue() : null; + + prevVersion = entry != null ? + GridCacheVersionEx.addConflictVersion( + new GridCacheVersion(1, 1, 1, otherClusterId), (GridCacheVersion)entry.version()) : + null; + } + + /** */ + protected void removeLocal(String key) { + assertTrue(removeAfterRemove ^ cache.containsKey(key)); + + cache.remove(key); + + assertFalse(cache.containsKey(key)); + } + + /** Removes entry via {@link IgniteInternalCache#removeAllConflict(Map)}. */ + protected void removeFromOther(String k, boolean success) throws IgniteCheckedException { + removeFromOther(k, 1, success); + } + + /** Removes entry via {@link IgniteInternalCache#removeAllConflict(Map)}. */ + protected void removeFromOther(String k, long order, boolean success) throws IgniteCheckedException { + removeFromOther(k, new GridCacheVersion(1, order, 1, otherClusterId), success); + } + + /** Removes entry via {@link IgniteInternalCache#removeAllConflict(Map)}. */ + protected void removeFromOther(String k, GridCacheVersion ver, boolean success) throws IgniteCheckedException { + assertTrue(removeAfterRemove ^ cache.containsKey(k)); + + CacheEntry oldEntry = cache.getEntry(k); + + KeyCacheObject key = new KeyCacheObjectImpl(k, null, cachex.context().affinity().partition(k)); + + CacheVersionConflictResolver resolver = cachex.context().conflictResolver(); + + GridCacheEntryEx entry = + new GridCacheTestEntryEx(cachex.context(), key, cachex.context().toCacheObject(prevValue), prevVersion, 0); + + CacheObject prevStateMeta = cachex.context().toCacheObject(resolver.previousStateMetadata(entry)); + + cachex.putAllConflict(singletonMap(key, new GridCacheDrInfo(null, ver, prevStateMeta))); + + if (success) + assertFalse(cache.containsKey(k)); + else if (oldEntry != null) { + assertEquals(oldEntry.getValue(), cache.get(k)); + assertEquals(oldEntry.version(), cache.getEntry(k).version()); + } + } + + /** */ + protected String nextKey() { + return "Key_" + incKey.incrementAndGet() + "_" + otherClusterId + "_" + cacheMode; + } + + /** */ + protected String conflictResolveField() { + return null; + } + + /** */ + protected enum Operation { + /** */ + NONE, + + /** */ + PUT, + + /** */ + REMOVE + } +} diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java index 86ef675fb..b91c6eb3f 100644 --- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java +++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java @@ -17,139 +17,29 @@ package org.apache.ignite.cdc; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.EnumSet; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.binary.BinaryObject; -import org.apache.ignite.cache.CacheAtomicityMode; -import org.apache.ignite.cache.CacheEntry; -import org.apache.ignite.cache.CacheEntryVersion; -import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl; -import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverPluginProvider; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.processors.cache.CacheObject; -import org.apache.ignite.internal.processors.cache.CacheObjectImpl; -import org.apache.ignite.internal.processors.cache.IgniteInternalCache; -import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; -import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; +import java.util.function.Function; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.testframework.ListeningTestLogger; -import org.apache.ignite.testframework.LogListener; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.core.config.Configurator; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import static java.util.Collections.singletonMap; -import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; -import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cdc.CacheConflictOperationsAbstractTest.Operation.NONE; +import static org.apache.ignite.cdc.CacheConflictOperationsAbstractTest.Operation.PUT; +import static org.apache.ignite.cdc.CacheConflictOperationsAbstractTest.Operation.REMOVE; /** * Cache conflict operations test. */ -@RunWith(Parameterized.class) -public class CacheConflictOperationsTest extends GridCommonAbstractTest { - /** Cache mode. */ - @Parameterized.Parameter - public CacheAtomicityMode cacheMode; - - /** Other cluster id. */ - @Parameterized.Parameter(1) - public byte otherClusterId; - - /** @return Test parameters. */ - @Parameterized.Parameters(name = "cacheMode={0}, otherClusterId={1}") - public static Collection parameters() { - List params = new ArrayList<>(); - - for (CacheAtomicityMode mode : EnumSet.of(ATOMIC, TRANSACTIONAL)) - for (byte otherClusterId : new byte[] {FIRST_CLUSTER_ID, THIRD_CLUSTER_ID}) - params.add(new Object[] {mode, otherClusterId}); - - return params; - } - - /** */ - private static final byte FIRST_CLUSTER_ID = 1; - - /** */ - private static final byte SECOND_CLUSTER_ID = 2; - - /** */ - private static final byte THIRD_CLUSTER_ID = 3; - - /** */ - private IgniteCache cache; - - /** */ - private IgniteInternalCache cachex; - - /** */ - private IgniteEx client; - - /** */ - private IgniteEx ign; - - /** Listening test logger. */ - private final ListeningTestLogger listeningLog = new ListeningTestLogger(log); - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { - CacheVersionConflictResolverPluginProvider pluginCfg = new CacheVersionConflictResolverPluginProvider<>(); - - pluginCfg.setClusterId(SECOND_CLUSTER_ID); - pluginCfg.setCaches(new HashSet<>(Collections.singleton(DEFAULT_CACHE_NAME))); - pluginCfg.setConflictResolveField(conflictResolveField()); - - return super.getConfiguration(igniteInstanceName) - .setPluginProviders(pluginCfg) - .setGridLogger(listeningLog); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - super.beforeTest(); - - ign = startGrid(1); - - client = startClientGrid(2); - - if (cachex == null || cachex.configuration().getAtomicityMode() != cacheMode) { - if (cachex != null) - client.cache(DEFAULT_CACHE_NAME).destroy(); - - cache = client.createCache(new CacheConfiguration(DEFAULT_CACHE_NAME) - .setAtomicityMode(cacheMode)); - - cachex = client.cachex(DEFAULT_CACHE_NAME); - } - } - - /** {@inheritDoc} */ - @Override protected void afterTest() { - stopAllGrids(); - } - +public class CacheConflictOperationsTest extends CacheConflictOperationsAbstractTest { /** Tests that regular cache operations works with the conflict resolver when there is no update conflicts. */ @Test public void testSimpleUpdates() { - String key = "UpdatesWithoutConflict"; + String key = nextKey(); - put(key); - put(key); + for (int i = 0; i < 3; i++) { + putLocal(key); + putLocal(key); - remove(key); + removeLocal(key); + } } /** @@ -158,13 +48,17 @@ public void testSimpleUpdates() { */ @Test public void testUpdatesFromOtherClusterWithoutConflict() throws Exception { - String key = key("UpdateFromOtherClusterWithoutConflict", otherClusterId); + String key = nextKey(); - putConflict(key, 1, true); + putFromOther(key, 1, true); + putFromOther(key, 2, true); - putConflict(key, 2, true); + removeFromOther(key, 3, true); - removeConflict(key, 3, true); + putFromOther(key, 4, true); + putFromOther(key, 5, true); + + removeFromOther(key, 6, true); } /** @@ -172,203 +66,145 @@ public void testUpdatesFromOtherClusterWithoutConflict() throws Exception { * when there are update conflicts. */ @Test - public void testUpdatesReorderFromOtherCluster() throws Exception { - String key = key("UpdateClusterUpdateReorder", otherClusterId); - - putConflict(key, 2, true); + public void testUpdatesFromOtherClusterWithConflict() throws Exception { + String key = nextKey(); - // Update with the equal or lower order should ignored. - putConflict(key, 2, false); - putConflict(key, 1, false); - - // Remove with the equal or lower order should ignored. - removeConflict(key, 2, false); - removeConflict(key, 1, false); - - // Remove with the higher order should succeed. - putConflict(key, 3, true); + putFromOther(key, 1, true); + putFromOther(key, 2, true); - key = key("UpdateClusterUpdateReorder2", otherClusterId); + removeFromOther(key, 3, true); - int order = 1; + putFromOther(key, 3, false); + putFromOther(key, 4, true); + putFromOther(key, 4, false); + putFromOther(key, 4, false); - putConflict(key, new GridCacheVersion(2, order, 1, otherClusterId), true); + removeFromOther(key, 3, false); - // Update with the equal or lower topVer should ignored. - putConflict(key, new GridCacheVersion(2, order, 1, otherClusterId), false); - putConflict(key, new GridCacheVersion(1, order, 1, otherClusterId), false); + putFromOther(key, 4, false); - // Remove with the equal or lower topVer should ignored. - removeConflict(key, new GridCacheVersion(2, order, 1, otherClusterId), false); - removeConflict(key, new GridCacheVersion(1, order, 1, otherClusterId), false); + removeFromOther(key, 4, false); + removeFromOther(key, 5, true); + } - // Remove with the higher topVer should succeed. - putConflict(key, new GridCacheVersion(3, order, 1, otherClusterId), true); + /** + * Tests that {@code IgniteInternalCache#*AllConflict} cache operations works with the conflict resolver + * when there are update conflicts. + */ + @Test + public void testUpdatesReorderFromOtherCluster() throws Exception { + testUpdatesReorderFromOtherCluster( + nextKey(), + (topVer) -> new GridCacheVersion(topVer, 1, 1, otherClusterId)); - key = key("UpdateClusterUpdateReorder3", otherClusterId); + testUpdatesReorderFromOtherCluster( + nextKey(), + (order) -> new GridCacheVersion(1, order, 1, otherClusterId)); - int topVer = 1; + testUpdatesReorderFromOtherCluster( + nextKey(), + (nodeOrder) -> new GridCacheVersion(1, 1, nodeOrder, otherClusterId)); + } - putConflict(key, new GridCacheVersion(topVer, order, 2, otherClusterId), true); + /** */ + private void testUpdatesReorderFromOtherCluster(String key, Function verGen) throws Exception { + putFromOther(key, verGen.apply(2), true); - // Update with the equal or lower nodeOrder should ignored. - putConflict(key, new GridCacheVersion(topVer, order, 2, otherClusterId), false); - putConflict(key, new GridCacheVersion(topVer, order, 1, otherClusterId), false); + for (int i = 0; i < 3; i++) { + // Update with the equal or lower version should be ignored. + putFromOther(key, verGen.apply(2), false); + putFromOther(key, verGen.apply(1), false); - // Remove with the equal or lower nodeOrder should ignored. - removeConflict(key, new GridCacheVersion(topVer, order, 2, otherClusterId), false); - removeConflict(key, new GridCacheVersion(topVer, order, 1, otherClusterId), false); + // Remove with the equal or lower version should be ignored. + removeFromOther(key, verGen.apply(2), false); + removeFromOther(key, verGen.apply(1), false); + } - // Remove with the higher nodeOrder should succeed. - putConflict(key, new GridCacheVersion(topVer, order, 3, otherClusterId), true); + // Remove with the higher order should succeed. + putFromOther(key, verGen.apply(3), true); } /** Tests cache operations for entry replicated from another cluster. */ @Test - public void testUpdatesConflict() throws Exception { - String key = key("UpdateThisClusterConflict0", otherClusterId); + public void testLocalUpdateWins() throws Exception { + String key = nextKey(); - putConflict(key, 1, true); - - // Local remove for other cluster entry should succeed. - remove(key); - - // Conflict replicated update should ignored. - // Resolve by field value not applicable because after remove operation "old" value doesn't exists. - putConflict(key, 2, false); - - key = key("UpdateThisDCConflict1", otherClusterId); - - putConflict(key, 3, true); + putFromOther(key, true); // Local update for other cluster entry should succeed. - put(key); - - key = key("UpdateThisDCConflict2", otherClusterId); - - put(key); - - // Conflict replicated remove should ignored. - removeConflict(key, 4, false); - - key = key("UpdateThisDCConflict3", otherClusterId); - - put(key); - - // Conflict replicated update succeed only if resolved by field. - putConflict(key, 5, conflictResolveField() != null); + putLocal(key); } - /** Test switching debug log level for ConflictResolver during runtime */ + /** Tests cache operations for entry replicated from another cluster. */ @Test - public void testResolveDebug() throws Exception { - String key = key("UpdateClusterUpdateReorder", otherClusterId); - - LogListener lsnr = LogListener.matches("isUseNew").build(); - - listeningLog.registerListener(lsnr); - - Configurator.setLevel(CacheVersionConflictResolverImpl.class.getName(), Level.DEBUG); - - try { - putConflict(key, 1, true); - - putConflict(key, 1, false); - - assertTrue(lsnr.check()); - } - finally { - Configurator.setLevel(CacheVersionConflictResolverImpl.class.getName(), Level.INFO); + public void testUpdatesConflict() throws Exception { + for (Operation op1 : new Operation[] {NONE, PUT}) { // From other cluster. + for (Operation op2 : new Operation[] {PUT, REMOVE}) { // Local. + for (Operation op3 : new Operation[] {PUT, REMOVE}) { // From other cluster. + for (boolean sync : new boolean[] {true, false}) // Sync clusters before the last operation. + testUpdatesConflict(op1, op2, op3, sync); + } + } } - - lsnr.reset(); - - putConflict(key, 1, false); - - assertFalse(lsnr.check()); } /** */ - private void put(String key) { - ConflictResolvableTestData newVal = ConflictResolvableTestData.create(); - - CacheEntry oldEntry = cache.getEntry(key); - - cache.put(key, newVal); - - CacheEntry newEntry = cache.getEntry(key); - - assertNull(((CacheEntryVersion)newEntry.version()).otherClusterVersion()); - assertEquals(newVal, cache.get(key)); - - if (oldEntry != null) - assertTrue(((CacheEntryVersion)oldEntry.version()).order() < ((CacheEntryVersion)newEntry.version()).order()); - } - - /** Puts entry via {@link IgniteInternalCache#putAllConflict(Map)}. */ - private void putConflict(String k, long order, boolean success) throws IgniteCheckedException { - putConflict(k, new GridCacheVersion(1, order, 1, otherClusterId), success); - } - - /** Puts entry via {@link IgniteInternalCache#putAllConflict(Map)}. */ - private void putConflict(String k, GridCacheVersion newVer, boolean success) throws IgniteCheckedException { - CacheEntry oldEntry = cache.getEntry(k); - ConflictResolvableTestData newVal = ConflictResolvableTestData.create(); - - KeyCacheObject key = new KeyCacheObjectImpl(k, null, cachex.context().affinity().partition(k)); - CacheObject val = new CacheObjectImpl(client.binary().toBinary(newVal), null); - - cachex.putAllConflict(singletonMap(key, new GridCacheDrInfo(val, newVer))); - - if (success) { - assertEquals(newVer, ((GridCacheVersion)cache.getEntry(k).version()).conflictVersion()); - assertEquals(newVal, cache.get(k)); + private void testUpdatesConflict(Operation op1, Operation op2, Operation op3, boolean sync) throws Exception { + log.info("Checking: " + op1 + ", " + op2 + ", " + op3 + ", replication=" + sync); + + String key = nextKey(); + + if (op1 == PUT) + putFromOther(key, 1, true); + else + assert op1 == NONE; + + if (op2 == PUT) + // Local remove always succeed. + putLocal(key); + else { + assert op2 == REMOVE; + + if (op1 == NONE) + removeAfterRemove = true; + + try { + // Local remove always succeed. + removeLocal(key); + } + finally { + removeAfterRemove = false; + } } - else if (oldEntry != null) { - assertEquals(oldEntry.getValue(), cache.get(k)); - assertEquals(oldEntry.version(), cache.getEntry(k).version()); - } - } - - /** */ - private void remove(String key) { - assertTrue(cache.containsKey(key)); - - cache.remove(key); - - assertFalse(cache.containsKey(key)); - } - /** Removes entry via {@link IgniteInternalCache#removeAllConflict(Map)}. */ - private void removeConflict(String k, long order, boolean success) throws IgniteCheckedException { - removeConflict(k, new GridCacheVersion(1, order, 1, otherClusterId), success); - } + if (sync) + replicateToOther(key); - /** Removes entry via {@link IgniteInternalCache#removeAllConflict(Map)}. */ - private void removeConflict(String k, GridCacheVersion ver, boolean success) throws IgniteCheckedException { - assertTrue(cache.containsKey(k)); + // Update is always successful when replication is finished and both clusters have the same state. + boolean success = sync; - CacheEntry oldEntry = cache.getEntry(k); + if (op2 != REMOVE && op3 != REMOVE) { + // Values can be compared via the field when + // - previous value exist (was created and was not removed) + // - new value contain field (not a remove). + // So, update is also successful when can be resolved by field. + success |= conflictResolveField() != null; + } - KeyCacheObject key = new KeyCacheObjectImpl(k, null, cachex.context().affinity().partition(k)); + if (op3 == PUT) + putFromOther(key, success); + else { + assert op3 == REMOVE; - cachex.removeAllConflict(singletonMap(key, ver)); + if (op2 == REMOVE) + removeAfterRemove = true; - if (success) - assertFalse(cache.containsKey(k)); - else if (oldEntry != null) { - assertEquals(oldEntry.getValue(), cache.get(k)); - assertEquals(oldEntry.version(), cache.getEntry(k).version()); + try { + removeFromOther(key, 2, success); + } + finally { + removeAfterRemove = false; + } } } - - /** */ - private String key(String key, byte otherClusterId) { - return key + otherClusterId + cacheMode; - } - - /** */ - protected String conflictResolveField() { - return null; - } } diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsWithCustomResolverTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsWithCustomResolverTest.java index 24bd8489d..13d55bb11 100644 --- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsWithCustomResolverTest.java +++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsWithCustomResolverTest.java @@ -17,17 +17,18 @@ package org.apache.ignite.cdc; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverPluginProvider; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; -import org.apache.ignite.testframework.GridTestUtils; +import org.junit.Ignore; import org.junit.Test; /** Cache conflict operations test with a custom resolver. */ -public class CacheConflictOperationsWithCustomResolverTest extends CacheConflictOperationsTest { +public class CacheConflictOperationsWithCustomResolverTest extends CacheConflictOperationsAbstractTest { /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); @@ -37,25 +38,33 @@ public class CacheConflictOperationsWithCustomResolverTest extends CacheConflict return cfg; } - /** {@inheritDoc} */ + /** Tests simple updates. */ @Test - @Override public void testUpdatesReorderFromOtherCluster() { - // LWW strategy resolves conflicts in unexpected way at versioned resolve test. - GridTestUtils.assertThrows(log, super::testUpdatesReorderFromOtherCluster, AssertionError.class, ""); + public void testSimpleUpdates() { + String key = nextKey(); + + putLocal(key); + putLocal(key); + + removeLocal(key); } - /** {@inheritDoc} */ + /** Tests simple conflicts can be resolved with LWW. */ @Test - @Override public void testUpdatesConflict() { - // LWW strategy resolves conflicts in unexpected way at versioned resolve test. - GridTestUtils.assertThrows(log, super::testUpdatesConflict, AssertionError.class, ""); + public void testSimpleConflicts() throws IgniteCheckedException { + String key = nextKey(); + + putLocal(key); + + // LWW. + removeFromOther(key, true); } /** {@inheritDoc} */ + @Ignore @Test - @Override public void testResolveDebug() throws Exception { - // LWW strategy resolves conflicts in unexpected way at versioned resolve test. - GridTestUtils.assertThrows(log, super::testResolveDebug, AssertionError.class, ""); + @Override public void testResolveDebug() { + // Test LWW resolver has no logging. } /** @@ -65,8 +74,11 @@ private static final class LwwConflictResolver implements CacheVersionConflictRe /** * */ - @Override public GridCacheVersionConflictContext resolve(CacheObjectValueContext ctx, - GridCacheVersionedEntryEx oldEntry, GridCacheVersionedEntryEx newEntry, + @Override public GridCacheVersionConflictContext resolve( + CacheObjectValueContext ctx, + GridCacheVersionedEntryEx oldEntry, + GridCacheVersionedEntryEx newEntry, + Object prevStateMeta, boolean atomicVerComparator) { GridCacheVersionConflictContext res = new GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry);