Cache spreading to new nodes

classic Classic list List threaded Threaded
6 messages Options
Nythrad Nythrad
Reply | Threaded
Open this post in threaded view
|

Cache spreading to new nodes

I have a set of nodes, and I want to be able to set a cache in specific nodes. It works, but whenever I turn on a new node the cache is automatically spread to that node, which then causes errors like:
Failed over job to a new node ( I guess that there was a computation going on in a node that shouldn't have computed that, and was shut down in the meantime).

I don't know if I'm doing something wrong here or I'm missing something.
As I understand it, NodeFilter and Affinity are equivalent in my case (Affinity is a node filter which also creates rules on where can the cache spread from a given node?). With rebalance mode set to NONE, shouldn't the cache be spread on the "nodesForOptimization" nodes, according to either the node filter or the affinityFunction?

Here's my code:

List<UUID> nodesForOptimization = fetchNodes();

CacheConfiguration<String, Graph> graphCfg = new CacheConfiguration<>(graphCacheName);
graphCfg = graphCfg.setCacheMode(CacheMode.REPLICATED)
            .setBackups(nodesForOptimization.size() - 1)
            .setAtomicityMode(CacheAtomicityMode.ATOMIC)
            .setRebalanceMode(CacheRebalanceMode.NONE)
            .setStoreKeepBinary(true)
            .setCopyOnRead(false)
            .setOnheapCacheEnabled(false)
            .setNodeFilter(u -> nodesForOptimization.contains(u.id()))
            .setAffinity(
                new RendezvousAffinityFunction(
                    1024,
                    (c1, c2) -> nodesForOptimization.contains(c1.id()) && nodesForOptimization.contains(c2.id())
                )
            )
            .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
aealexsandrov aealexsandrov
Reply | Threaded
Open this post in threaded view
|

Re: Cache spreading to new nodes

Hi,

Could you share the whole reproducer with all configurations and required methods?

BR,
Andrei

8/12/2019 4:48 PM, Marco Bernagozzi пишет:
I have a set of nodes, and I want to be able to set a cache in specific nodes. It works, but whenever I turn on a new node the cache is automatically spread to that node, which then causes errors like:
Failed over job to a new node ( I guess that there was a computation going on in a node that shouldn't have computed that, and was shut down in the meantime).

I don't know if I'm doing something wrong here or I'm missing something.
As I understand it, NodeFilter and Affinity are equivalent in my case (Affinity is a node filter which also creates rules on where can the cache spread from a given node?). With rebalance mode set to NONE, shouldn't the cache be spread on the "nodesForOptimization" nodes, according to either the node filter or the affinityFunction?

Here's my code:

List<UUID> nodesForOptimization = fetchNodes();

CacheConfiguration<String, Graph> graphCfg = new CacheConfiguration<>(graphCacheName);
graphCfg = graphCfg.setCacheMode(CacheMode.REPLICATED)
            .setBackups(nodesForOptimization.size() - 1)
            .setAtomicityMode(CacheAtomicityMode.ATOMIC)
            .setRebalanceMode(CacheRebalanceMode.NONE)
            .setStoreKeepBinary(true)
            .setCopyOnRead(false)
            .setOnheapCacheEnabled(false)
            .setNodeFilter(u -> nodesForOptimization.contains(u.id()))
            .setAffinity(
                new RendezvousAffinityFunction(
                    1024,
                    (c1, c2) -> nodesForOptimization.contains(c1.id()) && nodesForOptimization.contains(c2.id())
                )
            )
            .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
Nythrad Nythrad
Reply | Threaded
Open this post in threaded view
|

Re: Cache spreading to new nodes

Hi, I did some more digging and discovered that the issue seems to be: 

org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture: Completed partition exchange 

Is there any way to disable or limit the partition exchange? 

Best, 
Marco 

On Mon, 12 Aug 2019 at 16:59, Andrei Aleksandrov <[hidden email]> wrote:

Hi,

Could you share the whole reproducer with all configurations and required methods?

BR,
Andrei

8/12/2019 4:48 PM, Marco Bernagozzi пишет:
I have a set of nodes, and I want to be able to set a cache in specific nodes. It works, but whenever I turn on a new node the cache is automatically spread to that node, which then causes errors like:
Failed over job to a new node ( I guess that there was a computation going on in a node that shouldn't have computed that, and was shut down in the meantime).

I don't know if I'm doing something wrong here or I'm missing something.
As I understand it, NodeFilter and Affinity are equivalent in my case (Affinity is a node filter which also creates rules on where can the cache spread from a given node?). With rebalance mode set to NONE, shouldn't the cache be spread on the "nodesForOptimization" nodes, according to either the node filter or the affinityFunction?

Here's my code:

List<UUID> nodesForOptimization = fetchNodes();

CacheConfiguration<String, Graph> graphCfg = new CacheConfiguration<>(graphCacheName);
graphCfg = graphCfg.setCacheMode(CacheMode.REPLICATED)
            .setBackups(nodesForOptimization.size() - 1)
            .setAtomicityMode(CacheAtomicityMode.ATOMIC)
            .setRebalanceMode(CacheRebalanceMode.NONE)
            .setStoreKeepBinary(true)
            .setCopyOnRead(false)
            .setOnheapCacheEnabled(false)
            .setNodeFilter(u -> nodesForOptimization.contains(u.id()))
            .setAffinity(
                new RendezvousAffinityFunction(
                    1024,
                    (c1, c2) -> nodesForOptimization.contains(c1.id()) && nodesForOptimization.contains(c2.id())
                )
            )
            .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
Denis Mekhanikov Denis Mekhanikov
Reply | Threaded
Open this post in threaded view
|

Re: Cache spreading to new nodes

Marco,

Rebalance mode set to NONE means that your cache won’t be rebalanced at all unless you trigger it manually.
I think, it’s better not to set it, because otherwise if you don’t trigger the rebalance, then only one node will store the cache.

Also the backup filter specified in the affinity function doesn’t seem correct to me. It’s always true, since your node filter accepts only those nodes, that are in the nodesForOptimization list.

What does fetchNodes() method do?
The recommended way to implement node filters is to check custom node’s attributes using an AttributeNodeFilter.

Partition map exchange is a process that happens after every topology change. Nodes exchange information about partitions distribution of caches. So, you can’t prevent it from happening.
The message, that you see is a symptom and not a cause.

Denis


On 13 Aug 2019, at 09:50, Marco Bernagozzi <[hidden email]> wrote:

Hi, I did some more digging and discovered that the issue seems to be: 

org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture: Completed partition exchange 

Is there any way to disable or limit the partition exchange? 

Best, 
Marco 

On Mon, 12 Aug 2019 at 16:59, Andrei Aleksandrov <[hidden email]> wrote:
Hi,

Could you share the whole reproducer with all configurations and required methods?

BR,
Andrei

8/12/2019 4:48 PM, Marco Bernagozzi пишет:
I have a set of nodes, and I want to be able to set a cache in specific nodes. It works, but whenever I turn on a new node the cache is automatically spread to that node, which then causes errors like: 
Failed over job to a new node ( I guess that there was a computation going on in a node that shouldn't have computed that, and was shut down in the meantime). 

I don't know if I'm doing something wrong here or I'm missing something. 
As I understand it, NodeFilter and Affinity are equivalent in my case (Affinity is a node filter which also creates rules on where can the cache spread from a given node?). With rebalance mode set to NONE, shouldn't the cache be spread on the "nodesForOptimization" nodes, according to either the node filter or the affinityFunction? 

Here's my code: 

List<UUID> nodesForOptimization = fetchNodes(); 

CacheConfiguration<String, Graph> graphCfg = new CacheConfiguration<>(graphCacheName); 
graphCfg = graphCfg.setCacheMode(CacheMode.REPLICATED) 
            .setBackups(nodesForOptimization.size() - 1) 
            .setAtomicityMode(CacheAtomicityMode.ATOMIC) 
            .setRebalanceMode(CacheRebalanceMode.NONE) 
            .setStoreKeepBinary(true) 
            .setCopyOnRead(false) 
            .setOnheapCacheEnabled(false) 
            .setNodeFilter(u -> nodesForOptimization.contains(u.id())) 
            .setAffinity( 
                new RendezvousAffinityFunction( 
                    1024, 
                    (c1, c2) -> nodesForOptimization.contains(c1.id()) && nodesForOptimization.contains(c2.id()) 
                ) 
            ) 
            .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);

Nythrad Nythrad
Reply | Threaded
Open this post in threaded view
|

Re: Cache spreading to new nodes

Hi,
Sorry, tearing down the project to make a runnable proved to be a much bigger project than expected. I eventually managed, and the outcome is:
I used to call:
List<String> cacheNames = new ArrayList<>();
        ignite.cacheNames().forEach(
            n -> {
                if (!n.equals("settingsCache")) {
                    ignite.cache(n).localEntries(CachePeekMode.ALL).iterator().forEachRemaining(a -> cacheNames.add(a.getKey().toString()));
                }
            }
        );
to check the local caches, which apparently creates a local copy of the cache in the machine (!?).
Now, I replaced it with:
List<String> cacheNames = new ArrayList<>();
        UUID localId = ignite.cluster().localNode().id();
        ignite.cacheNames().forEach(
            cache -> {
                if (!cache.equals("settingsCache")) {
                    boolean containsCache = ignite.cluster().forCacheNodes(cache).nodes().stream()
                        .anyMatch(n -> n.id().equals(localId));
                    if (containsCache) {
                        cacheNames.add(cache);
                    }
                }
            }
        );

And the issue disapeared. Is this an intended behaviour? Because it looks weird to me.

To reply to:
"I think, it’s better not to set it, because otherwise if you don’t trigger the rebalance, then only one node will store the cache."
With the configuration I posted you, the cache is spread out to the machines that I use in the setNodeFilter().

 Yes, I believe you're correct with the NodeFilter. It should be pointless to have now, right? That was me experimenting and trying to figure out why was the cache spreading to new nodes.

fetchNodes() fetches the ids of the local node and the k most empty nodes ( where k is given as an input for each cache). I check how full a node is based on the code right above, in which I check how many caches a node has.

Yes, I read that I should have set the attributes. However, now it feels like an unnecessary step? What would that improve, in my case?

 And yes, it makes sense now! Thanks for the clarification. I thought that the rebalancing was rebalancing something in an uncontrolled way, but turns out everything was due to my  ignite.cache(n).localEntries(CachePeekMode.ALL) creating a local cache.

I have just one question: you called it "backup filter". Is the nodeFilter a filter for only backup nodes or was that a typo? I thought it was a filter for all the nodes for a cache.

On Wed, 14 Aug 2019 at 17:58, Denis Mekhanikov <[hidden email]> wrote:
Marco,

Rebalance mode set to NONE means that your cache won’t be rebalanced at all unless you trigger it manually.
I think, it’s better not to set it, because otherwise if you don’t trigger the rebalance, then only one node will store the cache.

Also the backup filter specified in the affinity function doesn’t seem correct to me. It’s always true, since your node filter accepts only those nodes, that are in the nodesForOptimization list.

What does fetchNodes() method do?
The recommended way to implement node filters is to check custom node’s attributes using an AttributeNodeFilter.

Partition map exchange is a process that happens after every topology change. Nodes exchange information about partitions distribution of caches. So, you can’t prevent it from happening.
The message, that you see is a symptom and not a cause.

Denis


On 13 Aug 2019, at 09:50, Marco Bernagozzi <[hidden email]> wrote:

Hi, I did some more digging and discovered that the issue seems to be: 

org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture: Completed partition exchange 

Is there any way to disable or limit the partition exchange? 

Best, 
Marco 

On Mon, 12 Aug 2019 at 16:59, Andrei Aleksandrov <[hidden email]> wrote:
Hi,

Could you share the whole reproducer with all configurations and required methods?

BR,
Andrei

8/12/2019 4:48 PM, Marco Bernagozzi пишет:
I have a set of nodes, and I want to be able to set a cache in specific nodes. It works, but whenever I turn on a new node the cache is automatically spread to that node, which then causes errors like: 
Failed over job to a new node ( I guess that there was a computation going on in a node that shouldn't have computed that, and was shut down in the meantime). 

I don't know if I'm doing something wrong here or I'm missing something. 
As I understand it, NodeFilter and Affinity are equivalent in my case (Affinity is a node filter which also creates rules on where can the cache spread from a given node?). With rebalance mode set to NONE, shouldn't the cache be spread on the "nodesForOptimization" nodes, according to either the node filter or the affinityFunction? 

Here's my code: 

List<UUID> nodesForOptimization = fetchNodes(); 

CacheConfiguration<String, Graph> graphCfg = new CacheConfiguration<>(graphCacheName); 
graphCfg = graphCfg.setCacheMode(CacheMode.REPLICATED) 
            .setBackups(nodesForOptimization.size() - 1) 
            .setAtomicityMode(CacheAtomicityMode.ATOMIC) 
            .setRebalanceMode(CacheRebalanceMode.NONE) 
            .setStoreKeepBinary(true) 
            .setCopyOnRead(false) 
            .setOnheapCacheEnabled(false) 
            .setNodeFilter(u -> nodesForOptimization.contains(u.id())) 
            .setAffinity( 
                new RendezvousAffinityFunction( 
                    1024, 
                    (c1, c2) -> nodesForOptimization.contains(c1.id()) && nodesForOptimization.contains(c2.id()) 
                ) 
            ) 
            .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);

Denis Mekhanikov Denis Mekhanikov
Reply | Threaded
Open this post in threaded view
|

Re: Cache spreading to new nodes

Marco,

IgniteCache.localEntries.iterator() will iterate over all entries in the cache on a local node. So, it doesn't iterate over caches, but over entries in one cache instead.
It brings entries from off-heap to heap, so data is duplicated during iteration. But no “local cache” is created. Entries are just brought to heap which can be heavy for a garbage collector.

> Yes, I read that I should have set the attributes. However, now it feels like an unnecessary step? What would that improve, in my case?

Node filters should be stateless and return the same entries on all nodes. So, make sure, that it’s impossible that this node filter acts differently on different nodes.
Using an attribute-based node filter is a safe way to choose nodes for caches since such filter is guaranteed to work identically on every node.

> I have just one question: you called it "backup filter". Is the nodeFilter a filter for only backup nodes or was that a typo? I thought it was a filter for all the nodes for a cache.

Backup filter and node filter are different things. 
The one that you specify using CacheConfiguration#setNodeFilter() is used to choose nodes, where a cache should be stored.

On the other hand, backupFilter is a property of RendezvousAffinityFunction. It can be used to choose where backup partitions should be stored based on a location of a primary partition. A possible use-case for it is making primary and backup partitions be stored on different racks in a datacenter. 
As far as I can see, you don’t need this one.

Denis
On 15 Aug 2019, at 10:05, Marco Bernagozzi <[hidden email]> wrote:

Hi,
Sorry, tearing down the project to make a runnable proved to be a much bigger project than expected. I eventually managed, and the outcome is:
I used to call:
List<String> cacheNames = new ArrayList<>();
        ignite.cacheNames().forEach(
            n -> {
                if (!n.equals("settingsCache")) {
                    ignite.cache(n).localEntries(CachePeekMode.ALL).iterator().forEachRemaining(a -> cacheNames.add(a.getKey().toString()));
                }
            }
        );
to check the local caches, which apparently creates a local copy of the cache in the machine (!?).
Now, I replaced it with:
List<String> cacheNames = new ArrayList<>();
        UUID localId = ignite.cluster().localNode().id();
        ignite.cacheNames().forEach(
            cache -> {
                if (!cache.equals("settingsCache")) {
                    boolean containsCache = ignite.cluster().forCacheNodes(cache).nodes().stream()
                        .anyMatch(n -> n.id().equals(localId));
                    if (containsCache) {
                        cacheNames.add(cache);
                    }
                }
            }
        );

And the issue disapeared. Is this an intended behaviour? Because it looks weird to me.

To reply to:
"I think, it’s better not to set it, because otherwise if you don’t trigger the rebalance, then only one node will store the cache."
With the configuration I posted you, the cache is spread out to the machines that I use in the setNodeFilter().

 Yes, I believe you're correct with the NodeFilter. It should be pointless to have now, right? That was me experimenting and trying to figure out why was the cache spreading to new nodes.

fetchNodes() fetches the ids of the local node and the k most empty nodes ( where k is given as an input for each cache). I check how full a node is based on the code right above, in which I check how many caches a node has.

Yes, I read that I should have set the attributes. However, now it feels like an unnecessary step? What would that improve, in my case?

 And yes, it makes sense now! Thanks for the clarification. I thought that the rebalancing was rebalancing something in an uncontrolled way, but turns out everything was due to my  ignite.cache(n).localEntries(CachePeekMode.ALL) creating a local cache.

I have just one question: you called it "backup filter". Is the nodeFilter a filter for only backup nodes or was that a typo? I thought it was a filter for all the nodes for a cache.

On Wed, 14 Aug 2019 at 17:58, Denis Mekhanikov <[hidden email]> wrote:
Marco,

Rebalance mode set to NONE means that your cache won’t be rebalanced at all unless you trigger it manually.
I think, it’s better not to set it, because otherwise if you don’t trigger the rebalance, then only one node will store the cache.

Also the backup filter specified in the affinity function doesn’t seem correct to me. It’s always true, since your node filter accepts only those nodes, that are in the nodesForOptimization list.

What does fetchNodes() method do?
The recommended way to implement node filters is to check custom node’s attributes using an AttributeNodeFilter.

Partition map exchange is a process that happens after every topology change. Nodes exchange information about partitions distribution of caches. So, you can’t prevent it from happening.
The message, that you see is a symptom and not a cause.

Denis


On 13 Aug 2019, at 09:50, Marco Bernagozzi <[hidden email]> wrote:

Hi, I did some more digging and discovered that the issue seems to be: 

org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture: Completed partition exchange 

Is there any way to disable or limit the partition exchange? 

Best, 
Marco 

On Mon, 12 Aug 2019 at 16:59, Andrei Aleksandrov <[hidden email]> wrote:
Hi,

Could you share the whole reproducer with all configurations and required methods?

BR,
Andrei

8/12/2019 4:48 PM, Marco Bernagozzi пишет:
I have a set of nodes, and I want to be able to set a cache in specific nodes. It works, but whenever I turn on a new node the cache is automatically spread to that node, which then causes errors like: 
Failed over job to a new node ( I guess that there was a computation going on in a node that shouldn't have computed that, and was shut down in the meantime). 

I don't know if I'm doing something wrong here or I'm missing something. 
As I understand it, NodeFilter and Affinity are equivalent in my case (Affinity is a node filter which also creates rules on where can the cache spread from a given node?). With rebalance mode set to NONE, shouldn't the cache be spread on the "nodesForOptimization" nodes, according to either the node filter or the affinityFunction? 

Here's my code: 

List<UUID> nodesForOptimization = fetchNodes(); 

CacheConfiguration<String, Graph> graphCfg = new CacheConfiguration<>(graphCacheName); 
graphCfg = graphCfg.setCacheMode(CacheMode.REPLICATED) 
            .setBackups(nodesForOptimization.size() - 1) 
            .setAtomicityMode(CacheAtomicityMode.ATOMIC) 
            .setRebalanceMode(CacheRebalanceMode.NONE) 
            .setStoreKeepBinary(true) 
            .setCopyOnRead(false) 
            .setOnheapCacheEnabled(false) 
            .setNodeFilter(u -> nodesForOptimization.contains(u.id())) 
            .setAffinity( 
                new RendezvousAffinityFunction( 
                    1024, 
                    (c1, c2) -> nodesForOptimization.contains(c1.id()) && nodesForOptimization.contains(c2.id()) 
                ) 
            ) 
            .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);