Map Reduce over cache items, where values are sequences

classic Classic list List threaded Threaded
3 messages Options
stas stas
Reply | Threaded
Open this post in threaded view
|

Map Reduce over cache items, where values are sequences

Hello everyone,

I would like to use MapReduce over cache items representing events happened in a process to calculate certain statistics. Could you be so kind to help me how can I do that with apache ignite?

I have tens of millions of processes that happened in the past. The processes look like a sequence of events [event1, event2, event3, ... eventN], where number of events per process could vary (50-100). Every event has certain sets of attributes like timestamp, event type, set of metrics. I put these data to a cache as process_id => [e1, e2, e3, e4, ...]. What I would like to get is to get a histogram how often event of a certain type happens in all the processes or processes that have certain condition. What I managed to do is to broadcast a callable that lands on ignite nodes and can access local cache items and counts what I want and returns it back to the caller in K chunks which I have to aggregate on the client.

Ignite localIgnite = Ignition.localIgnite();
IgniteCache<String, MyProcess> localCache = localIgnite.cache("processes");
MyHistogram hist = new MyHistogram()
for (Cache.Entry<String, MyProcess> e : localCache.localEntries()) {
    hist.process(e.getValue());
}
return hist;

The problem with the approach is it utilizes only a single core on the ignite node, while I have 64. How could I do something similar in more efficient manner?

thank you in advance.
ezhuravlev ezhuravlev
Reply | Threaded
Open this post in threaded view
|

Re: Map Reduce over cache items, where values are sequences

Hi,

To parallelise everything properly, I would recommend starting an affinityCallable per partition(1024 by default). Inside this compute job, you can collect information for the certain partition only using ScanQuery(or SQLQuery)


пт, 27 сент. 2019 г. в 18:09, Stas Girkin <[hidden email]>:
Hello everyone,

I would like to use MapReduce over cache items representing events happened in a process to calculate certain statistics. Could you be so kind to help me how can I do that with apache ignite?

I have tens of millions of processes that happened in the past. The processes look like a sequence of events [event1, event2, event3, ... eventN], where number of events per process could vary (50-100). Every event has certain sets of attributes like timestamp, event type, set of metrics. I put these data to a cache as process_id => [e1, e2, e3, e4, ...]. What I would like to get is to get a histogram how often event of a certain type happens in all the processes or processes that have certain condition. What I managed to do is to broadcast a callable that lands on ignite nodes and can access local cache items and counts what I want and returns it back to the caller in K chunks which I have to aggregate on the client.

Ignite localIgnite = Ignition.localIgnite();
IgniteCache<String, MyProcess> localCache = localIgnite.cache("processes");
MyHistogram hist = new MyHistogram()
for (Cache.Entry<String, MyProcess> e : localCache.localEntries()) {
    hist.process(e.getValue());
}
return hist;

The problem with the approach is it utilizes only a single core on the ignite node, while I have 64. How could I do something similar in more efficient manner?

thank you in advance.
ezhuravlev ezhuravlev
Reply | Threaded
Open this post in threaded view
|

Re: Map Reduce over cache items, where values are sequences

For example, query with partition will look like :
QueryCursor<Cache.Entry<Integer, Integer>> qry =
cache.query(new ScanQuery<Integer, Integer>().setPartition(part));

вт, 1 окт. 2019 г. в 15:25, Evgenii Zhuravlev <[hidden email]>:
Hi,

To parallelise everything properly, I would recommend starting an affinityCallable per partition(1024 by default). Inside this compute job, you can collect information for the certain partition only using ScanQuery(or SQLQuery)


пт, 27 сент. 2019 г. в 18:09, Stas Girkin <[hidden email]>:
Hello everyone,

I would like to use MapReduce over cache items representing events happened in a process to calculate certain statistics. Could you be so kind to help me how can I do that with apache ignite?

I have tens of millions of processes that happened in the past. The processes look like a sequence of events [event1, event2, event3, ... eventN], where number of events per process could vary (50-100). Every event has certain sets of attributes like timestamp, event type, set of metrics. I put these data to a cache as process_id => [e1, e2, e3, e4, ...]. What I would like to get is to get a histogram how often event of a certain type happens in all the processes or processes that have certain condition. What I managed to do is to broadcast a callable that lands on ignite nodes and can access local cache items and counts what I want and returns it back to the caller in K chunks which I have to aggregate on the client.

Ignite localIgnite = Ignition.localIgnite();
IgniteCache<String, MyProcess> localCache = localIgnite.cache("processes");
MyHistogram hist = new MyHistogram()
for (Cache.Entry<String, MyProcess> e : localCache.localEntries()) {
    hist.process(e.getValue());
}
return hist;

The problem with the approach is it utilizes only a single core on the ignite node, while I have 64. How could I do something similar in more efficient manner?

thank you in advance.