Confusion with Events

classic Classic list List threaded Threaded
7 messages Options
KJQ KJQ
Reply | Threaded
Open this post in threaded view
|

Confusion with Events

I have some questions regarding Cache Listeners/Events.

We have a system that used a lot of "Caffeine" based caches spread across
multiple services (in K8S).  Basically "near-caches" (without a backing
store).  We are now trying to fit Ignite behind those usages.

*What we are trying to do is when Ignite /expires/ an entry receive the
event on all the nodes and evict it in from Caffeine*.

Are one of these approaches below correct? And/or how can I accomplish this?
Is there a better/easier way?

1) I tried registering a CacheListener with each cache configuration but
that seemed to only fire where the cache event was fired:

config.addCacheEntryListenerConfiguration(new
IgniteExpiredListener<>(cacheManagerProvider));

2) I am experimenting with cache events as well like this below.

ig.events(
            ig.cluster().forServers())
            .remoteListen(
                new IgniteBiPredicate<UUID, CacheEvent>()
                {
                    @Override
                    public boolean apply(UUID uuid, CacheEvent evt)
                    {
                        log.debug("Received local event "
                                  + evt.name()
                                  + ", key="
                                  + evt.key()
                                  + ", at="
                                  + evt.node().consistentId().toString()
                                  + ", "
                                  +
evt.eventNode().consistentId().toString() );
                        cm.getCache(evt.cacheName()).evict(evt.key());
                        return true; // Continue listening.
                    }
                },
                new IgnitePredicate<CacheEvent>()
                {
                    @Override
                    public boolean apply(final CacheEvent evt)
                    {
                        log.debug("Received remote event "
                                  + evt.name()
                                  + ", key="
                                  + evt.key()
                                  + ", at="
                                  + evt.node().consistentId().toString()
                                  + ", "
                                  +
evt.eventNode().consistentId().toString() );
                        return true;
                    }
                },
                EVTS_CACHE);




-----
KJQ
--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
KJQ
Ivan Pavlukhin Ivan Pavlukhin
Reply | Threaded
Open this post in threaded view
|

Re: Confusion with Events

Hi KJQ,

A following comes to my mind:
1. Use IgniteEvents#remoteListen(org.apache.ignite.lang.IgniteBiPredicate<java.util.UUID,T>,
org.apache.ignite.lang.IgnitePredicate<T>, int...) with null first
predicate (locLsnr) (because notifying caller seems not needed).
2. Use IgniteCompute#broadcastAsync(org.apache.ignite.lang.IgniteCallable<R>)
in remote event handler (rmtFilter) to notify all nodes about events.

But here can be following caveats:
1. Such broadcasts can lead to poor performance, events buffering
before broadcast might help.
2. Event listeners can be also triggered for backup partitions
updates, perhaps backup notifications should be filtered to avoid
duplicate broadcasts.

Also more details about your use case can help to develop a good
solution. Currently use case is not fully clear for me.

вт, 1 окт. 2019 г. в 03:38, KJQ <[hidden email]>:

>
> I have some questions regarding Cache Listeners/Events.
>
> We have a system that used a lot of "Caffeine" based caches spread across
> multiple services (in K8S).  Basically "near-caches" (without a backing
> store).  We are now trying to fit Ignite behind those usages.
>
> *What we are trying to do is when Ignite /expires/ an entry receive the
> event on all the nodes and evict it in from Caffeine*.
>
> Are one of these approaches below correct? And/or how can I accomplish this?
> Is there a better/easier way?
>
> 1) I tried registering a CacheListener with each cache configuration but
> that seemed to only fire where the cache event was fired:
>
> config.addCacheEntryListenerConfiguration(new
> IgniteExpiredListener<>(cacheManagerProvider));
>
> 2) I am experimenting with cache events as well like this below.
>
> ig.events(
>             ig.cluster().forServers())
>             .remoteListen(
>                 new IgniteBiPredicate<UUID, CacheEvent>()
>                 {
>                     @Override
>                     public boolean apply(UUID uuid, CacheEvent evt)
>                     {
>                         log.debug("Received local event "
>                                   + evt.name()
>                                   + ", key="
>                                   + evt.key()
>                                   + ", at="
>                                   + evt.node().consistentId().toString()
>                                   + ", "
>                                   +
> evt.eventNode().consistentId().toString() );
>                         cm.getCache(evt.cacheName()).evict(evt.key());
>                         return true; // Continue listening.
>                     }
>                 },
>                 new IgnitePredicate<CacheEvent>()
>                 {
>                     @Override
>                     public boolean apply(final CacheEvent evt)
>                     {
>                         log.debug("Received remote event "
>                                   + evt.name()
>                                   + ", key="
>                                   + evt.key()
>                                   + ", at="
>                                   + evt.node().consistentId().toString()
>                                   + ", "
>                                   +
> evt.eventNode().consistentId().toString() );
>                         return true;
>                     }
>                 },
>                 EVTS_CACHE);
>
>
>
>
> -----
> KJQ
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/



--
Best regards,
Ivan Pavlukhin
KJQ KJQ
Reply | Threaded
Open this post in threaded view
|

Re: Confusion with Events

In reply to this post by KJQ
Thank you for the quick response - it is much appreciated.  We are still
working through our Ignite integration, so far it has been nice, but it is
definitely a learning curve.

So, we have an environment that is deployed as services in a K8S cluster.
Each service uses Caffeine as the in-memory cache (i know Ignite has a
near-cache but cannot make that change now).  Caffeine is also being tested
inside of Reactive Flux/Mono calls making it harder to change quickly.  Each
service, a deployed pod, is also an Ignite "server" node as well.  We use
Ignite, partitioned, as the primary cache (and some distributed compute with
Drools).

Because all of the nodes use Caffeine, and becoming just a near-cache when
Ignite is included, we would like Ignite to raise an "expired" event to all
the nodes and evict that item from Caffeine (before Ignite, Caffeine was
used as the only in-memory cache per service) - we want to cleanup the local
caches on all the nodes.  Each Ignite cache configuration has an expiration
policy setup.

1) I tried using the `addCacheEntryListenerConfiguration` with each Ignite
cache thinking this was a better choice because (i thought) it was backed by
the continuous query and would not require me to explicitly use events.
But, it looked like that only fired on the node where the operation happened
(i.e. locally).  Maybe I could broadcast the event from within this listener
to the other nodes?

2) My next attempt is using the "remoteListen()."  If I understand you
correctly, I do not need a "local listener" but the "remote listener" should
broadcast a message when it is triggered?

*Couple of things I noticed in my test below:*
- If i take out the PUT it seems I never see any callback notifications
- Without the local event listener I do not see any expiration messages
(possibly because of where the data is being cached in the test - local vs.
remote node)

Basically, I would like to do this:
1) R/W to Ignite cache with an "expiration" policy
2) When Ignite decides to "expire" an item raise an event to all Ignite
nodes
3) From the event, evict the cache item locally.

This is what I have right now for testing:

ig.events(
    ig.cluster().forServers())
    .remoteListen(
          new IgniteBiPredicate<UUID, CacheEvent>()
          {
                    @Override
                    public boolean apply(UUID uuid, CacheEvent evt)
                    {
                        log.debug("Received local event {} {} {}  // {} //
{} ",
                                  evt.cacheName(),
                                  evt.name(),
                                  evt.key(),
                                  evt.node().id(),
                                  evt.eventNode().id());
                        return true; // Continue listening.
          },
        new IgnitePredicate<CacheEvent>()
        {
            @Override
            public boolean apply(final CacheEvent evt)
            {
                log.debug("Received remote event {} {} {}  / {} {} ",
                          evt.cacheName(),
                          evt.name(),
                          evt.key(),
                          evt.node().id(),
                          evt.eventNode().id());

                //Broadcast the callback
                ig.compute().broadcastAsync(new IgniteCallable()
                {
                    @Override
                    public Object call() throws Exception
                    {
                        log.debug("Received callback {} {} {}  / {} {} ",
                                 evt.cacheName(),
                                 evt.name(),
                                 evt.key(),
                                 evt.node().id(),
                                 evt.eventNode().id());
                        return null;
                    }
                });

                return true;
            }
        },
        EVT_CACHE_OBJECT_PUT,
        EVT_CACHE_OBJECT_EXPIRED)






-----
KJQ
--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
KJQ
Ivan Pavlukhin Ivan Pavlukhin
Reply | Threaded
Open this post in threaded view
|

Re: Confusion with Events

Hi KJQ,

Thank you for sharing details. Now things are more clear for me.

I suppose you should enable needed event types. You can call
setIncludeEventTypes(EventType.EVT_CACHE_OBJECT_PUT,
EventType.EVT_CACHE_OBJECT_EXPIRED) on your IgniteConfiguration. Also
I noticed that you use ig.compute() on Ignite instance (ig) local to
your calling code. But actually you need to inject actual Ignite
instance where event listener is called, it can be done by using
@IgniteInstanceResource. You can find such injection in
ComputeFibonacciContinuationExample in Ignite sources.

вт, 1 окт. 2019 г. в 18:24, KJQ <[hidden email]>:

>
> Thank you for the quick response - it is much appreciated.  We are still
> working through our Ignite integration, so far it has been nice, but it is
> definitely a learning curve.
>
> So, we have an environment that is deployed as services in a K8S cluster.
> Each service uses Caffeine as the in-memory cache (i know Ignite has a
> near-cache but cannot make that change now).  Caffeine is also being tested
> inside of Reactive Flux/Mono calls making it harder to change quickly.  Each
> service, a deployed pod, is also an Ignite "server" node as well.  We use
> Ignite, partitioned, as the primary cache (and some distributed compute with
> Drools).
>
> Because all of the nodes use Caffeine, and becoming just a near-cache when
> Ignite is included, we would like Ignite to raise an "expired" event to all
> the nodes and evict that item from Caffeine (before Ignite, Caffeine was
> used as the only in-memory cache per service) - we want to cleanup the local
> caches on all the nodes.  Each Ignite cache configuration has an expiration
> policy setup.
>
> 1) I tried using the `addCacheEntryListenerConfiguration` with each Ignite
> cache thinking this was a better choice because (i thought) it was backed by
> the continuous query and would not require me to explicitly use events.
> But, it looked like that only fired on the node where the operation happened
> (i.e. locally).  Maybe I could broadcast the event from within this listener
> to the other nodes?
>
> 2) My next attempt is using the "remoteListen()."  If I understand you
> correctly, I do not need a "local listener" but the "remote listener" should
> broadcast a message when it is triggered?
>
> *Couple of things I noticed in my test below:*
> - If i take out the PUT it seems I never see any callback notifications
> - Without the local event listener I do not see any expiration messages
> (possibly because of where the data is being cached in the test - local vs.
> remote node)
>
> Basically, I would like to do this:
> 1) R/W to Ignite cache with an "expiration" policy
> 2) When Ignite decides to "expire" an item raise an event to all Ignite
> nodes
> 3) From the event, evict the cache item locally.
>
> This is what I have right now for testing:
>
> ig.events(
>     ig.cluster().forServers())
>     .remoteListen(
>           new IgniteBiPredicate<UUID, CacheEvent>()
>           {
>                     @Override
>                     public boolean apply(UUID uuid, CacheEvent evt)
>                     {
>                         log.debug("Received local event {} {} {}  // {} //
> {} ",
>                                   evt.cacheName(),
>                                   evt.name(),
>                                   evt.key(),
>                                   evt.node().id(),
>                                   evt.eventNode().id());
>                         return true; // Continue listening.
>           },
>         new IgnitePredicate<CacheEvent>()
>         {
>             @Override
>             public boolean apply(final CacheEvent evt)
>             {
>                 log.debug("Received remote event {} {} {}  / {} {} ",
>                           evt.cacheName(),
>                           evt.name(),
>                           evt.key(),
>                           evt.node().id(),
>                           evt.eventNode().id());
>
>                 //Broadcast the callback
>                 ig.compute().broadcastAsync(new IgniteCallable()
>                 {
>                     @Override
>                     public Object call() throws Exception
>                     {
>                         log.debug("Received callback {} {} {}  / {} {} ",
>                                  evt.cacheName(),
>                                  evt.name(),
>                                  evt.key(),
>                                  evt.node().id(),
>                                  evt.eventNode().id());
>                         return null;
>                     }
>                 });
>
>                 return true;
>             }
>         },
>         EVT_CACHE_OBJECT_PUT,
>         EVT_CACHE_OBJECT_EXPIRED)
>
>
>
>
>
>
> -----
> KJQ
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/



--
Best regards,
Ivan Pavlukhin
KJQ KJQ
Reply | Threaded
Open this post in threaded view
|

Re: Confusion with Events

In reply to this post by KJQ
Thank you for your help!  I "think" this is working.  

*One thing I did notice in my test environment is that the events can be
raised in drastically different times?  *

It is not a huge deal since getting the item (providing the near cache has
expired it) will trigger the expiration on Ignite which will in turn raise
the events but I was just curious why such the delay especially in a local
test environment...

What is really odd to me is that no matter what I do, what order I put items
in, what order the cache gets initially created, anything in "cache1" always
seems to raise the event in a timely manner and both caches have the exact
same configuration.  My test case, which uses sleep() to simulate the
expiration, seems to show more consistent results with the events being
raised more timely.

*For example:*

1) Spin up a single Ignite node (embedded / server / partitioned /
EVT_CACHE_OBJECT_EXPIRED )
      a) cache1 defined via configuration (eager TTL = true)
      b) cache2 defined via configuration (eager TTL = true)

I assign a node attribute to each cache (cache = true) and then use a
predicate to filter only those specific nodes in the listener.

2) Using a dummy REST resource (in same instance) put entries into the
near-cache (caffeine) which writes through to the far-cache (ignite).
Caffeine expires in 30s / Ignite expires in 60s

On the first call to caffeine (which would be per Ignite node as well), if I
have not initialized the ignite side, attach the listener (so, this happens
just once on the node).  Idea is to listen on any "expired" events and raise
the action to the local near-cache.

Then...

      a) put k1/aaa into cache1
      b) put k2/bbb into cache2
      c) put k3/ccc into cache2

/Output looks something like (9:35):/

[DEBUG] CaffeineConfigurableCacheManager : Created cache cache1
[ WARN] CaffeineConfigurableCacheManager : Initialized....
[DEBUG] IgniteCacheBuilder  : Writing aaa to cache1
[DEBUG] CaffeineConfigurableCacheManager : Created cache cache2
[DEBUG] IgniteCacheBuilder  : Writing bbb to cache2
[DEBUG] IgniteCacheBuilder  : Writing ccc to cache2

3) Sit and wait (just looking at the events only, not if I get the record
which forces the expiry on expired items)...

/Within the ~"60s" expiration policy on Ignite I see (9:36):/

[DEBUG] IgniteCacheBuilder  : Received remote event cache1
CACHE_OBJECT_EXPIRED aaa
[DEBUG] IgniteCacheBuilder  : Eviction callback cache1 CACHE_OBJECT_EXPIRED
aaa

/Much later I see (9:52):/

[DEBUG] IgniteCacheBuilder  : Received remote event cache2
CACHE_OBJECT_EXPIRED bbb
[DEBUG] IgniteCacheBuilder  : Eviction callback cache2 CACHE_OBJECT_EXPIRED
bbb
[DEBUG] IgniteCacheBuilder  : Received remote event cache2
CACHE_OBJECT_EXPIRED ccc
[DEBUG] IgniteCacheBuilder  : Eviction callback cache2 CACHE_OBJECT_EXPIRED
ccc

*My current code looks something like this:*

ignite.events(ignite.cluster().forPredicate(new
AttributeNodeFilter<>(CACHE_NODE_ATTRIBUTE_KEY, true)))
.remoteListen(
null,
new IgnitePredicate<CacheEvent>()
{
    @IgniteInstanceResource
    private transient Ignite ignite;

    @SpringResource(resourceClass = CacheManager.class)
    private transient CacheManager cacheManager;

    @Override
    public boolean apply(final CacheEvent event)
    {
        log.debug("Received remote event {} {} {}  / {} {} ",
                  event.cacheName(),
                  event.name(),
                  event.key(),
                  event.node().id(),
                  event.eventNode().id());

        //Broadcase the callback
        ignite.compute().broadcastAsync((IgniteCallable)() -> {
            try
            {
                log.debug("Eviction callback {} {} {}  / {} {} ",
                          event.cacheName(),
                          event.name(),
                          event.key(),
                          event.node().id(),
                          event.eventNode().id());
                //Evict item from cache
                cacheManager
                    .getCache(event.cacheName())
                    .evict(event.key());
            }
            catch (Exception x)
            {
                log.error("Callback error: {}", x.getMessage(), x);
            }

            //Return the event
            return true;
        });

        return true;
    }
},
EVT_CACHE_ENTRY_EVICTED,
EVT_CACHE_OBJECT_EXPIRED);



-----
KJQ
--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
KJQ
Ivan Pavlukhin Ivan Pavlukhin
Reply | Threaded
Open this post in threaded view
|

Re: Confusion with Events

Hi KJQ,

> *One thing I did notice in my test environment is that the events can be raised in drastically different times?  *
Hard to say what is it. First idea is that there is something
preventing cache2 entries from expiration, e.g. someone reads cache2
entries. Would be great to have an isolated reproducer in order to
debug the thing if the problem reproduces.

чт, 3 окт. 2019 г. в 16:56, KJQ <[hidden email]>:

>
> Thank you for your help!  I "think" this is working.
>
> *One thing I did notice in my test environment is that the events can be
> raised in drastically different times?  *
>
> It is not a huge deal since getting the item (providing the near cache has
> expired it) will trigger the expiration on Ignite which will in turn raise
> the events but I was just curious why such the delay especially in a local
> test environment...
>
> What is really odd to me is that no matter what I do, what order I put items
> in, what order the cache gets initially created, anything in "cache1" always
> seems to raise the event in a timely manner and both caches have the exact
> same configuration.  My test case, which uses sleep() to simulate the
> expiration, seems to show more consistent results with the events being
> raised more timely.
>
> *For example:*
>
> 1) Spin up a single Ignite node (embedded / server / partitioned /
> EVT_CACHE_OBJECT_EXPIRED )
>       a) cache1 defined via configuration (eager TTL = true)
>       b) cache2 defined via configuration (eager TTL = true)
>
> I assign a node attribute to each cache (cache = true) and then use a
> predicate to filter only those specific nodes in the listener.
>
> 2) Using a dummy REST resource (in same instance) put entries into the
> near-cache (caffeine) which writes through to the far-cache (ignite).
> Caffeine expires in 30s / Ignite expires in 60s
>
> On the first call to caffeine (which would be per Ignite node as well), if I
> have not initialized the ignite side, attach the listener (so, this happens
> just once on the node).  Idea is to listen on any "expired" events and raise
> the action to the local near-cache.
>
> Then...
>
>       a) put k1/aaa into cache1
>       b) put k2/bbb into cache2
>       c) put k3/ccc into cache2
>
> /Output looks something like (9:35):/
>
> [DEBUG] CaffeineConfigurableCacheManager : Created cache cache1
> [ WARN] CaffeineConfigurableCacheManager : Initialized....
> [DEBUG] IgniteCacheBuilder  : Writing aaa to cache1
> [DEBUG] CaffeineConfigurableCacheManager : Created cache cache2
> [DEBUG] IgniteCacheBuilder  : Writing bbb to cache2
> [DEBUG] IgniteCacheBuilder  : Writing ccc to cache2
>
> 3) Sit and wait (just looking at the events only, not if I get the record
> which forces the expiry on expired items)...
>
> /Within the ~"60s" expiration policy on Ignite I see (9:36):/
>
> [DEBUG] IgniteCacheBuilder  : Received remote event cache1
> CACHE_OBJECT_EXPIRED aaa
> [DEBUG] IgniteCacheBuilder  : Eviction callback cache1 CACHE_OBJECT_EXPIRED
> aaa
>
> /Much later I see (9:52):/
>
> [DEBUG] IgniteCacheBuilder  : Received remote event cache2
> CACHE_OBJECT_EXPIRED bbb
> [DEBUG] IgniteCacheBuilder  : Eviction callback cache2 CACHE_OBJECT_EXPIRED
> bbb
> [DEBUG] IgniteCacheBuilder  : Received remote event cache2
> CACHE_OBJECT_EXPIRED ccc
> [DEBUG] IgniteCacheBuilder  : Eviction callback cache2 CACHE_OBJECT_EXPIRED
> ccc
>
> *My current code looks something like this:*
>
> ignite.events(ignite.cluster().forPredicate(new
> AttributeNodeFilter<>(CACHE_NODE_ATTRIBUTE_KEY, true)))
> .remoteListen(
> null,
> new IgnitePredicate<CacheEvent>()
> {
>     @IgniteInstanceResource
>     private transient Ignite ignite;
>
>     @SpringResource(resourceClass = CacheManager.class)
>     private transient CacheManager cacheManager;
>
>     @Override
>     public boolean apply(final CacheEvent event)
>     {
>         log.debug("Received remote event {} {} {}  / {} {} ",
>                   event.cacheName(),
>                   event.name(),
>                   event.key(),
>                   event.node().id(),
>                   event.eventNode().id());
>
>         //Broadcase the callback
>         ignite.compute().broadcastAsync((IgniteCallable)() -> {
>             try
>             {
>                 log.debug("Eviction callback {} {} {}  / {} {} ",
>                           event.cacheName(),
>                           event.name(),
>                           event.key(),
>                           event.node().id(),
>                           event.eventNode().id());
>                 //Evict item from cache
>                 cacheManager
>                     .getCache(event.cacheName())
>                     .evict(event.key());
>             }
>             catch (Exception x)
>             {
>                 log.error("Callback error: {}", x.getMessage(), x);
>             }
>
>             //Return the event
>             return true;
>         });
>
>         return true;
>     }
> },
> EVT_CACHE_ENTRY_EVICTED,
> EVT_CACHE_OBJECT_EXPIRED);
>
>
>
> -----
> KJQ
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/



--
Best regards,
Ivan Pavlukhin
KJQ KJQ
Reply | Threaded
Open this post in threaded view
|

Re: Confusion with Events

Ivan, thanks for all of the help!  I think we have it working based on your
advice.

I cannot seem to really reproduce it without following a very specific
scenario outside of a test case.  The test case works correctly - one change
that I made was to change the predicate from looking for nodes with a
specific attribute to all server nodes.

What has been frustrating, but nothing to do with Ignite, is the way working
in Windows WSL vs. DOS.  Oddly, WSL does not seem to respect Windows ports
and it led us down a red herring thinking our configuration was messed up
somewhere.



-----
KJQ
--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
KJQ