@QuerySqlField for collections

classic Classic list List threaded Threaded
4 messages Options
DmitryB DmitryB
Reply | Threaded
Open this post in threaded view
|

@QuerySqlField for collections

Hi team,

I found that fields with collection type are not properly handled by sql query;

here is my code:

import annotation.meta.field
import org.apache.ignite.cache.query.annotations.QuerySqlField

case class TRow1(
    @(QuerySqlField @field)(index = true) strP: String,
    @(QuerySqlField @field)(index = true) intP: Int,
    @(QuerySqlField @field)(index = true) doubleP: Double,
    @(QuerySqlField @field)(index = false) listP: List[String]    
) extends Serializable {}

// gen test data
val rdd = sc.parallelize(1 to 10).map(x => TRow1("str-"+x, x, x*10, (1 to x).map(x => "val-" + x).toList ))
val pair_rdd = rdd.map(r => (r.intP, r))

// prep cache
val igniteContext = new IgniteContext[Int, TRow1](sc, () => new IgniteConfiguration().setPeerClassLoadingEnabled(true), false)
val cacheCfg = new CacheConfiguration[Int, TRow1]()
cacheCfg.setName("cache01")
cacheCfg.setIndexedTypes(classOf[Int], classOf[TRow1])
val cache = igniteContext.fromCache(cacheCfg)
>> res6: org.apache.ignite.spark.IgniteRDD[Int,sandbox.TRow1] = IgniteRDD[4] at RDD at IgniteAbstractRDD.scala:27

// get results
scala> val count = cache.count
count: Long = 10

scala> cache.take(1)
res5: Array[(Int, sandbox.TRow1)] = Array((1,TRow1(str-1,1,10.0,List(val-1))))

scala> val result = cache.sql("select strP, intP, doubleP, listP from TRow1").take(1)
result: Array[org.apache.spark.sql.Row] = Array([str-1,1,10.0,[]])


So, field with collection type (List) is not returned back by sql query  



Vladimir Ozerov Vladimir Ozerov
Reply | Threaded
Open this post in threaded view
|

Re: @QuerySqlField for collections

Hi DmitryB,

Could you please provide the full class name of "List"? Is it java.util.List or some Scala collection? Can you try putting java.util.ArrayList and see if it works?

Vladimir.

On Fri, Mar 4, 2016 at 6:59 AM, DmitryB <[hidden email]> wrote:
Hi team,

I found that fields with collection type are not properly handled by sql
query;

here is my code:

import annotation.meta.field
import org.apache.ignite.cache.query.annotations.QuerySqlField

case class TRow1(
    @(QuerySqlField @field)(index = true) strP: String,
    @(QuerySqlField @field)(index = true) intP: Int,
    @(QuerySqlField @field)(index = true) doubleP: Double,
    @(QuerySqlField @field)(index = false) listP: List[String]
) extends Serializable {}

// gen test data
val rdd = sc.parallelize(1 to 10).map(x => TRow1("str-"+x, x, x*10, (1 to
x).map(x => "val-" + x).toList ))
val pair_rdd = rdd.map(r => (r.intP, r))

// prep cache
val igniteContext = new IgniteContext[Int, TRow1](sc, () => new
IgniteConfiguration().setPeerClassLoadingEnabled(true), false)
val cacheCfg = new CacheConfiguration[Int, TRow1]()
cacheCfg.setName("cache01")
cacheCfg.setIndexedTypes(classOf[Int], classOf[TRow1])
val cache = igniteContext.fromCache(cacheCfg)
>> res6: org.apache.ignite.spark.IgniteRDD[Int,sandbox.TRow1] = IgniteRDD[4]
>> at RDD at IgniteAbstractRDD.scala:27

// get results
scala> val count = cache.count
count: Long = 10

scala> cache.take(1)
res5: Array[(Int, sandbox.TRow1)] =
Array((1,TRow1(str-1,1,10.0,List(val-1))))

*scala> val result = cache.sql("select strP, intP, doubleP, listP from
TRow1").take(1)
result: Array[org.apache.spark.sql.Row] = Array([str-1,1,10.0,[]])*

*So, field with collection type (List) is not returned back by sql query  *







--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/QuerySqlField-for-collections-tp3364.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

DmitryB DmitryB
Reply | Threaded
Open this post in threaded view
|

Re: @QuerySqlField for collections

This post was updated on .
Hi Vladimir,

I'm using scala.collection.immutable.List[String], also tried with java.util.ArrayList[String], got following exception:

scala> val result = cache.sql("select t1.* from TRow1 as t1").count
16/03/07 03:23:09 WARN TaskMemoryManager: leak 8.3 MB memory from org.apache.spark.unsafe.map.BytesToBytesMap@bb96887
16/03/07 03:23:09 ERROR Executor: Managed memory leak detected; size = 8650752 bytes, TID = 1047
16/03/07 03:23:09 ERROR Executor: Exception in task 0.0 in stage 23.0 (TID 1047)
scala.MatchError: [] (of class java.util.ArrayList)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:255)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401)
        at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492)
        at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:505)
        at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:686)
        at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
        at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

16/03/07 03:23:09 ERROR TaskSetManager: Task 0 in stage 23.0 failed 1 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 23.0 failed 1 times, most recent failure: Lost task 0.0 in stage 23.0 (TID 1047, localhost): scala.MatchError: [] (of class java.util.ArrayList)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:255)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401)
        at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492)
        at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:505)
        at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:686)
        at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
        at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)


Regards,
Dmitry
Vladimir Ozerov Vladimir Ozerov
Reply | Threaded
Open this post in threaded view
|

Re: @QuerySqlField for collections

Hi Dmitry,

I tried to reproduce you problem in Java using the following code, but without success:

public class QueryListRunner {
    public static void main(String[] args) {
        IgniteConfiguration cfg = new IgniteConfiguration();
        CacheConfiguration<Integer, TRow1> ccfg = new CacheConfiguration<>();
        ccfg.setIndexedTypes(Integer.class, TRow1.class);
        cfg.setCacheConfiguration(ccfg);

        try (Ignite node = Ignition.start(cfg)) {
            IgniteCache<Integer, TRow1> cache = node.cache(null);

            TRow1 val = new TRow1();

            val.strP = "1";
            val.intP = 2;
            val.doubleP = 3d;
            val.listP = Collections.singletonList("4");

            cache.put(1, val);

            System.out.println("TAKEN FROM CACHE: " + cache.get(1).listP);

            Object queried = 
                cache.query(new SqlFieldsQuery("select strP, intP, doubleP, listP from TRow1")).getAll().get(0).get(3);

            System.out.println("TAKEN FROM QUERY: " + queried);
        }
    }
}

class TRow1 implements Serializable {
    @QuerySqlField(index = true) public String strP;
    @QuerySqlField(index = true) public int intP;
    @QuerySqlField(index = true) public Double doubleP;
    @QuerySqlField(index = false) public List<String> listP;
}

This leads me to assumption that this is a kind of Spark- or Scala-specific problem.

Andrey Gura, 
Could you please assist?

Vladimir.

On Mon, Mar 7, 2016 at 6:15 AM, DmitryB <[hidden email]> wrote:
Hi Vladimir,

i tried with java.util.ArrayList[String], got following exception:

scala> val result = cache.sql("select t1.* from TRow1 as t1").count
16/03/07 03:23:09 WARN TaskMemoryManager: leak 8.3 MB memory from
org.apache.spark.unsafe.map.BytesToBytesMap@bb96887
16/03/07 03:23:09 ERROR Executor: Managed memory leak detected; size =
8650752 bytes, TID = 1047
16/03/07 03:23:09 ERROR Executor: Exception in task 0.0 in stage 23.0 (TID
1047)
scala.MatchError: [] (of class java.util.ArrayList)
        at
org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:255)
        at
org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250)
        at
org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
        at
org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260)
        at
org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250)
        at
org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
        at
org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401)
        at
org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492)
        at
org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:505)
        at
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:686)
        at
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
        at
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
        at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

16/03/07 03:23:09 ERROR TaskSetManager: Task 0 in stage 23.0 failed 1 times;
aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 23.0 failed 1 times, most recent failure: Lost task 0.0 in stage 23.0
(TID 1047, localhost): scala.MatchError: [] (of class java.util.ArrayList)
        at
org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:255)
        at
org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250)
        at
org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
        at
org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260)
        at
org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250)
        at
org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
        at
org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401)
        at
org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492)
        at
org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:505)
        at
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:686)
        at
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
        at
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
        at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)


Regards,
Dmitry




--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/QuerySqlField-for-collections-tp3364p3381.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.