Questions on IgniteDataStreamer

classic Classic list List threaded Threaded
9 messages Options
vbm vbm
Reply | Threaded
Open this post in threaded view
|

Questions on IgniteDataStreamer

This post was updated on .
Hi

As part of our POC we wanted to compare the ingestion in to ignite using
Kafka Connect and Ignite Data Streamer.

We have many Kafka topics and now we want to use ignite data streamer to pull data to ignite cache and data is such that each kafka topic correspond to a cache in ignite.

From what I understand we need to have multiple KafkaStreamer which has one to one mapping to ignite data streamer which eventually writes to cache. Correct me if I am wrong here.

If there are multiple kafka topics, what is the best approach to load data to caches.
For comparison, what are the metrics that we can monitor to measure the
performance.



Regards,
Vishwas



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
vbm vbm
Reply | Threaded
Open this post in threaded view
|

Re: Questions on IgniteDataStreamer

Hi,

Can anyone provide some info on this ?

Regards,
Vishwas



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
ezhuravlev ezhuravlev
Reply | Threaded
Open this post in threaded view
|

Re: Metrics for IgniteDataStreamer

In reply to this post by vbm
Hi,

What kind of metrics will help you? Why just checking of the cache size won't be enough?

Regards,
Evgenii

2018-07-03 19:53 GMT+03:00 vbm <[hidden email]>:
Hi

As part of our POC we wanted to compare the ingestion in to ignite using
Kafka Connect and Ignite Data Streamer.

For comparison, what are the metrics that we can monitor to measure the
performance.


Regards,
Vishwas



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

vbm vbm
Reply | Threaded
Open this post in threaded view
|

Re: Metrics for IgniteDataStreamer

Hi Evgenii,

To compare the 2 ingestion methods (DataStreamer and KafkaConnect), we
wanted to know what are the key parameters that needs to be monitored. For
example: How fast the data is being put in cache.


Regards,
Vishwas



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
ezhuravlev ezhuravlev
Reply | Threaded
Open this post in threaded view
|

Re: Metrics for IgniteDataStreamer

Well, then you can just check the cache size after the certain period for each method and compare it. I'm not sure that the metric for the thing you want will make any sense - if you will have any pauses in ingestion, this metric won't be informative at all. At the same time, absolutely the same information can be rook from cache.size.

Evgenii

2018-07-05 12:06 GMT+03:00 vbm <[hidden email]>:
Hi Evgenii,

To compare the 2 ingestion methods (DataStreamer and KafkaConnect), we
wanted to know what are the key parameters that needs to be monitored. For
example: How fast the data is being put in cache.


Regards,
Vishwas



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

vbm vbm
Reply | Threaded
Open this post in threaded view
|

Re: Metrics for IgniteDataStreamer

HI Evangii,

Thanks for the reply. I have some more question regarding ignite data
streamer.

Below is our scenario:
We have many Kafka topics and now we want to use ignite data streamer to
pull data to ignite cache and data is such that each kafka topic correspond
to a cache in ignite.

From what I understand we need to have multiple KafkaStreamer which has one
to one mapping to ignite data streamer which eventually writes to cache.
Correct me if I am wrong here.

If there are multiple kafka topics, what is the best approach to load data
to caches.


Regards,
Vishwas



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
ezhuravlev ezhuravlev
Reply | Threaded
Open this post in threaded view
|

Re: Metrics for IgniteDataStreamer

Hi,

It's possible to set only one DataStreamer to the KafkaStreamer, So, I'd recommend using one KafkaStreamer per cache.

Evgenii

2018-07-05 20:03 GMT+03:00 vbm <[hidden email]>:
HI Evangii,

Thanks for the reply. I have some more question regarding ignite data
streamer.

Below is our scenario:
We have many Kafka topics and now we want to use ignite data streamer to
pull data to ignite cache and data is such that each kafka topic correspond
to a cache in ignite.

From what I understand we need to have multiple KafkaStreamer which has one
to one mapping to ignite data streamer which eventually writes to cache.
Correct me if I am wrong here.

If there are multiple kafka topics, what is the best approach to load data
to caches.


Regards,
Vishwas



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Om Thacker Om Thacker
Reply | Threaded
Open this post in threaded view
|

Re: Questions on IgniteDataStreamer

In reply to this post by vbm
Hello vbm,

I am working on the exact same problem. Did you find the solution for the
same.
I am using following code in my client application which will listen to
kafka connect (confluent).

I have one to one mapping for kafka topic and ignite cache. When there is an
insert into db, the kafka listener listens that and using gson library i am
converting json to object and the stmr.addData() works fine. But while
updating the value in db, i am facing marshller error.I tried to use
cache.put() method ,but it gives me cachewriteexception .


@KafkaListener(topics = { "kafka-Users" })
        public void listenUsers(String message) {
                logger.error(message);
                ObjectMapper mapper = new ObjectMapper();
                JsonNode rootNode;
                try {
                        rootNode = mapper.readTree(message);
                        Users user = new Users();
                        IgniteDataStreamer<Long, Users> stmr =
ignite.dataStreamer(IgniteProperties.USERS_CACHE.getName());
// stmr.allowOverwrite(true);

                        /*
                         * stmr.receiver(new StreamTransformer<Long, Users>() {
                         *
                         * @Override public Object process(MutableEntry<Long, Users> entry,
Object...
                         * arguments) throws EntryProcessorException { return null; }
                         *
                         * });
                         */

                        /*
                         * stmr.receiver(StreamTransformer.from((e, arg) -> { Users val =
e.getValue();
                         * System.out.println(val+" user from reciever $$$$$$$$$"); return null;
}));
                         */

                        Gson gson = new
GsonBuilder().setFieldNamingPolicy(FieldNamingPolicy.UPPER_CAMEL_CASE).create();
                        user = gson.fromJson(rootNode.get("payload").toString(), Users.class);
                        stmr.addData(rootNode.get("payload").get("UsersKey").asLong(), user);
                        stmr.flush(); //
// stmr.allowOverwrite(true);
                } catch (Exception e) {
                        e.printStackTrace();
                }
        }




can you please share your solution for the same.
Thanks,
Om Thacker



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
ilya.kasnacheev ilya.kasnacheev
Reply | Threaded
Open this post in threaded view
|

Re: Questions on IgniteDataStreamer

Hello!

Can you please at least share the exceptions you are getting?

Regards,
--
Ilya Kasnacheev


сб, 1 июн. 2019 г. в 13:55, Om Thacker <[hidden email]>:
Hello vbm,

I am working on the exact same problem. Did you find the solution for the
same.
I am using following code in my client application which will listen to
kafka connect (confluent).

I have one to one mapping for kafka topic and ignite cache. When there is an
insert into db, the kafka listener listens that and using gson library i am
converting json to object and the stmr.addData() works fine. But while
updating the value in db, i am facing marshller error.I tried to use
cache.put() method ,but it gives me cachewriteexception .


@KafkaListener(topics = { "kafka-Users" })
        public void listenUsers(String message) {
                logger.error(message);
                ObjectMapper mapper = new ObjectMapper();
                JsonNode rootNode;
                try {
                        rootNode = mapper.readTree(message);
                        Users user = new Users();
                        IgniteDataStreamer<Long, Users> stmr =
ignite.dataStreamer(IgniteProperties.USERS_CACHE.getName());
//                      stmr.allowOverwrite(true);

                        /*
                         * stmr.receiver(new StreamTransformer<Long, Users>() {
                         *
                         * @Override public Object process(MutableEntry<Long, Users> entry,
Object...
                         * arguments) throws EntryProcessorException { return null; }
                         *
                         * });
                         */

                        /*
                         * stmr.receiver(StreamTransformer.from((e, arg) -> { Users val =
e.getValue();
                         * System.out.println(val+" user from reciever $$$$$$$$$"); return null;
}));
                         */

                        Gson gson = new
GsonBuilder().setFieldNamingPolicy(FieldNamingPolicy.UPPER_CAMEL_CASE).create();
                        user = gson.fromJson(rootNode.get("payload").toString(), Users.class);
                        stmr.addData(rootNode.get("payload").get("UsersKey").asLong(), user);
                        stmr.flush(); //
//                      stmr.allowOverwrite(true);
                } catch (Exception e) {
                        e.printStackTrace();
                }
        }




can you please share your solution for the same.
Thanks,
Om Thacker



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/