Deadlock on concurrent calls to getAll and invokeAll on cache with read-through

classic Classic list List threaded Threaded
4 messages Options
peter108418 peter108418
Reply | Threaded
Open this post in threaded view
|

Deadlock on concurrent calls to getAll and invokeAll on cache with read-through

This post was updated on .
Hi

We have recently started to encounter what appears to be deadlocks on one of
our new clusters. We believe it may be due to "data patterns" being slightly
different and more dense than our other existing (working) production
clusters. We have some workarounds, but we think this might be an issue with
Ignite. Hopefully someone is able to narrow down the cause further? :)

Firstly, I'll describe the issues that we are seeing, and how to reproduce.
Then I'll try explain what we are trying to accomplish, maybe there is a
better solution to our problem?

The problem:
We have encountered an odd deadlock issue when, on the same cache where
read-through is enabled, concurrently calls are made to "getAll" and
"invokeAll". We are sorting the keys similarly across both calls.
Replacing one "side" with either multiple "get"s, or multiple "invoke"s,
seems to fix the problem, but performance is worse.

I have created a test case that can reproduce it. The test creates
- 1 thread doing a getAll({1, 2}),
- 2 threads doing an invokeAll({2, 3}) and an invokeAll({1, 3})

These 3 threads are executed, and may or may not end up in a deadlock,
usually the test case captures the deadlock state before 50 repetitions.
Please see attached sample maven project to reproduce:
https://drive.google.com/open?id=1GJ78dsulJ0XG-erNkN_vm3ordKr0nqS6
Run with "mvn clean test"

I have also posted the test code, (partial) log output and (partial)
stacktrace below.


What we are trying to do:
I believe our use-case to be fairly "normal"? We use Ignite as a
cache-layer, with a read-through adapter to a backing data-store. As data
continuously enters the backing data-store, we have a service that keeps the
Ignite cache up-to-date.

We have a large amount of historical data, going years back. The backing
data-store is the "master", we are not using Ignite Persistence. We use
Ignite as a cache layer as typically we recalculate on the same data
multiple times. We key our data by time chunks, where the "value" is a
container/collection of records within the time-range defined by the key.
We decided to go with an IgniteCache with read-through enabled to automate
cache-loading. To reduce the number of queries against the data-store, we
usually call "getAll" on the cache, as the resulting set of keys provided to
the CacheStore.loadAll can often be merged into a smaller number of queries
(example: joining time-ranges "08:00:00 - 08:15:00" and "08:15:00 -
08:30:00" to larger single time-range "08:00:00 - 08:30:00").

As we continuously load new data into the backing data-store, entries in
Ignite become inconsistent with the data-store, especially those around
"now" but out-of-order records also occur.
To handle this, we have a separate Ignite Service that fetches new records
from the data-store and updates the Ignite Cache using invokeAll and an
entry-processor.
Our reasoning here is to only forward the "new" records (in the scale of 10s
of records) and merged them into the container (containing 1000s of records)
"locally", instead of "getting" the container, merging and then "putting"
which would transfer a large amount of data back and forth.


EDIT: My "raw" tags got eaten when posting, fixed that.

package com.pchr.ignite;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.logger.slf4j.Slf4jLogger;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.*;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.mock.SerializableMode;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.cache.Cache;
import javax.cache.configuration.Factory;
import javax.cache.integration.CacheLoader;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.MutableEntry;
import java.io.Serializable;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.stream.Collectors.joining;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

public class IngestDeadlockTest implements Serializable {
    private static final Logger log = LoggerFactory.getLogger(IngestDeadlockTest.class);

    private static Ignite ignite;
    private static Ignite getIgnite() {
        if (ignite == null) {
            ignite = Ignition.getOrStart(getIgniteConfiguration());
        }

        return Objects.requireNonNull(ignite, "Ignite initialization failed");
    }

    private static IgniteConfiguration getIgniteConfiguration() {
        final IgniteConfiguration igniteConfiguration = new IgniteConfiguration();
        igniteConfiguration.setClientMode(false);
        igniteConfiguration.setGridLogger(new Slf4jLogger());

        return igniteConfiguration;
    }

    @BeforeAll
    static void startIgnite() {
        getIgnite();
    }

    @AfterAll
    static void stopIgnite() {
        if (ignite != null) {
            try {
                ignite.close();
                ignite = null;
            } catch (Throwable e) {
                fail("Error terminating embedded Ignite. This may impact subsequent tests. Test suite will now be terminated", e);
                System.exit(1); //Force exit
            }
        }
    }

    private CacheLoader<Integer, Integer> cacheLoaderMock;
    private IgniteCache<Integer, Integer> cache;

    @AfterEach
    void cleanup() {
        try {
            Thread.sleep(200); //Let Ignite "flush", otherwise "cache closed" exceptions occurs
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.cache.destroy();
        this.cache = null;
    }

    private void createCache(int index) {
        this.cacheLoaderMock = mock(CacheLoader.class, withSettings().serializable(SerializableMode.ACROSS_CLASSLOADERS));
        this.cache = getIgnite().createCache(createCacheConfig(index, cacheLoaderMock));
    }


    /**
     * Usually triggers the deadlock in before 50 repetitions.
     *
     * Uses a simple ${@code IgniteCache<Integer, Integer>} where we just assign the negated key as the value.
     *
     * Cache operations done
     *   - "getAll"         on keys: {1, 2}
     *   - "invokeAll"  (a) on keys: {2, 3}
     *   - "invokeAll"  (b) on keys: {1, 3}
     *
     * It seems that getAll may take some locks on the keys and before it can return, invokeAll get a lock on one of
     * the same keys.
     *
     */
    @RepeatedTest(200)
    void deadlock_shuffledRandomOrder(RepetitionInfo repetitionInfo) {
        createCache(repetitionInfo.getCurrentRepetition());
        final String igniteName = getIgnite().name();
        final String cacheName = this.cache.getName();

        //-- Arrange
        when(cacheLoaderMock.loadAll(any()))
                .thenAnswer(new Answer<Map<Integer, Integer>>() {
                    @Override
                    public Map<Integer, Integer> answer(InvocationOnMock invocation) throws Throwable {
                        final Iterable<Integer> keys = invocation.getArgument(0);
                        final ArrayList<Integer> keyList = new ArrayList<>();
                        keys.forEach(keyList::add);
                        log.info("[cacheStore] loadAll: " + keyList.stream().map(Objects::toString).collect(joining(", ")));
                        return keyList.stream().collect(Collectors.toMap(p -> p, p -> -p));
                    }
                });
        final CacheEntryProcessor<Integer, Integer, Integer> liveIngestEntryProcessor = new CacheEntryProcessor<Integer, Integer, Integer>() {
            @Override
            public Integer process(MutableEntry<Integer, Integer> entry, Object... arguments) throws EntryProcessorException {
                log.info("[invokeAll] process: " + entry.getKey());
                final int value = -entry.getKey();
                entry.setValue(value);
                return value;
            }
        };

        final Runnable cacheStore = () -> {
            final Set<Integer> keys = Stream.of(1, 2).collect(Collectors.toCollection(TreeSet::new)); //Ensure keys are sorted so potential lock order is the same
            final IgniteCache<Integer, Integer> ch = IgnitionEx.grid(igniteName).cache(cacheName);
            ch.getAll(keys);
        };
        final Runnable invokeAll_a = () -> {
            final Set<Integer> keys = Stream.of(2, 3).collect(Collectors.toCollection(TreeSet::new));
            final IgniteCache<Integer, Integer> ch = IgnitionEx.grid(igniteName).cache(cacheName);
            ch.invokeAll(keys, liveIngestEntryProcessor);
        };
        final Runnable invokeAll_b = () -> {
            final Set<Integer> keys = Stream.of(1, 3).collect(Collectors.toCollection(TreeSet::new));
            final IgniteCache<Integer, Integer> ch = IgnitionEx.grid(igniteName).cache(cacheName);
            ch.invokeAll(keys, liveIngestEntryProcessor);
        };


        //-- Act
        final ExecutorService executorService = Executors.newFixedThreadPool(3);
        final List<Runnable> loaders = Arrays.asList(invokeAll_a, cacheStore, invokeAll_b);
        Collections.shuffle(loaders, ThreadLocalRandom.current()); //Shuffle to mix up thread start times


        //-- Assert
        //Small note: Junit isn't actually capable of terminating the started threads, so this just keep on forever when the deadlock is triggered
        Assertions.assertTimeoutPreemptively(Duration.ofSeconds(10), () -> {
            log.info("Starting loaders ({}/{})..", repetitionInfo.getCurrentRepetition(), repetitionInfo.getTotalRepetitions());
            loaders.forEach(executorService::execute);
        }, "Deadlock encountered?");
    }

    private CacheConfiguration<Integer, Integer> createCacheConfig(int index, CacheLoader<Integer, Integer> cacheLoader) {
        final CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>("foo-" + index);
        cacheCfg.setCacheMode(CacheMode.PARTITIONED);
        cacheCfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
        cacheCfg.setTypes(Integer.class, Integer.class);
        cacheCfg.setReadThrough(true);
        cacheCfg.setCacheLoaderFactory((Factory<CacheLoader<Integer, Integer>>) () -> createCacheLoader(cacheLoader));

        return cacheCfg;
    }

    private CacheLoader<Integer, Integer> createCacheLoader(CacheLoader<Integer, Integer> cacheLoader) {
        return new CacheStore<Integer, Integer>() {
            @Override
            public Integer load(Integer key) throws CacheLoaderException {
                return cacheLoader.load(key);
            }

            @Override
            public Map<Integer, Integer> loadAll(Iterable<? extends Integer> keys) throws CacheLoaderException {
                return cacheLoader.loadAll(keys);
            }

            @Override
            public void loadCache(IgniteBiInClosure<Integer, Integer> clo, @Nullable Object... args) throws CacheLoaderException {
                fail("Unexpected invoke");
            }

            @Override
            public void sessionEnd(boolean commit) throws CacheWriterException {
                fail("Unexpected invoke");
            }

            @Override
            public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) throws CacheWriterException {
                fail("Unexpected invoke");
            }

            @Override
            public void writeAll(Collection<Cache.Entry<? extends Integer, ? extends Integer>> entries) throws CacheWriterException {
                fail("Unexpected invoke");
            }

            @Override
            public void delete(Object key) throws CacheWriterException {
                fail("Unexpected invoke");
            }

            @Override
            public void deleteAll(Collection<?> keys) throws CacheWriterException {
                fail("Unexpected invoke");
            }
        };
    }
}

Relevant log fragment:
11:44:45.443 [main] INFO  com.pchr.ignite.IngestDeadlockTest - Starting loaders..
11:44:45.443 [pool-5-thread-1] DEBUG o.a.i.i.p.cache.GridCacheProcessor - Getting public cache for name: foo-38
11:44:45.443 [pool-5-thread-2] DEBUG o.a.i.i.p.cache.GridCacheProcessor - Getting public cache for name: foo-38
11:44:45.443 [pool-5-thread-3] DEBUG o.a.i.i.p.cache.GridCacheProcessor - Getting public cache for name: foo-38
11:44:45.444 [exchange-worker-#43] INFO  o.a.i.i.p.c.GridCachePartitionExchangeManager - Skipping rebalancing (nothing scheduled) [top=AffinityTopologyVersion [topVer=1, minorTopVer=5], force=false, evt=DISCOVERY_CUSTOM_EVT, node=7d5e209b-1685-4939-ba90-fe35f3d2efac]
11:44:45.444 [exchange-worker-#43] INFO  o.a.i.i.p.c.GridCachePartitionExchangeManager - Skipping rebalancing (nothing scheduled) [top=AffinityTopologyVersion [topVer=1, minorTopVer=5], force=false, evt=DISCOVERY_CUSTOM_EVT, node=7d5e209b-1685-4939-ba90-fe35f3d2efac]
11:44:45.445 [sys-#50] DEBUG o.a.i.i.p.c.GridClosureProcessor - Grid runnable started: closure-proc-worker
11:44:45.445 [sys-#50] DEBUG o.a.i.i.p.c.s.CacheOsStoreManager - <foo-38> Loading values from store for keys: [1, 2]
11:44:45.446 [pool-5-thread-2] DEBUG org.apache.ignite.cache.msg.atomic - Assigned update version [futId=65537, writeVer=GridCacheVersion [topVer=186763484, order=1575283467272, nodeOrder=1]]
11:44:45.446 [pool-5-thread-2] DEBUG o.a.i.i.p.c.s.CacheOsStoreManager - <foo-38> Loading value from store for key [key=2]
11:44:45.450 [sys-#50] INFO  com.pchr.ignite.IngestDeadlockTest - [cacheStore] loadAll: 1, 2
11:44:45.643 [tcp-disco-msg-worker-#2] DEBUG o.a.i.s.d.tcp.TcpDiscoverySpi - Processing message [cls=TcpDiscoveryCustomEventMessage, id=b34e536ce61-7d5e209b-1685-4939-ba90-fe35f3d2efac]
11:44:45.643 [main] DEBUG o.a.i.s.d.tcp.TcpDiscoverySpi - Message has been added to queue: TcpDiscoveryCustomEventMessage [msg=DynamicCacheChangeBatch [id=b34e536ce61-e0f25301-c45f-4a93-a556-e09d229bb80c, reqs=[DynamicCacheChangeRequest [cacheName=foo-38, hasCfg=false, nodeId=7d5e209b-1685-4939-ba90-fe35f3d2efac, clientStartOnly=false, stop=true, destroy=true, disabledAfterStartfalse]], exchangeActions=null, startCaches=false], super=TcpDiscoveryAbstractMessage [sndNodeId=null, id=b34e536ce61-7d5e209b-1685-4939-ba90-fe35f3d2efac, verifierNodeId=null, topVer=0, pendingIdx=0, failedNodes=null, isClient=false]]
11:44:45.643 [tcp-disco-msg-worker-#2] DEBUG o.a.i.s.d.tcp.TcpDiscoverySpi - Pending message has been registered: b34e536ce61-7d5e209b-1685-4939-ba90-fe35f3d2efac
11:44:45.644 [tcp-disco-msg-worker-#2] DEBUG o.a.i.s.d.tcp.TcpDiscoverySpi - Message has been added to queue: TcpDiscoveryDiscardMessage [msgId=b34e536ce61-7d5e209b-1685-4939-ba90-fe35f3d2efac, customMsgDiscard=true, super=TcpDiscoveryAbstractMessage [sndNodeId=null, id=c34e536ce61-7d5e209b-1685-4939-ba90-fe35f3d2efac, verifierNodeId=null, topVer=0, pendingIdx=0, failedNodes=null, isClient=false]]
11:44:45.644 [tcp-disco-msg-worker-#2] DEBUG o.a.i.s.d.tcp.TcpDiscoverySpi - Processing message [cls=TcpDiscoveryDiscardMessage, id=c34e536ce61-7d5e209b-1685-4939-ba90-fe35f3d2efac]
11:44:45.644 [exchange-worker-#43] INFO  o.a.ignite.internal.exchange.time - Started exchange init [topVer=AffinityTopologyVersion [topVer=1, minorTopVer=6], mvccCrd=MvccCoordinator [nodeId=7d5e209b-1685-4939-ba90-fe35f3d2efac, crdVer=1575283483767, topVer=AffinityTopologyVersion [topVer=1, minorTopVer=0]], mvccCrdChange=false, crd=true, evt=DISCOVERY_CUSTOM_EVT, evtNode=7d5e209b-1685-4939-ba90-fe35f3d2efac, customEvt=DynamicCacheChangeBatch [id=b34e536ce61-e0f25301-c45f-4a93-a556-e09d229bb80c, reqs=[DynamicCacheChangeRequest [cacheName=foo-38, hasCfg=false, nodeId=7d5e209b-1685-4939-ba90-fe35f3d2efac, clientStartOnly=false, stop=true, destroy=false, disabledAfterStartfalse]], exchangeActions=ExchangeActions [startCaches=null, stopCaches=[foo-38], startGrps=[], stopGrps=[foo-38, destroy=true], resetParts=null, stateChangeRequest=null], startCaches=false], allowMerge=false]
11:44:45.644 [srvc-deploy-#44] DEBUG o.a.i.i.p.c.q.GridCacheDistributedQueryManager - <ignite-sys-cache> Running local SCAN query: GridCacheQueryAdapter [type=SCAN, clsName=null, clause=null, filter=ServiceDeploymentPredicate [], transform=null, part=null, incMeta=false, metrics=GridCacheQueryMetricsAdapter [minTime=9223372036854775807, maxTime=0, sumTime=0, avgTime=0.0, execs=0, completed=0, fails=0], pageSize=1024, timeout=0, incBackups=false, forceLocal=false, dedup=false, prj=org.apache.ignite.internal.cluster.ClusterGroupAdapter@47fc2706, keepBinary=false, subjId=7d5e209b-1685-4939-ba90-fe35f3d2efac, taskHash=0, mvccSnapshot=null]
11:44:45.644 [exchange-worker-#43] INFO  o.a.i.i.p.c.CacheAffinitySharedManager - Updating caches registry performed in 0 ms.
11:44:45.644 [srvc-deploy-#44] DEBUG o.a.i.i.p.r.GridResourceProcessor - Injecting resources [target=ServiceDeploymentPredicate []]
11:44:45.644 [exchange-worker-#43] INFO  o.a.i.i.p.c.CacheAffinitySharedManager - Caches starting performed in 0 ms.
11:44:45.644 [exchange-worker-#43] INFO  o.a.i.i.p.c.CacheAffinitySharedManager - Affinity initialization for started caches performed in 0 ms.
11:44:45.644 [srvc-deploy-#44] DEBUG o.a.i.i.p.c.q.GridCacheDistributedQueryManager - <ignite-sys-cache> Running local SCAN query: GridCacheQueryAdapter [type=SCAN, clsName=null, clause=null, filter=ServiceAssignmentsPredicate [], transform=null, part=null, incMeta=false, metrics=GridCacheQueryMetricsAdapter [minTime=9223372036854775807, maxTime=0, sumTime=0, avgTime=0.0, execs=0, completed=0, fails=0], pageSize=1024, timeout=0, incBackups=false, forceLocal=false, dedup=false, prj=org.apache.ignite.internal.cluster.ClusterGroupAdapter@447c9c19, keepBinary=false, subjId=7d5e209b-1685-4939-ba90-fe35f3d2efac, taskHash=0, mvccSnapshot=null]
11:44:45.644 [srvc-deploy-#44] DEBUG o.a.i.i.p.r.GridResourceProcessor - Injecting resources [target=ServiceAssignmentsPredicate []]
11:44:45.714 [grid-timeout-worker-#23] DEBUG o.a.i.i.p.t.GridTimeoutProcessor - Timeout has occurred [obj=CancelableTask [id=d04e536ce61-e0f25301-c45f-4a93-a556-e09d229bb80c, endTime=1575283485703, period=3000, cancel=false, task=MetricsUpdater [prevGcTime=5, prevCpuTime=1888, super=org.apache.ignite.internal.managers.discovery.GridDiscoveryManager$MetricsUpdater@7e7e7249]], process=true]

Dump of relevant threads:
"sys-#50" #98 prio=5 os_prio=0 tid=0x000000002a5d4800 nid=0x1ae8 waiting on condition [0x000000004a30e000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000007170c1b10> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
        at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
        at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
        at org.apache.ignite.internal.processors.cache.GridCacheMapEntry.lockEntry(GridCacheMapEntry.java:4967)
        at org.apache.ignite.internal.processors.cache.GridCacheMapEntry.obsolete(GridCacheMapEntry.java:2874)
        at org.apache.ignite.internal.processors.cache.GridCacheConcurrentMapImpl.putEntryIfObsoleteOrAbsent(GridCacheConcurrentMapImpl.java:124)
        at org.apache.ignite.internal.processors.cache.GridCacheConcurrentMapImpl.putEntryIfObsoleteOrAbsent(GridCacheConcurrentMapImpl.java:69)
        at org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedConcurrentMap.putEntryIfObsoleteOrAbsent(GridCachePartitionedConcurrentMap.java:94)
        at org.apache.ignite.internal.processors.cache.GridCacheAdapter.entryEx(GridCacheAdapter.java:1008)
        at org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter.entryEx(GridDhtCacheAdapter.java:544)
        at org.apache.ignite.internal.processors.cache.GridCacheAdapter.entryEx(GridCacheAdapter.java:999)
        at org.apache.ignite.internal.processors.cache.GridCacheAdapter$18$1.apply(GridCacheAdapter.java:2145)
        at org.apache.ignite.internal.processors.cache.GridCacheAdapter$18$1.apply(GridCacheAdapter.java:2120)
        at org.apache.ignite.internal.processors.cache.store.GridCacheStoreManagerAdapter$2.apply(GridCacheStoreManagerAdapter.java:467)
        at org.apache.ignite.internal.processors.cache.CacheStoreBalancingWrapper.loadAll(CacheStoreBalancingWrapper.java:177)
        at org.apache.ignite.internal.processors.cache.store.GridCacheStoreManagerAdapter.loadAllFromStore(GridCacheStoreManagerAdapter.java:487)
        at org.apache.ignite.internal.processors.cache.store.GridCacheStoreManagerAdapter.loadAll(GridCacheStoreManagerAdapter.java:400)
        at org.apache.ignite.internal.processors.cache.GridCacheAdapter$18.call(GridCacheAdapter.java:2120)
        at org.apache.ignite.internal.processors.cache.GridCacheAdapter$18.call(GridCacheAdapter.java:2118)
        at org.apache.ignite.internal.processors.cache.GridCacheContext$3.call(GridCacheContext.java:1444)
        at org.apache.ignite.internal.util.IgniteUtils.wrapThreadLoader(IgniteUtils.java:6820)
        at org.apache.ignite.internal.processors.closure.GridClosureProcessor$2.body(GridClosureProcessor.java:967)
        at org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:120)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - <0x00000007170c1df0> (a java.util.concurrent.ThreadPoolExecutor$Worker)

"pool-5-thread-3" #97 prio=5 os_prio=0 tid=0x000000002a5d3800 nid=0x79d4 waiting on condition [0x0000000049b2e000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:304)
        at org.apache.ignite.internal.util.future.GridFutureAdapter.get0(GridFutureAdapter.java:178)
        at org.apache.ignite.internal.util.future.GridFutureAdapter.get(GridFutureAdapter.java:141)
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.getAll0(GridDhtAtomicCache.java:501)
        at org.apache.ignite.internal.processors.cache.GridCacheAdapter.getAll(GridCacheAdapter.java:1549)
        at org.apache.ignite.internal.processors.cache.IgniteCacheProxyImpl.getAll(IgniteCacheProxyImpl.java:970)
        at org.apache.ignite.internal.processors.cache.GatewayProtectedCacheProxy.getAll(GatewayProtectedCacheProxy.java:688)
        at com.pchr.ignite.IngestDeadlockTest.lambda$deadlock_shuffledRandomOrder$0(IngestDeadlockTest.java:132)
        at com.pchr.ignite.IngestDeadlockTest$$Lambda$329/1526866775.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - <0x000000071659fd40> (a java.util.concurrent.ThreadPoolExecutor$Worker)

"pool-5-thread-2" #96 prio=5 os_prio=0 tid=0x000000002a5d6000 nid=0x707c waiting on condition [0x000000004866d000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:304)
        at org.apache.ignite.internal.util.future.GridFutureAdapter.get0(GridFutureAdapter.java:178)
        at org.apache.ignite.internal.util.future.GridFutureAdapter.get(GridFutureAdapter.java:141)
        at org.apache.ignite.internal.processors.cache.CacheStoreBalancingWrapper$LoadFuture.get(CacheStoreBalancingWrapper.java:310)
        at org.apache.ignite.internal.processors.cache.CacheStoreBalancingWrapper.load(CacheStoreBalancingWrapper.java:84)
        at org.apache.ignite.internal.processors.cache.store.GridCacheStoreManagerAdapter.loadFromStore(GridCacheStoreManagerAdapter.java:327)
        at org.apache.ignite.internal.processors.cache.store.GridCacheStoreManagerAdapter.load(GridCacheStoreManagerAdapter.java:293)
        at org.apache.ignite.internal.processors.cache.GridCacheMapEntry$AtomicCacheUpdateClosure.call(GridCacheMapEntry.java:5952)
        at org.apache.ignite.internal.processors.cache.GridCacheMapEntry$AtomicCacheUpdateClosure.call(GridCacheMapEntry.java:5782)
        at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree$Invoke.invokeClosure(BPlusTree.java:3722)
        at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree$Invoke.access$5900(BPlusTree.java:3616)
        at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.invokeDown(BPlusTree.java:1898)
        at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.invoke(BPlusTree.java:1782)
        at org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl$CacheDataStoreImpl.invoke0(IgniteCacheOffheapManagerImpl.java:1642)
        at org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl$CacheDataStoreImpl.invoke(IgniteCacheOffheapManagerImpl.java:1625)
        at org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl.invoke(IgniteCacheOffheapManagerImpl.java:428)
        at org.apache.ignite.internal.processors.cache.GridCacheMapEntry.innerUpdate(GridCacheMapEntry.java:2295)
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.updateSingle(GridDhtAtomicCache.java:2494)
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.update(GridDhtAtomicCache.java:1951)
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.updateAllAsyncInternal0(GridDhtAtomicCache.java:1780)
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.updateAllAsyncInternal(GridDhtAtomicCache.java:1668)
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateFuture.sendSingleRequest(GridNearAtomicAbstractUpdateFuture.java:299)
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateFuture.map(GridNearAtomicUpdateFuture.java:812)
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateFuture.mapOnTopology(GridNearAtomicUpdateFuture.java:664)
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateFuture.map(GridNearAtomicAbstractUpdateFuture.java:248)
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.updateAll0(GridDhtAtomicCache.java:1105)
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.invokeAll0(GridDhtAtomicCache.java:911)
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.invokeAll(GridDhtAtomicCache.java:798)
        at org.apache.ignite.internal.processors.cache.IgniteCacheProxyImpl.invokeAll(IgniteCacheProxyImpl.java:1551)
        at org.apache.ignite.internal.processors.cache.GatewayProtectedCacheProxy.invokeAll(GatewayProtectedCacheProxy.java:1276)
        at com.pchr.ignite.IngestDeadlockTest.lambda$deadlock_shuffledRandomOrder$1(IngestDeadlockTest.java:137)
        at com.pchr.ignite.IngestDeadlockTest$$Lambda$330/1632300236.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - <0x000000071659fa00> (a java.util.concurrent.ThreadPoolExecutor$Worker)
        - <0x00000007170c1b10> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
        - <0x00000007170ff7d8> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)

"pool-5-thread-1" #95 prio=5 os_prio=0 tid=0x0000000028500800 nid=0x43a8 waiting on condition [0x00000000483de000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000007170ff7d8> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
        at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
        at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
        at org.apache.ignite.internal.processors.cache.GridCacheMapEntry.lockEntry(GridCacheMapEntry.java:4967)
        at org.apache.ignite.internal.processors.cache.GridCacheMapEntry.obsolete(GridCacheMapEntry.java:2874)
        at org.apache.ignite.internal.processors.cache.GridCacheConcurrentMapImpl.putEntryIfObsoleteOrAbsent(GridCacheConcurrentMapImpl.java:124)
        at org.apache.ignite.internal.processors.cache.GridCacheConcurrentMapImpl.putEntryIfObsoleteOrAbsent(GridCacheConcurrentMapImpl.java:69)
        at org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedConcurrentMap.putEntryIfObsoleteOrAbsent(GridCachePartitionedConcurrentMap.java:94)
        at org.apache.ignite.internal.processors.cache.GridCacheAdapter.entryEx(GridCacheAdapter.java:1008)
        at org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter.entryEx(GridDhtCacheAdapter.java:544)
        at org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter.entryExx(GridDhtCacheAdapter.java:564)
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.lockEntries(GridDhtAtomicCache.java:2917)
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.updateAllAsyncInternal0(GridDhtAtomicCache.java:1746)
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.updateAllAsyncInternal(GridDhtAtomicCache.java:1668)
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateFuture.sendSingleRequest(GridNearAtomicAbstractUpdateFuture.java:299)
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateFuture.map(GridNearAtomicUpdateFuture.java:812)
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateFuture.mapOnTopology(GridNearAtomicUpdateFuture.java:664)
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateFuture.map(GridNearAtomicAbstractUpdateFuture.java:248)
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.updateAll0(GridDhtAtomicCache.java:1105)
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.invokeAll0(GridDhtAtomicCache.java:911)
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.invokeAll(GridDhtAtomicCache.java:798)
        at org.apache.ignite.internal.processors.cache.IgniteCacheProxyImpl.invokeAll(IgniteCacheProxyImpl.java:1551)
        at org.apache.ignite.internal.processors.cache.GatewayProtectedCacheProxy.invokeAll(GatewayProtectedCacheProxy.java:1276)
        at com.pchr.ignite.IngestDeadlockTest.lambda$deadlock_shuffledRandomOrder$2(IngestDeadlockTest.java:142)
        at com.pchr.ignite.IngestDeadlockTest$$Lambda$331/1360440329.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - <0x000000071659f670> (a java.util.concurrent.ThreadPoolExecutor$Worker)


--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
ilya.kasnacheev ilya.kasnacheev
Reply | Threaded
Open this post in threaded view
|

Re: Deadlock on concurrent calls to getAll and invokeAll on cache with read-through

Hello!

Unfortunately you seem to be right and this is an issue.

I have filed a ticket https://issues.apache.org/jira/browse/IGNITE-12425 to track it.

Regards,
--
Ilya Kasnacheev


пн, 2 дек. 2019 г. в 16:33, peter108418 <[hidden email]>:
Hi

We have recently started to encounter what appears to be deadlocks on one of
our new clusters. We believe it may be due to "data patterns" being slightly
different and more dense than our other existing (working) production
clusters. We have some workarounds, but we think this might be an issue with
Ignite. Hopefully someone is able to narrow down the cause further? :)

Firstly, I'll describe the issues that we are seeing, and how to reproduce.
Then I'll try explain what we are trying to accomplish, maybe there is a
better solution to our problem?

*The problem:*
We have encountered an odd deadlock issue when, on the same cache where
read-through is enabled, concurrently calls are made to "getAll" and
"invokeAll". We are sorting the keys similarly across both calls.
Replacing one "side" with either multiple "get"s, or multiple "invoke"s,
seems to fix the problem, but performance is worse.

I have created a test case that can reproduce it. The test creates
- 1 thread doing a getAll({1, 2}),
- 2 threads doing an invokeAll({2, 3}) and an invokeAll({1, 3})

These 3 threads are executed, and may or may not end up in a deadlock,
usually the test case captures the deadlock state before 50 repetitions.
Please see attached sample maven project to reproduce:
https://drive.google.com/open?id=1GJ78dsulJ0XG-erNkN_vm3ordKr0nqS6
Run with "mvn clean test"

I have also posted the test code, (partial) log output and (partial)
stacktrace below.


*What we are trying to do:*
I believe our use-case to be fairly "normal"? We use Ignite as a
cache-layer, with a read-through adapter to a backing data-store. As data
continuously enters the backing data-store, we have a service that keeps the
Ignite cache up-to-date.

We have a large amount of historical data, going years back. The backing
data-store is the "master", we are not using Ignite Persistence. We use
Ignite as a cache layer as typically we recalculate on the same data
multiple times. We key our data by time chunks, where the "value" is a
container/collection of records within the time-range defined by the key.
We decided to go with an IgniteCache with read-through enabled to automate
cache-loading. To reduce the number of queries against the data-store, we
usually call "getAll" on the cache, as the resulting set of keys provided to
the CacheStore.loadAll can often be merged into a smaller number of queries
(example: joining time-ranges "08:00:00 - 08:15:00" and "08:15:00 -
08:30:00" to larger single time-range "08:00:00 - 08:30:00").

As we continuously load new data into the backing data-store, entries in
Ignite become inconsistent with the data-store, especially those around
"now" but out-of-order records also occur.
To handle this, we have a separate Ignite Service that fetches new records
from the data-store and updates the Ignite Cache using invokeAll and an
entry-processor.
Our reasoning here is to only forward the "new" records (in the scale of 10s
of records) and merged them into the container (containing 1000s of records)
"locally", instead of "getting" the container, merging and then "putting"
which would transfer a large amount of data back and forth.





Relevant log fragment:


Dump of relevant threads:





--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
peter108418 peter108418
Reply | Threaded
Open this post in threaded view
|

Re: Deadlock on concurrent calls to getAll and invokeAll on cache with read-through

Thank you for looking into the issue, good to hear you were able to reproduce
it.



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
ilya.kasnacheev ilya.kasnacheev
Reply | Threaded
Open this post in threaded view
|

Re: Deadlock on concurrent calls to getAll and invokeAll on cache with read-through

Hello!

It seems to me the issue is fixed in our master branch and the upcoming 2.8, so maybe you should just wait for the nearest available RC and use that if this scenario is important for you.

I did not bisect for a fixing commit but you can do that if you like.

Regards,
--
Ilya Kasnacheev


вт, 10 дек. 2019 г. в 15:29, peter108418 <[hidden email]>:
Thank you for looking into the issue, good to hear you were able to reproduce
it.



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/