KafkaStream marshaller error

classic Classic list List threaded Threaded
9 messages Options
mvolkomorov mvolkomorov
Reply | Threaded
Open this post in threaded view
|

KafkaStream marshaller error

I have 1 DataNod and 1 Service with streaming. I have a filter for service:

<property name="nodeFilter">
<bean class="common.filters.KafkaStreamerServiceFilter"/>
</property>

public boolean apply(ClusterNode node) {
Boolean dataNode = node.attribute("kafkastreamer.service.node");

return dataNode != null && dataNode;
}


I have a marshalling error java.io.NotSerializableException: org.apache.ignite.stream.kafka.KafkaStreamer using KafkaStreamer in my Service:

private KafkaStreamer<String, String> kafkaStreamer = new KafkaStreamer<>();

I only can start service with:

private static KafkaStreamer<String, String> kafkaStreamer = new KafkaStreamer<>();

Is it because Ignite trying data transfer KafkaStreamer instance between nodes?

Log:

[2020-07-02 10:27:47,552][ERROR][main][IgniteServiceProcessor] Failed to marshal service with configured marshaller [name=KafkaStreamerService, srvc=services.kafkastreamer.KafkaStreamerService@3dedb4a6, marsh=JdkMarshaller [clsFilter=null]]
class org.apache.ignite.IgniteCheckedException: Failed to serialize object: services.kafkastreamer.KafkaStreamerService@3dedb4a6
at org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:102)
at org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:109)
at org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.marshal(AbstractNodeNameAwareMarshaller.java:57)
at org.apache.ignite.internal.util.IgniteUtils.marshal(IgniteUtils.java:10386)
at org.apache.ignite.internal.processors.service.IgniteServiceProcessor.prepareServiceConfigurations(IgniteServiceProcessor.java:583)
at org.apache.ignite.internal.processors.service.IgniteServiceProcessor.staticallyConfiguredServices(IgniteServiceProcessor.java:1541)
at org.apache.ignite.internal.processors.service.IgniteServiceProcessor.collectJoiningNodeData(IgniteServiceProcessor.java:354)
at org.apache.ignite.internal.managers.discovery.GridDiscoveryManager$5.collect(GridDiscoveryManager.java:861)
at org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.collectExchangeData(TcpDiscoverySpi.java:2032)
at org.apache.ignite.spi.discovery.tcp.ServerImpl.joinTopology(ServerImpl.java:1029)
at org.apache.ignite.spi.discovery.tcp.ServerImpl.spiStart(ServerImpl.java:427)
at org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.spiStart(TcpDiscoverySpi.java:2099)
at org.apache.ignite.internal.managers.GridManagerAdapter.startSpi(GridManagerAdapter.java:299)
at org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.start(GridDiscoveryManager.java:943)
at org.apache.ignite.internal.IgniteKernal.startManager(IgniteKernal.java:1960)
at org.apache.ignite.internal.IgniteKernal.start(IgniteKernal.java:1276)
at org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start0(IgnitionEx.java:2045)
at org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start(IgnitionEx.java:1703)
at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1117)
at org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1035)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:921)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:820)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:690)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:659)
at org.apache.ignite.Ignition.start(Ignition.java:346)
at app.KafkaStreamerServiceNodeStartup.main(KafkaStreamerServiceNodeStartup.java:40)
Caused by: java.io.NotSerializableException: org.apache.ignite.stream.kafka.KafkaStreamer
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:97)
... 25 more
ilya.kasnacheev ilya.kasnacheev
Reply | Threaded
Open this post in threaded view
|

Re: KafkaStream marshaller error

Hello!

You should probably start KafkaStreamer on remote node when the service is initialized (init()), instead of starting it in e.g. constructor and trying to send it to remote node.

Avoid putting fat instances in the fields of service/compute/predicate classes.

Regards,
--
Ilya Kasnacheev


чт, 2 июл. 2020 г. в 10:42, Maxim Volkomorov <[hidden email]>:
I have 1 DataNod and 1 Service with streaming. I have a filter for service:

<property name="nodeFilter">
<bean class="common.filters.KafkaStreamerServiceFilter"/>
</property>

public boolean apply(ClusterNode node) {
Boolean dataNode = node.attribute("kafkastreamer.service.node");

return dataNode != null && dataNode;
}


I have a marshalling error java.io.NotSerializableException: org.apache.ignite.stream.kafka.KafkaStreamer using KafkaStreamer in my Service:

private KafkaStreamer<String, String> kafkaStreamer = new KafkaStreamer<>();

I only can start service with:

private static KafkaStreamer<String, String> kafkaStreamer = new KafkaStreamer<>();

Is it because Ignite trying data transfer KafkaStreamer instance between nodes?

Log:

[2020-07-02 10:27:47,552][ERROR][main][IgniteServiceProcessor] Failed to marshal service with configured marshaller [name=KafkaStreamerService, srvc=services.kafkastreamer.KafkaStreamerService@3dedb4a6, marsh=JdkMarshaller [clsFilter=null]]
class org.apache.ignite.IgniteCheckedException: Failed to serialize object: services.kafkastreamer.KafkaStreamerService@3dedb4a6
at org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:102)
at org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:109)
at org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.marshal(AbstractNodeNameAwareMarshaller.java:57)
at org.apache.ignite.internal.util.IgniteUtils.marshal(IgniteUtils.java:10386)
at org.apache.ignite.internal.processors.service.IgniteServiceProcessor.prepareServiceConfigurations(IgniteServiceProcessor.java:583)
at org.apache.ignite.internal.processors.service.IgniteServiceProcessor.staticallyConfiguredServices(IgniteServiceProcessor.java:1541)
at org.apache.ignite.internal.processors.service.IgniteServiceProcessor.collectJoiningNodeData(IgniteServiceProcessor.java:354)
at org.apache.ignite.internal.managers.discovery.GridDiscoveryManager$5.collect(GridDiscoveryManager.java:861)
at org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.collectExchangeData(TcpDiscoverySpi.java:2032)
at org.apache.ignite.spi.discovery.tcp.ServerImpl.joinTopology(ServerImpl.java:1029)
at org.apache.ignite.spi.discovery.tcp.ServerImpl.spiStart(ServerImpl.java:427)
at org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.spiStart(TcpDiscoverySpi.java:2099)
at org.apache.ignite.internal.managers.GridManagerAdapter.startSpi(GridManagerAdapter.java:299)
at org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.start(GridDiscoveryManager.java:943)
at org.apache.ignite.internal.IgniteKernal.startManager(IgniteKernal.java:1960)
at org.apache.ignite.internal.IgniteKernal.start(IgniteKernal.java:1276)
at org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start0(IgnitionEx.java:2045)
at org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start(IgnitionEx.java:1703)
at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1117)
at org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1035)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:921)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:820)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:690)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:659)
at org.apache.ignite.Ignition.start(Ignition.java:346)
at app.KafkaStreamerServiceNodeStartup.main(KafkaStreamerServiceNodeStartup.java:40)
Caused by: java.io.NotSerializableException: org.apache.ignite.stream.kafka.KafkaStreamer
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:97)
... 25 more
mvolkomorov mvolkomorov
Reply | Threaded
Open this post in threaded view
|

Re: KafkaStream marshaller error

    Now i disabled node Filtering. You mean i started KafkaStreamer in init()?
I started KafkaStreamer like:

@Override
    public void execute(ServiceContext ctx) throws Exception {
        log.info("KafkaStreamerService starting ...");
        kafkaStreamer.start();
        log.info("KafkaStreamerService started OK");

In init() i only configure KafkaStreamer parameters.

@Override
    public void init(ServiceContext ctx) throws Exception {
        log = ignite.log();

        IgniteDataStreamer<String, String> stmr = ignite.dataStreamer("kafkaCache");
        stmr.allowOverwrite(true);
        stmr.autoFlushFrequency(1000);

        kafkaStreamer.setIgnite(ignite);
        kafkaStreamer.setStreamer(stmr);
        kafkaStreamer.setThreads(4);
...


If i have to avoid fat instances, how i can share kafkaStreamer instance between init(), execute() and cancel()?


чт, 2 июл. 2020 г. в 16:53, Ilya Kasnacheev <[hidden email]>:
Hello!

You should probably start KafkaStreamer on remote node when the service is initialized (init()), instead of starting it in e.g. constructor and trying to send it to remote node.

Avoid putting fat instances in the fields of service/compute/predicate classes.

Regards,
--
Ilya Kasnacheev


чт, 2 июл. 2020 г. в 10:42, Maxim Volkomorov <[hidden email]>:
I have 1 DataNod and 1 Service with streaming. I have a filter for service:

<property name="nodeFilter">
<bean class="common.filters.KafkaStreamerServiceFilter"/>
</property>

public boolean apply(ClusterNode node) {
Boolean dataNode = node.attribute("kafkastreamer.service.node");

return dataNode != null && dataNode;
}


I have a marshalling error java.io.NotSerializableException: org.apache.ignite.stream.kafka.KafkaStreamer using KafkaStreamer in my Service:

private KafkaStreamer<String, String> kafkaStreamer = new KafkaStreamer<>();

I only can start service with:

private static KafkaStreamer<String, String> kafkaStreamer = new KafkaStreamer<>();

Is it because Ignite trying data transfer KafkaStreamer instance between nodes?

Log:

[2020-07-02 10:27:47,552][ERROR][main][IgniteServiceProcessor] Failed to marshal service with configured marshaller [name=KafkaStreamerService, srvc=services.kafkastreamer.KafkaStreamerService@3dedb4a6, marsh=JdkMarshaller [clsFilter=null]]
class org.apache.ignite.IgniteCheckedException: Failed to serialize object: services.kafkastreamer.KafkaStreamerService@3dedb4a6
at org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:102)
at org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:109)
at org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.marshal(AbstractNodeNameAwareMarshaller.java:57)
at org.apache.ignite.internal.util.IgniteUtils.marshal(IgniteUtils.java:10386)
at org.apache.ignite.internal.processors.service.IgniteServiceProcessor.prepareServiceConfigurations(IgniteServiceProcessor.java:583)
at org.apache.ignite.internal.processors.service.IgniteServiceProcessor.staticallyConfiguredServices(IgniteServiceProcessor.java:1541)
at org.apache.ignite.internal.processors.service.IgniteServiceProcessor.collectJoiningNodeData(IgniteServiceProcessor.java:354)
at org.apache.ignite.internal.managers.discovery.GridDiscoveryManager$5.collect(GridDiscoveryManager.java:861)
at org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.collectExchangeData(TcpDiscoverySpi.java:2032)
at org.apache.ignite.spi.discovery.tcp.ServerImpl.joinTopology(ServerImpl.java:1029)
at org.apache.ignite.spi.discovery.tcp.ServerImpl.spiStart(ServerImpl.java:427)
at org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.spiStart(TcpDiscoverySpi.java:2099)
at org.apache.ignite.internal.managers.GridManagerAdapter.startSpi(GridManagerAdapter.java:299)
at org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.start(GridDiscoveryManager.java:943)
at org.apache.ignite.internal.IgniteKernal.startManager(IgniteKernal.java:1960)
at org.apache.ignite.internal.IgniteKernal.start(IgniteKernal.java:1276)
at org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start0(IgnitionEx.java:2045)
at org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start(IgnitionEx.java:1703)
at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1117)
at org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1035)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:921)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:820)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:690)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:659)
at org.apache.ignite.Ignition.start(Ignition.java:346)
at app.KafkaStreamerServiceNodeStartup.main(KafkaStreamerServiceNodeStartup.java:40)
Caused by: java.io.NotSerializableException: org.apache.ignite.stream.kafka.KafkaStreamer
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:97)
... 25 more
ilya.kasnacheev ilya.kasnacheev
Reply | Threaded
Open this post in threaded view
|

Re: KafkaStream marshaller error

Hello!

Where do you assign kafkaStreamer field?

Regards,
--
Ilya Kasnacheev


чт, 2 июл. 2020 г. в 18:46, Maxim Volkomorov <[hidden email]>:
    Now i disabled node Filtering. You mean i started KafkaStreamer in init()?
I started KafkaStreamer like:

@Override
    public void execute(ServiceContext ctx) throws Exception {
        log.info("KafkaStreamerService starting ...");
        kafkaStreamer.start();
        log.info("KafkaStreamerService started OK");

In init() i only configure KafkaStreamer parameters.

@Override
    public void init(ServiceContext ctx) throws Exception {
        log = ignite.log();

        IgniteDataStreamer<String, String> stmr = ignite.dataStreamer("kafkaCache");
        stmr.allowOverwrite(true);
        stmr.autoFlushFrequency(1000);

        kafkaStreamer.setIgnite(ignite);
        kafkaStreamer.setStreamer(stmr);
        kafkaStreamer.setThreads(4);
...


If i have to avoid fat instances, how i can share kafkaStreamer instance between init(), execute() and cancel()?


чт, 2 июл. 2020 г. в 16:53, Ilya Kasnacheev <[hidden email]>:
Hello!

You should probably start KafkaStreamer on remote node when the service is initialized (init()), instead of starting it in e.g. constructor and trying to send it to remote node.

Avoid putting fat instances in the fields of service/compute/predicate classes.

Regards,
--
Ilya Kasnacheev


чт, 2 июл. 2020 г. в 10:42, Maxim Volkomorov <[hidden email]>:
I have 1 DataNod and 1 Service with streaming. I have a filter for service:

<property name="nodeFilter">
<bean class="common.filters.KafkaStreamerServiceFilter"/>
</property>

public boolean apply(ClusterNode node) {
Boolean dataNode = node.attribute("kafkastreamer.service.node");

return dataNode != null && dataNode;
}


I have a marshalling error java.io.NotSerializableException: org.apache.ignite.stream.kafka.KafkaStreamer using KafkaStreamer in my Service:

private KafkaStreamer<String, String> kafkaStreamer = new KafkaStreamer<>();

I only can start service with:

private static KafkaStreamer<String, String> kafkaStreamer = new KafkaStreamer<>();

Is it because Ignite trying data transfer KafkaStreamer instance between nodes?

Log:

[2020-07-02 10:27:47,552][ERROR][main][IgniteServiceProcessor] Failed to marshal service with configured marshaller [name=KafkaStreamerService, srvc=services.kafkastreamer.KafkaStreamerService@3dedb4a6, marsh=JdkMarshaller [clsFilter=null]]
class org.apache.ignite.IgniteCheckedException: Failed to serialize object: services.kafkastreamer.KafkaStreamerService@3dedb4a6
at org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:102)
at org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:109)
at org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.marshal(AbstractNodeNameAwareMarshaller.java:57)
at org.apache.ignite.internal.util.IgniteUtils.marshal(IgniteUtils.java:10386)
at org.apache.ignite.internal.processors.service.IgniteServiceProcessor.prepareServiceConfigurations(IgniteServiceProcessor.java:583)
at org.apache.ignite.internal.processors.service.IgniteServiceProcessor.staticallyConfiguredServices(IgniteServiceProcessor.java:1541)
at org.apache.ignite.internal.processors.service.IgniteServiceProcessor.collectJoiningNodeData(IgniteServiceProcessor.java:354)
at org.apache.ignite.internal.managers.discovery.GridDiscoveryManager$5.collect(GridDiscoveryManager.java:861)
at org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.collectExchangeData(TcpDiscoverySpi.java:2032)
at org.apache.ignite.spi.discovery.tcp.ServerImpl.joinTopology(ServerImpl.java:1029)
at org.apache.ignite.spi.discovery.tcp.ServerImpl.spiStart(ServerImpl.java:427)
at org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.spiStart(TcpDiscoverySpi.java:2099)
at org.apache.ignite.internal.managers.GridManagerAdapter.startSpi(GridManagerAdapter.java:299)
at org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.start(GridDiscoveryManager.java:943)
at org.apache.ignite.internal.IgniteKernal.startManager(IgniteKernal.java:1960)
at org.apache.ignite.internal.IgniteKernal.start(IgniteKernal.java:1276)
at org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start0(IgnitionEx.java:2045)
at org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start(IgnitionEx.java:1703)
at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1117)
at org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1035)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:921)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:820)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:690)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:659)
at org.apache.ignite.Ignition.start(Ignition.java:346)
at app.KafkaStreamerServiceNodeStartup.main(KafkaStreamerServiceNodeStartup.java:40)
Caused by: java.io.NotSerializableException: org.apache.ignite.stream.kafka.KafkaStreamer
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:97)
... 25 more
mvolkomorov mvolkomorov
Reply | Threaded
Open this post in threaded view
|

Re: KafkaStream marshaller error

public class KafkaStreamerService implements Service {

    public static final String SERVICE_NAME = "KafkaStreamerService";
    private static final long serialVersionUID = 1L;

    @IgniteInstanceResource
    private Ignite ignite;
    private KafkaStreamer<String, String> kafkaStreamer = new KafkaStreamer<>();
    private IgniteLogger log;

чт, 2 июл. 2020 г. в 18:51, Ilya Kasnacheev <[hidden email]>:
Hello!

Where do you assign kafkaStreamer field?

Regards,
--
Ilya Kasnacheev


чт, 2 июл. 2020 г. в 18:46, Maxim Volkomorov <[hidden email]>:
    Now i disabled node Filtering. You mean i started KafkaStreamer in init()?
I started KafkaStreamer like:

@Override
    public void execute(ServiceContext ctx) throws Exception {
        log.info("KafkaStreamerService starting ...");
        kafkaStreamer.start();
        log.info("KafkaStreamerService started OK");

In init() i only configure KafkaStreamer parameters.

@Override
    public void init(ServiceContext ctx) throws Exception {
        log = ignite.log();

        IgniteDataStreamer<String, String> stmr = ignite.dataStreamer("kafkaCache");
        stmr.allowOverwrite(true);
        stmr.autoFlushFrequency(1000);

        kafkaStreamer.setIgnite(ignite);
        kafkaStreamer.setStreamer(stmr);
        kafkaStreamer.setThreads(4);
...


If i have to avoid fat instances, how i can share kafkaStreamer instance between init(), execute() and cancel()?


чт, 2 июл. 2020 г. в 16:53, Ilya Kasnacheev <[hidden email]>:
Hello!

You should probably start KafkaStreamer on remote node when the service is initialized (init()), instead of starting it in e.g. constructor and trying to send it to remote node.

Avoid putting fat instances in the fields of service/compute/predicate classes.

Regards,
--
Ilya Kasnacheev


чт, 2 июл. 2020 г. в 10:42, Maxim Volkomorov <[hidden email]>:
I have 1 DataNod and 1 Service with streaming. I have a filter for service:

<property name="nodeFilter">
<bean class="common.filters.KafkaStreamerServiceFilter"/>
</property>

public boolean apply(ClusterNode node) {
Boolean dataNode = node.attribute("kafkastreamer.service.node");

return dataNode != null && dataNode;
}


I have a marshalling error java.io.NotSerializableException: org.apache.ignite.stream.kafka.KafkaStreamer using KafkaStreamer in my Service:

private KafkaStreamer<String, String> kafkaStreamer = new KafkaStreamer<>();

I only can start service with:

private static KafkaStreamer<String, String> kafkaStreamer = new KafkaStreamer<>();

Is it because Ignite trying data transfer KafkaStreamer instance between nodes?

Log:

[2020-07-02 10:27:47,552][ERROR][main][IgniteServiceProcessor] Failed to marshal service with configured marshaller [name=KafkaStreamerService, srvc=services.kafkastreamer.KafkaStreamerService@3dedb4a6, marsh=JdkMarshaller [clsFilter=null]]
class org.apache.ignite.IgniteCheckedException: Failed to serialize object: services.kafkastreamer.KafkaStreamerService@3dedb4a6
at org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:102)
at org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:109)
at org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.marshal(AbstractNodeNameAwareMarshaller.java:57)
at org.apache.ignite.internal.util.IgniteUtils.marshal(IgniteUtils.java:10386)
at org.apache.ignite.internal.processors.service.IgniteServiceProcessor.prepareServiceConfigurations(IgniteServiceProcessor.java:583)
at org.apache.ignite.internal.processors.service.IgniteServiceProcessor.staticallyConfiguredServices(IgniteServiceProcessor.java:1541)
at org.apache.ignite.internal.processors.service.IgniteServiceProcessor.collectJoiningNodeData(IgniteServiceProcessor.java:354)
at org.apache.ignite.internal.managers.discovery.GridDiscoveryManager$5.collect(GridDiscoveryManager.java:861)
at org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.collectExchangeData(TcpDiscoverySpi.java:2032)
at org.apache.ignite.spi.discovery.tcp.ServerImpl.joinTopology(ServerImpl.java:1029)
at org.apache.ignite.spi.discovery.tcp.ServerImpl.spiStart(ServerImpl.java:427)
at org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.spiStart(TcpDiscoverySpi.java:2099)
at org.apache.ignite.internal.managers.GridManagerAdapter.startSpi(GridManagerAdapter.java:299)
at org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.start(GridDiscoveryManager.java:943)
at org.apache.ignite.internal.IgniteKernal.startManager(IgniteKernal.java:1960)
at org.apache.ignite.internal.IgniteKernal.start(IgniteKernal.java:1276)
at org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start0(IgnitionEx.java:2045)
at org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start(IgnitionEx.java:1703)
at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1117)
at org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1035)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:921)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:820)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:690)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:659)
at org.apache.ignite.Ignition.start(Ignition.java:346)
at app.KafkaStreamerServiceNodeStartup.main(KafkaStreamerServiceNodeStartup.java:40)
Caused by: java.io.NotSerializableException: org.apache.ignite.stream.kafka.KafkaStreamer
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:97)
... 25 more
ilya.kasnacheev ilya.kasnacheev
Reply | Threaded
Open this post in threaded view
|

Re: KafkaStream marshaller error

Hello!

You should do the assignment in init() method.

Don't create KafkaStreamer before service is sent over and is ready for initialization.

Regards,
--
Ilya Kasnacheev


чт, 2 июл. 2020 г. в 18:54, Maxim Volkomorov <[hidden email]>:
public class KafkaStreamerService implements Service {

    public static final String SERVICE_NAME = "KafkaStreamerService";
    private static final long serialVersionUID = 1L;

    @IgniteInstanceResource
    private Ignite ignite;
    private KafkaStreamer<String, String> kafkaStreamer = new KafkaStreamer<>();
    private IgniteLogger log;

чт, 2 июл. 2020 г. в 18:51, Ilya Kasnacheev <[hidden email]>:
Hello!

Where do you assign kafkaStreamer field?

Regards,
--
Ilya Kasnacheev


чт, 2 июл. 2020 г. в 18:46, Maxim Volkomorov <[hidden email]>:
    Now i disabled node Filtering. You mean i started KafkaStreamer in init()?
I started KafkaStreamer like:

@Override
    public void execute(ServiceContext ctx) throws Exception {
        log.info("KafkaStreamerService starting ...");
        kafkaStreamer.start();
        log.info("KafkaStreamerService started OK");

In init() i only configure KafkaStreamer parameters.

@Override
    public void init(ServiceContext ctx) throws Exception {
        log = ignite.log();

        IgniteDataStreamer<String, String> stmr = ignite.dataStreamer("kafkaCache");
        stmr.allowOverwrite(true);
        stmr.autoFlushFrequency(1000);

        kafkaStreamer.setIgnite(ignite);
        kafkaStreamer.setStreamer(stmr);
        kafkaStreamer.setThreads(4);
...


If i have to avoid fat instances, how i can share kafkaStreamer instance between init(), execute() and cancel()?


чт, 2 июл. 2020 г. в 16:53, Ilya Kasnacheev <[hidden email]>:
Hello!

You should probably start KafkaStreamer on remote node when the service is initialized (init()), instead of starting it in e.g. constructor and trying to send it to remote node.

Avoid putting fat instances in the fields of service/compute/predicate classes.

Regards,
--
Ilya Kasnacheev


чт, 2 июл. 2020 г. в 10:42, Maxim Volkomorov <[hidden email]>:
I have 1 DataNod and 1 Service with streaming. I have a filter for service:

<property name="nodeFilter">
<bean class="common.filters.KafkaStreamerServiceFilter"/>
</property>

public boolean apply(ClusterNode node) {
Boolean dataNode = node.attribute("kafkastreamer.service.node");

return dataNode != null && dataNode;
}


I have a marshalling error java.io.NotSerializableException: org.apache.ignite.stream.kafka.KafkaStreamer using KafkaStreamer in my Service:

private KafkaStreamer<String, String> kafkaStreamer = new KafkaStreamer<>();

I only can start service with:

private static KafkaStreamer<String, String> kafkaStreamer = new KafkaStreamer<>();

Is it because Ignite trying data transfer KafkaStreamer instance between nodes?

Log:

[2020-07-02 10:27:47,552][ERROR][main][IgniteServiceProcessor] Failed to marshal service with configured marshaller [name=KafkaStreamerService, srvc=services.kafkastreamer.KafkaStreamerService@3dedb4a6, marsh=JdkMarshaller [clsFilter=null]]
class org.apache.ignite.IgniteCheckedException: Failed to serialize object: services.kafkastreamer.KafkaStreamerService@3dedb4a6
at org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:102)
at org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:109)
at org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.marshal(AbstractNodeNameAwareMarshaller.java:57)
at org.apache.ignite.internal.util.IgniteUtils.marshal(IgniteUtils.java:10386)
at org.apache.ignite.internal.processors.service.IgniteServiceProcessor.prepareServiceConfigurations(IgniteServiceProcessor.java:583)
at org.apache.ignite.internal.processors.service.IgniteServiceProcessor.staticallyConfiguredServices(IgniteServiceProcessor.java:1541)
at org.apache.ignite.internal.processors.service.IgniteServiceProcessor.collectJoiningNodeData(IgniteServiceProcessor.java:354)
at org.apache.ignite.internal.managers.discovery.GridDiscoveryManager$5.collect(GridDiscoveryManager.java:861)
at org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.collectExchangeData(TcpDiscoverySpi.java:2032)
at org.apache.ignite.spi.discovery.tcp.ServerImpl.joinTopology(ServerImpl.java:1029)
at org.apache.ignite.spi.discovery.tcp.ServerImpl.spiStart(ServerImpl.java:427)
at org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.spiStart(TcpDiscoverySpi.java:2099)
at org.apache.ignite.internal.managers.GridManagerAdapter.startSpi(GridManagerAdapter.java:299)
at org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.start(GridDiscoveryManager.java:943)
at org.apache.ignite.internal.IgniteKernal.startManager(IgniteKernal.java:1960)
at org.apache.ignite.internal.IgniteKernal.start(IgniteKernal.java:1276)
at org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start0(IgnitionEx.java:2045)
at org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start(IgnitionEx.java:1703)
at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1117)
at org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1035)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:921)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:820)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:690)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:659)
at org.apache.ignite.Ignition.start(Ignition.java:346)
at app.KafkaStreamerServiceNodeStartup.main(KafkaStreamerServiceNodeStartup.java:40)
Caused by: java.io.NotSerializableException: org.apache.ignite.stream.kafka.KafkaStreamer
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:97)
... 25 more
mvolkomorov mvolkomorov
Reply | Threaded
Open this post in threaded view
|

Re: KafkaStream marshaller error

Ok, if i put an assignment to init(), how will i start it in execute() method, and stop in canсel()? This example was taken here http://apache-ignite-users.70518.x6.nabble.com/Kindly-tell-me-where-to-find-these-jar-files-td12649.html


чт, 2 июл. 2020 г. в 18:57, Ilya Kasnacheev <[hidden email]>:
Hello!

You should do the assignment in init() method.

Don't create KafkaStreamer before service is sent over and is ready for initialization.

Regards,
--
Ilya Kasnacheev


чт, 2 июл. 2020 г. в 18:54, Maxim Volkomorov <[hidden email]>:
public class KafkaStreamerService implements Service {

    public static final String SERVICE_NAME = "KafkaStreamerService";
    private static final long serialVersionUID = 1L;

    @IgniteInstanceResource
    private Ignite ignite;
    private KafkaStreamer<String, String> kafkaStreamer = new KafkaStreamer<>();
    private IgniteLogger log;

чт, 2 июл. 2020 г. в 18:51, Ilya Kasnacheev <[hidden email]>:
Hello!

Where do you assign kafkaStreamer field?

Regards,
--
Ilya Kasnacheev


чт, 2 июл. 2020 г. в 18:46, Maxim Volkomorov <[hidden email]>:
    Now i disabled node Filtering. You mean i started KafkaStreamer in init()?
I started KafkaStreamer like:

@Override
    public void execute(ServiceContext ctx) throws Exception {
        log.info("KafkaStreamerService starting ...");
        kafkaStreamer.start();
        log.info("KafkaStreamerService started OK");

In init() i only configure KafkaStreamer parameters.

@Override
    public void init(ServiceContext ctx) throws Exception {
        log = ignite.log();

        IgniteDataStreamer<String, String> stmr = ignite.dataStreamer("kafkaCache");
        stmr.allowOverwrite(true);
        stmr.autoFlushFrequency(1000);

        kafkaStreamer.setIgnite(ignite);
        kafkaStreamer.setStreamer(stmr);
        kafkaStreamer.setThreads(4);
...


If i have to avoid fat instances, how i can share kafkaStreamer instance between init(), execute() and cancel()?


чт, 2 июл. 2020 г. в 16:53, Ilya Kasnacheev <[hidden email]>:
Hello!

You should probably start KafkaStreamer on remote node when the service is initialized (init()), instead of starting it in e.g. constructor and trying to send it to remote node.

Avoid putting fat instances in the fields of service/compute/predicate classes.

Regards,
--
Ilya Kasnacheev


чт, 2 июл. 2020 г. в 10:42, Maxim Volkomorov <[hidden email]>:
I have 1 DataNod and 1 Service with streaming. I have a filter for service:

<property name="nodeFilter">
<bean class="common.filters.KafkaStreamerServiceFilter"/>
</property>

public boolean apply(ClusterNode node) {
Boolean dataNode = node.attribute("kafkastreamer.service.node");

return dataNode != null && dataNode;
}


I have a marshalling error java.io.NotSerializableException: org.apache.ignite.stream.kafka.KafkaStreamer using KafkaStreamer in my Service:

private KafkaStreamer<String, String> kafkaStreamer = new KafkaStreamer<>();

I only can start service with:

private static KafkaStreamer<String, String> kafkaStreamer = new KafkaStreamer<>();

Is it because Ignite trying data transfer KafkaStreamer instance between nodes?

Log:

[2020-07-02 10:27:47,552][ERROR][main][IgniteServiceProcessor] Failed to marshal service with configured marshaller [name=KafkaStreamerService, srvc=services.kafkastreamer.KafkaStreamerService@3dedb4a6, marsh=JdkMarshaller [clsFilter=null]]
class org.apache.ignite.IgniteCheckedException: Failed to serialize object: services.kafkastreamer.KafkaStreamerService@3dedb4a6
at org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:102)
at org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:109)
at org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.marshal(AbstractNodeNameAwareMarshaller.java:57)
at org.apache.ignite.internal.util.IgniteUtils.marshal(IgniteUtils.java:10386)
at org.apache.ignite.internal.processors.service.IgniteServiceProcessor.prepareServiceConfigurations(IgniteServiceProcessor.java:583)
at org.apache.ignite.internal.processors.service.IgniteServiceProcessor.staticallyConfiguredServices(IgniteServiceProcessor.java:1541)
at org.apache.ignite.internal.processors.service.IgniteServiceProcessor.collectJoiningNodeData(IgniteServiceProcessor.java:354)
at org.apache.ignite.internal.managers.discovery.GridDiscoveryManager$5.collect(GridDiscoveryManager.java:861)
at org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.collectExchangeData(TcpDiscoverySpi.java:2032)
at org.apache.ignite.spi.discovery.tcp.ServerImpl.joinTopology(ServerImpl.java:1029)
at org.apache.ignite.spi.discovery.tcp.ServerImpl.spiStart(ServerImpl.java:427)
at org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.spiStart(TcpDiscoverySpi.java:2099)
at org.apache.ignite.internal.managers.GridManagerAdapter.startSpi(GridManagerAdapter.java:299)
at org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.start(GridDiscoveryManager.java:943)
at org.apache.ignite.internal.IgniteKernal.startManager(IgniteKernal.java:1960)
at org.apache.ignite.internal.IgniteKernal.start(IgniteKernal.java:1276)
at org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start0(IgnitionEx.java:2045)
at org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start(IgnitionEx.java:1703)
at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1117)
at org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1035)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:921)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:820)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:690)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:659)
at org.apache.ignite.Ignition.start(Ignition.java:346)
at app.KafkaStreamerServiceNodeStartup.main(KafkaStreamerServiceNodeStartup.java:40)
Caused by: java.io.NotSerializableException: org.apache.ignite.stream.kafka.KafkaStreamer
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:97)
... 25 more
ilya.kasnacheev ilya.kasnacheev
Reply | Threaded
Open this post in threaded view
|

Re: KafkaStream marshaller error

Hello!

init() will be called before execute() and stop().

To be extra sure, you can check for null.

Regards,
--
Ilya Kasnacheev


чт, 2 июл. 2020 г. в 19:06, Maxim Volkomorov <[hidden email]>:
Ok, if i put an assignment to init(), how will i start it in execute() method, and stop in canсel()? This example was taken here http://apache-ignite-users.70518.x6.nabble.com/Kindly-tell-me-where-to-find-these-jar-files-td12649.html


чт, 2 июл. 2020 г. в 18:57, Ilya Kasnacheev <[hidden email]>:
Hello!

You should do the assignment in init() method.

Don't create KafkaStreamer before service is sent over and is ready for initialization.

Regards,
--
Ilya Kasnacheev


чт, 2 июл. 2020 г. в 18:54, Maxim Volkomorov <[hidden email]>:
public class KafkaStreamerService implements Service {

    public static final String SERVICE_NAME = "KafkaStreamerService";
    private static final long serialVersionUID = 1L;

    @IgniteInstanceResource
    private Ignite ignite;
    private KafkaStreamer<String, String> kafkaStreamer = new KafkaStreamer<>();
    private IgniteLogger log;

чт, 2 июл. 2020 г. в 18:51, Ilya Kasnacheev <[hidden email]>:
Hello!

Where do you assign kafkaStreamer field?

Regards,
--
Ilya Kasnacheev


чт, 2 июл. 2020 г. в 18:46, Maxim Volkomorov <[hidden email]>:
    Now i disabled node Filtering. You mean i started KafkaStreamer in init()?
I started KafkaStreamer like:

@Override
    public void execute(ServiceContext ctx) throws Exception {
        log.info("KafkaStreamerService starting ...");
        kafkaStreamer.start();
        log.info("KafkaStreamerService started OK");

In init() i only configure KafkaStreamer parameters.

@Override
    public void init(ServiceContext ctx) throws Exception {
        log = ignite.log();

        IgniteDataStreamer<String, String> stmr = ignite.dataStreamer("kafkaCache");
        stmr.allowOverwrite(true);
        stmr.autoFlushFrequency(1000);

        kafkaStreamer.setIgnite(ignite);
        kafkaStreamer.setStreamer(stmr);
        kafkaStreamer.setThreads(4);
...


If i have to avoid fat instances, how i can share kafkaStreamer instance between init(), execute() and cancel()?


чт, 2 июл. 2020 г. в 16:53, Ilya Kasnacheev <[hidden email]>:
Hello!

You should probably start KafkaStreamer on remote node when the service is initialized (init()), instead of starting it in e.g. constructor and trying to send it to remote node.

Avoid putting fat instances in the fields of service/compute/predicate classes.

Regards,
--
Ilya Kasnacheev


чт, 2 июл. 2020 г. в 10:42, Maxim Volkomorov <[hidden email]>:
I have 1 DataNod and 1 Service with streaming. I have a filter for service:

<property name="nodeFilter">
<bean class="common.filters.KafkaStreamerServiceFilter"/>
</property>

public boolean apply(ClusterNode node) {
Boolean dataNode = node.attribute("kafkastreamer.service.node");

return dataNode != null && dataNode;
}


I have a marshalling error java.io.NotSerializableException: org.apache.ignite.stream.kafka.KafkaStreamer using KafkaStreamer in my Service:

private KafkaStreamer<String, String> kafkaStreamer = new KafkaStreamer<>();

I only can start service with:

private static KafkaStreamer<String, String> kafkaStreamer = new KafkaStreamer<>();

Is it because Ignite trying data transfer KafkaStreamer instance between nodes?

Log:

[2020-07-02 10:27:47,552][ERROR][main][IgniteServiceProcessor] Failed to marshal service with configured marshaller [name=KafkaStreamerService, srvc=services.kafkastreamer.KafkaStreamerService@3dedb4a6, marsh=JdkMarshaller [clsFilter=null]]
class org.apache.ignite.IgniteCheckedException: Failed to serialize object: services.kafkastreamer.KafkaStreamerService@3dedb4a6
at org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:102)
at org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:109)
at org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.marshal(AbstractNodeNameAwareMarshaller.java:57)
at org.apache.ignite.internal.util.IgniteUtils.marshal(IgniteUtils.java:10386)
at org.apache.ignite.internal.processors.service.IgniteServiceProcessor.prepareServiceConfigurations(IgniteServiceProcessor.java:583)
at org.apache.ignite.internal.processors.service.IgniteServiceProcessor.staticallyConfiguredServices(IgniteServiceProcessor.java:1541)
at org.apache.ignite.internal.processors.service.IgniteServiceProcessor.collectJoiningNodeData(IgniteServiceProcessor.java:354)
at org.apache.ignite.internal.managers.discovery.GridDiscoveryManager$5.collect(GridDiscoveryManager.java:861)
at org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.collectExchangeData(TcpDiscoverySpi.java:2032)
at org.apache.ignite.spi.discovery.tcp.ServerImpl.joinTopology(ServerImpl.java:1029)
at org.apache.ignite.spi.discovery.tcp.ServerImpl.spiStart(ServerImpl.java:427)
at org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.spiStart(TcpDiscoverySpi.java:2099)
at org.apache.ignite.internal.managers.GridManagerAdapter.startSpi(GridManagerAdapter.java:299)
at org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.start(GridDiscoveryManager.java:943)
at org.apache.ignite.internal.IgniteKernal.startManager(IgniteKernal.java:1960)
at org.apache.ignite.internal.IgniteKernal.start(IgniteKernal.java:1276)
at org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start0(IgnitionEx.java:2045)
at org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start(IgnitionEx.java:1703)
at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1117)
at org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1035)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:921)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:820)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:690)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:659)
at org.apache.ignite.Ignition.start(Ignition.java:346)
at app.KafkaStreamerServiceNodeStartup.main(KafkaStreamerServiceNodeStartup.java:40)
Caused by: java.io.NotSerializableException: org.apache.ignite.stream.kafka.KafkaStreamer
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:97)
... 25 more
mvolkomorov mvolkomorov
Reply | Threaded
Open this post in threaded view
|

Re: KafkaStream marshaller error

oh sorry, I didn’t immediately understand what you mean.Thank you!

чт, 2 июл. 2020 г. в 19:25, Ilya Kasnacheev <[hidden email]>:
Hello!

init() will be called before execute() and stop().

To be extra sure, you can check for null.

Regards,
--
Ilya Kasnacheev


чт, 2 июл. 2020 г. в 19:06, Maxim Volkomorov <[hidden email]>:
Ok, if i put an assignment to init(), how will i start it in execute() method, and stop in canсel()? This example was taken here http://apache-ignite-users.70518.x6.nabble.com/Kindly-tell-me-where-to-find-these-jar-files-td12649.html


чт, 2 июл. 2020 г. в 18:57, Ilya Kasnacheev <[hidden email]>:
Hello!

You should do the assignment in init() method.

Don't create KafkaStreamer before service is sent over and is ready for initialization.

Regards,
--
Ilya Kasnacheev


чт, 2 июл. 2020 г. в 18:54, Maxim Volkomorov <[hidden email]>:
public class KafkaStreamerService implements Service {

    public static final String SERVICE_NAME = "KafkaStreamerService";
    private static final long serialVersionUID = 1L;

    @IgniteInstanceResource
    private Ignite ignite;
    private KafkaStreamer<String, String> kafkaStreamer = new KafkaStreamer<>();
    private IgniteLogger log;

чт, 2 июл. 2020 г. в 18:51, Ilya Kasnacheev <[hidden email]>:
Hello!

Where do you assign kafkaStreamer field?

Regards,
--
Ilya Kasnacheev


чт, 2 июл. 2020 г. в 18:46, Maxim Volkomorov <[hidden email]>:
    Now i disabled node Filtering. You mean i started KafkaStreamer in init()?
I started KafkaStreamer like:

@Override
    public void execute(ServiceContext ctx) throws Exception {
        log.info("KafkaStreamerService starting ...");
        kafkaStreamer.start();
        log.info("KafkaStreamerService started OK");

In init() i only configure KafkaStreamer parameters.

@Override
    public void init(ServiceContext ctx) throws Exception {
        log = ignite.log();

        IgniteDataStreamer<String, String> stmr = ignite.dataStreamer("kafkaCache");
        stmr.allowOverwrite(true);
        stmr.autoFlushFrequency(1000);

        kafkaStreamer.setIgnite(ignite);
        kafkaStreamer.setStreamer(stmr);
        kafkaStreamer.setThreads(4);
...


If i have to avoid fat instances, how i can share kafkaStreamer instance between init(), execute() and cancel()?


чт, 2 июл. 2020 г. в 16:53, Ilya Kasnacheev <[hidden email]>:
Hello!

You should probably start KafkaStreamer on remote node when the service is initialized (init()), instead of starting it in e.g. constructor and trying to send it to remote node.

Avoid putting fat instances in the fields of service/compute/predicate classes.

Regards,
--
Ilya Kasnacheev


чт, 2 июл. 2020 г. в 10:42, Maxim Volkomorov <[hidden email]>:
I have 1 DataNod and 1 Service with streaming. I have a filter for service:

<property name="nodeFilter">
<bean class="common.filters.KafkaStreamerServiceFilter"/>
</property>

public boolean apply(ClusterNode node) {
Boolean dataNode = node.attribute("kafkastreamer.service.node");

return dataNode != null && dataNode;
}


I have a marshalling error java.io.NotSerializableException: org.apache.ignite.stream.kafka.KafkaStreamer using KafkaStreamer in my Service:

private KafkaStreamer<String, String> kafkaStreamer = new KafkaStreamer<>();

I only can start service with:

private static KafkaStreamer<String, String> kafkaStreamer = new KafkaStreamer<>();

Is it because Ignite trying data transfer KafkaStreamer instance between nodes?

Log:

[2020-07-02 10:27:47,552][ERROR][main][IgniteServiceProcessor] Failed to marshal service with configured marshaller [name=KafkaStreamerService, srvc=services.kafkastreamer.KafkaStreamerService@3dedb4a6, marsh=JdkMarshaller [clsFilter=null]]
class org.apache.ignite.IgniteCheckedException: Failed to serialize object: services.kafkastreamer.KafkaStreamerService@3dedb4a6
at org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:102)
at org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:109)
at org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.marshal(AbstractNodeNameAwareMarshaller.java:57)
at org.apache.ignite.internal.util.IgniteUtils.marshal(IgniteUtils.java:10386)
at org.apache.ignite.internal.processors.service.IgniteServiceProcessor.prepareServiceConfigurations(IgniteServiceProcessor.java:583)
at org.apache.ignite.internal.processors.service.IgniteServiceProcessor.staticallyConfiguredServices(IgniteServiceProcessor.java:1541)
at org.apache.ignite.internal.processors.service.IgniteServiceProcessor.collectJoiningNodeData(IgniteServiceProcessor.java:354)
at org.apache.ignite.internal.managers.discovery.GridDiscoveryManager$5.collect(GridDiscoveryManager.java:861)
at org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.collectExchangeData(TcpDiscoverySpi.java:2032)
at org.apache.ignite.spi.discovery.tcp.ServerImpl.joinTopology(ServerImpl.java:1029)
at org.apache.ignite.spi.discovery.tcp.ServerImpl.spiStart(ServerImpl.java:427)
at org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.spiStart(TcpDiscoverySpi.java:2099)
at org.apache.ignite.internal.managers.GridManagerAdapter.startSpi(GridManagerAdapter.java:299)
at org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.start(GridDiscoveryManager.java:943)
at org.apache.ignite.internal.IgniteKernal.startManager(IgniteKernal.java:1960)
at org.apache.ignite.internal.IgniteKernal.start(IgniteKernal.java:1276)
at org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start0(IgnitionEx.java:2045)
at org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start(IgnitionEx.java:1703)
at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1117)
at org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1035)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:921)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:820)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:690)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:659)
at org.apache.ignite.Ignition.start(Ignition.java:346)
at app.KafkaStreamerServiceNodeStartup.main(KafkaStreamerServiceNodeStartup.java:40)
Caused by: java.io.NotSerializableException: org.apache.ignite.stream.kafka.KafkaStreamer
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:97)
... 25 more