Need help with Ignite KafkaStreamer

classic Classic list List threaded Threaded
6 messages Options
Alexey Alexey
Reply | Threaded
Open this post in threaded view
|

Need help with Ignite KafkaStreamer

Hi All,

I am trying to start KafkaStreamer and I really need help.

I used an example http://apacheignite.gridgain.org/docs/getting-started as a pattern and added only necessary properties. I consider that incorrect properties are the main problem. But I didn't find any information what values are correct. When I start KafkaStreamer and send message I see errors in Ignite logs that properties are not valid and warning about ignoring my message

Could you please take a look at log errors and my function and advice how I can fix it.

[15:58:44,569][INFO ][ignite-#57%null%][VerifiableProperties] Verifying properties
[15:58:44,615][WARN ][ignite-#57%null%][VerifiableProperties] Property bootstrap.servers is not valid
[15:58:44,615][INFO ][ignite-#57%null%][VerifiableProperties] Property group.id is overridden to test
[15:58:44,616][WARN ][ignite-#57%null%][VerifiableProperties] Property key.deserializer is not valid
[15:58:44,616][WARN ][ignite-#57%null%][VerifiableProperties] Property key.serializer is not valid
[15:58:44,616][WARN ][ignite-#57%null%][VerifiableProperties] Property value.deserializer is not valid
[15:58:44,616][WARN ][ignite-#57%null%][VerifiableProperties] Property value.serializer is not valid

[15:59:45,077][WARN ][pool-4-thread-3][root] Message is ignored due to an error [msg=MessageAndMetadata(test-topic,0,Message(magic = 0, attributes = 0, crc = 1312744161, key = java.nio.HeapByteBuffer[pos=0 lim=4 cap=13], payload = java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]),94,kafka.serializer.StringDecoder@6079d2fa,kafka.serializer.StringDecoder@6ee18190)]

@Override
    public void execute(ServiceContext ctx) throws Exception {
        KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>();

        try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer(stmCache.getName())) {
            // Allow data updates.
            stmr.allowOverwrite(true);

            kafkaStreamer.setIgnite(ignite);
            kafkaStreamer.setStreamer(stmr);

            // set the topic
            kafkaStreamer.setTopic("test-topic");

            // set the number of threads to process Kafka streams
            kafkaStreamer.setThreads(4);

            // set Kafka consumer configurations
            Properties props = new Properties();
            props.put("key.serializer",
                    "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer",
                    "org.apache.kafka.common.serialization.StringSerializer");

            props.put("bootstrap.servers", "localhost:9092");
            props.put("zookeeper.connect", "localhost:2181");
            props.put("group.id", "test");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

            ConsumerConfig config = new ConsumerConfig(props);
            kafkaStreamer.setConsumerConfig(config);

            // set decoders
            StringDecoder keyDecoder = new StringDecoder(null);
            StringDecoder valueDecoder = new StringDecoder(null);
            kafkaStreamer.setKeyDecoder(keyDecoder);
            kafkaStreamer.setValueDecoder(valueDecoder);

            kafkaStreamer.start();
            System.out.println("Kafka streamer started!");
        }
    }
vkulichenko vkulichenko
Reply | Threaded
Open this post in threaded view
|

Re: Need help with Ignite KafkaStreamer

Hi,

The logging is incorrect there and the exception that causes the is lost. I just fixed this in master. Can you try building from master and rerun the test? You should see traces in the log.

-Val
Alexey Alexey
Reply | Threaded
Open this post in threaded view
|

Re: Need help with Ignite KafkaStreamer

Hi, Thank you for your answer!
I build jar from latest master and rerun the test. Now I see the following error and stack trace. But I still don't have any ideas what's wrong with my example. Could you please advice how to fix this error?


[18:14:01,391][ERROR][pool-4-thread-1][root] Message is ignored due to an error [msg=MessageAndMetadata(test-topic,0,Message(magic = 0, attributes = 0, crc = 3139897862, key = java.nio.HeapByteBuffer[pos=0 lim=4 cap=14], payload = java.nio.HeapByteBuffer[pos=0 lim=6 cap=6]),126,kafka.serializer.StringDecoder@5e36b9b9,kafka.serializer.StringDecoder@5e36b9b9)]
java.lang.IllegalStateException: Data streamer has been closed.
        at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.enterBusy(DataStreamerImpl.java:355)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addDataInternal(DataStreamerImpl.java:550)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addData(DataStreamerImpl.java:604)
        at org.apache.ignite.stream.kafka.KafkaStreamer$1.run(KafkaStreamer.java:180)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
vkulichenko vkulichenko
Reply | Threaded
Open this post in threaded view
|

Re: Need help with Ignite KafkaStreamer

You should not use try-with-resources block, because it implicitly closes the streamer at the end, but you want to keep it opened. Simply remove the 'try (...) {}' from the code and it should work.

-Val
Alexey Alexey
Reply | Threaded
Open this post in threaded view
|

Re: Need help with Ignite KafkaStreamer

I removed  'try (...) {}' block and now I don't see any errors in the code.
I added some logs in KafkaStreamer and according to the log adding data to steamer finished without exception and any errors. But unfortunately the cache that used for streamer is still empty.

[15:12:44,450][WARN ][pool-4-thread-1][root] Cache name KafkaCache
[15:12:44,450][WARN ][pool-4-thread-1][root] Msg.key key1
[15:12:44,451][WARN ][pool-4-thread-1][root] Msg.message test-msg2
[15:12:44,451][WARN ][pool-4-thread-1][root] Success!!!

I tried to add receiver (or visitor) to the streamer, check the size of cache after sending kafka message but it seems streamer does not receive any messages.
How can I check that KafkaStreamer works properly?
vkulichenko vkulichenko
Reply | Threaded
Open this post in threaded view
|

Re: Need help with Ignite KafkaStreamer

How many entries did you load? IgniteDataStreamer buffers the data and most likely it just wait for buffers to fill up. You can try setting the time interval after which buffers will be flushed even if they are not full:

stmr.autoFlushFrequency(1000); // Set to 1 sec.

This way all your data will be eventually propagated into the cache.

-Val