How to solve the 22 parameters' limit under scala 2.10 in the case class?

classic Classic list List threaded Threaded
20 messages Options
F7753 F7753
Reply | Threaded
Open this post in threaded view
|

How to solve the 22 parameters' limit under scala 2.10 in the case class?

This post was updated on .
Description:
    I have two tables to join in spark - ignite hyper environment. one has 20 fields, the other has 40 fields, I can not create more than 22 params in a case class under scala 2.10(It has solved in scala 2.11, but for some reason, I can not use 2.11 in my environment).
   The ignite need annotation '@ScalarCacheQuerySqlField' to index the data fields, it works well with the case class, but not the spark.sql.types.StructField, the StructField can simply solve the 22 params' limitation.
Here is my question:
   Is there another way or some tips to solve the conflict? How to use '@ScalarCacheQuerySqlField' with StructField?
F7753 F7753
Reply | Threaded
Open this post in threaded view
|

Re: How to solve the 22 parameters' limit under scala 2.10 in the case class?

Here are some details:
  I created the case class A which parameter is less than 22 and the case class B which has more than 22 parameters, I want to let ignite to create index on the fields , so the case class is like:
---------------------------------------------------------------------------------------------------------------------
  case class A (
      @ScalarCacheQuerySqlField field0: String,
      @ScalarCacheQuerySqlField field1: BigInt,
      @ScalarCacheQuerySqlField field2: String,
     ....
      @ScalarCacheQuerySqlField field15: String,
  )
    case class B (
      @ScalarCacheQuerySqlField field0: String,
      @ScalarCacheQuerySqlField field1: BigInt,
      @ScalarCacheQuerySqlField field2: String,
     ....
      @ScalarCacheQuerySqlField field50: String,
  )
-------------------------------------------------------------------------------------------------------------------
then I use maven to compile the project, it alert the error:

 error: Implementation restriction: case classes cannot have more than 22 parameters.

I found that in scala2.10 you can just create no more than 22 parameters in a case class, so I try to use StructType to see whether it can solve this, the code is like:
--------------------------------------------------------------------------------------------------------------------
object Schema{
    val A = StructType( Array(
               StructField("field0", StringType),
               StructField("field1", LongType),
                 .....
               StructField("field15", StringType),
               )
    val B = StructType( Array(
               StructField("field0", StringType),
               StructField("field1", LongType),
                 .....
               StructField("field50", StringType),
               )
}
--------------------------------------------------------------------------------------------------------------------

How to add the annotation '@ScalarCacheQuerySqlField' ?
Alexey Kuznetsov Alexey Kuznetsov
Reply | Threaded
Open this post in threaded view
|

Re: How to solve the 22 parameters' limit under scala 2.10 in the case class?

Why do you need case classes?
It is possible to use usual classes?

Here my code that works:

import org.apache.ignite.cache.query.annotations.{QuerySqlField, QueryTextField}

import scala.annotation.meta.field

/**
 * Scala wrappers for java SQL annotations.
 */
object SqlAnnotations {
    type ScalaCacheQueryTextField = QueryTextField @field
    type ScalaCacheQuerySqlField = QuerySqlField @field
    type ScalaCacheQuerySqlFieldGroup = QuerySqlField.Group @field
}

class Card(
    @ScalaCacheQuerySqlField(index = true) val cardId: Long = 0L,
    @ScalaCacheQuerySqlField(index = true) val cardNumber: String = null,
 }

Could you try this approach?


On Fri, Apr 1, 2016 at 3:30 PM, F7753 <[hidden email]> wrote:
Here are some details:
  I created the case class A which parameter is less than 22 and the case
class B which has more than 22 parameters, I want to let ignite to create
index on the fields , so the case class is like:
---------------------------------------------------------------------------------------------------------------------
  case class A (
      @ScalarCacheQuerySqlField field0: String,
      @ScalarCacheQuerySqlField field1: BigInt,
      @ScalarCacheQuerySqlField field2: String,
     ....
      @ScalarCacheQuerySqlField field15: String,
  )
    case class B (
      @ScalarCacheQuerySqlField field0: String,
      @ScalarCacheQuerySqlField field1: BigInt,
      @ScalarCacheQuerySqlField field2: String,
     ....
      @ScalarCacheQuerySqlField field50: String,
  )
-------------------------------------------------------------------------------------------------------------------
then I use maven to compile the project, it alert the error:

 error: Implementation restriction: case classes cannot have more than 22
parameters.

I found that in scala2.10 you can just create no more than 22 parameters in
a case class, so I try to use StructType to see whether it can solve this,
the code is like:
--------------------------------------------------------------------------------------------------------------------
object Schema{
    val A = StructType( Array(
               StructField("field0", StringType),
               StructField("field1", LongType),
                 .....
               StructField("field15", StringType),
               )
    val B = StructType( Array(
               StructField("field0", StringType),
               StructField("field1", LongType),
                 .....
               StructField("field50", StringType),
               )
}
--------------------------------------------------------------------------------------------------------------------

How to add the annotation '@ScalarCacheQuerySqlField' ?



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/How-to-solve-the-22-parameters-limit-under-scala-2-10-in-the-case-class-tp3847p3849.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.



--
Alexey Kuznetsov
GridGain Systems
www.gridgain.com
F7753 F7753
Reply | Threaded
Open this post in threaded view
|

Re: How to solve the 22 parameters' limit under scala 2.10 in the case class?

In reply to this post by F7753
Thank you for your advice , I hope it will work, at least the compilation did not throw any exception.
But I got another error when I submit the jar. The cluster does not seems to be wrong, since I've tested it using the official guide: https://apacheignite-fs.readme.io/docs/ignitecontext-igniterdd#section-running-sql-queries-against-ignite-cache and the simple use case works well. The exception tell me that I should write 'Ignition.start()' before other code,I have not do this in the use case given by the guide, there must be something wrong. here is the full stack trace:
-------------------------------------------------------------------------------------------------------------------
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, nobida145): class org.apache.ignite.IgniteIllegalStateException: Ignite instance with provided name doesn't exist. Did you call Ignition.start(..) to start an Ignite instance? [name=null]
        at org.apache.ignite.internal.IgnitionEx.grid(IgnitionEx.java:1235)
        at org.apache.ignite.Ignition.ignite(Ignition.java:516)
        at org.apache.ignite.spark.IgniteContext.ignite(IgniteContext.scala:150)
        at org.apache.ignite.spark.IgniteContext$$anonfun$1.apply$mcVI$sp(IgniteContext.scala:58)
        at org.apache.ignite.spark.IgniteContext$$anonfun$1.apply(IgniteContext.scala:58)
        at org.apache.ignite.spark.IgniteContext$$anonfun$1.apply(IgniteContext.scala:58)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:912)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)
        at org.apache.ignite.spark.IgniteContext.<init>(IgniteContext.scala:58)
        at main.scala.StreamingJoin$.main(StreamingJoin.scala:180)
        at main.scala.StreamingJoin.main(StreamingJoin.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: class org.apache.ignite.IgniteIllegalStateException: Ignite instance with provided name doesn't exist. Did you call Ignition.start(..) to start an Ignite instance? [name=null]
        at org.apache.ignite.internal.IgnitionEx.grid(IgnitionEx.java:1235)
        at org.apache.ignite.Ignition.ignite(Ignition.java:516)
        at org.apache.ignite.spark.IgniteContext.ignite(IgniteContext.scala:150)
        at org.apache.ignite.spark.IgniteContext$$anonfun$1.apply$mcVI$sp(IgniteContext.scala:58)
        at org.apache.ignite.spark.IgniteContext$$anonfun$1.apply(IgniteContext.scala:58)
        at org.apache.ignite.spark.IgniteContext$$anonfun$1.apply(IgniteContext.scala:58)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
./igparkStr.sh: line 16: --repositories: command not found

-----------------------------------------------------------------------------------------------------------------
igsparkStr.sh is a shell script I wrote to submit a spark app, its content listed below:
-----------------------------------------------------------------------------------------------------------------
/opt/spark-1.6.1/bin/spark-submit --class main.scala.StreamingJoin   --properties-file conf/spark.conf ./myapp-1.0-SNAPSHOT-jar-with-dependencies.jar --packages org.apache.ignite:ignite-spark:1.5.0.final
 16   --repositories http://www.gridgainsystems.com/nexus/content/repositories/external
-----------------------------------------------------------------------------------------------------------------
yakov yakov
Reply | Threaded
Open this post in threaded view
|

Re: How to solve the 22 parameters' limit under scala 2.10 in the case class?

Hi!

Can you please provide the code you are trying to execute and full logs of execution. Seems that IgniteContext was not properly initialized.

--
Yakov
F7753 F7753
Reply | Threaded
Open this post in threaded view
|

Re: How to solve the 22 parameters' limit under scala 2.10 in the case class?

This post was updated on .
I use two IgniteContext  instance in my spark streaming job, one IgniteContext contains a cacheConfiguration, the cacheConfiguration was for the dataframe to join.
Here the code is:
-----------------------------------------------------------------------------------------------------

package main.scala

/**
  * Created by F7753 on 2016/3/30.
  */
import kafka.serializer.StringDecoder

import org.apache.ignite.cache.CacheMode
import org.apache.ignite.cache.query.annotations.QuerySqlField

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}

import org.apache.spark.storage.StorageLevel

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.Row
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType}

import org.apache.log4j._

import org.apache.ignite.Ignition
import org.apache.ignite.configuration._
import org.apache.ignite.spark.IgniteContext
import org.apache.ignite.configuration.CacheConfiguration

import scala.annotation.meta.field


object Schema {
  val small_schema = StructType( Array(
    StructField("field0", StringType),
    StructField("field2", LongType),
    StructField("field3", LongType),
    StructField("field4", LongType),
    StructField("field5", LongType),
    StructField("field6", LongType),
    StructField("field7", StringType),
    StructField("field8", StringType),
    StructField("field9", StringType),
    StructField("field10", StringType),
    StructField("field11", StringType),
    StructField("field12", StringType),
    StructField("field13", StringType),
    StructField("field14", StringType),
    StructField("field15", StringType),
    StructField("field16", StringType),
    StructField("field17", StringType),
    StructField("field18", StringType),
    StructField("field19", StringType))
  )

  val source_schema = StructType( Array(
    StructField("field0", LongType),
    StructField("field1", StringType),
    StructField("field2", StringType),
    StructField("field3", StringType),
    StructField("field4", IntegerType),
    StructField("field5", IntegerType),
    StructField("field6", IntegerType),
    StructField("field7", IntegerType),
    StructField("field8", IntegerType),
    StructField("field9", StringType),
    StructField("field10", StringType),
    StructField("field11", IntegerType),
    StructField("field12", StringType),
    StructField("field13", IntegerType),
    StructField("field14", StringType),
    StructField("field15",StringType),
    StructField("field16", IntegerType),
    StructField("field17", StringType),
    StructField("field18", IntegerType),
    StructField("field19", StringType),
    StructField("field20", StringType),
    StructField("field21", StringType),
    StructField("field22", IntegerType),
    StructField("field23", IntegerType),
    StructField("field24", StringType),
    StructField("field25", IntegerType),
    StructField("field26", IntegerType),
    StructField("field27", IntegerType),
    StructField("field28", IntegerType),
    StructField("field29", IntegerType),
    StructField("field30", LongType),
    StructField("field31", StringType),
    StructField("field32", LongType),
    StructField("field33", StringType),
    StructField("field34", LongType),
    StructField("field35", StringType),
    StructField("field36", LongType),
    StructField("field37", StringType),
    StructField("field38", IntegerType),
    StructField("field39", IntegerType),
    StructField("field40", IntegerType),
    StructField("field41", LongType))
  )
}

object SQLContextSingleton {

  @transient  private var instance: SQLContext = _

  def getInstance(sparkContext: SparkContext): SQLContext = {
    if (instance == null) {
      instance = new SQLContext(sparkContext)
    }
    instance
  }
}

object StreamingJoin {

  private final val SMALL = StreamingJoin.getClass.getSimpleName + "SmallTable"
  private final val SOURCE = StreamingJoin.getClass.getSimpleName + "SourceTable"

  def checkArgs(args: Array[String]): Unit = {
    args(0) match {
      case "Socket" =>
        if(args.length < 4) {
          System.err.println("Usage: StreamingTest Socket <ip> <port> <receiverNums>")
          System.exit(1)
        }
      case _ =>
        System.err.println("Unsurpported source...")
        System.exit(1)
    }
  }

  def main(args: Array[String]): Unit = {

    /** Type alias for `QuerySqlField`. */
    type ScalarCacheQuerySqlField = QuerySqlField @field

    checkArgs(args)

    val scf = new SparkConf().setAppName("StreamingTest")
    val ssc = new StreamingContext(scf, Seconds(10))
    val sqlCtx:SQLContext = SQLContextSingleton.getInstance(ssc.sparkContext)

    PropertyConfigurator.configure("./conf/log.conf")

    val smallTableData = SQLContextSingleton.getInstance(ssc.sparkContext).read
      .format("com.databricks.spark.csv")
      .option("header", "false")
      .schema(Schema.small_schema)
      .load("hdfs://host:9000/fileName.csv")

    val smallTableDF = sqlCtx.createDataFrame(smallTableData.rdd, classOf[smallTableCache])

    // create ignite context (embeded mode)
    val smallTableContext = new IgniteContext[String, small](ssc.sparkContext, "/home/test/SparkIgniteStreaming/config/example-cache.xml")

    // small table cache config
    val smallTableCacheCfg = new CacheConfiguration[String, small](SMALL)
    smallTableCacheCfg.setIndexedTypes(classOf[BigInt], classOf[small])
    smallTableCacheCfg.setCacheMode(CacheMode.REPLICATED)

    // ignite small table cache
    val smallTableCache = smallTableContext.fromCache(smallTableCacheCfg)

    // smallTableCols rdd save to cache
    val smallTableCols_rdd = smallTableDF.rdd.map(
      r => (
        r.getAs[String](0),
        new small(
          r.getAs[BigInt](0),
          r.getAs[String](1),
          r.getAs[String](2),
          r.getAs[String](3),
          r.getAs[Int](4),
          r.getAs[Int](5),
          r.getAs[Int](6),
          r.getAs[Int](7),
          r.getAs[Int](8),
          r.getAs[String](9),
          r.getAs[String](10),
          r.getAs[Int](11),
          r.getAs[String](12),
          r.getAs[String](13),
          r.getAs[String](14),
          r.getAs[Int](15),
          r.getAs[String](16),
          r.getAs[Int](17),
          r.getAs[String](18),
          r.getAs[String](19),
          r.getAs[String](20),
          r.getAs[Int](21),
          r.getAs[Int](22),
          r.getAs[String](23),
          r.getAs[Int](24),
          r.getAs[Int](25),
          r.getAs[Int](26),
          r.getAs[Int](27),
          r.getAs[Int](28),
          r.getAs[BigInt](29),
          r.getAs[String](30),
          r.getAs[BigInt](31),
          r.getAs[String](32),
          r.getAs[BigInt](33),
          r.getAs[String](34),
          r.getAs[BigInt](35),
          r.getAs[String](36),
          r.getAs[Int](37),
          r.getAs[Int](38),
          r.getAs[Int](39),
          r.getAs[BigInt](40)
        )))

    smallTableCache.savePairs(smallTableCols_rdd)

    def creatSourceDStream: DStream[String] = {
      args(0) match {
        case "Socket" =>
          ssc.socketTextStream(args(1), args(2).toInt, StorageLevel.OFF_HEAP)
      }
    }

    val linesN = (1 to args(args.length-1).toInt).map(i => creatSourceDStream.flatMap(_.split("\n")))
    val lines  = ssc.union(linesN)

    lines.foreachRDD( (rdd: RDD[String], time: Time) => {
      val sqlCtx = SQLContextSingleton.getInstance(rdd.sparkContext)
      val rowRdd = rdd.map(_.split(",")).map(row => Row(row(0), row(1).toLong, row(2).toLong, row(3).toLong, row(4).toLong, row(5).toLong,
        row(6), row(7), row(8), row(9), row(10), row(11), row(12), row(13), row(14), row(15), row(16), row(17), row(18)))

      val sourceTableColsDF = sqlCtx.createDataFrame(rowRdd, classOf[source])

      val sourceTableCols_rdd = sourceTableColsDF.rdd.map(
        r => (
          r.getAs[String](0),
          new  source(
            r.getAs[String](0),
            r.getAs[BigInt](1),
            r.getAs[BigInt](2),
            r.getAs[BigInt](3),
            r.getAs[BigInt](4),
            r.getAs[String](5),
            r.getAs[String](6),
            r.getAs[String](7),
            r.getAs[String](8),
            r.getAs[String](9),
            r.getAs[String](10),
            r.getAs[String](11),
            r.getAs[String](12),
            r.getAs[String](13),
            r.getAs[String](14),
            r.getAs[String](15),
            r.getAs[String](16),
            r.getAs[String](17)
          )
          )
      )

      // source table cache config
      val sourceTableCacheCfg = new CacheConfiguration[String, source](SOURCE)
      sourceTableCacheCfg.setIndexedTypes(classOf[BigInt], classOf[source])
      sourceTableCacheCfg.setCacheMode(CacheMode.PARTITIONED)

      val streamTableContext = new IgniteContext[String, source](ssc.sparkContext, "/home/test/config/default-config.xml")

      // ignite source table cache
      val sourceTableCache = streamTableContext.fromCache(sourceTableCacheCfg)

      // sourceTableCols rdd save to cache
      sourceTableCache.savePairs(sourceTableCols_rdd)

      val query =
        s"""
           |select s.fl, s.xz  count(*)
           | from
           |   SourceTable as e, \" """ + SOURCE+ """\".SmallTable as s
                                                    | where
                                                    |   e.pz_id=s.pz_id
                                                    | group by
                                                    |   s.fl, s.xz
                                                  """.stripMargin

      val res = sourceTableCache.sql(query)

      println("-----------------------------")
      println("Time: " + time)
      println("-----------------------------")

      res.show(10)
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

-----------------------------------------------------------------------------------------------------
the Schema.scala:
-----------------------------------------------------------------------------------------------------
package main.scala

import org.apache.ignite.scalar.scalar._

/**
  * Created by INFI on 2016/3/31.
  */
class small(@ScalarCacheQuerySqlField field0:  String,
                  @ScalarCacheQuerySqlField field1:    BigInt,
                  @ScalarCacheQuerySqlField field2:   BigInt,
                  @ScalarCacheQuerySqlField field3:   BigInt,
                  @ScalarCacheQuerySqlField field4:  BigInt,
                  @ScalarCacheQuerySqlField field5:    String,
                  @ScalarCacheQuerySqlField field6:    String,
                  @ScalarCacheQuerySqlField field7:  String,
                  @ScalarCacheQuerySqlField field8:     String,
                  @ScalarCacheQuerySqlField field9:     String,
                  @ScalarCacheQuerySqlField field10:       String,
                  @ScalarCacheQuerySqlField field11:   String,
                  @ScalarCacheQuerySqlField field12:       String,
                  @ScalarCacheQuerySqlField field13:      String,
                  @ScalarCacheQuerySqlField field14:      String,
                  @ScalarCacheQuerySqlField field15:      String,
                  @ScalarCacheQuerySqlField field16:      String,
                  @ScalarCacheQuerySqlField field17:      String)extends Serializable {
 
}

class source(@ScalarCacheQuerySqlField field0:     BigInt,
                   @ScalarCacheQuerySqlField field1:     String,
                   @ScalarCacheQuerySqlField field2:        String,
                   @ScalarCacheQuerySqlField field3:     String,
                   @ScalarCacheQuerySqlField field4:      Int,
                   @ScalarCacheQuerySqlField field5:        Int,
                   @ScalarCacheQuerySqlField field6:        Int,
                   @ScalarCacheQuerySqlField field7:    Int,
                   @ScalarCacheQuerySqlField field8:        Int,
                   @ScalarCacheQuerySqlField field9:      String,
                   @ScalarCacheQuerySqlField field10:      String,
                   @ScalarCacheQuerySqlField field11:    Int,
                   @ScalarCacheQuerySqlField field12:     String,
                   @ScalarCacheQuerySqlField field13:     String,
                   @ScalarCacheQuerySqlField field14:       String,
                   @ScalarCacheQuerySqlField field15:   Int,
                   @ScalarCacheQuerySqlField field16:   String,
                   @ScalarCacheQuerySqlField field17:      Int,
                   @ScalarCacheQuerySqlField field18:        String,
                   @ScalarCacheQuerySqlField field19:        String,
                   @ScalarCacheQuerySqlField field20:        String,
                   @ScalarCacheQuerySqlField field21:   Int,
                   @ScalarCacheQuerySqlField field22:   Int,
                   @ScalarCacheQuerySqlField field23:      String,
                   @ScalarCacheQuerySqlField field24:       Int,
                   @ScalarCacheQuerySqlField field25:       Int,
                   @ScalarCacheQuerySqlField field26:       Int,
                   @ScalarCacheQuerySqlField field27:       Int,
                   @ScalarCacheQuerySqlField field28:       Int,
                   @ScalarCacheQuerySqlField field29:     BigInt,
                   @ScalarCacheQuerySqlField field30:      String,
                   @ScalarCacheQuerySqlField field31:     BigInt,
                   @ScalarCacheQuerySqlField field32:      String,
                   @ScalarCacheQuerySqlField field33:   BigInt,
                   @ScalarCacheQuerySqlField field34:  String,
                   @ScalarCacheQuerySqlField field35:   BigInt,
                   @ScalarCacheQuerySqlField field36:  String,
                   @ScalarCacheQuerySqlField field37:    Int,
                   @ScalarCacheQuerySqlField field38:      Int,
                   @ScalarCacheQuerySqlField field39:      Int,
                   @ScalarCacheQuerySqlField field40:      BigInt)extends Serializable {
 
}
-----------------------------------------------------------------------------------------------------
and the xml file is :
-----------------------------------------------------------------------------------------------------
<?xml version="1.0" encoding="UTF-8"?>




<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd">
    <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
        <property name="cacheConfiguration">
            <list>
               
                <bean class="org.apache.ignite.configuration.CacheConfiguration">
                    <property name="atomicityMode" value="ATOMIC"/>
                    <property name="backups" value="1"/>
                </bean>
            </list>
        </property>

       
        <property name="discoverySpi">
            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <property name="ipFinder">
                   
                   
                   
                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
                        <property name="addresses">
                            <list>
                               
                                <value>127.0.0.1:47500..47509</value>
                            </list>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>
</beans>
-----------------------------------------------------------------------------------------------------

the log:
-----------------------------------------------------------------------------------------------------
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 7, nobida144): class org.apache.ignite.IgniteCheckedException: Spring XML configuration path is invalid: /home/test/SparkIgniteStreaming/config/example-cache.xml. Note that this path should be either absolute or a relative local file system path, relative to META-INF in classpath or valid URL to IGNITE_HOME.
        at org.apache.ignite.internal.util.IgniteUtils.resolveSpringUrl(IgniteUtils.java:3523)
        at org.apache.ignite.internal.IgnitionEx.loadConfigurations(IgnitionEx.java:643)
        at org.apache.ignite.internal.IgnitionEx.loadConfiguration(IgnitionEx.java:682)
        at org.apache.ignite.spark.IgniteContext$$anonfun$$lessinit$greater$2.apply(IgniteContext.scala:88)
        at org.apache.ignite.spark.IgniteContext$$anonfun$$lessinit$greater$2.apply(IgniteContext.scala:88)
        at org.apache.ignite.spark.Once.apply(IgniteContext.scala:184)
        at org.apache.ignite.spark.IgniteContext.ignite(IgniteContext.scala:143)
        at org.apache.ignite.spark.IgniteRDD$$anonfun$savePairs$1.apply(IgniteRDD.scala:171)
        at org.apache.ignite.spark.IgniteRDD$$anonfun$savePairs$1.apply(IgniteRDD.scala:170)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.MalformedURLException: no protocol: /home/test/SparkIgniteStreaming/config/example-cache.xml
        at java.net.URL.<init>(URL.java:589)
        at java.net.URL.<init>(URL.java:486)
        at java.net.URL.<init>(URL.java:435)
        at org.apache.ignite.internal.util.IgniteUtils.resolveSpringUrl(IgniteUtils.java:3514)
        ... 18 more

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:920)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
        at org.apache.ignite.spark.IgniteRDD.savePairs(IgniteRDD.scala:170)
        at main.scala.StreamingJoin$.main(StreamingJoin.scala:242)
        at main.scala.StreamingJoin.main(StreamingJoin.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: class org.apache.ignite.IgniteCheckedException: Spring XML configuration path is invalid: /home/test/SparkIgniteStreaming/config/example-cache.xml. Note that this path should be either absolute or a relative local file system path, relative to META-INF in classpath or valid URL to IGNITE_HOME.
        at org.apache.ignite.internal.util.IgniteUtils.resolveSpringUrl(IgniteUtils.java:3523)
        at org.apache.ignite.internal.IgnitionEx.loadConfigurations(IgnitionEx.java:643)
        at org.apache.ignite.internal.IgnitionEx.loadConfiguration(IgnitionEx.java:682)
        at org.apache.ignite.spark.IgniteContext$$anonfun$$lessinit$greater$2.apply(IgniteContext.scala:88)
        at org.apache.ignite.spark.IgniteContext$$anonfun$$lessinit$greater$2.apply(IgniteContext.scala:88)
        at org.apache.ignite.spark.Once.apply(IgniteContext.scala:184)
        at org.apache.ignite.spark.IgniteContext.ignite(IgniteContext.scala:143)
        at org.apache.ignite.spark.IgniteRDD$$anonfun$savePairs$1.apply(IgniteRDD.scala:171)
        at org.apache.ignite.spark.IgniteRDD$$anonfun$savePairs$1.apply(IgniteRDD.scala:170)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.MalformedURLException: no protocol: /home/test/SparkIgniteStreaming/config/example-cache.xml
        at java.net.URL.<init>(URL.java:589)
        at java.net.URL.<init>(URL.java:486)
        at java.net.URL.<init>(URL.java:435)
        at org.apache.ignite.internal.util.IgniteUtils.resolveSpringUrl(IgniteUtils.java:3514)
        ... 18 more
^C[18:09:32] Ignite node stopped OK [uptime=00:00:16:733]

-----------------------------------------------------------------------------------------------------

I modified the schema fields' name for some reason, other code are the same with the one I executed.
yakov yakov
Reply | Threaded
Open this post in threaded view
|

Re: How to solve the 22 parameters' limit under scala 2.10 in the case class?

Thanks for sharing the code. Alex Goncharuk, can you please take a look?

--Yakov
alexey.goncharuk alexey.goncharuk
Reply | Threaded
Open this post in threaded view
|

Re: How to solve the 22 parameters' limit under scala 2.10 in the case class?

From the error message "Spring XML configuration path is invalid: /home/test/SparkIgniteStreaming/config/example-cache.xm" my guess is that the configuration file is absent on the Spark executor node.

2016-04-04 8:17 GMT-07:00 Yakov Zhdanov <[hidden email]>:
Thanks for sharing the code. Alex Goncharuk, can you please take a look?

--Yakov

F7753 F7753
Reply | Threaded
Open this post in threaded view
|

Re: How to solve the 22 parameters' limit under scala 2.10 in the case class?

I copied the config file to all other nodes then it throws:
---------------------------------------------------------------------------------------------
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 11, nobida148): class org.apache.ignite.IgniteCheckedException: Failed to instantiate Spring XML application context [springUrl=file:/home/CQ/PlatformTest/IgniteRDDTest/SparkIgniteStreaming/config/example-cache.xml, err=Line 1 in XML document from URL [file:/home/CQ/PlatformTest/IgniteRDDTest/SparkIgniteStreaming/config/example-cache.xml] is invalid; nested exception is org.xml.sax.SAXParseException; systemId: http://www.springframework.org/schema/beans/spring-beans.xsd; lineNumber: 1; columnNumber: 166; Open quote is expected for attribute "language" associated with an  element type  "script".]
        at org.apache.ignite.internal.util.spring.IgniteSpringHelperImpl.applicationContext(IgniteSpringHelperImpl.java:391)
        at org.apache.ignite.internal.util.spring.IgniteSpringHelperImpl.loadConfigurations(IgniteSpringHelperImpl.java:104)
        at org.apache.ignite.internal.util.spring.IgniteSpringHelperImpl.loadConfigurations(IgniteSpringHelperImpl.java:98)
        at org.apache.ignite.internal.IgnitionEx.loadConfigurations(IgnitionEx.java:604)
        at org.apache.ignite.internal.IgnitionEx.loadConfigurations(IgnitionEx.java:643)
        at org.apache.ignite.internal.IgnitionEx.loadConfiguration(IgnitionEx.java:682)
        at org.apache.ignite.spark.IgniteContext$$anonfun$$lessinit$greater$2.apply(IgniteContext.scala:88)
        at org.apache.ignite.spark.IgniteContext$$anonfun$$lessinit$greater$2.apply(IgniteContext.scala:88)
        at org.apache.ignite.spark.Once.apply(IgniteContext.scala:184)
        at org.apache.ignite.spark.IgniteContext.ignite(IgniteContext.scala:143)
        at org.apache.ignite.spark.IgniteRDD$$anonfun$savePairs$1.apply(IgniteRDD.scala:171)
        at org.apache.ignite.spark.IgniteRDD$$anonfun$savePairs$1.apply(IgniteRDD.scala:170)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.beans.factory.xml.XmlBeanDefinitionStoreException: Line 1 in XML document from URL [file:/home/CQ/PlatformTest/IgniteRDDTest/SparkIgniteStreaming/config/example-cache.xml] is invalid; nested exception is org.xml.sax.SAXParseException; systemId: http://www.springframework.org/schema/beans/spring-beans.xsd; lineNumber: 1; columnNumber: 166; Open quote is expected for attribute "language" associated with an  element type  "script".
        at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.doLoadBeanDefinitions(XmlBeanDefinitionReader.java:398)
        at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.loadBeanDefinitions(XmlBeanDefinitionReader.java:335)
        at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.loadBeanDefinitions(XmlBeanDefinitionReader.java:303)
        at org.apache.ignite.internal.util.spring.IgniteSpringHelperImpl.applicationContext(IgniteSpringHelperImpl.java:379)
        ... 21 more
Caused by: org.xml.sax.SAXParseException; systemId: http://www.springframework.org/schema/beans/spring-beans.xsd; lineNumber: 1; columnNumber: 166; Open quote is expected for attribute "language" associated with an  element type  "script".
        at org.apache.xerces.util.ErrorHandlerWrapper.createSAXParseException(Unknown Source)
        at org.apache.xerces.util.ErrorHandlerWrapper.fatalError(Unknown Source)
        at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source)
        at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source)
        at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source)
        at org.apache.xerces.impl.XMLScanner.reportFatalError(Unknown Source)
        at org.apache.xerces.impl.XMLScanner.scanAttributeValue(Unknown Source)
        at org.apache.xerces.impl.XMLNSDocumentScannerImpl.scanAttribute(Unknown Source)
        at org.apache.xerces.impl.XMLNSDocumentScannerImpl.scanStartElement(Unknown Source)
        at org.apache.xerces.impl.XMLDocumentFragmentScannerImpl$FragmentContentDispatcher.dispatch(Unknown Source)
        at org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.scanDocument(Unknown Source)
        at org.apache.xerces.impl.xs.opti.SchemaParsingConfig.parse(Unknown Source)
        at org.apache.xerces.impl.xs.opti.SchemaParsingConfig.parse(Unknown Source)
        at org.apache.xerces.impl.xs.opti.SchemaDOMParser.parse(Unknown Source)
        at org.apache.xerces.impl.xs.traversers.XSDHandler.getSchemaDocument(Unknown Source)
        at org.apache.xerces.impl.xs.traversers.XSDHandler.parseSchema(Unknown Source)
        at org.apache.xerces.impl.xs.XMLSchemaLoader.loadSchema(Unknown Source)
        at org.apache.xerces.impl.xs.XMLSchemaValidator.findSchemaGrammar(Unknown Source)
        at org.apache.xerces.impl.xs.XMLSchemaValidator.handleStartElement(Unknown Source)
        at org.apache.xerces.impl.xs.XMLSchemaValidator.startElement(Unknown Source)
        at org.apache.xerces.impl.XMLNSDocumentScannerImpl.scanStartElement(Unknown Source)
        at org.apache.xerces.impl.XMLNSDocumentScannerImpl$NSContentDispatcher.scanRootElementHook(Unknown Source)
        at org.apache.xerces.impl.XMLDocumentFragmentScannerImpl$FragmentContentDispatcher.dispatch(Unknown Source)
        at org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.scanDocument(Unknown Source)
        at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source)
        at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source)
        at org.apache.xerces.parsers.XMLParser.parse(Unknown Source)
        at org.apache.xerces.parsers.DOMParser.parse(Unknown Source)
        at org.apache.xerces.jaxp.DocumentBuilderImpl.parse(Unknown Source)
        at org.springframework.beans.factory.xml.DefaultDocumentLoader.loadDocument(DefaultDocumentLoader.java:76)
        at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.doLoadDocument(XmlBeanDefinitionReader.java:428)
        at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.doLoadBeanDefinitions(XmlBeanDefinitionReader.java:390)
        ... 24 more

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:920)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
        at org.apache.ignite.spark.IgniteRDD.savePairs(IgniteRDD.scala:170)
        at main.scala.StreamingJoin$.main(StreamingJoin.scala:242)
        at main.scala.StreamingJoin.main(StreamingJoin.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: class org.apache.ignite.IgniteCheckedException: Failed to instantiate Spring XML application context [springUrl=file:/home/CQ/PlatformTest/IgniteRDDTest/SparkIgniteStreaming/config/example-cache.xml, err=Line 1 in XML document from URL [file:/home/CQ/PlatformTest/IgniteRDDTest/SparkIgniteStreaming/config/example-cache.xml] is invalid; nested exception is org.xml.sax.SAXParseException; systemId: http://www.springframework.org/schema/beans/spring-beans.xsd; lineNumber: 1; columnNumber: 166; Open quote is expected for attribute "language" associated with an  element type  "script".]
        at org.apache.ignite.internal.util.spring.IgniteSpringHelperImpl.applicationContext(IgniteSpringHelperImpl.java:391)
        at org.apache.ignite.internal.util.spring.IgniteSpringHelperImpl.loadConfigurations(IgniteSpringHelperImpl.java:104)
        at org.apache.ignite.internal.util.spring.IgniteSpringHelperImpl.loadConfigurations(IgniteSpringHelperImpl.java:98)
        at org.apache.ignite.internal.IgnitionEx.loadConfigurations(IgnitionEx.java:604)
        at org.apache.ignite.internal.IgnitionEx.loadConfigurations(IgnitionEx.java:643)
        at org.apache.ignite.internal.IgnitionEx.loadConfiguration(IgnitionEx.java:682)
        at org.apache.ignite.spark.IgniteContext$$anonfun$$lessinit$greater$2.apply(IgniteContext.scala:88)
        at org.apache.ignite.spark.IgniteContext$$anonfun$$lessinit$greater$2.apply(IgniteContext.scala:88)
        at org.apache.ignite.spark.Once.apply(IgniteContext.scala:184)
        at org.apache.ignite.spark.IgniteContext.ignite(IgniteContext.scala:143)
        at org.apache.ignite.spark.IgniteRDD$$anonfun$savePairs$1.apply(IgniteRDD.scala:171)
        at org.apache.ignite.spark.IgniteRDD$$anonfun$savePairs$1.apply(IgniteRDD.scala:170)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.beans.factory.xml.XmlBeanDefinitionStoreException: Line 1 in XML document from URL [file:/home/CQ/PlatformTest/IgniteRDDTest/SparkIgniteStreaming/config/example-cache.xml] is invalid; nested exception is org.xml.sax.SAXParseException; systemId: http://www.springframework.org/schema/beans/spring-beans.xsd; lineNumber: 1; columnNumber: 166; Open quote is expected for attribute "language" associated with an  element type  "script".
        at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.doLoadBeanDefinitions(XmlBeanDefinitionReader.java:398)
        at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.loadBeanDefinitions(XmlBeanDefinitionReader.java:335)
        at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.loadBeanDefinitions(XmlBeanDefinitionReader.java:303)
        at org.apache.ignite.internal.util.spring.IgniteSpringHelperImpl.applicationContext(IgniteSpringHelperImpl.java:379)
        ... 21 more
Caused by: org.xml.sax.SAXParseException; systemId: http://www.springframework.org/schema/beans/spring-beans.xsd; lineNumber: 1; columnNumber: 166; Open quote is expected for attribute "language" associated with an  element type  "script".
        at org.apache.xerces.util.ErrorHandlerWrapper.createSAXParseException(Unknown Source)
        at org.apache.xerces.util.ErrorHandlerWrapper.fatalError(Unknown Source)
        at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source)
        at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source)
        at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source)
        at org.apache.xerces.impl.XMLScanner.reportFatalError(Unknown Source)
        at org.apache.xerces.impl.XMLScanner.scanAttributeValue(Unknown Source)
        at org.apache.xerces.impl.XMLNSDocumentScannerImpl.scanAttribute(Unknown Source)
        at org.apache.xerces.impl.XMLNSDocumentScannerImpl.scanStartElement(Unknown Source)
        at org.apache.xerces.impl.XMLDocumentFragmentScannerImpl$FragmentContentDispatcher.dispatch(Unknown Source)
        at org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.scanDocument(Unknown Source)
        at org.apache.xerces.impl.xs.opti.SchemaParsingConfig.parse(Unknown Source)
        at org.apache.xerces.impl.xs.opti.SchemaParsingConfig.parse(Unknown Source)
        at org.apache.xerces.impl.xs.opti.SchemaDOMParser.parse(Unknown Source)
        at org.apache.xerces.impl.xs.traversers.XSDHandler.getSchemaDocument(Unknown Source)
        at org.apache.xerces.impl.xs.traversers.XSDHandler.parseSchema(Unknown Source)
        at org.apache.xerces.impl.xs.XMLSchemaLoader.loadSchema(Unknown Source)
        at org.apache.xerces.impl.xs.XMLSchemaValidator.findSchemaGrammar(Unknown Source)
        at org.apache.xerces.impl.xs.XMLSchemaValidator.handleStartElement(Unknown Source)
        at org.apache.xerces.impl.xs.XMLSchemaValidator.startElement(Unknown Source)
        at org.apache.xerces.impl.XMLNSDocumentScannerImpl.scanStartElement(Unknown Source)
        at org.apache.xerces.impl.XMLNSDocumentScannerImpl$NSContentDispatcher.scanRootElementHook(Unknown Source)
        at org.apache.xerces.impl.XMLDocumentFragmentScannerImpl$FragmentContentDispatcher.dispatch(Unknown Source)
        at org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.scanDocument(Unknown Source)
        at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source)
        at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source)
        at org.apache.xerces.parsers.XMLParser.parse(Unknown Source)
        at org.apache.xerces.parsers.DOMParser.parse(Unknown Source)
        at org.apache.xerces.jaxp.DocumentBuilderImpl.parse(Unknown Source)
        at org.springframework.beans.factory.xml.DefaultDocumentLoader.loadDocument(DefaultDocumentLoader.java:76)
        at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.doLoadDocument(XmlBeanDefinitionReader.java:428)
        at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.doLoadBeanDefinitions(XmlBeanDefinitionReader.java:390)
        ... 24 more
---------------------------------------------------------------------------------------------
alexey.goncharuk alexey.goncharuk
Reply | Threaded
Open this post in threaded view
|

Re: How to solve the 22 parameters' limit under scala 2.10 in the case class?

It looks like the XML file you are using is not valid. Can you share the config with us?​
F7753 F7753
Reply | Threaded
Open this post in threaded view
|

Re: How to solve the 22 parameters' limit under scala 2.10 in the case class?

In fact, I do not use the XML file to set the cache configuration. It seems that I had to connect to the Internet each time I launch the application, there is no local XML file cache(I remember the spring seems to fetch local XML file if there is one, the usage of spring in ignite is differ from that? However, I do not have much knowledge about spring).
So, when I connected to the internet(By some settings in my network environment), the same exception did not come out again.
And I wonder if there is some approach to let the embeded spring framework to fetch the XML file from local file system other than the internet?

Here is the XML file, the same as the official doc:
---------------------------------------------------------------------------------------------------------
<?xml version="1.0" encoding="UTF-8"?>




<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd">
    <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
        <property name="cacheConfiguration">
            <list>
               
                <bean class="org.apache.ignite.configuration.CacheConfiguration">
                    <property name="atomicityMode" value="ATOMIC"/>
                    <property name="backups" value="1"/>
                </bean>
            </list>
        </property>

       
        <property name="discoverySpi">
            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <property name="ipFinder">
                   
                   
                   
                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
                        <property name="addresses">
                            <list>
                               
                                <value>127.0.0.1:47500..47509</value>
                            </list>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>
</beans>
---------------------------------------------------------------------------------------------------------
F7753 F7753
Reply | Threaded
Open this post in threaded view
|

Re: How to solve the 22 parameters' limit under scala 2.10 in the case class?

I use the constructor like "val smallTableContext = new IgniteContext[BigInt, zb_test](ssc.sparkContext, () => new IgniteConfiguration())" instead of given a springXML file, since there is no need to put all the parameters in the XML file.
vkulichenko vkulichenko
Reply | Threaded
Open this post in threaded view
|

Re: How to solve the 22 parameters' limit under scala 2.10 in the case class?

Hi,

Can you show the code that creates the IgniteContext?

-Val
F7753 F7753
Reply | Threaded
Open this post in threaded view
|

Re: How to solve the 22 parameters' limit under scala 2.10 in the case class?

There is nothing more different from the code I show in the 6th comments in this topic, only the initialization of igniteContext, I also list the way I create it in the above comnents:
----------------------------------------------------------------------------------------------------------
val smallTableContext = new IgniteContext[BigInt, zb_test](ssc.sparkContext, () => new IgniteConfiguration())
----------------------------------------------------------------------------------------------------------
It seems that each time the ignite load a IgniteContext from a spring XML file, it can not avoid to link to the internet to download the xsi file to check the config XML file, I do not want to let the nodes link to the internet in my circumstance. Then I use the above code to create a IgniteContext instance , there is no need for the node to link to the internet.
vkulichenko vkulichenko
Reply | Threaded
Open this post in threaded view
|

Re: How to solve the 22 parameters' limit under scala 2.10 in the case class?

According to the trace you provided earlier, a different constructor was used to create the IgniteContext (the one that takes URL to XML file or the one that takes only SparkContext). Can you please double check you code? Probably you do this somewhere else?

If you have a small example that reproduces the issue, please share it with us and we will take a look.

-Val
F7753 F7753
Reply | Threaded
Open this post in threaded view
|

Re: How to solve the 22 parameters' limit under scala 2.10 in the case class?

Sorry vkulichenko, which one do you want to reproduce? I had made so many mistake here in this topic.
vkulichenko vkulichenko
Reply | Threaded
Open this post in threaded view
|

Re: How to solve the 22 parameters' limit under scala 2.10 in the case class?

Which issues still exist? As far as I understand, you get an XML parsing exception when starting Ignite. Is this correct?

In the code you provided I see lines like this:

val streamTableContext = new IgniteContext[String, source](ssc.sparkContext, "/home/test/config/default-config.xml")

This XML will be used to start a node and it looks like it's corrupted somehow (XML is not valid). If you are using a closure that creates IgniteConfiguration object, there is no way to get this exception since there are no XML files in play.

Please clarify what exactly is still not working for you and if possible provide the example that will to reproduce issues you have.

-Val
F7753 F7753
Reply | Threaded
Open this post in threaded view
|

Re: How to solve the 22 parameters' limit under scala 2.10 in the case class?

This post was updated on .
Hi Val
This issue happens whenever I use the XML file to initialize a IgniteContext while I can not link to the internet, the two way to avoid the issus on my cluster is to use a IgniteConfiguration to instead the XML file or to make sure the node has already linked to the internet.
I'm sure that the file is not corrupted since it runs well if the node's network is OK, BTW, the file is copied from the example folder and I haven't changed any thing of the file's header content(Only changed the beans).
vkulichenko vkulichenko
Reply | Threaded
Open this post in threaded view
|

Re: How to solve the 22 parameters' limit under scala 2.10 in the case class?

This sounds weird and I can't reproduce this regardless of Internet connection. Does everything work for you if you don't use Spring configs?

-Val
F7753 F7753
Reply | Threaded
Open this post in threaded view
|

Re: How to solve the 22 parameters' limit under scala 2.10 in the case class?

Hi Val,
Yes, it did very weird and I am very sure that the exception
----------------------------------------------------------------------------------------------------------
" nested exception is org.xml.sax.SAXParseException; systemId: http://www.springframework.org/schema/beans/spring-beans.xsd; lineNumber: 1; columnNumber: 166; Open quote is expected for attribute "language" associated with an  element type  "script".] "
----------------------------------------------------------------------------------------------------------
will be thrown when the network to the internet is shutdown. So I use the IgniteConfiguration to avoid this.
Now there is something wrong with my query, it seems that the schema of the cache did not work well in my use case, I don't think this is some side effect of the use of IgniteConfiguration.