Ignite Spark Integration

classic Classic list List threaded Threaded
7 messages Options
Chang Chang
Reply | Threaded
Open this post in threaded view
|

Ignite Spark Integration

This post has NOT been accepted by the mailing list yet.
I am trying to use Ignite in Apache Spark streaming. I like to update the Ignite cache when processing the DStream via reduceByKeyAndWindow function.

Here is my pseudo code:
============================
import org.apache.ignite.scalar._

val ssc = new StreamingContext(sc)
. . .
scala(CONFIG) {
. . .
//spark code
. . .
ssc.start()
ssc.awaitTermination()
}
============================
Outcome:

Getting runtime error of:
============================
5/05/22 10:12:30 ERROR OneForOneStrategy: org.apache.spark.streaming.StreamingContext
java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext
============================

Is the above the correct way to use Ignite in Spark? What am I missing?

Thanks.
yakov yakov
Reply | Threaded
Open this post in threaded view
|

Re: Ignite Spark Integration

It seems you use some anonymous closure that is sent over to another Ignite node and this closure pulls StreamingContext. Can you move the closure to a static class , to make sure that closure does not have any reference.

Please let me know if it works.
Chang Chang
Reply | Threaded
Open this post in threaded view
|

Re: Ignite Spark Integration

This post has NOT been accepted by the mailing list yet.
I am not quite sure how not to include the StreamingContext in Spark.  Would appreciate suggestions on how to do this.

For now, I've moved the scala(CONFIG) into the functions as follows:

============================
import org.apache.ignite.scalar._

val CONFIG = "somepath\examples\config\example-cache.xml"
val NAME = "partitioned"
def mkCache[K,V]:IgniteCache = cache$[K,V][NAME].get
def initIgnite:Unit = {
   scala(CONFIG)
      igniteCache = mkCache[String,String]
   }
}
. . .
val ssc = new StreamingContext(sc)
//spark code
. . .
initIgnite
val reducer = (a:A, b:B) => {
   val newRecord = new SomeRecord(b)
   scala(CONFIG) {
      . . .
      val c = ignite$.jcache[String,String][NAME]
       c.iterator() foreach println
       c+=(newRecord.SESSIONID -> newRecord.toString)
   }
   newRecord
}
//spark code
. . .
ssc.start()
ssc.awaitTermination()
}
============================
Result:
Now when each time an Ignite session is established, the cache println is always empty records.
Question:
What is wrong with the above code?  It is not giving me the common in-memory table content.
alexey.goncharuk alexey.goncharuk
Reply | Threaded
Open this post in threaded view
|

Re: Ignite Spark Integration

My first suspicion is that Ignite nodes do not discover each other. Are you running your application on several hosts? If so, you need to change discovery configuration in example-cache.xml so that nodes would discover each other.

If that does not work, please also include a minimal full code of your application to reproduce the issue and describe the configuration of your cluster.
Chang Chang
Reply | Threaded
Open this post in threaded view
|

Re: Ignite Spark Integration

This post has NOT been accepted by the mailing list yet.
Code is running locally on 1 node/machine.  I will attempt to package up minimal running code to reproduce my issue.

Thanks,
Chang
Chang Chang
Reply | Threaded
Open this post in threaded view
|

Re: Ignite Spark Integration

This post has NOT been accepted by the mailing list yet.
I was able to figure out the inclusion of Spark Hive Context value was causing the StreamingContext serialization error.  I am still puzzled that SparkContext and StreamingContext values are ok but HiveContext has issues with Ignite.

The following is now working for me:
=====================
import org.apache.ignite.scalar._

val ssc = new StreamingContext(sc)
. . .
scala(CONFIG) {
. . .
//spark code
. . .
ssc.start()
ssc.awaitTermination()
}
====================

sandish sandish
Reply | Threaded
Open this post in threaded view
|

Re: Ignite Spark Integration

This post has NOT been accepted by the mailing list yet.
Hi Guys,

I'm Trying to run simple example with Spark Ignite Integration but I'm getting below error can any one help??

Error:
----------------------------------------------------------------------------------------------------------------------
>>>
>>> ver. 1.1.4-SNAPSHOT#19700101-sha1:DEV
>>> 2015 Copyright(C) Apache Software Foundation

15/07/02 22:25:57 INFO IgniteKernal: Config URL: n/a
15/07/02 22:25:57 INFO IgniteKernal: Daemon mode: off
15/07/02 22:25:57 INFO IgniteKernal: OS: Linux 3.13.0-54-generic i386
15/07/02 22:25:57 INFO IgniteKernal: OS user: sany
15/07/02 22:25:57 INFO IgniteKernal: Language runtime: Scala ver. 2.11.2
15/07/02 22:25:57 INFO IgniteKernal: VM information: Java(TM) SE Runtime Environment 1.8.0_45-b14 Oracle Corporation Java HotSpot(TM) Server VM 25.45-b02
15/07/02 22:25:57 INFO IgniteKernal: VM total memory: 0.89GB
15/07/02 22:25:57 INFO IgniteKernal: Remote Management [restart: off, REST: on, JMX (remote: off)]
15/07/02 22:25:57 INFO IgniteKernal: IGNITE_HOME=/mnt/softwares/incubator-ignite
15/07/02 22:25:57 INFO IgniteKernal: VM arguments: [-Didea.launcher.port=7539, -Didea.launcher.bin.path=/home/sany/Desktop/idea-IU-141.1532.4/bin, -Dfile.encoding=UTF-8]
15/07/02 22:25:57 INFO IgniteKernal: Configured caches ['ignite-marshaller-sys-cache', 'ignite-sys-cache', 'ignite-atomics-sys-cache', 'default']
15/07/02 22:25:57 INFO IgniteKernal: 3-rd party licenses can be found at: /mnt/softwares/incubator-ignite/libs/licenses
15/07/02 22:25:57 WARN GridDiagnostic: Initial heap size is 64MB (should be no less than 512MB, use -Xms512m -Xmx512m).
15/07/02 22:25:57 INFO IgniteKernal: Non-loopback local IPs: 192.168.1.5, fe80:0:0:0:2e1:40ff:fe33:1151%wlan1, fe80:0:0:0:5635:30ff:fe83:4961%wlan0
15/07/02 22:25:57 INFO IgniteKernal: Enabled local MACs: 00E140331151, 543530834961
15/07/02 22:25:57 INFO IgnitePluginProcessor: Configured plugins:
15/07/02 22:25:57 INFO IgnitePluginProcessor:   ^-- None
15/07/02 22:25:57 INFO IgnitePluginProcessor:
15/07/02 22:25:57 WARN TcpCommunicationSpi: Failed to load 'igniteshmem' library from classpath. Will try to load it from IGNITE_HOME.
15/07/02 22:25:58 WARN TcpCommunicationSpi: Failed to start shared memory communication server.
15/07/02 22:25:58 INFO TcpCommunicationSpi: Successfully bound to TCP port [port=47101, locHost=/127.0.0.1]
15/07/02 22:25:58 WARN NoopCheckpointSpi: Checkpoints are disabled (to enable configure any GridCheckpointSpi implementation)
15/07/02 22:25:58 WARN GridCollisionManager: Collision resolution is disabled (all jobs will be activated upon arrival).
15/07/02 22:25:58 WARN NoopSwapSpaceSpi: Swap space is disabled. To enable use FileSwapSpaceSpi.
15/07/02 22:25:58 INFO IgniteKernal: Security status [authentication=off]
15/07/02 22:25:58 INFO GridTcpRestProtocol: Command protocol successfully started [name=TCP binary, host=/127.0.0.1, port=11212]
15/07/02 22:25:58 WARN TcpDiscoveryMulticastIpFinder: Failed to deserialize multicast response.
15/07/02 22:25:58 WARN TcpDiscoverySpi: Failed to connect to any address from IP finder (will retry to join topology in 2000ms): [/127.0.0.1:47505, /127.0.0.1:47503, /127.0.0.1:47502, /127.0.0.1:47507, /127.0.0.1:47509, /127.0.0.1:47506, /127.0.0.1:47501, /127.0.0.1:47500, /127.0.0.1:47508, /127.0.0.1:47504]
15/07/02 22:26:00 WARN TcpDiscoverySpi: Failed to connect to any address from IP finder (will retry to join topology in 2000ms): [/127.0.0.1:47507, /127.0.0.1:47508, /127.0.0.1:47506, /127.0.0.1:47505, /127.0.0.1:47503, /127.0.0.1:47501, /127.0.0.1:47500, /127.0.0.1:47504, /127.0.0.1:47509, /127.0.0.1:47502]
15/07/02 22:26:02 WARN TcpDiscoverySpi: Failed to connect to any address from IP finder (will retry to join topology in 2000ms): [/127.0.0.1:47505, /127.0.0.1:47507, /127.0.0.1:47504, /127.0.0.1:47500, /127.0.0.1:47503, /127.0.0.1:47506, /127.0.0.1:47502, /127.0.0.1:47509, /127.0.0.1:47501, /127.0.0.1:47508]
15/07/02 22:26:04 WARN TcpDiscoverySpi: Failed to connect to any address from IP finder (will retry to join topology in 2000ms): [/127.0.0.1:47501, /127.0.0.1:47506, /127.0.0.1:47504, /127.0.0.1:47502, /127.0.0.1:47503, /127.0.0.1:47507, /127.0.0.1:47505, /127.0.0.1:47509, /127.0.0.1:47508, /127.0.0.1:47500]
----------------------------------------------------------------------------------------------------------------------


code:

import org.apache.ignite.cache.query.annotations.{QueryTextField, QuerySqlField}
import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration}
import org.apache.spark.SparkContext


object ScalarCacheExample {
  private val CONFIG = "/mnt/github/incubator-ignite/examples/config/example-cache.xml"


  private def EMPTY_ARGS = Array.empty[String]
  def main(args: Array[String]) {
    val sc = new SparkContext("local[4]", "BenchmarkTest")
    val igniteContext = new IgniteContext[Integer, Integer](sc,
      CONFIG)
    val cache = igniteContext.fromCache("partitioned")
    cache.filter(_._2 < 10).collect()
    sc.stop()

  }
}
------------------------------------------------------------------------------------------------------------------------------