SQL access via Spark does not work

classic Classic list List threaded Threaded
3 messages Options
Marco-2 Marco-2
Reply | Threaded
Open this post in threaded view
|

SQL access via Spark does not work

Hi,

I'm just trying out Apache Ignite and have issues when trying to access the cache by using  SQL in Spark.

I have a simple data container defined:

case class DataContainer(@QuerySqlField key: Int, @QuerySqlField value: String) extends Serializable{

  override def toString = s"Key$key, value $value"

}

Then I put some data into the cache via a Spark Batch app:
    val dataContainerArray = Array[DataContainer](new DataContainer(1, "T"), new DataContainer(2, "A"))
    val dataContainerRdd = sc.parallelize(dataContainerArray)
    val dataKeyValueRdd = dataContainerRdd.map(item => Pair(item.key, item))

    //configure the cache...
    val ccfg = new CacheConfiguration[Int, DataContainer]()
    ccfg.setIndexedTypes(Int.getClass, DataContainer.getClass)
    ccfg.setName("C2")

    val cacheRdd = igniteContext.fromCache(ccfg)
    cacheRdd.savePairs(dataKeyValueRdd, true)


Now, I am trying to read the cache from another Spark app

val igniteContext = new IgniteContext[Int, DataContainer](sc,
      () => new IgniteConfiguration())

    //get the cache as a rdd representation
    val cacheRdd = igniteContext.fromCache("C2")

    // get the entry for key = 1 (WORKS FINE)
    val result = cacheRdd.filter(_._1 == 1).collect()
    result.foreach(row => println(row._2))


    //no result :-(
    cacheRdd.sql("select * from C2.DATACONTAINER_").foreach(row=>println(row.toString()))

The access by the filter function(RDD means) works fine. Unfortunately, the SQL access does not return any results. Does anybody have an idea what I am making wrong here ?


Thanks,
Marco
alexey.goncharuk alexey.goncharuk
Reply | Threaded
Open this post in threaded view
|

Re: SQL access via Spark does not work

I have a few comments for the code you provided:

 - Since in DataContainer is a case class, you need to annotate fields as @(QuerySqlField @field) to make the value visible for indexing. Alternatively, you can use the @ScalarCacheQuerySqlField annotation from the ignite-scalar module (you will need to add the corresponding dependency). We will try to add a check to detect this case and print a proper warning.
 - The actual key type of the key that is put to the Ignite is java.lang.Integer, so the setIndexedTypes() call should look like this: ccfg.setIndexedTypes(classOf[Integer], classOf[DataContainer]) I will need to check if we can also automatically detect and correct such cases.
 - Not sure why you used DATACONTAINER_ table name (with the underscore at the end of the name). For me the table name was DATACONTAINER, as expected
 - Finally, the foreach() is executed on the remote spark executor because the DataFrame is a remote data structure. I used collect to make the output be printed on the driver: cacheRdd.sql("select * from C2.DATACONTAINER").collect().foreach(row => println(row.toString()))

After these corrections your code worked fine for me. Can you try this and check if it works for you?
Marco-2 Marco-2
Reply | Threaded
Open this post in threaded view
|

Re: SQL access via Spark does not work

Hi Alexey,

yes, it works now :). Thanks for your help.

BR Marco

2015-09-09 2:05 GMT+02:00 alexey.goncharuk <[hidden email]>:
I have a few comments for the code you provided:

 - Since in DataContainer is a case class, you need to annotate fields as
@(QuerySqlField @field) to make the value visible for indexing.
Alternatively, you can use the @ScalarCacheQuerySqlField annotation from the
ignite-scalar module (you will need to add the corresponding dependency). We
will try to add a check to detect this case and print a proper warning.
 - The actual key type of the key that is put to the Ignite is
java.lang.Integer, so the setIndexedTypes() call should look like this:
ccfg.setIndexedTypes(classOf[Integer], classOf[DataContainer]) I will need
to check if we can also automatically detect and correct such cases.
 - Not sure why you used DATACONTAINER_ table name (with the underscore at
the end of the name). For me the table name was DATACONTAINER, as expected
 - Finally, the foreach() is executed on the remote spark executor because
the DataFrame is a remote data structure. I used collect to make the output
be printed on the driver: cacheRdd.sql("select * from
C2.DATACONTAINER").collect().foreach(row => println(row.toString()))

After these corrections your code worked fine for me. Can you try this and
check if it works for you?



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/SQL-access-via-Spark-does-not-work-tp1279p1305.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.



--
Viele Grüße,
Marco