Near Cache versus Continuous Query

classic Classic list List threaded Threaded
5 messages Options
tschauenberg tschauenberg
Reply | Threaded
Open this post in threaded view
|

Near Cache versus Continuous Query

Hi,

I have a use case where I want a fully copy of a replicated cache on a
subset of my thick clients.  In this example, I have an ETL thick client
that creates and updates a replicated cache in my server grid.  I then have
a series of webserver thick clients that I always want a fully up to date
copy of that cache.

NearCache Attempt:
=============
I tried using a NearCache on the webserver thick clients but this had two
undesireable problems:
* it created a near cache on each server node which I have no use for since
this is already an replicated cache
* the near cache never received new entries from the replicated cache, it
was only updated with entries it had already had stored in it.

Is there a way I can resolve either of these two undesireable problems of
the NearCache for my situation?


Continuous Query Attempt:
=================
This lead me to instead consider Continuous Queries (CQ) where I would have
each webserver maintain it's own Map of the server cache data where upon
startup it uses the CQ initial query to get the current server state on
startup and then uses the CQ local listener.  

Trying to get the CQ working I followed the examples in text in
https://ignite.apache.org/docs/latest/configuring-caches/near-cache however
I can only see the local listener updates if I run the query in my own
thread that I never let finish.

What am I doing wrong in the code below?

Continuous Query code:
---------------------------

// Create new continuous query.
val qry = ContinuousQuery<Int, String>()

// have it return all data in its initial query
// Setting an optional initial query.
qry.setInitialQuery(ScanQuery())

// don't set a remote filter as we want all data returned
// qry.setRemoteFilterFactory()

// Callback that is called locally when update notifications are received.
qry.setLocalListener { events ->
    println("Update notifications
starting...[thread=${Thread.currentThread()}]")
    for (e in events) {
        println("Listener event: [thread=${Thread.currentThread()},
key=${e.key}, val=${e.value}]")
    }
    println("Update notifications finished
[thread=${Thread.currentThread()}]")
}

val executor = Executors.newSingleThreadExecutor()
executor.submit {
    myCache.query(qry).use { cur ->

        // Iterating over initial query results
        println("Initial query cursor
starting...[thread=${Thread.currentThread()}]")
        for (e in cur) {
            println("Cursor value: [thread=${Thread.currentThread()},
key=${e.key}, val=${e.value}]")
        }
        println("Initial query cursor finished
[thread=${Thread.currentThread()}]")

        println("Starting holding continuous query cursor open so we can get
callback data... [thread=${Thread.currentThread()}]")
        val shouldBeRunning = true
        while (shouldBeRunning) {
            // hold this thread open forever so the local listener callback
can keep processing events
            try {
                Thread.sleep(5000)
            } catch (e: InterruptedException) {
                println("Continuous query sleep interrupted
[thread=${Thread.currentThread()}]")
                throw e
            }
        }
        println("Stopping holding continuous query cursor open because we
got shutdown signal [thread=${Thread.currentThread()}]")
    }
}





--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
ptupitsyn ptupitsyn
Reply | Threaded
Open this post in threaded view
|

Re: Near Cache versus Continuous Query

Hi,

> it created a near cache on each server node

There are two Near Cache modes: 

* Server Node Near Cache - configured once before cache start for all server nodes
* Client Node Near Cache - configured dynamically on per-node basis

Please make sure to use the second mode, like explained in the docs [1]:
ignite.getOrCreateNearCache("myCache", nearCfg);


On Sat, Feb 20, 2021 at 4:06 AM tschauenberg <[hidden email]> wrote:
Hi,

I have a use case where I want a fully copy of a replicated cache on a
subset of my thick clients.  In this example, I have an ETL thick client
that creates and updates a replicated cache in my server grid.  I then have
a series of webserver thick clients that I always want a fully up to date
copy of that cache.

NearCache Attempt:
=============
I tried using a NearCache on the webserver thick clients but this had two
undesireable problems:
* it created a near cache on each server node which I have no use for since
this is already an replicated cache
* the near cache never received new entries from the replicated cache, it
was only updated with entries it had already had stored in it.

Is there a way I can resolve either of these two undesireable problems of
the NearCache for my situation?


Continuous Query Attempt:
=================
This lead me to instead consider Continuous Queries (CQ) where I would have
each webserver maintain it's own Map of the server cache data where upon
startup it uses the CQ initial query to get the current server state on
startup and then uses the CQ local listener. 

Trying to get the CQ working I followed the examples in text in
https://ignite.apache.org/docs/latest/configuring-caches/near-cache however
I can only see the local listener updates if I run the query in my own
thread that I never let finish.

What am I doing wrong in the code below?

Continuous Query code:
---------------------------

// Create new continuous query.
val qry = ContinuousQuery<Int, String>()

// have it return all data in its initial query
// Setting an optional initial query.
qry.setInitialQuery(ScanQuery())

// don't set a remote filter as we want all data returned
// qry.setRemoteFilterFactory()

// Callback that is called locally when update notifications are received.
qry.setLocalListener { events ->
    println("Update notifications
starting...[thread=${Thread.currentThread()}]")
    for (e in events) {
        println("Listener event: [thread=${Thread.currentThread()},
key=${e.key}, val=${e.value}]")
    }
    println("Update notifications finished
[thread=${Thread.currentThread()}]")
}

val executor = Executors.newSingleThreadExecutor()
executor.submit {
    myCache.query(qry).use { cur ->

        // Iterating over initial query results
        println("Initial query cursor
starting...[thread=${Thread.currentThread()}]")
        for (e in cur) {
            println("Cursor value: [thread=${Thread.currentThread()},
key=${e.key}, val=${e.value}]")
        }
        println("Initial query cursor finished
[thread=${Thread.currentThread()}]")

        println("Starting holding continuous query cursor open so we can get
callback data... [thread=${Thread.currentThread()}]")
        val shouldBeRunning = true
        while (shouldBeRunning) {
            // hold this thread open forever so the local listener callback
can keep processing events
            try {
                Thread.sleep(5000)
            } catch (e: InterruptedException) {
                println("Continuous query sleep interrupted
[thread=${Thread.currentThread()}]")
                throw e
            }
        }
        println("Stopping holding continuous query cursor open because we
got shutdown signal [thread=${Thread.currentThread()}]")
    }
}





--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
ilya.kasnacheev ilya.kasnacheev
Reply | Threaded
Open this post in threaded view
|

Re: Near Cache versus Continuous Query

In reply to this post by tschauenberg
Hello!

"I always want a fully up to date copy of that cache" sounds like you need a cache store actually.

Regards,
--
Ilya Kasnacheev


сб, 20 февр. 2021 г. в 04:05, tschauenberg <[hidden email]>:
Hi,

I have a use case where I want a fully copy of a replicated cache on a
subset of my thick clients.  In this example, I have an ETL thick client
that creates and updates a replicated cache in my server grid.  I then have
a series of webserver thick clients that I always want a fully up to date
copy of that cache.

NearCache Attempt:
=============
I tried using a NearCache on the webserver thick clients but this had two
undesireable problems:
* it created a near cache on each server node which I have no use for since
this is already an replicated cache
* the near cache never received new entries from the replicated cache, it
was only updated with entries it had already had stored in it.

Is there a way I can resolve either of these two undesireable problems of
the NearCache for my situation?


Continuous Query Attempt:
=================
This lead me to instead consider Continuous Queries (CQ) where I would have
each webserver maintain it's own Map of the server cache data where upon
startup it uses the CQ initial query to get the current server state on
startup and then uses the CQ local listener. 

Trying to get the CQ working I followed the examples in text in
https://ignite.apache.org/docs/latest/configuring-caches/near-cache however
I can only see the local listener updates if I run the query in my own
thread that I never let finish.

What am I doing wrong in the code below?

Continuous Query code:
---------------------------

// Create new continuous query.
val qry = ContinuousQuery<Int, String>()

// have it return all data in its initial query
// Setting an optional initial query.
qry.setInitialQuery(ScanQuery())

// don't set a remote filter as we want all data returned
// qry.setRemoteFilterFactory()

// Callback that is called locally when update notifications are received.
qry.setLocalListener { events ->
    println("Update notifications
starting...[thread=${Thread.currentThread()}]")
    for (e in events) {
        println("Listener event: [thread=${Thread.currentThread()},
key=${e.key}, val=${e.value}]")
    }
    println("Update notifications finished
[thread=${Thread.currentThread()}]")
}

val executor = Executors.newSingleThreadExecutor()
executor.submit {
    myCache.query(qry).use { cur ->

        // Iterating over initial query results
        println("Initial query cursor
starting...[thread=${Thread.currentThread()}]")
        for (e in cur) {
            println("Cursor value: [thread=${Thread.currentThread()},
key=${e.key}, val=${e.value}]")
        }
        println("Initial query cursor finished
[thread=${Thread.currentThread()}]")

        println("Starting holding continuous query cursor open so we can get
callback data... [thread=${Thread.currentThread()}]")
        val shouldBeRunning = true
        while (shouldBeRunning) {
            // hold this thread open forever so the local listener callback
can keep processing events
            try {
                Thread.sleep(5000)
            } catch (e: InterruptedException) {
                println("Continuous query sleep interrupted
[thread=${Thread.currentThread()}]")
                throw e
            }
        }
        println("Stopping holding continuous query cursor open because we
got shutdown signal [thread=${Thread.currentThread()}]")
    }
}





--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
bhlewka bhlewka
Reply | Threaded
Open this post in threaded view
|

Re: Near Cache versus Continuous Query

This post was updated on .
I have 3 questions:

1. Why do the server nodes use on heap storage when a client initializes a dynamic near cache? We have never noticed any on-heap storage usage before using a near cache on one of our clients.

    Observe no caches initialized on server
       
visor> cache
        [WARN ] No caches found.
    Initialize a normal cache and load 10 entries using client A, then disconnect client A
       
myCache = igniteClient.getOrCreateCache(
                CacheConfiguration<Int, MyDataType>("myCache")
                    .setCacheMode(CacheMode.REPLICATED)
                    .setStatisticsEnabled(true)
                    .setEncryptionEnabled(true)
                    .setAffinity(rendezvousAffinityFunction)
        )
    Observe loaded data, with 0 on-heap entries and no clients connected
       
visor> cache
        Time of the snapshot: 2021-02-22 22:05:03
        +====================================================================================================================================================================+
        |        Name(@)        |    Mode     | Nodes | Total entries (Heap / Off-heap) | Primary entries (Heap / Off-heap) |   Hits    |  Misses   |   Reads   |   Writes   |
        +====================================================================================================================================================================+
        | myCache         (@c0) | REPLICATED  | 1     | 10 (0 / 10)                     | min: 10 (0 / 10)                  | min: 0    | min: 0    | min: 0    | min: 10    |
        |                       |             |       |                                 | avg: 10.00 (0.00 / 10.00)         | avg: 0.00 | avg: 0.00 | avg: 0.00 | avg: 10.00 |
        |                       |             |       |                                 | max: 10 (0 / 10)                  | max: 0    | max: 0    | max: 0    | max: 10    |
        +-----------------------+-------------+-------+---------------------------------+-----------------------------------+-----------+-----------+-----------+------------+
    Connect client B with a near cache and get 2 records, populating the near cache with 2 records.
       
 myCache = igniteClient.getOrCreateNearCache(
            "myCache",
            NearCacheConfiguration()
        )
        myCache.get(1)
        myCache.get(2)
    Observe 4 on heap entries while Client B is connected. Observe Client B has 2 records in its near cache.
       
Cache 'myCache(@c0)':
        +---------------------------------------------------------+
        | Name(@)                         | myCache(@c0) |
        | Total entries (Heap / Off-heap) | 14 (4 / 10)           |
        | Nodes                           | 2                     |
        | Total size Min/Avg/Max          | 2 / 7.00 / 12         |
        |   Heap size Min/Avg/Max         | 2 / 2.00 / 2          |
        |   Off-heap size Min/Avg/Max     | 0 / 5.00 / 10         |
        +---------------------------------------------------------+

        Nodes for: myCache(@c0)
        +====================================================================================================================+
        |       Node ID8(@), IP       | CPUs | Heap Used | CPU Load  |   Up Time    | Size (Primary / Backup)  | Hi/Mi/Rd/Wr |
        +====================================================================================================================+
        | Client B(@n1), 10.130.8.195 | 12   | 4.53 %    | -100.00 % | 00:01:24.677 | Total: 2 (2 / 0)         | Hi: 0       |
        |                             |      |           |           |              |   Heap: 2 (2 / <n/a>)    | Mi: 4       |
        |                             |      |           |           |              |   Off-Heap: 0 (0 / 0)    | Rd: 4       |
        |                             |      |           |           |              |   Off-Heap Memory: 0     | Wr: 0       |
        +-----------------------------+------+-----------+-----------+--------------+--------------------------+-------------+
        | Server  (@n0), 172.17.0.2   | 2    | 27.84 %   | 0.67 %    | 00:27:30.191 | Total: 12 (12 / 0)       | Hi: 4       |
        |                             |      |           |           |              |   Heap: 2 (2 / <n/a>)    | Mi: 0       |
        |                             |      |           |           |              |   Off-Heap: 10 (10 / 0)  | Rd: 4       |
        |                             |      |           |           |              |   Off-Heap Memory: <n/a> | Wr: 10      |
        +--------------------------------------------------------------------------------------------------------------------+
    Disconnect client B, Observe 2 on heap entries and no clients connected
       
visor> cache
        Time of the snapshot: 2021-02-22 22:10:44
        +====================================================================================================================================================================+
        |        Name(@)        |    Mode     | Nodes | Total entries (Heap / Off-heap) | Primary entries (Heap / Off-heap) |   Hits    |  Misses   |   Reads   |   Writes   |
        +====================================================================================================================================================================+
        | myCache         (@c0) | REPLICATED  | 1     | 12 (2 / 10)                     | min: 12 (2 / 10)                  | min: 2    | min: 0    | min: 2    | min: 10    |
        |                       |             |       |                                 | avg: 12.00 (2.00 / 10.00)         | avg: 2.00 | avg: 0.00 | avg: 2.00 | avg: 10.00 |
        |                       |             |       |                                 | max: 12 (2 / 10)                  | max: 2    | max: 0    | max: 2    | max: 10    |
        +-----------------------+-------------+-------+---------------------------------+-----------------------------------+-----------+-----------+-----------+------------+
    Why do we have some on-heap entries?

2. What is the best way to ensure the near cache is kept completely in-sync with the server cache, would it be sufficient to use a continuous query that makes a cache.get() call whenever the continuous query gets a CREATED event? The cache will keep a small enough amount of data that we do not have a need for an eviction policy.

3. What is the best way to keep a continuous query open indefinitely?
ilya.kasnacheev ilya.kasnacheev
Reply | Threaded
Open this post in threaded view
|

Re: Near Cache versus Continuous Query

Hello!

1. I'm not sure, but perhaps this is due to necessity to track nodes with near cache to keep it in sync.

2. I'm not sure there's the best way. Near cache is a cache, and "cache" usually means "hot subset". The only way to be sure about complete in-sync is to have a server node working on replicated cache, with FULL_SYNC.

3. You may open it in a singleton service's init(), for example. Still there is no guarantee that it is always up, only that it's never down for a long time. If you really need to catch every write always, consider a cache store.

Regards,
-- 
Ilya Kasnacheev


вт, 23 февр. 2021 г. в 01:32, bhlewka <[hidden email]>:
I have 3 questions:

1. Why do the server nodes use on heap storage when a client initializes a
dynamic near cache? We have never noticed any on-heap storage usage before
using a near cache on one of our clients.

    Observe no caches initialized on server

    Initialize a normal cache and load 10 entries using client A, then
disconnect client A

    Observe loaded data, with 0 on-heap entries and no clients connected

    Connect client B with a near cache and get 2 records, populating the
near cache with 2 records.

    Observe 4 on heap entries while Client B is connected. Observe Client B
has 2 records in its near cache.

    Disconnect client B, Observe 2 on heap entries and no clients connected

    Why do we have some on-heap entries?

2. What is the best way to ensure the near cache is kept completely in-sync
with the server cache, would it be sufficient to use a continuous query that
makes a cache.get() call whenever the continuous query gets a CREATED event?
The cache will keep a small enough amount of data that we do not have a need
for an eviction policy.

3. What is the best way to keep a continuous query open indefinitely?



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/