resetLostPartitions is blocked inside event listener

classic Classic list List threaded Threaded
5 messages Options
akash shinde akash shinde
Reply | Threaded
Open this post in threaded view
|

resetLostPartitions is blocked inside event listener

Hi,
I am trying to handle lost partition scenario.
I have written event listener listening  to EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST event.
I want to reset lost partition state of cache after cache loading  is done.
Issue: ignite.resetLostPartitions(caheName) is getting blocked and not completing.

Please find the code for Event Listener. Someone can help on this. Why this resetLostPartitions getting blocked.
public class IgniteEventListner implements IgnitePredicate<CacheRebalancingEvent> {
private static final Logger LOGGER = LoggerFactory.getLogger(IgniteEventListner.class);

private final Ignite ignite;

public IgniteEventListner(Ignite ignite) {
this.ignite = ignite;
}

@Override
public boolean apply(CacheRebalancingEvent evt) {

IgniteCache<DefaultDataAffinityKey, AssetGroupData> cache = ignite.getOrCreateCache(CacheName.ASSET_GROUP_CACHE.name());
Collection<Integer> lostPartitions = cache.lostPartitions();
reloadCache(lostPartitions); //perform partition based cache loading

ignite.resetLostPartitions(Arrays.asList(CacheName.ASSET_GROUP_CACHE.name())); //Reset partitions

System.out.println("Check-1, Partition lost event processed");

return true;
}
}
Cache Configuration
private CacheConfiguration assetGroupCacheCfg() {
CacheConfiguration assetGroupCacheCfg = new CacheConfiguration<>(CacheName.ASSET_GROUP_CACHE.name());
assetGroupCacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
assetGroupCacheCfg.setWriteThrough(false);
assetGroupCacheCfg.setReadThrough(false);
assetGroupCacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC);
assetGroupCacheCfg.setBackups(0);
assetGroupCacheCfg.setCacheMode(CacheMode.PARTITIONED);
assetGroupCacheCfg.setIndexedTypes(DefaultDataAffinityKey.class, AssetGroupData.class);
assetGroupCacheCfg.setSqlIndexMaxInlineSize(100);
RendezvousAffinityFunction affinityFunction = new RendezvousAffinityFunction();
assetGroupCacheCfg.setAffinity(affinityFunction);
assetGroupCacheCfg.setStatisticsEnabled(true);
assetGroupCacheCfg.setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE);
return assetGroupCacheCfg;
}
Ignite Configuration
private IgniteConfiguration getIgniteConfiguration() {

TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
String[] hosts = {"127.0.0.1:47500..47509"};
ipFinder.setAddresses(Arrays.asList(hosts));

TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
discoSpi.setIpFinder(ipFinder);

IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setDiscoverySpi(discoSpi);
cfg.setIgniteInstanceName("springDataNode");
cfg.setPeerClassLoadingEnabled(false);
cfg.setRebalanceThreadPoolSize(4);
DataStorageConfiguration storageCfg = new DataStorageConfiguration();
DataRegionConfiguration regionConfiguration = new DataRegionConfiguration();
regionConfiguration.setInitialSize(3L * 1024 * 1024 * 1024);
regionConfiguration.setMaxSize(3L * 1024 * 1024 * 1024);
regionConfiguration.setMetricsEnabled(true);

storageCfg.setDefaultDataRegionConfiguration(regionConfiguration);
storageCfg.setStoragePath("c:/ignite-storage/storage");
storageCfg.setWalPath("c:/ignite-storage/storage/wal");
storageCfg.setWalArchivePath("c:/ignite-storage/storage/wal-archive");
storageCfg.setMetricsEnabled(true);
cfg.setDataStorageConfiguration(storageCfg);
cfg.setIncludeEventTypes(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST,EventType.EVT_NODE_FAILED);
cfg.setCacheConfiguration(getCacheConfigurations());
return cfg;
}

Thanks,
Akash
ilya.kasnacheev ilya.kasnacheev
Reply | Threaded
Open this post in threaded view
|

Re: resetLostPartitions is blocked inside event listener

Hello!

It is not advisable to call any blocking methods from event listeners. Just fire resetLostPartitions from another thread.

Regards,
--
Ilya Kasnacheev


чт, 7 нояб. 2019 г. в 15:17, Akash Shinde <[hidden email]>:
Hi,
I am trying to handle lost partition scenario.
I have written event listener listening  to EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST event.
I want to reset lost partition state of cache after cache loading  is done.
Issue: ignite.resetLostPartitions(caheName) is getting blocked and not completing.

Please find the code for Event Listener. Someone can help on this. Why this resetLostPartitions getting blocked.
public class IgniteEventListner implements IgnitePredicate<CacheRebalancingEvent> {
private static final Logger LOGGER = LoggerFactory.getLogger(IgniteEventListner.class);

private final Ignite ignite;

public IgniteEventListner(Ignite ignite) {
this.ignite = ignite;
}

@Override
public boolean apply(CacheRebalancingEvent evt) {

IgniteCache<DefaultDataAffinityKey, AssetGroupData> cache = ignite.getOrCreateCache(CacheName.ASSET_GROUP_CACHE.name());
Collection<Integer> lostPartitions = cache.lostPartitions();
reloadCache(lostPartitions); //perform partition based cache loading

ignite.resetLostPartitions(Arrays.asList(CacheName.ASSET_GROUP_CACHE.name())); //Reset partitions

System.out.println("Check-1, Partition lost event processed");

return true;
}
}
Cache Configuration
private CacheConfiguration assetGroupCacheCfg() {
CacheConfiguration assetGroupCacheCfg = new CacheConfiguration<>(CacheName.ASSET_GROUP_CACHE.name());
assetGroupCacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
assetGroupCacheCfg.setWriteThrough(false);
assetGroupCacheCfg.setReadThrough(false);
assetGroupCacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC);
assetGroupCacheCfg.setBackups(0);
assetGroupCacheCfg.setCacheMode(CacheMode.PARTITIONED);
assetGroupCacheCfg.setIndexedTypes(DefaultDataAffinityKey.class, AssetGroupData.class);
assetGroupCacheCfg.setSqlIndexMaxInlineSize(100);
RendezvousAffinityFunction affinityFunction = new RendezvousAffinityFunction();
assetGroupCacheCfg.setAffinity(affinityFunction);
assetGroupCacheCfg.setStatisticsEnabled(true);
assetGroupCacheCfg.setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE);
return assetGroupCacheCfg;
}
Ignite Configuration
private IgniteConfiguration getIgniteConfiguration() {

TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
String[] hosts = {"127.0.0.1:47500..47509"};
ipFinder.setAddresses(Arrays.asList(hosts));

TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
discoSpi.setIpFinder(ipFinder);

IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setDiscoverySpi(discoSpi);
cfg.setIgniteInstanceName("springDataNode");
cfg.setPeerClassLoadingEnabled(false);
cfg.setRebalanceThreadPoolSize(4);
DataStorageConfiguration storageCfg = new DataStorageConfiguration();
DataRegionConfiguration regionConfiguration = new DataRegionConfiguration();
regionConfiguration.setInitialSize(3L * 1024 * 1024 * 1024);
regionConfiguration.setMaxSize(3L * 1024 * 1024 * 1024);
regionConfiguration.setMetricsEnabled(true);

storageCfg.setDefaultDataRegionConfiguration(regionConfiguration);
storageCfg.setStoragePath("c:/ignite-storage/storage");
storageCfg.setWalPath("c:/ignite-storage/storage/wal");
storageCfg.setWalArchivePath("c:/ignite-storage/storage/wal-archive");
storageCfg.setMetricsEnabled(true);
cfg.setDataStorageConfiguration(storageCfg);
cfg.setIncludeEventTypes(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST,EventType.EVT_NODE_FAILED);
cfg.setCacheConfiguration(getCacheConfigurations());
return cfg;
}

Thanks,
Akash
prasadbhalerao1983 prasadbhalerao1983
Reply | Threaded
Open this post in threaded view
|

Re: resetLostPartitions is blocked inside event listener

Do you mean to say, spawn a different thread from event listener and reset the lost partition in that thread?

I tried this and it works. 

But wanted to understand the reason, why this call get blocked in event listener?

Thanks,
Prasad

On Thu 7 Nov, 2019, 9:28 PM Ilya Kasnacheev <[hidden email] wrote:
Hello!

It is not advisable to call any blocking methods from event listeners. Just fire resetLostPartitions from another thread.

Regards,
--
Ilya Kasnacheev


чт, 7 нояб. 2019 г. в 15:17, Akash Shinde <[hidden email]>:
Hi,
I am trying to handle lost partition scenario.
I have written event listener listening  to EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST event.
I want to reset lost partition state of cache after cache loading  is done.
Issue: ignite.resetLostPartitions(caheName) is getting blocked and not completing.

Please find the code for Event Listener. Someone can help on this. Why this resetLostPartitions getting blocked.
public class IgniteEventListner implements IgnitePredicate<CacheRebalancingEvent> {
private static final Logger LOGGER = LoggerFactory.getLogger(IgniteEventListner.class);

private final Ignite ignite;

public IgniteEventListner(Ignite ignite) {
this.ignite = ignite;
}

@Override
public boolean apply(CacheRebalancingEvent evt) {

IgniteCache<DefaultDataAffinityKey, AssetGroupData> cache = ignite.getOrCreateCache(CacheName.ASSET_GROUP_CACHE.name());
Collection<Integer> lostPartitions = cache.lostPartitions();
reloadCache(lostPartitions); //perform partition based cache loading

ignite.resetLostPartitions(Arrays.asList(CacheName.ASSET_GROUP_CACHE.name())); //Reset partitions

System.out.println("Check-1, Partition lost event processed");

return true;
}
}
Cache Configuration
private CacheConfiguration assetGroupCacheCfg() {
CacheConfiguration assetGroupCacheCfg = new CacheConfiguration<>(CacheName.ASSET_GROUP_CACHE.name());
assetGroupCacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
assetGroupCacheCfg.setWriteThrough(false);
assetGroupCacheCfg.setReadThrough(false);
assetGroupCacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC);
assetGroupCacheCfg.setBackups(0);
assetGroupCacheCfg.setCacheMode(CacheMode.PARTITIONED);
assetGroupCacheCfg.setIndexedTypes(DefaultDataAffinityKey.class, AssetGroupData.class);
assetGroupCacheCfg.setSqlIndexMaxInlineSize(100);
RendezvousAffinityFunction affinityFunction = new RendezvousAffinityFunction();
assetGroupCacheCfg.setAffinity(affinityFunction);
assetGroupCacheCfg.setStatisticsEnabled(true);
assetGroupCacheCfg.setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE);
return assetGroupCacheCfg;
}
Ignite Configuration
private IgniteConfiguration getIgniteConfiguration() {

TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
String[] hosts = {"127.0.0.1:47500..47509"};
ipFinder.setAddresses(Arrays.asList(hosts));

TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
discoSpi.setIpFinder(ipFinder);

IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setDiscoverySpi(discoSpi);
cfg.setIgniteInstanceName("springDataNode");
cfg.setPeerClassLoadingEnabled(false);
cfg.setRebalanceThreadPoolSize(4);
DataStorageConfiguration storageCfg = new DataStorageConfiguration();
DataRegionConfiguration regionConfiguration = new DataRegionConfiguration();
regionConfiguration.setInitialSize(3L * 1024 * 1024 * 1024);
regionConfiguration.setMaxSize(3L * 1024 * 1024 * 1024);
regionConfiguration.setMetricsEnabled(true);

storageCfg.setDefaultDataRegionConfiguration(regionConfiguration);
storageCfg.setStoragePath("c:/ignite-storage/storage");
storageCfg.setWalPath("c:/ignite-storage/storage/wal");
storageCfg.setWalArchivePath("c:/ignite-storage/storage/wal-archive");
storageCfg.setMetricsEnabled(true);
cfg.setDataStorageConfiguration(storageCfg);
cfg.setIncludeEventTypes(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST,EventType.EVT_NODE_FAILED);
cfg.setCacheConfiguration(getCacheConfigurations());
return cfg;
}

Thanks,
Akash
ilya.kasnacheev ilya.kasnacheev
Reply | Threaded
Open this post in threaded view
|

Re: resetLostPartitions is blocked inside event listener

Hello!

Event listener is invoked synchronously from internal threads. If partition reset has to happen from the same thread, then obviously there will be a deadlock.

Cache listeners have same property, i.e., you should avoid doing cache operations from them.

This is tradeoff between performance and usability which was resolved in favor of former.

Regards,
--
Ilya Kasnacheev


чт, 7 нояб. 2019 г. в 20:30, Prasad Bhalerao <[hidden email]>:
Do you mean to say, spawn a different thread from event listener and reset the lost partition in that thread?

I tried this and it works. 

But wanted to understand the reason, why this call get blocked in event listener?

Thanks,
Prasad

On Thu 7 Nov, 2019, 9:28 PM Ilya Kasnacheev <[hidden email] wrote:
Hello!

It is not advisable to call any blocking methods from event listeners. Just fire resetLostPartitions from another thread.

Regards,
--
Ilya Kasnacheev


чт, 7 нояб. 2019 г. в 15:17, Akash Shinde <[hidden email]>:
Hi,
I am trying to handle lost partition scenario.
I have written event listener listening  to EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST event.
I want to reset lost partition state of cache after cache loading  is done.
Issue: ignite.resetLostPartitions(caheName) is getting blocked and not completing.

Please find the code for Event Listener. Someone can help on this. Why this resetLostPartitions getting blocked.
public class IgniteEventListner implements IgnitePredicate<CacheRebalancingEvent> {
private static final Logger LOGGER = LoggerFactory.getLogger(IgniteEventListner.class);

private final Ignite ignite;

public IgniteEventListner(Ignite ignite) {
this.ignite = ignite;
}

@Override
public boolean apply(CacheRebalancingEvent evt) {

IgniteCache<DefaultDataAffinityKey, AssetGroupData> cache = ignite.getOrCreateCache(CacheName.ASSET_GROUP_CACHE.name());
Collection<Integer> lostPartitions = cache.lostPartitions();
reloadCache(lostPartitions); //perform partition based cache loading

ignite.resetLostPartitions(Arrays.asList(CacheName.ASSET_GROUP_CACHE.name())); //Reset partitions

System.out.println("Check-1, Partition lost event processed");

return true;
}
}
Cache Configuration
private CacheConfiguration assetGroupCacheCfg() {
CacheConfiguration assetGroupCacheCfg = new CacheConfiguration<>(CacheName.ASSET_GROUP_CACHE.name());
assetGroupCacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
assetGroupCacheCfg.setWriteThrough(false);
assetGroupCacheCfg.setReadThrough(false);
assetGroupCacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC);
assetGroupCacheCfg.setBackups(0);
assetGroupCacheCfg.setCacheMode(CacheMode.PARTITIONED);
assetGroupCacheCfg.setIndexedTypes(DefaultDataAffinityKey.class, AssetGroupData.class);
assetGroupCacheCfg.setSqlIndexMaxInlineSize(100);
RendezvousAffinityFunction affinityFunction = new RendezvousAffinityFunction();
assetGroupCacheCfg.setAffinity(affinityFunction);
assetGroupCacheCfg.setStatisticsEnabled(true);
assetGroupCacheCfg.setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE);
return assetGroupCacheCfg;
}
Ignite Configuration
private IgniteConfiguration getIgniteConfiguration() {

TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
String[] hosts = {"127.0.0.1:47500..47509"};
ipFinder.setAddresses(Arrays.asList(hosts));

TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
discoSpi.setIpFinder(ipFinder);

IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setDiscoverySpi(discoSpi);
cfg.setIgniteInstanceName("springDataNode");
cfg.setPeerClassLoadingEnabled(false);
cfg.setRebalanceThreadPoolSize(4);
DataStorageConfiguration storageCfg = new DataStorageConfiguration();
DataRegionConfiguration regionConfiguration = new DataRegionConfiguration();
regionConfiguration.setInitialSize(3L * 1024 * 1024 * 1024);
regionConfiguration.setMaxSize(3L * 1024 * 1024 * 1024);
regionConfiguration.setMetricsEnabled(true);

storageCfg.setDefaultDataRegionConfiguration(regionConfiguration);
storageCfg.setStoragePath("c:/ignite-storage/storage");
storageCfg.setWalPath("c:/ignite-storage/storage/wal");
storageCfg.setWalArchivePath("c:/ignite-storage/storage/wal-archive");
storageCfg.setMetricsEnabled(true);
cfg.setDataStorageConfiguration(storageCfg);
cfg.setIncludeEventTypes(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST,EventType.EVT_NODE_FAILED);
cfg.setCacheConfiguration(getCacheConfigurations());
return cfg;
}

Thanks,
Akash
akash shinde akash shinde
Reply | Threaded
Open this post in threaded view
|

Re: resetLostPartitions is blocked inside event listener

Thank you Ilya.

On Fri, Nov 8, 2019 at 12:43 AM Ilya Kasnacheev <[hidden email]> wrote:
Hello!

Event listener is invoked synchronously from internal threads. If partition reset has to happen from the same thread, then obviously there will be a deadlock.

Cache listeners have same property, i.e., you should avoid doing cache operations from them.

This is tradeoff between performance and usability which was resolved in favor of former.

Regards,
--
Ilya Kasnacheev


чт, 7 нояб. 2019 г. в 20:30, Prasad Bhalerao <[hidden email]>:
Do you mean to say, spawn a different thread from event listener and reset the lost partition in that thread?

I tried this and it works. 

But wanted to understand the reason, why this call get blocked in event listener?

Thanks,
Prasad

On Thu 7 Nov, 2019, 9:28 PM Ilya Kasnacheev <[hidden email] wrote:
Hello!

It is not advisable to call any blocking methods from event listeners. Just fire resetLostPartitions from another thread.

Regards,
--
Ilya Kasnacheev


чт, 7 нояб. 2019 г. в 15:17, Akash Shinde <[hidden email]>:
Hi,
I am trying to handle lost partition scenario.
I have written event listener listening  to EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST event.
I want to reset lost partition state of cache after cache loading  is done.
Issue: ignite.resetLostPartitions(caheName) is getting blocked and not completing.

Please find the code for Event Listener. Someone can help on this. Why this resetLostPartitions getting blocked.
public class IgniteEventListner implements IgnitePredicate<CacheRebalancingEvent> {
private static final Logger LOGGER = LoggerFactory.getLogger(IgniteEventListner.class);

private final Ignite ignite;

public IgniteEventListner(Ignite ignite) {
this.ignite = ignite;
}

@Override
public boolean apply(CacheRebalancingEvent evt) {

IgniteCache<DefaultDataAffinityKey, AssetGroupData> cache = ignite.getOrCreateCache(CacheName.ASSET_GROUP_CACHE.name());
Collection<Integer> lostPartitions = cache.lostPartitions();
reloadCache(lostPartitions); //perform partition based cache loading

ignite.resetLostPartitions(Arrays.asList(CacheName.ASSET_GROUP_CACHE.name())); //Reset partitions

System.out.println("Check-1, Partition lost event processed");

return true;
}
}
Cache Configuration
private CacheConfiguration assetGroupCacheCfg() {
CacheConfiguration assetGroupCacheCfg = new CacheConfiguration<>(CacheName.ASSET_GROUP_CACHE.name());
assetGroupCacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
assetGroupCacheCfg.setWriteThrough(false);
assetGroupCacheCfg.setReadThrough(false);
assetGroupCacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC);
assetGroupCacheCfg.setBackups(0);
assetGroupCacheCfg.setCacheMode(CacheMode.PARTITIONED);
assetGroupCacheCfg.setIndexedTypes(DefaultDataAffinityKey.class, AssetGroupData.class);
assetGroupCacheCfg.setSqlIndexMaxInlineSize(100);
RendezvousAffinityFunction affinityFunction = new RendezvousAffinityFunction();
assetGroupCacheCfg.setAffinity(affinityFunction);
assetGroupCacheCfg.setStatisticsEnabled(true);
assetGroupCacheCfg.setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE);
return assetGroupCacheCfg;
}
Ignite Configuration
private IgniteConfiguration getIgniteConfiguration() {

TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
String[] hosts = {"127.0.0.1:47500..47509"};
ipFinder.setAddresses(Arrays.asList(hosts));

TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
discoSpi.setIpFinder(ipFinder);

IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setDiscoverySpi(discoSpi);
cfg.setIgniteInstanceName("springDataNode");
cfg.setPeerClassLoadingEnabled(false);
cfg.setRebalanceThreadPoolSize(4);
DataStorageConfiguration storageCfg = new DataStorageConfiguration();
DataRegionConfiguration regionConfiguration = new DataRegionConfiguration();
regionConfiguration.setInitialSize(3L * 1024 * 1024 * 1024);
regionConfiguration.setMaxSize(3L * 1024 * 1024 * 1024);
regionConfiguration.setMetricsEnabled(true);

storageCfg.setDefaultDataRegionConfiguration(regionConfiguration);
storageCfg.setStoragePath("c:/ignite-storage/storage");
storageCfg.setWalPath("c:/ignite-storage/storage/wal");
storageCfg.setWalArchivePath("c:/ignite-storage/storage/wal-archive");
storageCfg.setMetricsEnabled(true);
cfg.setDataStorageConfiguration(storageCfg);
cfg.setIncludeEventTypes(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST,EventType.EVT_NODE_FAILED);
cfg.setCacheConfiguration(getCacheConfigurations());
return cfg;
}

Thanks,
Akash