|
|
Hi, I am trying to handle lost partition scenario. I have written event listener listening to EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST event. I want to reset lost partition state of cache after cache loading is done. Issue: ignite.resetLostPartitions(caheName) is getting blocked and not completing.
Please find the code for Event Listener. Someone can help on this. Why this
resetLostPartitions getting blocked. public class IgniteEventListner implements IgnitePredicate<CacheRebalancingEvent> { private static final Logger LOGGER = LoggerFactory.getLogger(IgniteEventListner.class);
private final Ignite ignite;
public IgniteEventListner(Ignite ignite) { this.ignite = ignite; }
@Override public boolean apply(CacheRebalancingEvent evt) {
IgniteCache<DefaultDataAffinityKey, AssetGroupData> cache = ignite.getOrCreateCache(CacheName.ASSET_GROUP_CACHE.name()); Collection<Integer> lostPartitions = cache.lostPartitions(); reloadCache(lostPartitions); //perform partition based cache loading
ignite.resetLostPartitions(Arrays.asList(CacheName.ASSET_GROUP_CACHE.name())); //Reset partitions
System.out.println("Check-1, Partition lost event processed");
return true; } } Cache Configuration private CacheConfiguration assetGroupCacheCfg() { CacheConfiguration assetGroupCacheCfg = new CacheConfiguration<>(CacheName.ASSET_GROUP_CACHE.name()); assetGroupCacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); assetGroupCacheCfg.setWriteThrough(false); assetGroupCacheCfg.setReadThrough(false); assetGroupCacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC); assetGroupCacheCfg.setBackups(0); assetGroupCacheCfg.setCacheMode(CacheMode.PARTITIONED); assetGroupCacheCfg.setIndexedTypes(DefaultDataAffinityKey.class, AssetGroupData.class); assetGroupCacheCfg.setSqlIndexMaxInlineSize(100); RendezvousAffinityFunction affinityFunction = new RendezvousAffinityFunction(); assetGroupCacheCfg.setAffinity(affinityFunction); assetGroupCacheCfg.setStatisticsEnabled(true); assetGroupCacheCfg.setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE); return assetGroupCacheCfg; }
Ignite Configuration private IgniteConfiguration getIgniteConfiguration() {
TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(); String[] hosts = {"127.0.0.1:47500..47509"}; ipFinder.setAddresses(Arrays.asList(hosts));
TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); discoSpi.setIpFinder(ipFinder);
IgniteConfiguration cfg = new IgniteConfiguration(); cfg.setDiscoverySpi(discoSpi); cfg.setIgniteInstanceName("springDataNode"); cfg.setPeerClassLoadingEnabled(false); cfg.setRebalanceThreadPoolSize(4); DataStorageConfiguration storageCfg = new DataStorageConfiguration(); DataRegionConfiguration regionConfiguration = new DataRegionConfiguration(); regionConfiguration.setInitialSize(3L * 1024 * 1024 * 1024); regionConfiguration.setMaxSize(3L * 1024 * 1024 * 1024); regionConfiguration.setMetricsEnabled(true);
storageCfg.setDefaultDataRegionConfiguration(regionConfiguration); storageCfg.setStoragePath("c:/ignite-storage/storage"); storageCfg.setWalPath("c:/ignite-storage/storage/wal"); storageCfg.setWalArchivePath("c:/ignite-storage/storage/wal-archive"); storageCfg.setMetricsEnabled(true); cfg.setDataStorageConfiguration(storageCfg); cfg.setIncludeEventTypes(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST,EventType.EVT_NODE_FAILED); cfg.setCacheConfiguration(getCacheConfigurations()); return cfg; }
Thanks, Akash
|
|
Hello!
It is not advisable to call any blocking methods from event listeners. Just fire resetLostPartitions from another thread.
Regards,
Hi, I am trying to handle lost partition scenario. I have written event listener listening to EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST event. I want to reset lost partition state of cache after cache loading is done. Issue: ignite.resetLostPartitions(caheName) is getting blocked and not completing.
Please find the code for Event Listener. Someone can help on this. Why this
resetLostPartitions getting blocked. public class IgniteEventListner implements IgnitePredicate<CacheRebalancingEvent> { private static final Logger LOGGER = LoggerFactory.getLogger(IgniteEventListner.class);
private final Ignite ignite;
public IgniteEventListner(Ignite ignite) { this.ignite = ignite; }
@Override public boolean apply(CacheRebalancingEvent evt) {
IgniteCache<DefaultDataAffinityKey, AssetGroupData> cache = ignite.getOrCreateCache(CacheName.ASSET_GROUP_CACHE.name()); Collection<Integer> lostPartitions = cache.lostPartitions(); reloadCache(lostPartitions); //perform partition based cache loading
ignite.resetLostPartitions(Arrays.asList(CacheName.ASSET_GROUP_CACHE.name())); //Reset partitions
System.out.println("Check-1, Partition lost event processed");
return true; } } Cache Configuration private CacheConfiguration assetGroupCacheCfg() { CacheConfiguration assetGroupCacheCfg = new CacheConfiguration<>(CacheName.ASSET_GROUP_CACHE.name()); assetGroupCacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); assetGroupCacheCfg.setWriteThrough(false); assetGroupCacheCfg.setReadThrough(false); assetGroupCacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC); assetGroupCacheCfg.setBackups(0); assetGroupCacheCfg.setCacheMode(CacheMode.PARTITIONED); assetGroupCacheCfg.setIndexedTypes(DefaultDataAffinityKey.class, AssetGroupData.class); assetGroupCacheCfg.setSqlIndexMaxInlineSize(100); RendezvousAffinityFunction affinityFunction = new RendezvousAffinityFunction(); assetGroupCacheCfg.setAffinity(affinityFunction); assetGroupCacheCfg.setStatisticsEnabled(true); assetGroupCacheCfg.setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE); return assetGroupCacheCfg; }
Ignite Configuration private IgniteConfiguration getIgniteConfiguration() {
TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(); String[] hosts = {"127.0.0.1:47500..47509"}; ipFinder.setAddresses(Arrays.asList(hosts));
TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); discoSpi.setIpFinder(ipFinder);
IgniteConfiguration cfg = new IgniteConfiguration(); cfg.setDiscoverySpi(discoSpi); cfg.setIgniteInstanceName("springDataNode"); cfg.setPeerClassLoadingEnabled(false); cfg.setRebalanceThreadPoolSize(4); DataStorageConfiguration storageCfg = new DataStorageConfiguration(); DataRegionConfiguration regionConfiguration = new DataRegionConfiguration(); regionConfiguration.setInitialSize(3L * 1024 * 1024 * 1024); regionConfiguration.setMaxSize(3L * 1024 * 1024 * 1024); regionConfiguration.setMetricsEnabled(true);
storageCfg.setDefaultDataRegionConfiguration(regionConfiguration); storageCfg.setStoragePath("c:/ignite-storage/storage"); storageCfg.setWalPath("c:/ignite-storage/storage/wal"); storageCfg.setWalArchivePath("c:/ignite-storage/storage/wal-archive"); storageCfg.setMetricsEnabled(true); cfg.setDataStorageConfiguration(storageCfg); cfg.setIncludeEventTypes(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST,EventType.EVT_NODE_FAILED); cfg.setCacheConfiguration(getCacheConfigurations()); return cfg; }
Thanks, Akash
|
|
Do you mean to say, spawn a different thread from event listener and reset the lost partition in that thread?
I tried this and it works.
But wanted to understand the reason, why this call get blocked in event listener?
Thanks, Prasad Hello!
It is not advisable to call any blocking methods from event listeners. Just fire resetLostPartitions from another thread.
Regards,
Hi, I am trying to handle lost partition scenario. I have written event listener listening to EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST event. I want to reset lost partition state of cache after cache loading is done. Issue: ignite.resetLostPartitions(caheName) is getting blocked and not completing.
Please find the code for Event Listener. Someone can help on this. Why this
resetLostPartitions getting blocked. public class IgniteEventListner implements IgnitePredicate<CacheRebalancingEvent> { private static final Logger LOGGER = LoggerFactory.getLogger(IgniteEventListner.class);
private final Ignite ignite;
public IgniteEventListner(Ignite ignite) { this.ignite = ignite; }
@Override public boolean apply(CacheRebalancingEvent evt) {
IgniteCache<DefaultDataAffinityKey, AssetGroupData> cache = ignite.getOrCreateCache(CacheName.ASSET_GROUP_CACHE.name()); Collection<Integer> lostPartitions = cache.lostPartitions(); reloadCache(lostPartitions); //perform partition based cache loading
ignite.resetLostPartitions(Arrays.asList(CacheName.ASSET_GROUP_CACHE.name())); //Reset partitions
System.out.println("Check-1, Partition lost event processed");
return true; } } Cache Configuration private CacheConfiguration assetGroupCacheCfg() { CacheConfiguration assetGroupCacheCfg = new CacheConfiguration<>(CacheName.ASSET_GROUP_CACHE.name()); assetGroupCacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); assetGroupCacheCfg.setWriteThrough(false); assetGroupCacheCfg.setReadThrough(false); assetGroupCacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC); assetGroupCacheCfg.setBackups(0); assetGroupCacheCfg.setCacheMode(CacheMode.PARTITIONED); assetGroupCacheCfg.setIndexedTypes(DefaultDataAffinityKey.class, AssetGroupData.class); assetGroupCacheCfg.setSqlIndexMaxInlineSize(100); RendezvousAffinityFunction affinityFunction = new RendezvousAffinityFunction(); assetGroupCacheCfg.setAffinity(affinityFunction); assetGroupCacheCfg.setStatisticsEnabled(true); assetGroupCacheCfg.setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE); return assetGroupCacheCfg; }
Ignite Configuration private IgniteConfiguration getIgniteConfiguration() {
TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(); String[] hosts = {"127.0.0.1:47500..47509"}; ipFinder.setAddresses(Arrays.asList(hosts));
TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); discoSpi.setIpFinder(ipFinder);
IgniteConfiguration cfg = new IgniteConfiguration(); cfg.setDiscoverySpi(discoSpi); cfg.setIgniteInstanceName("springDataNode"); cfg.setPeerClassLoadingEnabled(false); cfg.setRebalanceThreadPoolSize(4); DataStorageConfiguration storageCfg = new DataStorageConfiguration(); DataRegionConfiguration regionConfiguration = new DataRegionConfiguration(); regionConfiguration.setInitialSize(3L * 1024 * 1024 * 1024); regionConfiguration.setMaxSize(3L * 1024 * 1024 * 1024); regionConfiguration.setMetricsEnabled(true);
storageCfg.setDefaultDataRegionConfiguration(regionConfiguration); storageCfg.setStoragePath("c:/ignite-storage/storage"); storageCfg.setWalPath("c:/ignite-storage/storage/wal"); storageCfg.setWalArchivePath("c:/ignite-storage/storage/wal-archive"); storageCfg.setMetricsEnabled(true); cfg.setDataStorageConfiguration(storageCfg); cfg.setIncludeEventTypes(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST,EventType.EVT_NODE_FAILED); cfg.setCacheConfiguration(getCacheConfigurations()); return cfg; }
Thanks, Akash
|
|
Hello!
Event listener is invoked synchronously from internal threads. If partition reset has to happen from the same thread, then obviously there will be a deadlock.
Cache listeners have same property, i.e., you should avoid doing cache operations from them.
This is tradeoff between performance and usability which was resolved in favor of former.
Regards,
Do you mean to say, spawn a different thread from event listener and reset the lost partition in that thread?
I tried this and it works.
But wanted to understand the reason, why this call get blocked in event listener?
Thanks, Prasad Hello!
It is not advisable to call any blocking methods from event listeners. Just fire resetLostPartitions from another thread.
Regards,
Hi, I am trying to handle lost partition scenario. I have written event listener listening to EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST event. I want to reset lost partition state of cache after cache loading is done. Issue: ignite.resetLostPartitions(caheName) is getting blocked and not completing.
Please find the code for Event Listener. Someone can help on this. Why this
resetLostPartitions getting blocked. public class IgniteEventListner implements IgnitePredicate<CacheRebalancingEvent> { private static final Logger LOGGER = LoggerFactory.getLogger(IgniteEventListner.class);
private final Ignite ignite;
public IgniteEventListner(Ignite ignite) { this.ignite = ignite; }
@Override public boolean apply(CacheRebalancingEvent evt) {
IgniteCache<DefaultDataAffinityKey, AssetGroupData> cache = ignite.getOrCreateCache(CacheName.ASSET_GROUP_CACHE.name()); Collection<Integer> lostPartitions = cache.lostPartitions(); reloadCache(lostPartitions); //perform partition based cache loading
ignite.resetLostPartitions(Arrays.asList(CacheName.ASSET_GROUP_CACHE.name())); //Reset partitions
System.out.println("Check-1, Partition lost event processed");
return true; } } Cache Configuration private CacheConfiguration assetGroupCacheCfg() { CacheConfiguration assetGroupCacheCfg = new CacheConfiguration<>(CacheName.ASSET_GROUP_CACHE.name()); assetGroupCacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); assetGroupCacheCfg.setWriteThrough(false); assetGroupCacheCfg.setReadThrough(false); assetGroupCacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC); assetGroupCacheCfg.setBackups(0); assetGroupCacheCfg.setCacheMode(CacheMode.PARTITIONED); assetGroupCacheCfg.setIndexedTypes(DefaultDataAffinityKey.class, AssetGroupData.class); assetGroupCacheCfg.setSqlIndexMaxInlineSize(100); RendezvousAffinityFunction affinityFunction = new RendezvousAffinityFunction(); assetGroupCacheCfg.setAffinity(affinityFunction); assetGroupCacheCfg.setStatisticsEnabled(true); assetGroupCacheCfg.setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE); return assetGroupCacheCfg; }
Ignite Configuration private IgniteConfiguration getIgniteConfiguration() {
TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(); String[] hosts = {"127.0.0.1:47500..47509"}; ipFinder.setAddresses(Arrays.asList(hosts));
TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); discoSpi.setIpFinder(ipFinder);
IgniteConfiguration cfg = new IgniteConfiguration(); cfg.setDiscoverySpi(discoSpi); cfg.setIgniteInstanceName("springDataNode"); cfg.setPeerClassLoadingEnabled(false); cfg.setRebalanceThreadPoolSize(4); DataStorageConfiguration storageCfg = new DataStorageConfiguration(); DataRegionConfiguration regionConfiguration = new DataRegionConfiguration(); regionConfiguration.setInitialSize(3L * 1024 * 1024 * 1024); regionConfiguration.setMaxSize(3L * 1024 * 1024 * 1024); regionConfiguration.setMetricsEnabled(true);
storageCfg.setDefaultDataRegionConfiguration(regionConfiguration); storageCfg.setStoragePath("c:/ignite-storage/storage"); storageCfg.setWalPath("c:/ignite-storage/storage/wal"); storageCfg.setWalArchivePath("c:/ignite-storage/storage/wal-archive"); storageCfg.setMetricsEnabled(true); cfg.setDataStorageConfiguration(storageCfg); cfg.setIncludeEventTypes(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST,EventType.EVT_NODE_FAILED); cfg.setCacheConfiguration(getCacheConfigurations()); return cfg; }
Thanks, Akash
|
|
Thank you Ilya. On Fri, Nov 8, 2019 at 12:43 AM Ilya Kasnacheev < [hidden email]> wrote: Hello!
Event listener is invoked synchronously from internal threads. If partition reset has to happen from the same thread, then obviously there will be a deadlock.
Cache listeners have same property, i.e., you should avoid doing cache operations from them.
This is tradeoff between performance and usability which was resolved in favor of former.
Regards,
Do you mean to say, spawn a different thread from event listener and reset the lost partition in that thread?
I tried this and it works.
But wanted to understand the reason, why this call get blocked in event listener?
Thanks, Prasad Hello!
It is not advisable to call any blocking methods from event listeners. Just fire resetLostPartitions from another thread.
Regards,
Hi, I am trying to handle lost partition scenario. I have written event listener listening to EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST event. I want to reset lost partition state of cache after cache loading is done. Issue: ignite.resetLostPartitions(caheName) is getting blocked and not completing.
Please find the code for Event Listener. Someone can help on this. Why this
resetLostPartitions getting blocked. public class IgniteEventListner implements IgnitePredicate<CacheRebalancingEvent> { private static final Logger LOGGER = LoggerFactory.getLogger(IgniteEventListner.class);
private final Ignite ignite;
public IgniteEventListner(Ignite ignite) { this.ignite = ignite; }
@Override public boolean apply(CacheRebalancingEvent evt) {
IgniteCache<DefaultDataAffinityKey, AssetGroupData> cache = ignite.getOrCreateCache(CacheName.ASSET_GROUP_CACHE.name()); Collection<Integer> lostPartitions = cache.lostPartitions(); reloadCache(lostPartitions); //perform partition based cache loading
ignite.resetLostPartitions(Arrays.asList(CacheName.ASSET_GROUP_CACHE.name())); //Reset partitions
System.out.println("Check-1, Partition lost event processed");
return true; } } Cache Configuration private CacheConfiguration assetGroupCacheCfg() { CacheConfiguration assetGroupCacheCfg = new CacheConfiguration<>(CacheName.ASSET_GROUP_CACHE.name()); assetGroupCacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); assetGroupCacheCfg.setWriteThrough(false); assetGroupCacheCfg.setReadThrough(false); assetGroupCacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC); assetGroupCacheCfg.setBackups(0); assetGroupCacheCfg.setCacheMode(CacheMode.PARTITIONED); assetGroupCacheCfg.setIndexedTypes(DefaultDataAffinityKey.class, AssetGroupData.class); assetGroupCacheCfg.setSqlIndexMaxInlineSize(100); RendezvousAffinityFunction affinityFunction = new RendezvousAffinityFunction(); assetGroupCacheCfg.setAffinity(affinityFunction); assetGroupCacheCfg.setStatisticsEnabled(true); assetGroupCacheCfg.setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE); return assetGroupCacheCfg; }
Ignite Configuration private IgniteConfiguration getIgniteConfiguration() {
TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(); String[] hosts = {"127.0.0.1:47500..47509"}; ipFinder.setAddresses(Arrays.asList(hosts));
TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); discoSpi.setIpFinder(ipFinder);
IgniteConfiguration cfg = new IgniteConfiguration(); cfg.setDiscoverySpi(discoSpi); cfg.setIgniteInstanceName("springDataNode"); cfg.setPeerClassLoadingEnabled(false); cfg.setRebalanceThreadPoolSize(4); DataStorageConfiguration storageCfg = new DataStorageConfiguration(); DataRegionConfiguration regionConfiguration = new DataRegionConfiguration(); regionConfiguration.setInitialSize(3L * 1024 * 1024 * 1024); regionConfiguration.setMaxSize(3L * 1024 * 1024 * 1024); regionConfiguration.setMetricsEnabled(true);
storageCfg.setDefaultDataRegionConfiguration(regionConfiguration); storageCfg.setStoragePath("c:/ignite-storage/storage"); storageCfg.setWalPath("c:/ignite-storage/storage/wal"); storageCfg.setWalArchivePath("c:/ignite-storage/storage/wal-archive"); storageCfg.setMetricsEnabled(true); cfg.setDataStorageConfiguration(storageCfg); cfg.setIncludeEventTypes(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST,EventType.EVT_NODE_FAILED); cfg.setCacheConfiguration(getCacheConfigurations()); return cfg; }
Thanks, Akash
|
|