ContinuousQuery Batch updates

classic Classic list List threaded Threaded
8 messages Options
ssansoy ssansoy
Reply | Threaded
Open this post in threaded view
|

ContinuousQuery Batch updates

Hi,

We have an app that writes N records to the cluster (REPLICATED) - e.g.
10,000 records, in one transaction.

We also have an app that issues a continuous query against the cluster,
listening for updates to this cache.
We'd like the app to receive all 10,000 records in one call into the
localListener.

We are observing that the continuous query only receives records one at a
time.
I have tried playing around with setPageSize and setTimeInterval, e.g.
pageSize=12,000 timeInterval=10,000

E.g. the query waits either 10 seconds for updates to take place, or until
12,000 updates have occurred. This does seem to be an improvement - but now
rather than 10,000 calls to local listen, we now have 3 calls, e.g. for
quantities 2000, 4500, 3500 for example. These quantities for each callback
are consistently the same upon retries.
Are we observing this behaviour because our cache keys are designated as
"primary" on different nodes of the cluster? So we are effectively getting 1
localListen callback per node with the number of entries that are marked as
"primary" on that node?

This is very problematic for us unfortunately, as we are migrating are apps
to ignite, and a large part of the app processing expects all updates to
arrive in one go so there is a consistent view of the write that has
occurred. Is there anything we can do here to get this behaviour? ideally
without even having to have a timeout and introducing extra delay?

Thanks.



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
ilya.kasnacheev ilya.kasnacheev
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousQuery Batch updates

Hello!

In this case you could use an affinity function which will put all these entries on the same node, but it will mean that you no longer use any distribution benefits. 

I don't think it is a good design if you expect local listener to get a tx worth of entries at once. Listener should ideally consider entries in isolation.

Regards,
-- 
Ilya Kasnacheev


чт, 8 окт. 2020 г. в 19:06, ssansoy <[hidden email]>:
Hi,

We have an app that writes N records to the cluster (REPLICATED) - e.g.
10,000 records, in one transaction.

We also have an app that issues a continuous query against the cluster,
listening for updates to this cache.
We'd like the app to receive all 10,000 records in one call into the
localListener.

We are observing that the continuous query only receives records one at a
time.
I have tried playing around with setPageSize and setTimeInterval, e.g.
pageSize=12,000 timeInterval=10,000

E.g. the query waits either 10 seconds for updates to take place, or until
12,000 updates have occurred. This does seem to be an improvement - but now
rather than 10,000 calls to local listen, we now have 3 calls, e.g. for
quantities 2000, 4500, 3500 for example. These quantities for each callback
are consistently the same upon retries.
Are we observing this behaviour because our cache keys are designated as
"primary" on different nodes of the cluster? So we are effectively getting 1
localListen callback per node with the number of entries that are marked as
"primary" on that node?

This is very problematic for us unfortunately, as we are migrating are apps
to ignite, and a large part of the app processing expects all updates to
arrive in one go so there is a consistent view of the write that has
occurred. Is there anything we can do here to get this behaviour? ideally
without even having to have a timeout and introducing extra delay?

Thanks.



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
ssansoy ssansoy
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousQuery Batch updates

Thanks, this is what I have ended up doing. However, it looks like
AffinityKeyMapper is deprecated?
I am adding an implementation of this (which returns the binary typename of
the key BinaryObject) - and this does seem to have the desired effect (e.g.
all keys with the same typename are marked as primary on a single node). I
set this implementing class on the cache configuration.

I don't think I can use the suggested @AffinityKeyMapped annotation because
we don't have a type representing the key that we can add this to. Our
caches are created via table creation DDL with:

WITH "TEMPLATE=MY_TEMPLATE,value_type=SOME_TABLE_TYPE,
key_type=SOME_TABLE_KEY"

The value and keys we operate on are all BinaryObjects.

In terms of design, we have to have some sort of expectation of
transactional safety. E.g. a user can update 2 records in a cache (e.g. some
calculation inputs) which need to both be seen at the same time in order
execute logic as they are updated.

Could you please advise if:

1. There is an alternative to the deprecated AffinityKeyMapper we should be
using instead?
2. What side effects there might be to having all keys marked as primary on
a single node (even though the caches are marked as REPLICATED).
3. If there is any other more robust way of achieving this?
4. How we can tune the locallisten thread to be single threaded for a
particular query. We want to eliminate any chance of a locallisten being hit
in parallel for updates to the same cache (ideally without implementing our
own synchronization). This seemed to happen in practice for singular updates
on the cluster happening concurrently, but after setting the pageSize and
timeInterval - these 3 updates from the 3 nodes seemed to come in
concurrently.





--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
ilya.kasnacheev ilya.kasnacheev
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousQuery Batch updates

Hello!

1. I don't think that AffinityKeyMapped is deprecated, but there are cases when it is ignored :(
You can use affinity_key clause in CREATE TABLE ... WITH.
2. If it's the same node for all keys, all processing will happen on that node.
3. It depends on what you are trying to do.
4. I don't think you can since you're not supposed to.

Regards,
--
Ilya Kasnacheev


пн, 12 окт. 2020 г. в 14:04, ssansoy <[hidden email]>:
Thanks, this is what I have ended up doing. However, it looks like
AffinityKeyMapper is deprecated?
I am adding an implementation of this (which returns the binary typename of
the key BinaryObject) - and this does seem to have the desired effect (e.g.
all keys with the same typename are marked as primary on a single node). I
set this implementing class on the cache configuration.

I don't think I can use the suggested @AffinityKeyMapped annotation because
we don't have a type representing the key that we can add this to. Our
caches are created via table creation DDL with:

WITH "TEMPLATE=MY_TEMPLATE,value_type=SOME_TABLE_TYPE,
key_type=SOME_TABLE_KEY"

The value and keys we operate on are all BinaryObjects.

In terms of design, we have to have some sort of expectation of
transactional safety. E.g. a user can update 2 records in a cache (e.g. some
calculation inputs) which need to both be seen at the same time in order
execute logic as they are updated.

Could you please advise if:

1. There is an alternative to the deprecated AffinityKeyMapper we should be
using instead?
2. What side effects there might be to having all keys marked as primary on
a single node (even though the caches are marked as REPLICATED).
3. If there is any other more robust way of achieving this?
4. How we can tune the locallisten thread to be single threaded for a
particular query. We want to eliminate any chance of a locallisten being hit
in parallel for updates to the same cache (ideally without implementing our
own synchronization). This seemed to happen in practice for singular updates
on the cluster happening concurrently, but after setting the pageSize and
timeInterval - these 3 updates from the 3 nodes seemed to come in
concurrently.





--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
ssansoy ssansoy
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousQuery Batch updates

Hi, thanks for the reply again!

1. @AffinityKeyMapped is not deprecated as you mentioned, but
AffinityKeyMapper is (it seems the AffinityKeyMapper is used in places where
the annotation cannot be - e.g. our case). if we use the AFFINITY_KEY clause
on the table definition, we don't want to select a field of the table as the
key - instead we want to use the cache name. Can this be a string literal
here? e.g. AFFINITY_KEY='MY_CACHE' so the same affinity key is generated for
every entry in the table?

2. "If it's the same node for all keys, all processing will happen on that
node" - This may be ok in our case. Are there any issues that may affect
"correctness" of the data, as opposed to performance of the processing?

3. "It depends on what you are trying to do." - we just want to be able to
write e.g 2 records in a transaction via some writing process, and be able
to read them somewhere else as soon as they are written to the cluster so
they can be used.
We can probably write some custom logic to wait for all entries to arrive at
the client, and then batch them up - possibly by versioning them or
maintaining some other state about the transaction in a separate cache on
the cluster - but were hoping there would be some way of doing this out of
the box with a distributed cache solution - e.g. 2 records are written in a
transaction, and the client is updated with those 2 records in one callback.
The docs for ContinuousQueryWithTransformer.EventListener imply this kind of
thing should be possible (e.g. "called after one or more entries have been
updated" and the onUpdated method receives an iterable:

"    public interface EventListener<T> {
        /**
         * Called after one or more entries have been updated.
         *
         * @param events The entries just updated that transformed with
remote transformer of {@link ContinuousQueryWithTransformer}.
         */
        void onUpdated(Iterable<? extends T> events);
    }"






--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
ilya.kasnacheev ilya.kasnacheev
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousQuery Batch updates

Hello!

I think you may need to write a custom affinity function for your use case, which will confine every cache to a single primary node.

Regards,
--
Ilya Kasnacheev


вт, 13 окт. 2020 г. в 11:18, ssansoy <[hidden email]>:
Hi, thanks for the reply again!

1. @AffinityKeyMapped is not deprecated as you mentioned, but
AffinityKeyMapper is (it seems the AffinityKeyMapper is used in places where
the annotation cannot be - e.g. our case). if we use the AFFINITY_KEY clause
on the table definition, we don't want to select a field of the table as the
key - instead we want to use the cache name. Can this be a string literal
here? e.g. AFFINITY_KEY='MY_CACHE' so the same affinity key is generated for
every entry in the table?

2. "If it's the same node for all keys, all processing will happen on that
node" - This may be ok in our case. Are there any issues that may affect
"correctness" of the data, as opposed to performance of the processing?

3. "It depends on what you are trying to do." - we just want to be able to
write e.g 2 records in a transaction via some writing process, and be able
to read them somewhere else as soon as they are written to the cluster so
they can be used.
We can probably write some custom logic to wait for all entries to arrive at
the client, and then batch them up - possibly by versioning them or
maintaining some other state about the transaction in a separate cache on
the cluster - but were hoping there would be some way of doing this out of
the box with a distributed cache solution - e.g. 2 records are written in a
transaction, and the client is updated with those 2 records in one callback.
The docs for ContinuousQueryWithTransformer.EventListener imply this kind of
thing should be possible (e.g. "called after one or more entries have been
updated" and the onUpdated method receives an iterable:

"    public interface EventListener<T> {
        /**
         * Called after one or more entries have been updated.
         *
         * @param events The entries just updated that transformed with
remote transformer of {@link ContinuousQueryWithTransformer}.
         */
        void onUpdated(Iterable<? extends T> events);
    }"






--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
ssansoy ssansoy
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousQuery Batch updates

Hi,
RE: the custom affinity function, this is what we have:

public class CacheLevelAffinityKeyMapper implements AffinityKeyMapper {

    private final Logger LOGGER =
LoggerFactory.getLogger(CacheLevelAffinityKeyMapper.class);
    @Override
    public Object affinityKey(Object key) {
        if(key instanceof BinaryObject){
            BinaryObject binaryObjectKey = (BinaryObject) key;
            BinaryType binaryType = binaryObjectKey.type();
            LOGGER.trace("Key is {}, binary type is {}", key,
binaryType.typeName());
            return binaryType.typeName();
        }
        else{
            LOGGER.trace("Key is {}, type is {}", key, key.getClass());
            return key;
        }
    }

The issue was that the interface AffinityKeyMapper is depricated in Ignite
2.8.1. Is this the way you would recommend supplying such a custom function?
We can't use the @AffinityKeyMapped annotation because there is no java type
to annotate as such (we use BinaryObjects only)



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
ilya.kasnacheev ilya.kasnacheev
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousQuery Batch updates

Hello!

Then you need to implement your own AffinityFunction by subclassing RendezvousAffinityFunction.

Regards,
--
Ilya Kasnacheev


вт, 13 окт. 2020 г. в 13:15, ssansoy <[hidden email]>:
Hi,
RE: the custom affinity function, this is what we have:

public class CacheLevelAffinityKeyMapper implements AffinityKeyMapper {

    private final Logger LOGGER =
LoggerFactory.getLogger(CacheLevelAffinityKeyMapper.class);
    @Override
    public Object affinityKey(Object key) {
        if(key instanceof BinaryObject){
            BinaryObject binaryObjectKey = (BinaryObject) key;
            BinaryType binaryType = binaryObjectKey.type();
            LOGGER.trace("Key is {}, binary type is {}", key,
binaryType.typeName());
            return binaryType.typeName();
        }
        else{
            LOGGER.trace("Key is {}, type is {}", key, key.getClass());
            return key;
        }
    }

The issue was that the interface AffinityKeyMapper is depricated in Ignite
2.8.1. Is this the way you would recommend supplying such a custom function?
We can't use the @AffinityKeyMapped annotation because there is no java type
to annotate as such (we use BinaryObjects only)



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/