Map-reduce proceesing

classic Classic list List threaded Threaded
5 messages Options
dmreshet dmreshet
Reply | Threaded
Open this post in threaded view
|

Map-reduce proceesing

Hello!
I want to implement SQL query in terms of MapReduce with ComputeTaskSplitAdapter.

select * from Person where salary > ?

And I want to know what is the best practise to do this?

At this moment I am using cache.localEntries() to get all cache values at Map stage and it look's like it is not coorect, because there is no garanties that each task will be executed on different nodes of Ignite Data Grid.

Here is an example of split method of  my ComputeTaskSplitAdapter  class


    @Override
    protected Collection<? extends ComputeJob> split(int gridSize, Integer salary) throws IgniteException {
        List<ComputeJob> jobs = new ArrayList<>(gridSize);

        for (int i = 0; i < gridSize; i++) {
            jobs.add(new ComputeJobAdapter() {
                @Override
                public Object execute() {
                    IgniteCache<Long, Person> cache = Ignition.ignite().cache(Executor.PERSON_CACHE);
                    List<Person> list = new ArrayList<>();
                    Iterable<Cache.Entry<Long, Person>> entries = cache.localEntries();
                    entries.forEach((entry -> {
                        if (entry.getValue().getSalary() > salary) {
                            list.add(entry.getValue());
                        }
                    }));

                    return list;
                }
            });
        }

        return jobs;
    }


Vladimir Ozerov Vladimir Ozerov
Reply | Threaded
Open this post in threaded view
|

Re: Map-reduce proceesing

Hi,

There is no need to implement SQL queries using map-reduce. Ignite already has it's own query engine. Please refer to org.apache.ignite.cache.query.SqlQuery class and IgniteCache.query() method.

Alternatively you can use scan queries for some cases. See org.apache.ignite.cache.query.ScanQuery.

Vladimir.

On Wed, Apr 20, 2016 at 10:41 AM, dmreshet <[hidden email]> wrote:
Hello!
I want to implement SQL query in terms of MapReduce with
ComputeTaskSplitAdapter.

/select * from Person where salary > ?/

And I want to know what is the best practise to do this?

At this moment I am using cache.localEntries() to get all cache values at
Map stage and it look's like it is not coorect, because there is no
garanties that each task will be executed on different nodes of Ignite Data
Grid.

Here is an example of split method of  my ComputeTaskSplitAdapter  class


/    @Override
    protected Collection<? extends ComputeJob> split(int gridSize, Integer
salary) throws IgniteException {
        List<ComputeJob> jobs = new ArrayList<>(gridSize);

        for (int i = 0; i < gridSize; i++) {
            jobs.add(new ComputeJobAdapter() {
                @Override
                public Object execute() {
                    IgniteCache<Long, Person> cache =
Ignition.ignite().cache(Executor.PERSON_CACHE);
                    List<Person> list = new ArrayList<>();
                    Iterable<Cache.Entry&lt;Long, Person>> entries =
cache.localEntries();
                    entries.forEach((entry -> {
                        if (entry.getValue().getSalary() > salary) {
                            list.add(entry.getValue());
                        }
                    }));

                    return list;
                }
            });
        }

        return jobs;
    }
/





--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Map-reduce-proceesing-tp4357.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

dmreshet dmreshet
Reply | Threaded
Open this post in threaded view
|

Re: Map-reduce proceesing

Yes, I know.
I want to compare performance of SQL,  SQL with indexes and MapReduce job.
I have found that I can use broadcast to garantie that my MapReduce job will be executed on each node exactly once.
So now my job uses code:
Collection<List<Person> result = ignite.compute(ignite.cluster()).broadcast((IgniteCallable<List<Person>>) () -> {...});

And than I will reduce the result.

Is that the best practise to implement MapReduce job in case that I should process data from cache?
Vladimir Ozerov Vladimir Ozerov
Reply | Threaded
Open this post in threaded view
|

Re: Map-reduce proceesing

Hi,

If you broadcast the job and want to iterate over cache inside it, then please make sure that you iterate only over local entries (e.g. IgniteCache.localEntries(), ScanQuery.setLocal(true), etc.). Otherwise your jobs will duplicate work and performance will suffer.

Also please note that returned result set might be incomplete if one of the nodes failed during job processing. If you care about it, you should either implement some failover, or use Ignite's built-in queries (ScanQuery, SqlQuery) which already take care of it.

Anyway, I strongly recommend you to focus on SqlQuery first. You can configure indexes on cache and they could give you great boost, because instead of iterating over the whole cache, Ignite will use indexes for fast data lookup.

Vladimir.

On Wed, Apr 20, 2016 at 12:31 PM, dmreshet <[hidden email]> wrote:
Yes, I know.
I want to compare performance of SQL,  SQL with indexes and MapReduce job.
I have found that I can use broadcast to garantie that my MapReduce job will
be executed on each node exactly once.
So now my job uses code:
/Collection<List&lt;Person> result =
ignite.compute(ignite.cluster()).broadcast((IgniteCallable<List&lt;Person>>)
() -> {...});/

And than I will reduce the result.

Is that the best practise to implement MapReduce job in case that I should
process data from cache?



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Map-reduce-proceesing-tp4357p4364.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

dmreshet dmreshet
Reply | Threaded
Open this post in threaded view
|

Re: Map-reduce proceesing

Thank you very much for your answer. It helped me very much.