KafkaStreamer, how to manage (stop consumming, resume) on client disconnection

classic Classic list List threaded Threaded
6 messages Options
facundo.maldonado facundo.maldonado
Reply | Threaded
Open this post in threaded view
|

KafkaStreamer, how to manage (stop consumming, resume) on client disconnection

Hi all, I'm having some problems dealing with the KafkaStreamer.

I have a deployment with a streamer (client node) that consumes records from
a Kafka topic, and a data node (cache storage).

If for any reason, the cache node crashes or simple restarts, the client
node gets disconnected, but the KafkaStreamer keeps pulling records and
tries to push to the cache.

Is there a recommended way to stop the kafkaStreamer on client disconnection
and resume once the connection is established again?



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
akorensh akorensh
Reply | Threaded
Open this post in threaded view
|

Re: KafkaStreamer, how to manage (stop consumming, resume) on client disconnection

Hi,
  You can use disconnect events/exception, and then use KafkaStreamer.stop.
 
  see:
https://ignite.apache.org/docs/latest/clustering/connect-client-nodes#client-disconnectedreconnected-events

  https://ignite.apache.org/docs/latest/clustering/connect-client-nodes 
  Here look for: While a client is in a disconnected state and an attempt to
reconnect is in progress, the Ignite API throws a
IgniteClientDisconnectedException
   

  KafkaStreamer stop method:
https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/stream/kafka/KafkaStreamer.html#stop--
 
Thanks, Alex



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
facundo.maldonado facundo.maldonado
Reply | Threaded
Open this post in threaded view
|

Re: KafkaStreamer, how to manage (stop consumming, resume) on client disconnection

I forgot to mention, I'm starting the KafkaStreamer in a cluster service.
Pretty similar to all the examples that are around.

I saw the exception in the documentation, my concern here is where should I
catch it given that I initialize and setup the streamer on the init() method
and start it in the execute()? Should I create a custom implementation of a
StreamReceiver (holding a reference to the KafkaStreamer) that actually call
the cache.put() method, cach the exception and stop the streamer?

I didn't take into account the event stuff, the solution may be on that
path. I think is valid to add a listener on the service init() method,
right?

Thanks Alex for your answer.



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
akorensh akorensh
Reply | Threaded
Open this post in threaded view
|

Re: KafkaStreamer, how to manage (stop consumming, resume) on client disconnection

Hi,
  I think listening to events would be a good solution for you.

There are two discovery events that are triggered on the client node when it
is disconnected from or reconnected to the cluster:

EVT_CLIENT_NODE_DISCONNECTED

EVT_CLIENT_NODE_RECONNECTED

see:
https://ignite.apache.org/docs/latest/clustering/connect-client-nodes#client-disconnectedreconnected-events


As for StreamReceiver: Keep in mind that the logic implemented in a stream
receiver is executed on the node where data is to be stored.  If the server
where the data resides crashes, your code might not execute.
https://ignite.apache.org/docs/latest/data-streaming#stream-visitor

Thanks, Alex





--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
facundo.maldonado facundo.maldonado
Reply | Threaded
Open this post in threaded view
|

Re: KafkaStreamer, how to manage (stop consumming, resume) on client disconnection

Thanks for the answers. 
I resolved the problem of reconnection using events. It worked very well. 
What I found, is the following...
The KafkaStreamer consumes records and send them to the IgniteDataStreamer. 
It doesn't handle the IgniteFuture returned.
If the connection with the server is interrupted (server restart for example) the KafkaStreamer is stoped, kafka consumers are stoped, but those records that were sent to the streamer and (I believe) are in the buffer are still trying to be saved in the cache.
There is no way to recover them as far as I know.
Am I right?
Should I implement a custom KafkaStreamer that, in that situation, handles the IgniteFuture and let's say retry the insertion in the cache?

Another question, I'm using a grid service to start the streamer. What is the benefit of this vs a simple spring service if I'm using kubernetes for deployment? 




On Fri, Nov 20, 2020 at 5:01 PM akorensh <[hidden email]> wrote:
Hi,
  I think listening to events would be a good solution for you.

There are two discovery events that are triggered on the client node when it
is disconnected from or reconnected to the cluster:

EVT_CLIENT_NODE_DISCONNECTED

EVT_CLIENT_NODE_RECONNECTED

see:
https://ignite.apache.org/docs/latest/clustering/connect-client-nodes#client-disconnectedreconnected-events


As for StreamReceiver: Keep in mind that the logic implemented in a stream
receiver is executed on the node where data is to be stored.  If the server
where the data resides crashes, your code might not execute.
https://ignite.apache.org/docs/latest/data-streaming#stream-visitor

Thanks, Alex





--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


--
Facundo Maldonado
akorensh akorensh
Reply | Threaded
Open this post in threaded view
|

Re: KafkaStreamer, how to manage (stop consumming, resume) on client disconnection

This post was updated on .
flush() guarantees completion of all futures returned by addData
https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/IgniteDataStreamer.html#flush--

flush() will send the batch, but it is still possible for the server to
crash before the message reaches it.

If you need verify whether the particular put actually made it to the
server, appropriate events are available: https://ignite.apache.org/docs/latest/events/events#cache-events

You can put in retry logic if the client has disconnected.

One possibility is to use the CacheException as per the contract of the
flush() method
you will get something like this:

javax.cache.CacheException: class
org.apache.ignite.IgniteClientDisconnectedException: Data streamer has been
closed, client node disconnected.
        at
org.apache.ignite.internal.processors.cache.GridCacheUtils.convertToCacheException(GridCacheUtils.java:1275)
        at
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.doFlush(DataStreamerImpl.java:1204)





--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/