EntryProcessor for cache

classic Classic list List threaded Threaded
16 messages Options
Anil Anil
Reply | Threaded
Open this post in threaded view
|

EntryProcessor for cache

HI,

I have two caches Person and PersonDetail. i have to update the one of the property person details status based on person and person detail properties and current date.

Person {

String personId

String equivalentId

String name

Long dateOfBirth;
....

}

PersonDetail {

String detailedId

String equivalentId

Long startDate

Long endDate;

String status

}

Person cache key -> AffinityKey<String, String> -> AffinityKey<PersonId, equivalentId>
PersonDetail cache key -> AffinityKey<String, String> -> AffinityKey<DetailId, equivalentId>


status of Person details is determined based on Person#dateOfBirth, PersonDetail#startDate, PersonDetail#endDate and current date.


I see entry processor can nbe applied on given set of cache keys only. Is there any way in update each PersonDetail's status in efficient way ? 

Can we use mapreduce ? if yes, Could you please share any example ?

Thanks for your help.

Thanks.
Anil Anil
Reply | Threaded
Open this post in threaded view
|

Re: EntryProcessor for cache


Anyone got a chance to look into this ? thanks.

On 4 February 2017 at 14:31, Anil <[hidden email]> wrote:
HI,

I have two caches Person and PersonDetail. i have to update the one of the property person details status based on person and person detail properties and current date.

Person {

String personId

String equivalentId

String name

Long dateOfBirth;
....

}

PersonDetail {

String detailedId

String equivalentId

Long startDate

Long endDate;

String status

}

Person cache key -> AffinityKey<String, String> -> AffinityKey<PersonId, equivalentId>
PersonDetail cache key -> AffinityKey<String, String> -> AffinityKey<DetailId, equivalentId>


status of Person details is determined based on Person#dateOfBirth, PersonDetail#startDate, PersonDetail#endDate and current date.


I see entry processor can nbe applied on given set of cache keys only. Is there any way in update each PersonDetail's status in efficient way ? 

Can we use mapreduce ? if yes, Could you please share any example ?

Thanks for your help.

Thanks.

vkulichenko vkulichenko
Reply | Threaded
Open this post in threaded view
|

Re: EntryProcessor for cache

In reply to this post by Anil
Yes, you can create a map reduce task, execute local query within each job (use Query.setLocal(true)), and update queried entries accordingly.

Also note that each SQL table in Ignite has predefined _key field that returns key object, so you can return set of keys from the query.

-Val
Anil Anil
Reply | Threaded
Open this post in threaded view
|

Re: EntryProcessor for cache

Hi Val, 

Thanks. Could you please point me to mapreduce example that runs on cache ?

thanks.

On 7 February 2017 at 05:47, vkulichenko <[hidden email]> wrote:
Yes, you can create a map reduce task, execute local query within each job
(use Query.setLocal(true)), and update queried entries accordingly.

Also note that each SQL table in Ignite has predefined _key field that
returns key object, so you can return set of keys from the query.

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/EntryProcessor-for-cache-tp10432p10462.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

vkulichenko vkulichenko
Reply | Threaded
Open this post in threaded view
|

Re: EntryProcessor for cache

Anil,

I don't think there is such an example in particular. Just implement ComputeTask, access cache(s) in compute jobs and work with local data. Then reduce in reduce() method to get final result. Other particular details depend on your use case.

-Val
Anil Anil
Reply | Threaded
Open this post in threaded view
|

Re: EntryProcessor for cache

Hi Val,

i tried to write compute task and no luck. 

Can some create sudo code to update a cache using compute task ? thanks 

Thanks

On 8 February 2017 at 03:03, vkulichenko <[hidden email]> wrote:
Anil,

I don't think there is such an example in particular. Just implement
ComputeTask, access cache(s) in compute jobs and work with local data. Then
reduce in reduce() method to get final result. Other particular details
depend on your use case.

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/EntryProcessor-for-cache-tp10432p10490.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

vkulichenko vkulichenko
Reply | Threaded
Open this post in threaded view
|

Re: EntryProcessor for cache

Anil,

What exactly did you try and what didn't work? Can you show your code?

-Val
Anil Anil
Reply | Threaded
Open this post in threaded view
|

Re: EntryProcessor for cache

Hi Val,

i have attached the code. please let me know if you see any issues with approach. thanks.

Thanks.

On 10 February 2017 at 02:16, vkulichenko <[hidden email]> wrote:
Anil,

What exactly did you try and what didn't work? Can you show your code?

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/EntryProcessor-for-cache-tp10432p10532.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.


TestComputeTask.java (11K) Download Attachment
Anil Anil
Reply | Threaded
Open this post in threaded view
|

Re: EntryProcessor for cache

and Does compute task execute in parallel on number of partitions per node ? (like entry processor) thanks.

On 10 February 2017 at 10:52, Anil <[hidden email]> wrote:
Hi Val,

i have attached the code. please let me know if you see any issues with approach. thanks.

Thanks.

On 10 February 2017 at 02:16, vkulichenko <[hidden email]> wrote:
Anil,

What exactly did you try and what didn't work? Can you show your code?

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/EntryProcessor-for-cache-tp10432p10532.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.


vkulichenko vkulichenko
Reply | Threaded
Open this post in threaded view
|

Re: EntryProcessor for cache

Looks ok except that the first query should also be local I guess. Also note that you used split adapter, so didn't actually map the jobs to nodes, leaving this to Ignite. This means that there is a chance some nodes will get more than one job, and some none of the jobs. Round robin balancing is used by default, so this should not happen, at least on stable topology, but theoretically there is no guarantee. Use map method instead to manually map jobs to nodes, or just use broadcast() method.

Jobs are executed in parallel in the public thread pool.

-Val
Anil Anil
Reply | Threaded
Open this post in threaded view
|

Re: EntryProcessor for cache

Hi Val,

I have created ComputeTask which updates which scans the local cache and updates its information to child records in another cache. Both caches are collocated so that parent and child records fall under node and partition. 

1. I see following warning in the logs when compute task is running -

 GridCachePartitionExchangeManager:480 - Failed to wait for partition map exchange [topVer=AffinityTopologyVersion [topVer=6, minorTopVer=0], node=c7a3957b-a3d0-4923-8e5d-e95430c7e66e]. Dumping pending objects that might be the cause:

Should I worry about this warning ? what could be the reason for this warning. 

2. 

QueryCursor<Entry<String, Person>> cursor = cache.query(new SqlQuery<String, Person>(Person.class, "select * from Person").setLocal(true));

     

  for (Entry<String, Person> row : cursor) {

       String eqId =   row.getValue().getEqId(); //(String) row.get(0);

       QueryCursor<Entry<AffinityKey<String>, PersonDetail>> dCursor = 

                                                 detailsCache.query(new SqlQuery<AffinityKey<String>, PersonDetail>(PersonDetail.class,                                                                                                                                                                                                                                                               "select * from DETAIL_CACHE.PersonDetail  where eqId = ?").setLocal(true).setArgs(eqId));

         for (Entry<AffinityKey<String>, PersonDetail> d : dCursor) {

               // add person info to person detail and add to person detail data streamer.

            }


     }


I see (in logs) that query is taking long time -


Query execution is too long [time=23309 ms, sql='SELECT "PERSON_CACHE".Person._key, "PERSON_CACHE".PERSON._val from Person', plan=

SELECT

    PERSON_CACHE.PERSON._KEY,

    PERSON_CACHE.PERSON._VAL

FROM PERSON_CACHE.PERSON

    /* PERSON_CACHE.PERSON.__SCAN_ */

, parameters=[]]


any issues with the above approach ? thanks.


Thanks.
 

On 11 February 2017 at 04:18, vkulichenko <[hidden email]> wrote:
Looks ok except that the first query should also be local I guess. Also note
that you used split adapter, so didn't actually map the jobs to nodes,
leaving this to Ignite. This means that there is a chance some nodes will
get more than one job, and some none of the jobs. Round robin balancing is
used by default, so this should not happen, at least on stable topology, but
theoretically there is no guarantee. Use map method instead to manually map
jobs to nodes, or just use broadcast() method.

Jobs are executed in parallel in the public thread pool.

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/EntryProcessor-for-cache-tp10432p10559.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Andrew Mashenkov Andrew Mashenkov
Reply | Threaded
Open this post in threaded view
|

Re: EntryProcessor for cache

Hi Anil,

1. Seems, some node enter to topology, but cannot finish partition map exchange operations due to long running transtaction or smth holds lock on a partition.

2.     /* PERSON_CACHE.PERSON.__SCAN_ */ says that no indices is used for this query and sull scan will be performed.Do you have an index on PersonDetail.eqId field?

On Thu, Feb 16, 2017 at 6:50 PM, Anil <[hidden email]> wrote:
Hi Val,

I have created ComputeTask which updates which scans the local cache and updates its information to child records in another cache. Both caches are collocated so that parent and child records fall under node and partition. 

1. I see following warning in the logs when compute task is running -

 GridCachePartitionExchangeManager:480 - Failed to wait for partition map exchange [topVer=AffinityTopologyVersion [topVer=6, minorTopVer=0], node=c7a3957b-a3d0-4923-8e5d-e95430c7e66e]. Dumping pending objects that might be the cause:

Should I worry about this warning ? what could be the reason for this warning. 

2. 

QueryCursor<Entry<String, Person>> cursor = cache.query(new SqlQuery<String, Person>(Person.class, "select * from Person").setLocal(true));

     

  for (Entry<String, Person> row : cursor) {

       String eqId =   row.getValue().getEqId(); //(String) row.get(0);

       QueryCursor<Entry<AffinityKey<String>, PersonDetail>> dCursor = 

                                                 detailsCache.query(new SqlQuery<AffinityKey<String>, PersonDetail>(PersonDetail.class,                                                                                                                                                                                                                                                               "select * from DETAIL_CACHE.PersonDetail  where eqId = ?").setLocal(true).setArgs(eqId));

         for (Entry<AffinityKey<String>, PersonDetail> d : dCursor) {

               // add person info to person detail and add to person detail data streamer.

            }


     }


I see (in logs) that query is taking long time -


Query execution is too long [time=23309 ms, sql='SELECT "PERSON_CACHE".Person._key, "PERSON_CACHE".PERSON._val from Person', plan=

SELECT

    PERSON_CACHE.PERSON._KEY,

    PERSON_CACHE.PERSON._VAL

FROM PERSON_CACHE.PERSON

    /* PERSON_CACHE.PERSON.__SCAN_ */

, parameters=[]]


any issues with the above approach ? thanks.


Thanks.
 

On 11 February 2017 at 04:18, vkulichenko <[hidden email]> wrote:
Looks ok except that the first query should also be local I guess. Also note
that you used split adapter, so didn't actually map the jobs to nodes,
leaving this to Ignite. This means that there is a chance some nodes will
get more than one job, and some none of the jobs. Round robin balancing is
used by default, so this should not happen, at least on stable topology, but
theoretically there is no guarantee. Use map method instead to manually map
jobs to nodes, or just use broadcast() method.

Jobs are executed in parallel in the public thread pool.

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/EntryProcessor-for-cache-tp10432p10559.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.




--
Best regards,
Andrey V. Mashenkov
Regards, Andrew.
Anil Anil
Reply | Threaded
Open this post in threaded view
|

Re: EntryProcessor for cache

Hi Andrey,

Yes. index is available on eqId of PersonDetail object.

Query says scan for Person cache not the PersonDetail cache. 

and i think the  above  Computask executed by only one thread and  not by number of threads on number of partitions. Can parallelism achieved here ?

Thanks.



On 17 February 2017 at 02:32, Andrey Mashenkov <[hidden email]> wrote:
Hi Anil,

1. Seems, some node enter to topology, but cannot finish partition map exchange operations due to long running transtaction or smth holds lock on a partition.

2.     /* PERSON_CACHE.PERSON.__SCAN_ */ says that no indices is used for this query and sull scan will be performed.Do you have an index on PersonDetail.eqId field?

On Thu, Feb 16, 2017 at 6:50 PM, Anil <[hidden email]> wrote:
Hi Val,

I have created ComputeTask which updates which scans the local cache and updates its information to child records in another cache. Both caches are collocated so that parent and child records fall under node and partition. 

1. I see following warning in the logs when compute task is running -

 GridCachePartitionExchangeManager:480 - Failed to wait for partition map exchange [topVer=AffinityTopologyVersion [topVer=6, minorTopVer=0], node=c7a3957b-a3d0-4923-8e5d-e95430c7e66e]. Dumping pending objects that might be the cause:

Should I worry about this warning ? what could be the reason for this warning. 

2. 

QueryCursor<Entry<String, Person>> cursor = cache.query(new SqlQuery<String, Person>(Person.class, "select * from Person").setLocal(true));

     

  for (Entry<String, Person> row : cursor) {

       String eqId =   row.getValue().getEqId(); //(String) row.get(0);

       QueryCursor<Entry<AffinityKey<String>, PersonDetail>> dCursor = 

                                                 detailsCache.query(new SqlQuery<AffinityKey<String>, PersonDetail>(PersonDetail.class,                                                                                                                                                                                                                                                               "select * from DETAIL_CACHE.PersonDetail  where eqId = ?").setLocal(true).setArgs(eqId));

         for (Entry<AffinityKey<String>, PersonDetail> d : dCursor) {

               // add person info to person detail and add to person detail data streamer.

            }


     }


I see (in logs) that query is taking long time -


Query execution is too long [time=23309 ms, sql='SELECT "PERSON_CACHE".Person._key, "PERSON_CACHE".PERSON._val from Person', plan=

SELECT

    PERSON_CACHE.PERSON._KEY,

    PERSON_CACHE.PERSON._VAL

FROM PERSON_CACHE.PERSON

    /* PERSON_CACHE.PERSON.__SCAN_ */

, parameters=[]]


any issues with the above approach ? thanks.


Thanks.
 

On 11 February 2017 at 04:18, vkulichenko <[hidden email]> wrote:
Looks ok except that the first query should also be local I guess. Also note
that you used split adapter, so didn't actually map the jobs to nodes,
leaving this to Ignite. This means that there is a chance some nodes will
get more than one job, and some none of the jobs. Round robin balancing is
used by default, so this should not happen, at least on stable topology, but
theoretically there is no guarantee. Use map method instead to manually map
jobs to nodes, or just use broadcast() method.

Jobs are executed in parallel in the public thread pool.

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/EntryProcessor-for-cache-tp10432p10559.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.




--
Best regards,
Andrey V. Mashenkov

Andrew Mashenkov Andrew Mashenkov
Reply | Threaded
Open this post in threaded view
|

Re: EntryProcessor for cache

Hi Anil,

Most likely, your query takes long time due to SQL query is running in single thread. The only workaround for now is to add more nodes.

However, query is quite simple, so you can run ScanQuery per partition in parallel manner for iterating over PERSON_CACHE. 


On Fri, Feb 17, 2017 at 5:29 AM, Anil <[hidden email]> wrote:
Hi Andrey,

Yes. index is available on eqId of PersonDetail object.

Query says scan for Person cache not the PersonDetail cache. 

and i think the  above  Computask executed by only one thread and  not by number of threads on number of partitions. Can parallelism achieved here ?

Thanks.



On 17 February 2017 at 02:32, Andrey Mashenkov <[hidden email]> wrote:
Hi Anil,

1. Seems, some node enter to topology, but cannot finish partition map exchange operations due to long running transtaction or smth holds lock on a partition.

2.     /* PERSON_CACHE.PERSON.__SCAN_ */ says that no indices is used for this query and sull scan will be performed.Do you have an index on PersonDetail.eqId field?

On Thu, Feb 16, 2017 at 6:50 PM, Anil <[hidden email]> wrote:
Hi Val,

I have created ComputeTask which updates which scans the local cache and updates its information to child records in another cache. Both caches are collocated so that parent and child records fall under node and partition. 

1. I see following warning in the logs when compute task is running -

 GridCachePartitionExchangeManager:480 - Failed to wait for partition map exchange [topVer=AffinityTopologyVersion [topVer=6, minorTopVer=0], node=c7a3957b-a3d0-4923-8e5d-e95430c7e66e]. Dumping pending objects that might be the cause:

Should I worry about this warning ? what could be the reason for this warning. 

2. 

QueryCursor<Entry<String, Person>> cursor = cache.query(new SqlQuery<String, Person>(Person.class, "select * from Person").setLocal(true));

     

  for (Entry<String, Person> row : cursor) {

       String eqId =   row.getValue().getEqId(); //(String) row.get(0);

       QueryCursor<Entry<AffinityKey<String>, PersonDetail>> dCursor = 

                                                 detailsCache.query(new SqlQuery<AffinityKey<String>, PersonDetail>(PersonDetail.class,                                                                                                                                                                                                                                                               "select * from DETAIL_CACHE.PersonDetail  where eqId = ?").setLocal(true).setArgs(eqId));

         for (Entry<AffinityKey<String>, PersonDetail> d : dCursor) {

               // add person info to person detail and add to person detail data streamer.

            }


     }


I see (in logs) that query is taking long time -


Query execution is too long [time=23309 ms, sql='SELECT "PERSON_CACHE".Person._key, "PERSON_CACHE".PERSON._val from Person', plan=

SELECT

    PERSON_CACHE.PERSON._KEY,

    PERSON_CACHE.PERSON._VAL

FROM PERSON_CACHE.PERSON

    /* PERSON_CACHE.PERSON.__SCAN_ */

, parameters=[]]


any issues with the above approach ? thanks.


Thanks.
 

On 11 February 2017 at 04:18, vkulichenko <[hidden email]> wrote:
Looks ok except that the first query should also be local I guess. Also note
that you used split adapter, so didn't actually map the jobs to nodes,
leaving this to Ignite. This means that there is a chance some nodes will
get more than one job, and some none of the jobs. Round robin balancing is
used by default, so this should not happen, at least on stable topology, but
theoretically there is no guarantee. Use map method instead to manually map
jobs to nodes, or just use broadcast() method.

Jobs are executed in parallel in the public thread pool.

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/EntryProcessor-for-cache-tp10432p10559.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.




--
Best regards,
Andrey V. Mashenkov




--
Best regards,
Andrey V. Mashenkov
Regards, Andrew.
Anil Anil
Reply | Threaded
Open this post in threaded view
|

Re: EntryProcessor for cache

Hi Andrey,

Thanks. this looks promising and will try that.

the only way to get the partitions is ignite.affinity("PERSON_CACHE").partitions(). is that holds for non affinity cache ?

Thanks

On 17 February 2017 at 10:39, Andrey Mashenkov <[hidden email]> wrote:
Hi Anil,

Most likely, your query takes long time due to SQL query is running in single thread. The only workaround for now is to add more nodes.

However, query is quite simple, so you can run ScanQuery per partition in parallel manner for iterating over PERSON_CACHE. 


On Fri, Feb 17, 2017 at 5:29 AM, Anil <[hidden email]> wrote:
Hi Andrey,

Yes. index is available on eqId of PersonDetail object.

Query says scan for Person cache not the PersonDetail cache. 

and i think the  above  Computask executed by only one thread and  not by number of threads on number of partitions. Can parallelism achieved here ?

Thanks.



On 17 February 2017 at 02:32, Andrey Mashenkov <[hidden email]> wrote:
Hi Anil,

1. Seems, some node enter to topology, but cannot finish partition map exchange operations due to long running transtaction or smth holds lock on a partition.

2.     /* PERSON_CACHE.PERSON.__SCAN_ */ says that no indices is used for this query and sull scan will be performed.Do you have an index on PersonDetail.eqId field?

On Thu, Feb 16, 2017 at 6:50 PM, Anil <[hidden email]> wrote:
Hi Val,

I have created ComputeTask which updates which scans the local cache and updates its information to child records in another cache. Both caches are collocated so that parent and child records fall under node and partition. 

1. I see following warning in the logs when compute task is running -

 GridCachePartitionExchangeManager:480 - Failed to wait for partition map exchange [topVer=AffinityTopologyVersion [topVer=6, minorTopVer=0], node=c7a3957b-a3d0-4923-8e5d-e95430c7e66e]. Dumping pending objects that might be the cause:

Should I worry about this warning ? what could be the reason for this warning. 

2. 

QueryCursor<Entry<String, Person>> cursor = cache.query(new SqlQuery<String, Person>(Person.class, "select * from Person").setLocal(true));

     

  for (Entry<String, Person> row : cursor) {

       String eqId =   row.getValue().getEqId(); //(String) row.get(0);

       QueryCursor<Entry<AffinityKey<String>, PersonDetail>> dCursor = 

                                                 detailsCache.query(new SqlQuery<AffinityKey<String>, PersonDetail>(PersonDetail.class,                                                                                                                                                                                                                                                               "select * from DETAIL_CACHE.PersonDetail  where eqId = ?").setLocal(true).setArgs(eqId));

         for (Entry<AffinityKey<String>, PersonDetail> d : dCursor) {

               // add person info to person detail and add to person detail data streamer.

            }


     }


I see (in logs) that query is taking long time -


Query execution is too long [time=23309 ms, sql='SELECT "PERSON_CACHE".Person._key, "PERSON_CACHE".PERSON._val from Person', plan=

SELECT

    PERSON_CACHE.PERSON._KEY,

    PERSON_CACHE.PERSON._VAL

FROM PERSON_CACHE.PERSON

    /* PERSON_CACHE.PERSON.__SCAN_ */

, parameters=[]]


any issues with the above approach ? thanks.


Thanks.
 

On 11 February 2017 at 04:18, vkulichenko <[hidden email]> wrote:
Looks ok except that the first query should also be local I guess. Also note
that you used split adapter, so didn't actually map the jobs to nodes,
leaving this to Ignite. This means that there is a chance some nodes will
get more than one job, and some none of the jobs. Round robin balancing is
used by default, so this should not happen, at least on stable topology, but
theoretically there is no guarantee. Use map method instead to manually map
jobs to nodes, or just use broadcast() method.

Jobs are executed in parallel in the public thread pool.

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/EntryProcessor-for-cache-tp10432p10559.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.




--
Best regards,
Andrey V. Mashenkov




--
Best regards,
Andrey V. Mashenkov

Andrew Mashenkov Andrew Mashenkov
Reply | Threaded
Open this post in threaded view
|

Re: EntryProcessor for cache

Hi Anil,

ignite.affinity("PERSON_CACHE").partitions() return total number of partitions for given cache.
You may want to iterate over local partitions: ignite.affinity("PERSON_CACHE").primaryPartitions(ignite.cluster().localNode())

On Fri, Feb 17, 2017 at 11:47 AM, Anil <[hidden email]> wrote:
Hi Andrey,

Thanks. this looks promising and will try that.

the only way to get the partitions is ignite.affinity("PERSON_CACHE").partitions(). is that holds for non affinity cache ?

Thanks

On 17 February 2017 at 10:39, Andrey Mashenkov <[hidden email]> wrote:
Hi Anil,

Most likely, your query takes long time due to SQL query is running in single thread. The only workaround for now is to add more nodes.

However, query is quite simple, so you can run ScanQuery per partition in parallel manner for iterating over PERSON_CACHE. 


On Fri, Feb 17, 2017 at 5:29 AM, Anil <[hidden email]> wrote:
Hi Andrey,

Yes. index is available on eqId of PersonDetail object.

Query says scan for Person cache not the PersonDetail cache. 

and i think the  above  Computask executed by only one thread and  not by number of threads on number of partitions. Can parallelism achieved here ?

Thanks.



On 17 February 2017 at 02:32, Andrey Mashenkov <[hidden email]> wrote:
Hi Anil,

1. Seems, some node enter to topology, but cannot finish partition map exchange operations due to long running transtaction or smth holds lock on a partition.

2.     /* PERSON_CACHE.PERSON.__SCAN_ */ says that no indices is used for this query and sull scan will be performed.Do you have an index on PersonDetail.eqId field?

On Thu, Feb 16, 2017 at 6:50 PM, Anil <[hidden email]> wrote:
Hi Val,

I have created ComputeTask which updates which scans the local cache and updates its information to child records in another cache. Both caches are collocated so that parent and child records fall under node and partition. 

1. I see following warning in the logs when compute task is running -

 GridCachePartitionExchangeManager:480 - Failed to wait for partition map exchange [topVer=AffinityTopologyVersion [topVer=6, minorTopVer=0], node=c7a3957b-a3d0-4923-8e5d-e95430c7e66e]. Dumping pending objects that might be the cause:

Should I worry about this warning ? what could be the reason for this warning. 

2. 

QueryCursor<Entry<String, Person>> cursor = cache.query(new SqlQuery<String, Person>(Person.class, "select * from Person").setLocal(true));

     

  for (Entry<String, Person> row : cursor) {

       String eqId =   row.getValue().getEqId(); //(String) row.get(0);

       QueryCursor<Entry<AffinityKey<String>, PersonDetail>> dCursor = 

                                                 detailsCache.query(new SqlQuery<AffinityKey<String>, PersonDetail>(PersonDetail.class,                                                                                                                                                                                                                                                               "select * from DETAIL_CACHE.PersonDetail  where eqId = ?").setLocal(true).setArgs(eqId));

         for (Entry<AffinityKey<String>, PersonDetail> d : dCursor) {

               // add person info to person detail and add to person detail data streamer.

            }


     }


I see (in logs) that query is taking long time -


Query execution is too long [time=23309 ms, sql='SELECT "PERSON_CACHE".Person._key, "PERSON_CACHE".PERSON._val from Person', plan=

SELECT

    PERSON_CACHE.PERSON._KEY,

    PERSON_CACHE.PERSON._VAL

FROM PERSON_CACHE.PERSON

    /* PERSON_CACHE.PERSON.__SCAN_ */

, parameters=[]]


any issues with the above approach ? thanks.


Thanks.
 

On 11 February 2017 at 04:18, vkulichenko <[hidden email]> wrote:
Looks ok except that the first query should also be local I guess. Also note
that you used split adapter, so didn't actually map the jobs to nodes,
leaving this to Ignite. This means that there is a chance some nodes will
get more than one job, and some none of the jobs. Round robin balancing is
used by default, so this should not happen, at least on stable topology, but
theoretically there is no guarantee. Use map method instead to manually map
jobs to nodes, or just use broadcast() method.

Jobs are executed in parallel in the public thread pool.

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/EntryProcessor-for-cache-tp10432p10559.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.




--
Best regards,
Andrey V. Mashenkov




--
Best regards,
Andrey V. Mashenkov




--
Best regards,
Andrey V. Mashenkov
Regards, Andrew.