index and query org.apache.ignite.spark.IgniteRDD[String,org.apache.spark.sql.Row]

classic Classic list List threaded Threaded
12 messages Options
DmitryB DmitryB
Reply | Threaded
Open this post in threaded view
|

index and query org.apache.ignite.spark.IgniteRDD[String,org.apache.spark.sql.Row]

Hi team,

I try to save, index and query spark DataFrames with Ignite cache
this this my code:

import org.apache.spark.sql.Row
import org.apache.ignite.configuration._
import org.apache.ignite.spark.IgniteContext
import org.apache.ignite.configuration.CacheConfiguration
import sandbox._

// create rdd
val data = List[TRow1](TRow1("1", "test-1"), TRow1("2", "test-2"))
val df = sqlContext.createDataFrame[TRow1](data)
>> df: org.apache.spark.sql.DataFrame = [key: string, name: string]

// create ignite context (embeded mode)
val igniteContext = new IgniteContext[String, Row](sc, () => new IgniteConfiguration(), false)

// cache config
val cacheCfg = new CacheConfiguration[String, Row]()
cacheCfg.setName("cache01")
cacheCfg.setIndexedTypes(classOf[String], classOf[Row])

// ignite cache
val cache = igniteContext.fromCache(cacheCfg)
>> cache: org.apache.ignite.spark.IgniteRDD[String,org.apache.spark.sql.Row] = IgniteRDD[1] at RDD at IgniteAbstractRDD.scala:27

// df rdd save to cache
val df_rdd = df.rdd.map(r => (r.getAs[String](0), r))
>> df_rdd: org.apache.spark.rdd.RDD[(String, org.apache.spark.sql.Row)] = MapPartitionsRDD[4] at map at <console>:38
cache.savePairs(df_rdd)

// query
val c = cache.count
>> res3: Long = 2

val result = cache.sql("select * from Row").collect
>> result: Array[org.apache.spark.sql.Row] = Array()

Unfortunately, running SQL query i don't get any result;

Could you plz recommend the correct way for storing, indexing and querying sql.Rows with ignite cache

Best regards,
Dmitry


agura-2 agura-2
Reply | Threaded
Open this post in threaded view
|

Re: index and query org.apache.ignite.spark.IgniteRDD[String,org.apache.spark.sql.Row]

CONTENTS DELETED
The author has deleted this message.
DmitryB DmitryB
Reply | Threaded
Open this post in threaded view
|

Re: index and query org.apache.ignite.spark.IgniteRDD[String,org.apache.spark.sql.Row]

Hi Andrey,

Thanks a lots for your help.
Unfortunately, i can not use case classes, because a schema information is only available at runtime;
to make it more clear let me add more details. suppose that i have a very big data set (~500 Tb) which is stored in AWS s3 in a parquet format; Using spark, i can process (filter + join) it and reduce size down to ~200 -500 Gb; resulted dataset i would like to save in ignite cache using IgniteRdd and create indexes for a particular set of fields which will be used later for running queries (filter, join, aggregations); My assumption is that having this result dataset in ignite + indexes would help to improve the performance comparing to using spark DataFrame (persisted);
Unfortunately, the resulted dataset schema can vary with great number of variations; So, it seems impossible to describe all of them with case classes;
This is why an approach to store spark.sql.row + describe query fields and indexes using QueryEntity would be preferable;
Thanks to your explanation, i see that this approach doesn't works;
Another solutions that is spinning in my head is to generate case classes dynamically (at runtime) based on spark data frame schema, then map sql.rows to RDD[generated_case_class], describe ignite query and index fields using QueryEntity, create IgniteContext for generated case class; Im not sure that this approach is even possible, so i would like to ask for your opinion before i go deeper;
Will be very grateful for advice

Best regards,
Dmitry





agura-2 agura-2
Reply | Threaded
Open this post in threaded view
|

Re: index and query org.apache.ignite.spark.IgniteRDD[String,org.apache.spark.sql.Row]

CONTENTS DELETED
The author has deleted this message.
alexey.goncharuk alexey.goncharuk
Reply | Threaded
Open this post in threaded view
|

Re: index and query org.apache.ignite.spark.IgniteRDD[String,org.apache.spark.sql.Row]

Yep, BinaryObjectBuilder should definitely be a solution for this. You can obtain an instance of Ignite from IgniteContext and use the IgniteBinary interface to get an instance of BinaryObjectBuilder to build object structures dynamically. And you can use QueryEntity class to describe the index configuration for the binary objects being stored in cache. Once the binary object is built, you can save it to Ignite using the same savePairs() method.

I am surprised we do not have any examples for BinaryObjectBuilder, however, you can take a look at the CacheJdbcPojoStore source code [1] to get an idea how the builder works. You can also take a look at the documentation [2].

Hope this helps!

DmitryB DmitryB
Reply | Threaded
Open this post in threaded view
|

Re: index and query org.apache.ignite.spark.IgniteRDD[String,org.apache.spark.sql.Row]

Hi guys,

Thanks for pointing me to the right direction; Provided links about binary marshaller was quite useful;
Unfortunately i was not able to achieve my final goal; I get a GridDhtPartitionsExchangeFuture when i call cache.savePairs method; Here is a full stack:

scala> cache.savePairs(pairRdd)
16/03/07 05:03:09 ERROR GridDhtPartitionsExchangeFuture: Failed to reinitialize local partitions (preloading will be stopped): GridDhtPartitionExchangeId [topVer=AffinityTopologyVersion [topVer=1, minorTopVer=1], nodeId=3bbda0cf, evt=DISCOVERY_CUSTOM_EVT]
class org.apache.ignite.IgniteCheckedException: Failed to initialize property 'id' for key class 'class java.lang.Object' and value class 'interface org.apache.ignite.binary.BinaryObject'. Make sure that one of these classes contains respective getter method or field.
        at org.apache.ignite.internal.processors.query.GridQueryProcessor.buildClassProperty(GridQueryProcessor.java:1638)
        at org.apache.ignite.internal.processors.query.GridQueryProcessor.processClassMeta(GridQueryProcessor.java:1517)
        at org.apache.ignite.internal.processors.query.GridQueryProcessor.initializeCache(GridQueryProcessor.java:279)
        at org.apache.ignite.internal.processors.query.GridQueryProcessor.onCacheStart(GridQueryProcessor.java:462)
        at org.apache.ignite.internal.processors.cache.GridCacheProcessor.startCache(GridCacheProcessor.java:1039)
        at org.apache.ignite.internal.processors.cache.GridCacheProcessor.prepareCacheStart(GridCacheProcessor.java:1649)
        at org.apache.ignite.internal.processors.cache.GridCacheProcessor.prepareCachesStart(GridCacheProcessor.java:1564)
        at org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.startCaches(GridDhtPartitionsExchangeFuture.java:961)
        at org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.init(GridDhtPartitionsExchangeFuture.java:524)
        at org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$ExchangeWorker.body(GridCachePartitionExchangeManager.java:1297)
        at org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:110)
        at java.lang.Thread.run(Thread.java:745)
16/03/07 05:03:09 ERROR GridCachePartitionExchangeManager: Runtime error caught during grid runnable execution: GridWorker [name=partition-exchanger, gridName=null, finished=false, isCancelled=false, hashCode=628489503, interrupted=false, runner=exchange-worker-#47%null%]
java.lang.NullPointerException
        at org.apache.ignite.internal.processors.cache.GridCacheProcessor.onExchangeDone(GridCacheProcessor.java:1724)
        at org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.onDone(GridDhtPartitionsExchangeFuture.java:1114)
        at org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.onDone(GridDhtPartitionsExchangeFuture.java:88)
        at org.apache.ignite.internal.util.future.GridFutureAdapter.onDone(GridFutureAdapter.java:336)
        at org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.init(GridDhtPartitionsExchangeFuture.java:878)
        at org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$ExchangeWorker.body(GridCachePartitionExchangeManager.java:1297)
        at org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:110)
        at java.lang.Thread.run(Thread.java:745)
[Stage 2:>                                                          (0 + 2) / 2]


Code example:

import scala.util.{Try}
import org.apache.ignite.configuration._
import org.apache.ignite.spark.IgniteContext
import org.apache.ignite.configuration.CacheConfiguration
import org.apache.ignite.binary.BinaryObject
import org.apache.ignite.binary.BinaryObjectBuilder
import org.apache.ignite.cache._
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import java.util.LinkedHashMap
import scala.collection.mutable.LinkedHashMap


val queryEntity = new QueryEntity()
queryEntity.setKeyType(classOf[Long].getName)
queryEntity.setValueType(classOf[BinaryObject].getName)

val fields = new java.util.LinkedHashMap[String, String]()
fields.put("id", classOf[String].getName)
fields.put("name", classOf[String].getName)

queryEntity.setFields(fields)
queryEntity.setIndexes(List(new QueryIndex("id"), new QueryIndex("name")).asJava)


val cacheCfg = new CacheConfiguration[Long, BinaryObject]()
cacheCfg.setName("cache01")
cacheCfg.setQueryEntities(List(queryEntity).asJava)

val igniteContext = new IgniteContext[Long, BinaryObject](sc, () => new IgniteConfiguration(), false)

val cache = igniteContext.fromCache(cacheCfg)
>>cache: org.apache.ignite.spark.IgniteRDD[Long,org.apache.ignite.binary.BinaryObject] = IgniteRDD[1] at RDD at IgniteAbstractRDD.scala:27


scala> val rdd = sc.parallelize(1 to 5)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:47

scala> val pairRdd = rdd.map(x => {
     |     val builder = igniteContext.ignite.binary.builder("DT1")
     |     builder.setField("id", x.toString)
     |     builder.setField("name", "test-" + x.toString)
     |     val binObj = builder.build
     |     binObj
     | }).zipWithIndex.map(r => (r._2, r._1))
pairRdd: org.apache.spark.rdd.RDD[(Long, org.apache.ignite.binary.BinaryObject)] = MapPartitionsRDD[5] at map at <console>:57

cache.savePairs(pairRdd)


Probably you have any ideas which can help me to overcome this problem;

Thank you and best regards,
Dmitry




alexey.goncharuk alexey.goncharuk
Reply | Threaded
Open this post in threaded view
|

Re: index and query org.apache.ignite.spark.IgniteRDD[String,org.apache.spark.sql.Row]

Dmitriy,

You should have used the same entity name in QueryEntity as the one you used when creating a builder, i.e.
queryEntity.setValueType("DT1")
because you can have multiple value types stored in one cache.

I will create a ticket to throw a proper exception when BinaryObject is used in query entity configuration, it seems like a usability issue for me.
Let us know if this solves your issue.

​--AG
DmitryB DmitryB
Reply | Threaded
Open this post in threaded view
|

Re: index and query org.apache.ignite.spark.IgniteRDD[String,org.apache.spark.sql.Row]

Hi Alexey,

Thanks for advice, with queryEntity.setValueType("DT1") i can save pairs to cache, but i get another exception when i try to get my data back:

scala> cache.count
[Stage 3:>                                                       (0 + 0) / 1024]16/03/08 01:36:24 ERROR Executor: Exception in task 1.0 in stage 3.0 (TID 5)
javax.cache.CacheException: class org.apache.ignite.IgniteCheckedException: Failed to read class name from file [id=99745, file=/usr/ignite/work/marshaller/99745.classname]
        at org.apache.ignite.internal.processors.cache.GridCacheUtils.convertToCacheException(GridCacheUtils.java:1597)
        at org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter$CacheQueryFallbackFuture.retryIfPossible(GridCacheQueryAdapter.java:700)
        at org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter$CacheQueryFallbackFuture.next(GridCacheQueryAdapter.java:670)
        at org.apache.ignite.internal.processors.cache.IgniteCacheProxy$5.onHasNext(IgniteCacheProxy.java:529)
        at org.apache.ignite.internal.util.GridCloseableIteratorAdapter.hasNextX(GridCloseableIteratorAdapter.java:53)
        at org.apache.ignite.internal.util.lang.GridIteratorAdapter.hasNext(GridIteratorAdapter.java:45)
        at org.apache.ignite.spark.impl.IgniteQueryIterator.hasNext(IgniteQueryIterator.scala:24)
        at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1595)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1143)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1143)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)


I have created docker images (spark 1.6 +ignite 1.6), probably you can have a quick look
docker run -it dmitryb/ignite-spark:0.0.1

I build ignite from https://git-wip-us.apache.org/repos/asf/ignite using scala 2.10
mvn clean package -DskipTests -Dscala-2.10


Regards,
Dmitry
alexey.goncharuk alexey.goncharuk
Reply | Threaded
Open this post in threaded view
|

Re: index and query org.apache.ignite.spark.IgniteRDD[String,org.apache.spark.sql.Row]

It looks like we are missing an option to tell IgniteRDD to work with binary objects. When an iterator is created, it tries to deserialize objects, and since you do not have a corresponding class, the exception occurs. I will create a ticket for this shortly.

Despite this, you should still be able to query your data using IgniteSQL queries or Spark Data Frame SQL. Can you give it a shot?
DmitryB DmitryB
Reply | Threaded
Open this post in threaded view
|

Re: index and query org.apache.ignite.spark.IgniteRDD[String,org.apache.spark.sql.Row]

Spark SQL queries has the same problem
vkulichenko vkulichenko
Reply | Threaded
Open this post in threaded view
|

Re: index and query org.apache.ignite.spark.IgniteRDD[String,org.apache.spark.sql.Row]

Dmitry,

What query do you execute? Note that if you try 'select * from ...', it will fail, because Ignite adds predefined _KEY and _VAL fields to each table. They reference key and value objects which will be deserialized here as well. Can you try selecting only individual primitive fields? Will this work?

-Val
vkulichenko vkulichenko
Reply | Threaded
Open this post in threaded view
|

Re: index and query org.apache.ignite.spark.IgniteRDD[String,org.apache.spark.sql.Row]

P.S. Here is the ticket for keepBinary flag in IgniteRDD: https://issues.apache.org/jira/browse/IGNITE-2821