A question of ignite on spark streaming

classic Classic list List threaded Threaded
7 messages Options
F7753 F7753
Reply | Threaded
Open this post in threaded view
|

A question of ignite on spark streaming

This post has NOT been accepted by the mailing list yet.
This post was updated on .
I want to use two igniteRDD to cache spark dataFrames: one is a static table,while the other is a stream table which the data comes from kafka or socket and etc.
My question is: the IgniteContext has only two parameters so that I can only create one cache. How to use ignite to accelerate spark streaming in my use case?
Below are the details:

Here is the static small table ,the data is loaded from HDFS
-------------------------------------------------------------------------------------------------
val smallTableDF = sqlCtx.createDataFrame(smallTableData.rdd, classOf[zb_test])

// create ignite context (embeded mode)
val igniteContext = new IgniteContext[String, staticTb](ssc.sparkContext, () => new
    IgniteConfiguration().setPeerClassLoadingEnabled(true), false)

// small table cache config
val smallTableCacheCfg = new CacheConfiguration[String, staticTb]()
smallTableCacheCfg.setName("cache01")
smallTableCacheCfg.setIndexedTypes(classOf[String], classOf[staticTb]) // table has "staticTb" name
smallTableCacheCfg.setCacheMode(CacheMode.REPLICATED)
--------------------------------------------------------------------------------------------------
This is the stream table , the data comes from socket
--------------------------------------------------------------------------------------------------
val sourceTableCacheCfg = new CacheConfiguration[String, streamTb]()
sourceTableCacheCfg.setName("cache02")
sourceTableCacheCfg.setIndexedTypes(classOf[String], classOf[streamTb]) // table has "streamTb" name
sourceTableCacheCfg.setCacheMode(CacheMode.PARTITIONED)

ignite.addCacheConfiguration(sourceTableCacheCfg)
-------------------------------------------------------------------------------------------------
I want to write a join sql to get some result, But I can not create the second cache
-------------------------------------------------------------------------------------------------
// ignite source table cache
val sourceTableCache = igniteContext.fromCache(sourceTableCacheCfg)
-------------------------------------------------------------------------------------------------

my sql is like:
-------------------------------------------------------------------------------------------------
select b.field0,b.field1,  count(*)
 from
   a, b
 where
   a.id=b.pz_id
 group by
   b.feild0, b.field1
-------------------------------------------------------------------------------------------------
Warmest Regards
vkulichenko vkulichenko
Reply | Threaded
Open this post in threaded view
|

Re: A question of ignite on spark streaming

Hi,

Can you please properly subscribe to the mailing list so that the community can receive email notifications? Here is the instruction: http://apache-ignite-users.70518.x6.nabble.com/mailing_list/MailingListOptions.jtp?forum=1

F7753 wrote
I want to use two igniteRDD to cache spark dataFrames: one is a static table,while the other is a stream table which the data comes from kafka or socket and etc.
My question is: the IgniteContext has only two parameters so that I can only create one cache. How to use ignite to accelerate spark streaming in my use case?
Looks like type IgniteContext was parameterized by mistake, it doesn't make much sense to me. I created a ticket to fix this: https://issues.apache.org/jira/browse/IGNITE-2929

As a workaround you can create two contexts with different type parameters. They will reuse the same Ignite instance and also will allow you to create two caches.

-Val
F7753 F7753
Reply | Threaded
Open this post in threaded view
|

Re: A question of ignite on spark streaming

In reply to this post by F7753
Thanks for your advice ,that helps me a lot.I've received a mail list below, is there something wrong?
---------------------------The mail------------------------------------------------------
Hi. This is the qmail-send program at apache.org.
I'm afraid I wasn't able to deliver your message to the following addresses.
This is a permanent error; I've given up. Sorry it didn't work out.

<user@ignite.apache.org>:
Must be sent from an @apache.org address or a subscriber address or an address in LDAP.

--- Below this line is a copy of the message.

Return-Path: <mabiaocsu@foxmail.com>
Received: (qmail 17052 invoked by uid 99); 31 Mar 2016 10:18:36 -0000
Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142)
    by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 31 Mar 2016 10:18:36 +0000
Received: from localhost (localhost [127.0.0.1])
by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id DC2ADC0222
for <user@ignite.apache.org>; Thu, 31 Mar 2016 10:18:35 +0000 (UTC)
X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org
X-Spam-Flag: NO
X-Spam-Score: 1.772
X-Spam-Level: *
X-Spam-Status: No, score=1.772 tagged_above=-999 required=6.31
tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_NONE=-0.0001,
SPF_SOFTFAIL=0.972] autolearn=disabled
Received: from mx2-lw-eu.apache.org ([10.40.0.8])
by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024)
with ESMTP id 1WFzTO9CbMCJ for <user@ignite.apache.org>;
Thu, 31 Mar 2016 10:18:34 +0000 (UTC)
Received: from mbob.nabble.com (mbob.nabble.com [162.253.133.15])
by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with ESMTP id 1FCDE5FAD9
for <user@ignite.apache.org>; Thu, 31 Mar 2016 10:18:34 +0000 (UTC)
Received: from malf.nabble.com (unknown [162.253.133.59])
by mbob.nabble.com (Postfix) with ESMTP id 1CA3F24242A1
for <user@ignite.apache.org>; Thu, 31 Mar 2016 03:07:13 -0700 (PDT)
Date: Thu, 31 Mar 2016 03:05:10 -0700 (PDT)
From: F7753 <mabiaocsu@foxmail.com>
To: user@ignite.apache.org
Message-ID: <1459418710613-3815.post@n6.nabble.com>
Subject: How to use ignite on spark streaming to join two tables in ignite
 cache?
MIME-Version: 1.0
Content-Type: text/plain; charset=us-ascii
Content-Transfer-Encoding: 7bit

I want to use two igniteRDD to cache dataFrames: one is a static table, and
the other is a stream table which the data comes from kafka or socket and
etc.
But the IgniteContext has only two parameters so that I can only create one
cache. Below are the details:

This is the static table ,the data has already known
-------------------------------------------------------------------------------------------------
val smallTableDF = sqlCtx.createDataFrame(smallTableData.rdd,
classOf[zb_test])

// create ignite context (embeded mode)
val igniteContext = new IgniteContext[String, staticTb](ssc.sparkContext, ()
=> new
    IgniteConfiguration().setPeerClassLoadingEnabled(true), false)

// small table cache config
val smallTableCacheCfg = new CacheConfiguration[String, staticTb]()
smallTableCacheCfg.setName("cache01")
smallTableCacheCfg.setIndexedTypes(classOf[String], classOf[staticTb]) //
table has "staticTb" name
smallTableCacheCfg.setCacheMode(CacheMode.REPLICATED)
--------------------------------------------------------------------------------------------------
This is the stream table , the data comes from socket
--------------------------------------------------------------------------------------------------
val sourceTableCacheCfg = new CacheConfiguration[String, streamTb]()
sourceTableCacheCfg.setName("cache02")
sourceTableCacheCfg.setIndexedTypes(classOf[String], classOf[streamTb]) //
table has "streamTb" name
sourceTableCacheCfg.setCacheMode(CacheMode.PARTITIONED)

ignite.addCacheConfiguration(sourceTableCacheCfg)
-------------------------------------------------------------------------------------------------
I want to write a join sql to get some result, But I can not create the
second cache
-------------------------------------------------------------------------------------------------
// ignite source table cache
val sourceTableCache = igniteContext.fromCache(sourceTableCacheCfg)
-------------------------------------------------------------------------------------------------

Warmest Regards




--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/How-to-use-ignite-on-spark-streaming-to-join-two-tables-in-ignite-cache-tp3815.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.
vkulichenko vkulichenko
Reply | Threaded
Open this post in threaded view
|

Re: A question of ignite on spark streaming

Don't know. I see you're subscribed now, so I think it can be ignored.

-Val
F7753 F7753
Reply | Threaded
Open this post in threaded view
|

Re: A question of ignite on spark streaming

In reply to this post by F7753
Here comes another question :

If  I use two IgniteContext in my Spark Streaming application, since the IgniteContext.fromCache() method returns the IgniteRDD, which does not have a  query( SqlFieldQuery) method to apply cross query sql. What shoud I do? Implement query method in IgniteRDD or some way else?

Thanks a lot
vkulichenko vkulichenko
Reply | Threaded
Open this post in threaded view
|

Re: A question of ignite on spark streaming

IgniteRDD has the 'sql' method which does exactly what you're looking for - redirects to SqlFieldQuery.

-Val
F7753 F7753
Reply | Threaded
Open this post in threaded view
|

Re: A question of ignite on spark streaming

vkulichenko:
    Thanks for your carefulness, I put the query string into the 'sql' method, hope it well works well, but when I compile this maven project, I meet another problem and post another question in this forum :THE LINK