Apache Spark + Ignite Connection Issue

classic Classic list List threaded Threaded
10 messages Options
sri hari kali charan Tummala sri hari kali charan Tummala
Reply | Threaded
Open this post in threaded view
|

Apache Spark + Ignite Connection Issue

Hi Community, 

I am trying to read and write into the Ignite cluster using apache-spark I am able to do that using JDBC thin client but not native method as mentioned in several spark + ignite examples.

Right now all the spark + ignite examples launch a local ignite cluster but I want my code connecting to already existing cluster (client).

Question:-
How to pass Ignite connection ip and port (10800)  10800 in example-default.xml ?

Error:-
TcpDiscoverySpi: Failed to connect to any address from IP finder (will retry to join topology every 2000 ms; change 'reconnectDelay' to configure the frequency of retries): [/3.88.248.113:10800]

Working (Spark + Ignite using JDBC):-
val df = spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
//.option("driver", "org.apache.ignite.IgniteJdbcDriver")
.option("dbtable", "Person").load()

df.printSchema()

df.createOrReplaceTempView("test")

spark.sql("select * from test where id=1").show(10)

spark.sql("select 4,'blah',124232").show(10)

import java.sql.DriverManager
val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")

import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")

spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113",
"Person",connectionProperties)

spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
.option("dbtable", "Person").load().show(10,false)

Not Working requires a CONFIG file which is example-default.xml:-
val igniteDF = spark.read
.format(FORMAT_IGNITE) //Data source type.
.option(OPTION_TABLE, "person") //Table to read.
.option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
.load()
.filter(col("id") >= 2) //Filter clause.
.filter(col("name") like "%J%") //Another filter clause.

Full Code:- (sparkDSLExample) function fails to connect ignite cluster which I already have
package com.ignite.examples.spark

import com.ignite.examples.model.Address
import org.apache.ignite.{Ignite, Ignition}
import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.client.{ClientCache, IgniteClient}
import org.apache.ignite.configuration.{CacheConfiguration, ClientConfiguration}
import java.lang.{Long => JLong, String => JString}

import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_TABLE}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.functions.col

object SparkClientConnectionTest {

private val CACHE_NAME = "SparkCache"

private val CONFIG = "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"

def setupExampleData = {

val cfg2 = new ClientConfiguration().setAddresses("3.88.248.113:10800")
val igniteClient:IgniteClient = Ignition.startClient(cfg2)

System.out.format(">>> Created cache [%s].\n", CACHE_NAME)

val cache:ClientCache[Integer, Address] = igniteClient.getOrCreateCache(CACHE_NAME)

cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS Person"))
.setSchema("PUBLIC")).getAll

cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH \"VALUE_TYPE=%s\"", classOf[Address].getName))
.setSchema("PUBLIC")).getAll

cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", "04074").setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", "520003").setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", "1234").setSchema("PUBLIC")).getAll

System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)

val data=cache.query(new SqlFieldsQuery("select * from Person").setSchema("PUBLIC")).getAll

println(data.toString)
}

def sparkDSLExample(implicit spark: SparkSession): Unit = {
println("Querying using Spark DSL.")
println


val igniteDF = spark.read
.format(FORMAT_IGNITE) //Data source type.
.option(OPTION_TABLE, "person") //Table to read.
.option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
.load()
.filter(col("id") >= 2) //Filter clause.
.filter(col("name") like "%J%") //Another filter clause.

println("Data frame schema:")

igniteDF.printSchema() //Printing query schema to console.

println("Data frame content:")

igniteDF.show() //Printing query results to console.
}


def main(args: Array[String]): Unit = {

setupExampleData

//Creating spark session.
implicit val spark = SparkSession.builder()
.appName("Spark Ignite data sources example")
.master("local")
.config("spark.executor.instances", "2")
.getOrCreate()

// Adjust the logger to exclude the logs of no interest.
Logger.getRootLogger.setLevel(Level.ERROR)
Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)

//sparkDSLExample


val df = spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
//.option("driver", "org.apache.ignite.IgniteJdbcDriver")
.option("dbtable", "Person").load()

df.printSchema()

df.createOrReplaceTempView("test")

spark.sql("select * from test where id=1").show(10)

spark.sql("select 4,'blah',124232").show(10)

import java.sql.DriverManager
val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")

import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")

spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113",
"Person",connectionProperties)

spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
.option("dbtable", "Person").load().show(10,false)

}

}

example-default.xml:-

<?xml version="1.0" encoding="UTF-8"?>

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<!--
Ignite configuration with all defaults and enabled p2p deployment and enabled events.
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
<bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<!-- Set to true to enable distributed class loading for examples, default is false. -->
<property name="peerClassLoadingEnabled" value="true"/>

<!-- Enable task execution events for examples. -->
<property name="includeEventTypes">
<list>
<!--Task execution events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>

<!--Cache events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
</list>
</property>

<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<!--
Ignite provides several options for automatic discovery that can be used
instead os static IP based discovery. For information on all options refer
to our documentation: http://apacheignite.readme.io/docs/cluster-config
-->
<!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
<!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>3.88.248.113:10800</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>



--
Thanks & Regards
Sri Tummala

stephendarlington stephendarlington
Reply | Threaded
Open this post in threaded view
|

Re: Apache Spark + Ignite Connection Issue

You’re trying to connect a thick client (the Spark integration) to the thin client port (10800). Your example-default.xml file needs to have the same configuration as your server node(s).

Regards,
Stephen

On 17 Oct 2019, at 18:12, sri hari kali charan Tummala <[hidden email]> wrote:

Hi Community, 

I am trying to read and write into the Ignite cluster using apache-spark I am able to do that using JDBC thin client but not native method as mentioned in several spark + ignite examples.

Right now all the spark + ignite examples launch a local ignite cluster but I want my code connecting to already existing cluster (client).

Question:-
How to pass Ignite connection ip and port (10800)  10800 in example-default.xml ?

Error:-
TcpDiscoverySpi: Failed to connect to any address from IP finder (will retry to join topology every 2000 ms; change 'reconnectDelay' to configure the frequency of retries): [/3.88.248.113:10800]

Working (Spark + Ignite using JDBC):-
val df = spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
//.option("driver", "org.apache.ignite.IgniteJdbcDriver")
.option("dbtable", "Person").load()

df.printSchema()

df.createOrReplaceTempView("test")

spark.sql("select * from test where id=1").show(10)

spark.sql("select 4,'blah',124232").show(10)

import java.sql.DriverManager
val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")

import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")

spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113",
"Person",connectionProperties)

spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
.option("dbtable", "Person").load().show(10,false)

Not Working requires a CONFIG file which is example-default.xml:-
val igniteDF = spark.read
.format(FORMAT_IGNITE) //Data source type.
.option(OPTION_TABLE, "person") //Table to read.
.option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
.load()
.filter(col("id") >= 2) //Filter clause.
.filter(col("name") like "%J%") //Another filter clause.

Full Code:- (sparkDSLExample) function fails to connect ignite cluster which I already have
package com.ignite.examples.spark

import com.ignite.examples.model.Address
import org.apache.ignite.{Ignite, Ignition}
import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.client.{ClientCache, IgniteClient}
import org.apache.ignite.configuration.{CacheConfiguration, ClientConfiguration}
import java.lang.{Long => JLong, String => JString}

import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_TABLE}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.functions.col

object SparkClientConnectionTest {

private val CACHE_NAME = "SparkCache"

private val CONFIG = "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"

def setupExampleData = {

val cfg2 = new ClientConfiguration().setAddresses("3.88.248.113:10800")
val igniteClient:IgniteClient = Ignition.startClient(cfg2)

System.out.format(">>> Created cache [%s].\n", CACHE_NAME)

val cache:ClientCache[Integer, Address] = igniteClient.getOrCreateCache(CACHE_NAME)

cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS Person"))
.setSchema("PUBLIC")).getAll

cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH \"VALUE_TYPE=%s\"", classOf[Address].getName))
.setSchema("PUBLIC")).getAll

cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", "04074").setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", "520003").setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", "1234").setSchema("PUBLIC")).getAll

System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)

val data=cache.query(new SqlFieldsQuery("select * from Person").setSchema("PUBLIC")).getAll

println(data.toString)
}

def sparkDSLExample(implicit spark: SparkSession): Unit = {
println("Querying using Spark DSL.")
println


val igniteDF = spark.read
.format(FORMAT_IGNITE) //Data source type.
.option(OPTION_TABLE, "person") //Table to read.
.option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
.load()
.filter(col("id") >= 2) //Filter clause.
.filter(col("name") like "%J%") //Another filter clause.

println("Data frame schema:")

igniteDF.printSchema() //Printing query schema to console.

println("Data frame content:")

igniteDF.show() //Printing query results to console.
}


def main(args: Array[String]): Unit = {

setupExampleData

//Creating spark session.
implicit val spark = SparkSession.builder()
.appName("Spark Ignite data sources example")
.master("local")
.config("spark.executor.instances", "2")
.getOrCreate()

// Adjust the logger to exclude the logs of no interest.
Logger.getRootLogger.setLevel(Level.ERROR)
Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)

//sparkDSLExample


val df = spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
//.option("driver", "org.apache.ignite.IgniteJdbcDriver")
.option("dbtable", "Person").load()

df.printSchema()

df.createOrReplaceTempView("test")

spark.sql("select * from test where id=1").show(10)

spark.sql("select 4,'blah',124232").show(10)

import java.sql.DriverManager
val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")

import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")

spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113",
"Person",connectionProperties)

spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
.option("dbtable", "Person").load().show(10,false)

}

}

example-default.xml:-

<?xml version="1.0" encoding="UTF-8"?>

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<!--
Ignite configuration with all defaults and enabled p2p deployment and enabled events.
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
<bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<!-- Set to true to enable distributed class loading for examples, default is false. -->
<property name="peerClassLoadingEnabled" value="true"/>

<!-- Enable task execution events for examples. -->
<property name="includeEventTypes">
<list>
<!--Task execution events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>

<!--Cache events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
</list>
</property>

<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<!--
Ignite provides several options for automatic discovery that can be used
instead os static IP based discovery. For information on all options refer
to our documentation: http://apacheignite.readme.io/docs/cluster-config
-->
<!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
<!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>3.88.248.113:10800</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>



--
Thanks & Regards
Sri Tummala



sri hari kali charan Tummala sri hari kali charan Tummala
Reply | Threaded
Open this post in threaded view
|

Re: Apache Spark + Ignite Connection Issue

Hi Stephen , 

 do you mean 3.88.248.113:47500..47700 something like this? or just public ip 3.88.248.113 I tried all the possibilities none of them are getting connected.

Thanks
Sri 

On Fri, Oct 18, 2019 at 6:02 AM Stephen Darlington <[hidden email]> wrote:
You’re trying to connect a thick client (the Spark integration) to the thin client port (10800). Your example-default.xml file needs to have the same configuration as your server node(s).

Regards,
Stephen

On 17 Oct 2019, at 18:12, sri hari kali charan Tummala <[hidden email]> wrote:

Hi Community, 

I am trying to read and write into the Ignite cluster using apache-spark I am able to do that using JDBC thin client but not native method as mentioned in several spark + ignite examples.

Right now all the spark + ignite examples launch a local ignite cluster but I want my code connecting to already existing cluster (client).

Question:-
How to pass Ignite connection ip and port (10800)  10800 in example-default.xml ?

Error:-
TcpDiscoverySpi: Failed to connect to any address from IP finder (will retry to join topology every 2000 ms; change 'reconnectDelay' to configure the frequency of retries): [/3.88.248.113:10800]

Working (Spark + Ignite using JDBC):-
val df = spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
//.option("driver", "org.apache.ignite.IgniteJdbcDriver")
.option("dbtable", "Person").load()

df.printSchema()

df.createOrReplaceTempView("test")

spark.sql("select * from test where id=1").show(10)

spark.sql("select 4,'blah',124232").show(10)

import java.sql.DriverManager
val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")

import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")

spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113",
"Person",connectionProperties)

spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
.option("dbtable", "Person").load().show(10,false)

Not Working requires a CONFIG file which is example-default.xml:-
val igniteDF = spark.read
.format(FORMAT_IGNITE) //Data source type.
.option(OPTION_TABLE, "person") //Table to read.
.option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
.load()
.filter(col("id") >= 2) //Filter clause.
.filter(col("name") like "%J%") //Another filter clause.

Full Code:- (sparkDSLExample) function fails to connect ignite cluster which I already have
package com.ignite.examples.spark

import com.ignite.examples.model.Address
import org.apache.ignite.{Ignite, Ignition}
import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.client.{ClientCache, IgniteClient}
import org.apache.ignite.configuration.{CacheConfiguration, ClientConfiguration}
import java.lang.{Long => JLong, String => JString}

import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_TABLE}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.functions.col

object SparkClientConnectionTest {

private val CACHE_NAME = "SparkCache"

private val CONFIG = "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"

def setupExampleData = {

val cfg2 = new ClientConfiguration().setAddresses("3.88.248.113:10800")
val igniteClient:IgniteClient = Ignition.startClient(cfg2)

System.out.format(">>> Created cache [%s].\n", CACHE_NAME)

val cache:ClientCache[Integer, Address] = igniteClient.getOrCreateCache(CACHE_NAME)

cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS Person"))
.setSchema("PUBLIC")).getAll

cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH \"VALUE_TYPE=%s\"", classOf[Address].getName))
.setSchema("PUBLIC")).getAll

cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", "04074").setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", "520003").setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", "1234").setSchema("PUBLIC")).getAll

System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)

val data=cache.query(new SqlFieldsQuery("select * from Person").setSchema("PUBLIC")).getAll

println(data.toString)
}

def sparkDSLExample(implicit spark: SparkSession): Unit = {
println("Querying using Spark DSL.")
println


val igniteDF = spark.read
.format(FORMAT_IGNITE) //Data source type.
.option(OPTION_TABLE, "person") //Table to read.
.option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
.load()
.filter(col("id") >= 2) //Filter clause.
.filter(col("name") like "%J%") //Another filter clause.

println("Data frame schema:")

igniteDF.printSchema() //Printing query schema to console.

println("Data frame content:")

igniteDF.show() //Printing query results to console.
}


def main(args: Array[String]): Unit = {

setupExampleData

//Creating spark session.
implicit val spark = SparkSession.builder()
.appName("Spark Ignite data sources example")
.master("local")
.config("spark.executor.instances", "2")
.getOrCreate()

// Adjust the logger to exclude the logs of no interest.
Logger.getRootLogger.setLevel(Level.ERROR)
Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)

//sparkDSLExample


val df = spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
//.option("driver", "org.apache.ignite.IgniteJdbcDriver")
.option("dbtable", "Person").load()

df.printSchema()

df.createOrReplaceTempView("test")

spark.sql("select * from test where id=1").show(10)

spark.sql("select 4,'blah',124232").show(10)

import java.sql.DriverManager
val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")

import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")

spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113",
"Person",connectionProperties)

spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
.option("dbtable", "Person").load().show(10,false)

}

}

example-default.xml:-

<?xml version="1.0" encoding="UTF-8"?>

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<!--
Ignite configuration with all defaults and enabled p2p deployment and enabled events.
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
<bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<!-- Set to true to enable distributed class loading for examples, default is false. -->
<property name="peerClassLoadingEnabled" value="true"/>

<!-- Enable task execution events for examples. -->
<property name="includeEventTypes">
<list>
<!--Task execution events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>

<!--Cache events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
</list>
</property>

<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<!--
Ignite provides several options for automatic discovery that can be used
instead os static IP based discovery. For information on all options refer
to our documentation: http://apacheignite.readme.io/docs/cluster-config
-->
<!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
<!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>3.88.248.113:10800</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>



--
Thanks & Regards
Sri Tummala





--
Thanks & Regards
Sri Tummala

sri hari kali charan Tummala sri hari kali charan Tummala
Reply | Threaded
Open this post in threaded view
|

Re: Apache Spark + Ignite Connection Issue

do you mean communication ports 47100-47200 as mentioned (https://www.gridgain.com/docs/8.7.6//installation-guide/manual-install-on-ec2) ? bare in mind I am running my spark job outside ignite ec2 box (MAC PC).

which option is right in my default.xml file?

Option 1:-
                        <property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>3.88.248.113:47100..47200</value>
</list>
</property>

Option 2:-
                        <property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>3.88.248.113:47500..47600</value>
</list>
</property>

Option 3:-
                        <property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>3.88.248.113</value>
</list>
</property>


Thanks

On Fri, Oct 18, 2019 at 9:18 AM sri hari kali charan Tummala <[hidden email]> wrote:
Hi Stephen , 

 do you mean 3.88.248.113:47500..47700 something like this? or just public ip 3.88.248.113 I tried all the possibilities none of them are getting connected.

Thanks
Sri 

On Fri, Oct 18, 2019 at 6:02 AM Stephen Darlington <[hidden email]> wrote:
You’re trying to connect a thick client (the Spark integration) to the thin client port (10800). Your example-default.xml file needs to have the same configuration as your server node(s).

Regards,
Stephen

On 17 Oct 2019, at 18:12, sri hari kali charan Tummala <[hidden email]> wrote:

Hi Community, 

I am trying to read and write into the Ignite cluster using apache-spark I am able to do that using JDBC thin client but not native method as mentioned in several spark + ignite examples.

Right now all the spark + ignite examples launch a local ignite cluster but I want my code connecting to already existing cluster (client).

Question:-
How to pass Ignite connection ip and port (10800)  10800 in example-default.xml ?

Error:-
TcpDiscoverySpi: Failed to connect to any address from IP finder (will retry to join topology every 2000 ms; change 'reconnectDelay' to configure the frequency of retries): [/3.88.248.113:10800]

Working (Spark + Ignite using JDBC):-
val df = spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
//.option("driver", "org.apache.ignite.IgniteJdbcDriver")
.option("dbtable", "Person").load()

df.printSchema()

df.createOrReplaceTempView("test")

spark.sql("select * from test where id=1").show(10)

spark.sql("select 4,'blah',124232").show(10)

import java.sql.DriverManager
val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")

import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")

spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113",
"Person",connectionProperties)

spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
.option("dbtable", "Person").load().show(10,false)

Not Working requires a CONFIG file which is example-default.xml:-
val igniteDF = spark.read
.format(FORMAT_IGNITE) //Data source type.
.option(OPTION_TABLE, "person") //Table to read.
.option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
.load()
.filter(col("id") >= 2) //Filter clause.
.filter(col("name") like "%J%") //Another filter clause.

Full Code:- (sparkDSLExample) function fails to connect ignite cluster which I already have
package com.ignite.examples.spark

import com.ignite.examples.model.Address
import org.apache.ignite.{Ignite, Ignition}
import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.client.{ClientCache, IgniteClient}
import org.apache.ignite.configuration.{CacheConfiguration, ClientConfiguration}
import java.lang.{Long => JLong, String => JString}

import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_TABLE}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.functions.col

object SparkClientConnectionTest {

private val CACHE_NAME = "SparkCache"

private val CONFIG = "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"

def setupExampleData = {

val cfg2 = new ClientConfiguration().setAddresses("3.88.248.113:10800")
val igniteClient:IgniteClient = Ignition.startClient(cfg2)

System.out.format(">>> Created cache [%s].\n", CACHE_NAME)

val cache:ClientCache[Integer, Address] = igniteClient.getOrCreateCache(CACHE_NAME)

cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS Person"))
.setSchema("PUBLIC")).getAll

cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH \"VALUE_TYPE=%s\"", classOf[Address].getName))
.setSchema("PUBLIC")).getAll

cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", "04074").setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", "520003").setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", "1234").setSchema("PUBLIC")).getAll

System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)

val data=cache.query(new SqlFieldsQuery("select * from Person").setSchema("PUBLIC")).getAll

println(data.toString)
}

def sparkDSLExample(implicit spark: SparkSession): Unit = {
println("Querying using Spark DSL.")
println


val igniteDF = spark.read
.format(FORMAT_IGNITE) //Data source type.
.option(OPTION_TABLE, "person") //Table to read.
.option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
.load()
.filter(col("id") >= 2) //Filter clause.
.filter(col("name") like "%J%") //Another filter clause.

println("Data frame schema:")

igniteDF.printSchema() //Printing query schema to console.

println("Data frame content:")

igniteDF.show() //Printing query results to console.
}


def main(args: Array[String]): Unit = {

setupExampleData

//Creating spark session.
implicit val spark = SparkSession.builder()
.appName("Spark Ignite data sources example")
.master("local")
.config("spark.executor.instances", "2")
.getOrCreate()

// Adjust the logger to exclude the logs of no interest.
Logger.getRootLogger.setLevel(Level.ERROR)
Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)

//sparkDSLExample


val df = spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
//.option("driver", "org.apache.ignite.IgniteJdbcDriver")
.option("dbtable", "Person").load()

df.printSchema()

df.createOrReplaceTempView("test")

spark.sql("select * from test where id=1").show(10)

spark.sql("select 4,'blah',124232").show(10)

import java.sql.DriverManager
val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")

import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")

spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113",
"Person",connectionProperties)

spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
.option("dbtable", "Person").load().show(10,false)

}

}

example-default.xml:-

<?xml version="1.0" encoding="UTF-8"?>

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<!--
Ignite configuration with all defaults and enabled p2p deployment and enabled events.
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
<bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<!-- Set to true to enable distributed class loading for examples, default is false. -->
<property name="peerClassLoadingEnabled" value="true"/>

<!-- Enable task execution events for examples. -->
<property name="includeEventTypes">
<list>
<!--Task execution events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>

<!--Cache events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
</list>
</property>

<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<!--
Ignite provides several options for automatic discovery that can be used
instead os static IP based discovery. For information on all options refer
to our documentation: http://apacheignite.readme.io/docs/cluster-config
-->
<!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
<!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>3.88.248.113:10800</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>



--
Thanks & Regards
Sri Tummala





--
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala

sri hari kali charan Tummala sri hari kali charan Tummala
Reply | Threaded
Open this post in threaded view
|

Re: Apache Spark + Ignite Connection Issue

Hi Stephen/All, 

got it working somewhat using below, but have an issue table and data which is created using thin client is failing to read using spark but table created by spark can be read using a thin client, that means table created in Ignite using spark are the only ones read using spark in Ignite?

example-default.xml
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>18.206.247.40:10800</value>
</list>
</property>

NotWorking Scala Function:-

def readThinClientTableUsingSpark(implicit spark: SparkSession) = {
val personDataFrame = spark.read
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, CONFIG)
.option(OPTION_TABLE, "person")
.load()

println()
println("Data frame thin connection content:person")
println()

//Printing content of data frame to console.
personDataFrame.show()
}

Exception in thread "main" class org.apache.ignite.IgniteException: Unknown table person
at org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46)
at org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.ignite.spark.impl.IgniteSQLRelation.schema(IgniteSQLRelation.scala:46)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:431)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164)
at com.ignite.examples.spark.SparkIgniteCleanCode$.readThinClientTableUsingSpark(SparkIgniteCleanCode.scala:154)
at com.ignite.examples.spark.SparkIgniteCleanCode$.main(SparkIgniteCleanCode.scala:216)
at com.ignite.examples.spark.SparkIgniteCleanCode.main(SparkIgniteCleanCode.scala)


Full Code:- (step 7 Fails)
package com.ignite.examples.spark

import com.ignite.examples.model.Address
import org.apache.ignite.{Ignite, Ignition}
import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.client.{ClientCache, IgniteClient}
import org.apache.ignite.configuration.{CacheConfiguration, ClientConfiguration}
import java.lang.{Long => JLong, String => JString}

import com.ignite.examples.spark.SparkClientConnectionTest.{CACHE_NAME, CONFIG}
import org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath
import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_CREATE_TABLE_PARAMETERS, OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, OPTION_TABLE}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{SaveMode, SparkSession}

object SparkIgniteCleanCode {

private val CACHE_NAME = "SparkCache"

private val CONFIG = "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"

def setupExampleData = {

val cfg2 = new ClientConfiguration().setAddresses("18.206.247.40:10800")
val igniteClient:IgniteClient = Ignition.startClient(cfg2)

System.out.format(">>> Created cache [%s].\n", CACHE_NAME)

val cache:ClientCache[Integer, Address] = igniteClient.getOrCreateCache(CACHE_NAME)

cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS Person"))
.setSchema("PUBLIC")).getAll

cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH \"VALUE_TYPE=%s\"", classOf[Address].getName))
.setSchema("PUBLIC")).getAll

cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", "04074").setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", "520003").setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", "1234").setSchema("PUBLIC")).getAll

System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)

val data=cache.query(new SqlFieldsQuery("select * from Person").setSchema("PUBLIC")).getAll

println(data.toString)
}

def sparkReadIgniteWithThinClient(implicit spark: SparkSession)={

val df = spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://18.206.247.40")
.option("fetchsize",100)
//.option("driver", "org.apache.ignite.IgniteJdbcDriver")
.option("dbtable", "Person").load()

df.printSchema()

df.createOrReplaceTempView("test")

spark.sql("select * from test where id=1").show(10)

spark.sql("select 4,'blah',124232").show(10)

}

def sparkWriteIgniteWithThinClient(implicit spark: SparkSession)={

import java.sql.DriverManager
val connection = DriverManager.getConnection("jdbc:ignite:thin://18.206.247.40")

import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("url", "jdbc:ignite:thin://18.206.247.40")

spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://18.206.247.40",
"Person",connectionProperties)

spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://18.206.247.40")
.option("fetchsize",100)
.option("dbtable", "Person").load().show(10,false)

}

def writeJSonToIgniteUsingSpark(implicit spark: SparkSession): Unit = {

val ignite = Ignition.start(CONFIG)

//Load content of json file to data frame.
val personsDataFrame = spark.read.json(
resolveIgnitePath("/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/resources/person.json").getAbsolutePath)

println()
println("Json file content:")
println()

//Printing content of json file to console.
personsDataFrame.show()

println()
println("Writing Data Frame to Ignite:")
println()

//Writing content of data frame to Ignite.
personsDataFrame.write
.format(FORMAT_IGNITE)
.mode(org.apache.spark.sql.SaveMode.Append)
.option(OPTION_CONFIG_FILE, CONFIG)
.option(OPTION_TABLE, "json_person")
.option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
.option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated")
.save()

println("Done!")

println()
println("Reading data from Ignite table:")
println()

val cache = ignite.cache[Any, Any](CACHE_NAME)

//Reading saved data from Ignite.
val data = cache.query(new SqlFieldsQuery("SELECT id, name, department FROM json_person")).getAll

println(data.toString)

//data.foreach { row ⇒ println(row.mkString("[", ", ", "]")) }
}

def readIgniteUsingSpark(implicit spark: SparkSession) = {
val json_person = spark.read
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, CONFIG)
.option(OPTION_TABLE, "json_person")
.load()

println()
println("Data frame content:json_person")
println()

//Printing content of data frame to console.
json_person.show()
}


def readThinClientTableUsingSpark(implicit spark: SparkSession) = {
val personDataFrame = spark.read
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, CONFIG)
.option(OPTION_TABLE, "person")
.load()

println()
println("Data frame thin connection content:person")
println()

//Printing content of data frame to console.
personDataFrame.show()
}


def main(args: Array[String]): Unit = {

println()
println("Step 1 setupExampleData:")
println()

setupExampleData

println()
println("Step 2 createSparkSession:")
println()

//Creating spark session.
implicit val spark = SparkSession.builder()
.appName("Spark Ignite data sources example")
.master("local")
.config("spark.executor.instances", "2")
.getOrCreate()

// Adjust the logger to exclude the logs of no interest.
Logger.getRootLogger.setLevel(Level.ERROR)
Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)

println()
println("Step 3 ReadIgniteWithThinClient of Step1 Data:")
println()

sparkReadIgniteWithThinClient(spark)

println()
println("Step 4 sparkWriteIgniteWithThinClient of Step1 Data:")
println()

sparkWriteIgniteWithThinClient(spark)

println()
println("Step 5 writeJSonToIgniteUsingSpark Using Spark:")
println()

writeJSonToIgniteUsingSpark(spark)

println()
println("Step 6 readIgniteUsingSpark Using Spark:")
println()

readIgniteUsingSpark(spark)

println()
println("Step 7 readThinClientTableUsingSpark Using Spark:")
println()

readThinClientTableUsingSpark(spark)


spark.stop()


}

}

example-default.xml

<?xml version="1.0" encoding="UTF-8"?>

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<!--
Ignite configuration with all defaults and enabled p2p deployment and enabled events.
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
<bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<!-- Set to true to enable distributed class loading for examples, default is false. -->
<property name="peerClassLoadingEnabled" value="true"/>

<!-- Enable task execution events for examples. -->
<property name="includeEventTypes">
<list>
<!--Task execution events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>

<!--Cache events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
</list>
</property>

<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<!--
Ignite provides several options for automatic discovery that can be used
instead os static IP based discovery. For information on all options refer
to our documentation: http://apacheignite.readme.io/docs/cluster-config
-->
<!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
<!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>18.206.247.40:10800</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>

Thanks
Sri 

On Fri, Oct 18, 2019 at 9:23 AM sri hari kali charan Tummala <[hidden email]> wrote:
do you mean communication ports 47100-47200 as mentioned (https://www.gridgain.com/docs/8.7.6//installation-guide/manual-install-on-ec2) ? bare in mind I am running my spark job outside ignite ec2 box (MAC PC).

which option is right in my default.xml file?

Option 1:-
                        <property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>3.88.248.113:47100..47200</value>
</list>
</property>

Option 2:-
                        <property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>3.88.248.113:47500..47600</value>
</list>
</property>

Option 3:-
                        <property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>3.88.248.113</value>
</list>
</property>


Thanks

On Fri, Oct 18, 2019 at 9:18 AM sri hari kali charan Tummala <[hidden email]> wrote:
Hi Stephen , 

 do you mean 3.88.248.113:47500..47700 something like this? or just public ip 3.88.248.113 I tried all the possibilities none of them are getting connected.

Thanks
Sri 

On Fri, Oct 18, 2019 at 6:02 AM Stephen Darlington <[hidden email]> wrote:
You’re trying to connect a thick client (the Spark integration) to the thin client port (10800). Your example-default.xml file needs to have the same configuration as your server node(s).

Regards,
Stephen

On 17 Oct 2019, at 18:12, sri hari kali charan Tummala <[hidden email]> wrote:

Hi Community, 

I am trying to read and write into the Ignite cluster using apache-spark I am able to do that using JDBC thin client but not native method as mentioned in several spark + ignite examples.

Right now all the spark + ignite examples launch a local ignite cluster but I want my code connecting to already existing cluster (client).

Question:-
How to pass Ignite connection ip and port (10800)  10800 in example-default.xml ?

Error:-
TcpDiscoverySpi: Failed to connect to any address from IP finder (will retry to join topology every 2000 ms; change 'reconnectDelay' to configure the frequency of retries): [/3.88.248.113:10800]

Working (Spark + Ignite using JDBC):-
val df = spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
//.option("driver", "org.apache.ignite.IgniteJdbcDriver")
.option("dbtable", "Person").load()

df.printSchema()

df.createOrReplaceTempView("test")

spark.sql("select * from test where id=1").show(10)

spark.sql("select 4,'blah',124232").show(10)

import java.sql.DriverManager
val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")

import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")

spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113",
"Person",connectionProperties)

spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
.option("dbtable", "Person").load().show(10,false)

Not Working requires a CONFIG file which is example-default.xml:-
val igniteDF = spark.read
.format(FORMAT_IGNITE) //Data source type.
.option(OPTION_TABLE, "person") //Table to read.
.option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
.load()
.filter(col("id") >= 2) //Filter clause.
.filter(col("name") like "%J%") //Another filter clause.

Full Code:- (sparkDSLExample) function fails to connect ignite cluster which I already have
package com.ignite.examples.spark

import com.ignite.examples.model.Address
import org.apache.ignite.{Ignite, Ignition}
import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.client.{ClientCache, IgniteClient}
import org.apache.ignite.configuration.{CacheConfiguration, ClientConfiguration}
import java.lang.{Long => JLong, String => JString}

import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_TABLE}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.functions.col

object SparkClientConnectionTest {

private val CACHE_NAME = "SparkCache"

private val CONFIG = "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"

def setupExampleData = {

val cfg2 = new ClientConfiguration().setAddresses("3.88.248.113:10800")
val igniteClient:IgniteClient = Ignition.startClient(cfg2)

System.out.format(">>> Created cache [%s].\n", CACHE_NAME)

val cache:ClientCache[Integer, Address] = igniteClient.getOrCreateCache(CACHE_NAME)

cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS Person"))
.setSchema("PUBLIC")).getAll

cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH \"VALUE_TYPE=%s\"", classOf[Address].getName))
.setSchema("PUBLIC")).getAll

cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", "04074").setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", "520003").setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", "1234").setSchema("PUBLIC")).getAll

System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)

val data=cache.query(new SqlFieldsQuery("select * from Person").setSchema("PUBLIC")).getAll

println(data.toString)
}

def sparkDSLExample(implicit spark: SparkSession): Unit = {
println("Querying using Spark DSL.")
println


val igniteDF = spark.read
.format(FORMAT_IGNITE) //Data source type.
.option(OPTION_TABLE, "person") //Table to read.
.option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
.load()
.filter(col("id") >= 2) //Filter clause.
.filter(col("name") like "%J%") //Another filter clause.

println("Data frame schema:")

igniteDF.printSchema() //Printing query schema to console.

println("Data frame content:")

igniteDF.show() //Printing query results to console.
}


def main(args: Array[String]): Unit = {

setupExampleData

//Creating spark session.
implicit val spark = SparkSession.builder()
.appName("Spark Ignite data sources example")
.master("local")
.config("spark.executor.instances", "2")
.getOrCreate()

// Adjust the logger to exclude the logs of no interest.
Logger.getRootLogger.setLevel(Level.ERROR)
Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)

//sparkDSLExample


val df = spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
//.option("driver", "org.apache.ignite.IgniteJdbcDriver")
.option("dbtable", "Person").load()

df.printSchema()

df.createOrReplaceTempView("test")

spark.sql("select * from test where id=1").show(10)

spark.sql("select 4,'blah',124232").show(10)

import java.sql.DriverManager
val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")

import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")

spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113",
"Person",connectionProperties)

spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
.option("dbtable", "Person").load().show(10,false)

}

}

example-default.xml:-

<?xml version="1.0" encoding="UTF-8"?>

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<!--
Ignite configuration with all defaults and enabled p2p deployment and enabled events.
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
<bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<!-- Set to true to enable distributed class loading for examples, default is false. -->
<property name="peerClassLoadingEnabled" value="true"/>

<!-- Enable task execution events for examples. -->
<property name="includeEventTypes">
<list>
<!--Task execution events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>

<!--Cache events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
</list>
</property>

<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<!--
Ignite provides several options for automatic discovery that can be used
instead os static IP based discovery. For information on all options refer
to our documentation: http://apacheignite.readme.io/docs/cluster-config
-->
<!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
<!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>3.88.248.113:10800</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>



--
Thanks & Regards
Sri Tummala





--
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala

sri hari kali charan Tummala sri hari kali charan Tummala
Reply | Threaded
Open this post in threaded view
|

Re: Apache Spark + Ignite Connection Issue

unfortunately, tables that are created by spark don't exist in ignite when I try to query using sqlline jdbc thin client, so the spark is still running locally the tables which are created by spark exists only for that session.

did anyone come across this issue? how to resolve it?





On Fri, Oct 18, 2019 at 11:32 AM sri hari kali charan Tummala <[hidden email]> wrote:
Hi Stephen/All, 

got it working somewhat using below, but have an issue table and data which is created using thin client is failing to read using spark but table created by spark can be read using a thin client, that means table created in Ignite using spark are the only ones read using spark in Ignite?

example-default.xml
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>18.206.247.40:10800</value>
</list>
</property>

NotWorking Scala Function:-

def readThinClientTableUsingSpark(implicit spark: SparkSession) = {
val personDataFrame = spark.read
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, CONFIG)
.option(OPTION_TABLE, "person")
.load()

println()
println("Data frame thin connection content:person")
println()

//Printing content of data frame to console.
personDataFrame.show()
}

Exception in thread "main" class org.apache.ignite.IgniteException: Unknown table person
at org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46)
at org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.ignite.spark.impl.IgniteSQLRelation.schema(IgniteSQLRelation.scala:46)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:431)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164)
at com.ignite.examples.spark.SparkIgniteCleanCode$.readThinClientTableUsingSpark(SparkIgniteCleanCode.scala:154)
at com.ignite.examples.spark.SparkIgniteCleanCode$.main(SparkIgniteCleanCode.scala:216)
at com.ignite.examples.spark.SparkIgniteCleanCode.main(SparkIgniteCleanCode.scala)


Full Code:- (step 7 Fails)
package com.ignite.examples.spark

import com.ignite.examples.model.Address
import org.apache.ignite.{Ignite, Ignition}
import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.client.{ClientCache, IgniteClient}
import org.apache.ignite.configuration.{CacheConfiguration, ClientConfiguration}
import java.lang.{Long => JLong, String => JString}

import com.ignite.examples.spark.SparkClientConnectionTest.{CACHE_NAME, CONFIG}
import org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath
import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_CREATE_TABLE_PARAMETERS, OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, OPTION_TABLE}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{SaveMode, SparkSession}

object SparkIgniteCleanCode {

private val CACHE_NAME = "SparkCache"

private val CONFIG = "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"

def setupExampleData = {

val cfg2 = new ClientConfiguration().setAddresses("18.206.247.40:10800")
val igniteClient:IgniteClient = Ignition.startClient(cfg2)

System.out.format(">>> Created cache [%s].\n", CACHE_NAME)

val cache:ClientCache[Integer, Address] = igniteClient.getOrCreateCache(CACHE_NAME)

cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS Person"))
.setSchema("PUBLIC")).getAll

cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH \"VALUE_TYPE=%s\"", classOf[Address].getName))
.setSchema("PUBLIC")).getAll

cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", "04074").setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", "520003").setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", "1234").setSchema("PUBLIC")).getAll

System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)

val data=cache.query(new SqlFieldsQuery("select * from Person").setSchema("PUBLIC")).getAll

println(data.toString)
}

def sparkReadIgniteWithThinClient(implicit spark: SparkSession)={

val df = spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://18.206.247.40")
.option("fetchsize",100)
//.option("driver", "org.apache.ignite.IgniteJdbcDriver")
.option("dbtable", "Person").load()

df.printSchema()

df.createOrReplaceTempView("test")

spark.sql("select * from test where id=1").show(10)

spark.sql("select 4,'blah',124232").show(10)

}

def sparkWriteIgniteWithThinClient(implicit spark: SparkSession)={

import java.sql.DriverManager
val connection = DriverManager.getConnection("jdbc:ignite:thin://18.206.247.40")

import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("url", "jdbc:ignite:thin://18.206.247.40")

spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://18.206.247.40",
"Person",connectionProperties)

spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://18.206.247.40")
.option("fetchsize",100)
.option("dbtable", "Person").load().show(10,false)

}

def writeJSonToIgniteUsingSpark(implicit spark: SparkSession): Unit = {

val ignite = Ignition.start(CONFIG)

//Load content of json file to data frame.
val personsDataFrame = spark.read.json(
resolveIgnitePath("/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/resources/person.json").getAbsolutePath)

println()
println("Json file content:")
println()

//Printing content of json file to console.
personsDataFrame.show()

println()
println("Writing Data Frame to Ignite:")
println()

//Writing content of data frame to Ignite.
personsDataFrame.write
.format(FORMAT_IGNITE)
.mode(org.apache.spark.sql.SaveMode.Append)
.option(OPTION_CONFIG_FILE, CONFIG)
.option(OPTION_TABLE, "json_person")
.option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
.option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated")
.save()

println("Done!")

println()
println("Reading data from Ignite table:")
println()

val cache = ignite.cache[Any, Any](CACHE_NAME)

//Reading saved data from Ignite.
val data = cache.query(new SqlFieldsQuery("SELECT id, name, department FROM json_person")).getAll

println(data.toString)

//data.foreach { row ⇒ println(row.mkString("[", ", ", "]")) }
}

def readIgniteUsingSpark(implicit spark: SparkSession) = {
val json_person = spark.read
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, CONFIG)
.option(OPTION_TABLE, "json_person")
.load()

println()
println("Data frame content:json_person")
println()

//Printing content of data frame to console.
json_person.show()
}


def readThinClientTableUsingSpark(implicit spark: SparkSession) = {
val personDataFrame = spark.read
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, CONFIG)
.option(OPTION_TABLE, "person")
.load()

println()
println("Data frame thin connection content:person")
println()

//Printing content of data frame to console.
personDataFrame.show()
}


def main(args: Array[String]): Unit = {

println()
println("Step 1 setupExampleData:")
println()

setupExampleData

println()
println("Step 2 createSparkSession:")
println()

//Creating spark session.
implicit val spark = SparkSession.builder()
.appName("Spark Ignite data sources example")
.master("local")
.config("spark.executor.instances", "2")
.getOrCreate()

// Adjust the logger to exclude the logs of no interest.
Logger.getRootLogger.setLevel(Level.ERROR)
Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)

println()
println("Step 3 ReadIgniteWithThinClient of Step1 Data:")
println()

sparkReadIgniteWithThinClient(spark)

println()
println("Step 4 sparkWriteIgniteWithThinClient of Step1 Data:")
println()

sparkWriteIgniteWithThinClient(spark)

println()
println("Step 5 writeJSonToIgniteUsingSpark Using Spark:")
println()

writeJSonToIgniteUsingSpark(spark)

println()
println("Step 6 readIgniteUsingSpark Using Spark:")
println()

readIgniteUsingSpark(spark)

println()
println("Step 7 readThinClientTableUsingSpark Using Spark:")
println()

readThinClientTableUsingSpark(spark)


spark.stop()


}

}

example-default.xml

<?xml version="1.0" encoding="UTF-8"?>

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<!--
Ignite configuration with all defaults and enabled p2p deployment and enabled events.
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
<bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<!-- Set to true to enable distributed class loading for examples, default is false. -->
<property name="peerClassLoadingEnabled" value="true"/>

<!-- Enable task execution events for examples. -->
<property name="includeEventTypes">
<list>
<!--Task execution events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>

<!--Cache events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
</list>
</property>

<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<!--
Ignite provides several options for automatic discovery that can be used
instead os static IP based discovery. For information on all options refer
to our documentation: http://apacheignite.readme.io/docs/cluster-config
-->
<!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
<!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>18.206.247.40:10800</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>

Thanks
Sri 

On Fri, Oct 18, 2019 at 9:23 AM sri hari kali charan Tummala <[hidden email]> wrote:
do you mean communication ports 47100-47200 as mentioned (https://www.gridgain.com/docs/8.7.6//installation-guide/manual-install-on-ec2) ? bare in mind I am running my spark job outside ignite ec2 box (MAC PC).

which option is right in my default.xml file?

Option 1:-
                        <property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>3.88.248.113:47100..47200</value>
</list>
</property>

Option 2:-
                        <property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>3.88.248.113:47500..47600</value>
</list>
</property>

Option 3:-
                        <property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>3.88.248.113</value>
</list>
</property>


Thanks

On Fri, Oct 18, 2019 at 9:18 AM sri hari kali charan Tummala <[hidden email]> wrote:
Hi Stephen , 

 do you mean 3.88.248.113:47500..47700 something like this? or just public ip 3.88.248.113 I tried all the possibilities none of them are getting connected.

Thanks
Sri 

On Fri, Oct 18, 2019 at 6:02 AM Stephen Darlington <[hidden email]> wrote:
You’re trying to connect a thick client (the Spark integration) to the thin client port (10800). Your example-default.xml file needs to have the same configuration as your server node(s).

Regards,
Stephen

On 17 Oct 2019, at 18:12, sri hari kali charan Tummala <[hidden email]> wrote:

Hi Community, 

I am trying to read and write into the Ignite cluster using apache-spark I am able to do that using JDBC thin client but not native method as mentioned in several spark + ignite examples.

Right now all the spark + ignite examples launch a local ignite cluster but I want my code connecting to already existing cluster (client).

Question:-
How to pass Ignite connection ip and port (10800)  10800 in example-default.xml ?

Error:-
TcpDiscoverySpi: Failed to connect to any address from IP finder (will retry to join topology every 2000 ms; change 'reconnectDelay' to configure the frequency of retries): [/3.88.248.113:10800]

Working (Spark + Ignite using JDBC):-
val df = spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
//.option("driver", "org.apache.ignite.IgniteJdbcDriver")
.option("dbtable", "Person").load()

df.printSchema()

df.createOrReplaceTempView("test")

spark.sql("select * from test where id=1").show(10)

spark.sql("select 4,'blah',124232").show(10)

import java.sql.DriverManager
val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")

import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")

spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113",
"Person",connectionProperties)

spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
.option("dbtable", "Person").load().show(10,false)

Not Working requires a CONFIG file which is example-default.xml:-
val igniteDF = spark.read
.format(FORMAT_IGNITE) //Data source type.
.option(OPTION_TABLE, "person") //Table to read.
.option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
.load()
.filter(col("id") >= 2) //Filter clause.
.filter(col("name") like "%J%") //Another filter clause.

Full Code:- (sparkDSLExample) function fails to connect ignite cluster which I already have
package com.ignite.examples.spark

import com.ignite.examples.model.Address
import org.apache.ignite.{Ignite, Ignition}
import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.client.{ClientCache, IgniteClient}
import org.apache.ignite.configuration.{CacheConfiguration, ClientConfiguration}
import java.lang.{Long => JLong, String => JString}

import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_TABLE}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.functions.col

object SparkClientConnectionTest {

private val CACHE_NAME = "SparkCache"

private val CONFIG = "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"

def setupExampleData = {

val cfg2 = new ClientConfiguration().setAddresses("3.88.248.113:10800")
val igniteClient:IgniteClient = Ignition.startClient(cfg2)

System.out.format(">>> Created cache [%s].\n", CACHE_NAME)

val cache:ClientCache[Integer, Address] = igniteClient.getOrCreateCache(CACHE_NAME)

cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS Person"))
.setSchema("PUBLIC")).getAll

cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH \"VALUE_TYPE=%s\"", classOf[Address].getName))
.setSchema("PUBLIC")).getAll

cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", "04074").setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", "520003").setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", "1234").setSchema("PUBLIC")).getAll

System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)

val data=cache.query(new SqlFieldsQuery("select * from Person").setSchema("PUBLIC")).getAll

println(data.toString)
}

def sparkDSLExample(implicit spark: SparkSession): Unit = {
println("Querying using Spark DSL.")
println


val igniteDF = spark.read
.format(FORMAT_IGNITE) //Data source type.
.option(OPTION_TABLE, "person") //Table to read.
.option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
.load()
.filter(col("id") >= 2) //Filter clause.
.filter(col("name") like "%J%") //Another filter clause.

println("Data frame schema:")

igniteDF.printSchema() //Printing query schema to console.

println("Data frame content:")

igniteDF.show() //Printing query results to console.
}


def main(args: Array[String]): Unit = {

setupExampleData

//Creating spark session.
implicit val spark = SparkSession.builder()
.appName("Spark Ignite data sources example")
.master("local")
.config("spark.executor.instances", "2")
.getOrCreate()

// Adjust the logger to exclude the logs of no interest.
Logger.getRootLogger.setLevel(Level.ERROR)
Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)

//sparkDSLExample


val df = spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
//.option("driver", "org.apache.ignite.IgniteJdbcDriver")
.option("dbtable", "Person").load()

df.printSchema()

df.createOrReplaceTempView("test")

spark.sql("select * from test where id=1").show(10)

spark.sql("select 4,'blah',124232").show(10)

import java.sql.DriverManager
val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")

import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")

spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113",
"Person",connectionProperties)

spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
.option("dbtable", "Person").load().show(10,false)

}

}

example-default.xml:-

<?xml version="1.0" encoding="UTF-8"?>

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<!--
Ignite configuration with all defaults and enabled p2p deployment and enabled events.
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
<bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<!-- Set to true to enable distributed class loading for examples, default is false. -->
<property name="peerClassLoadingEnabled" value="true"/>

<!-- Enable task execution events for examples. -->
<property name="includeEventTypes">
<list>
<!--Task execution events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>

<!--Cache events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
</list>
</property>

<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<!--
Ignite provides several options for automatic discovery that can be used
instead os static IP based discovery. For information on all options refer
to our documentation: http://apacheignite.readme.io/docs/cluster-config
-->
<!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
<!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>3.88.248.113:10800</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>



--
Thanks & Regards
Sri Tummala





--
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala


Screen Shot 2019-10-18 at 11.59.20 AM.png (92K) Download Attachment
sri hari kali charan Tummala sri hari kali charan Tummala
Reply | Threaded
Open this post in threaded view
|

Re: Apache Spark + Ignite Connection Issue

so spark + jdbc thin client is the only way to go forward?

 Reading:-

val df = spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://18.206.247.40")
.option("fetchsize",100)
//.option("driver", "org.apache.ignite.IgniteJdbcDriver")
.option("dbtable", "Person2").load()

df.printSchema()

df.createOrReplaceTempView("test")

spark.sql("select * from test where id=1").show(10)

spark.sql("select 4,'blah',124232").show(10)

Writing:-
import java.sql.DriverManager
val connection = DriverManager.getConnection("jdbc:ignite:thin://18.206.247.40")

import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("url", "jdbc:ignite:thin://18.206.247.40")

spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://18.206.247.40",
"Person2",connectionProperties)

On Fri, Oct 18, 2019 at 12:00 PM sri hari kali charan Tummala <[hidden email]> wrote:
unfortunately, tables that are created by spark don't exist in ignite when I try to query using sqlline jdbc thin client, so the spark is still running locally the tables which are created by spark exists only for that session.

did anyone come across this issue? how to resolve it?





On Fri, Oct 18, 2019 at 11:32 AM sri hari kali charan Tummala <[hidden email]> wrote:
Hi Stephen/All, 

got it working somewhat using below, but have an issue table and data which is created using thin client is failing to read using spark but table created by spark can be read using a thin client, that means table created in Ignite using spark are the only ones read using spark in Ignite?

example-default.xml
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>18.206.247.40:10800</value>
</list>
</property>

NotWorking Scala Function:-

def readThinClientTableUsingSpark(implicit spark: SparkSession) = {
val personDataFrame = spark.read
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, CONFIG)
.option(OPTION_TABLE, "person")
.load()

println()
println("Data frame thin connection content:person")
println()

//Printing content of data frame to console.
personDataFrame.show()
}

Exception in thread "main" class org.apache.ignite.IgniteException: Unknown table person
at org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46)
at org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.ignite.spark.impl.IgniteSQLRelation.schema(IgniteSQLRelation.scala:46)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:431)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164)
at com.ignite.examples.spark.SparkIgniteCleanCode$.readThinClientTableUsingSpark(SparkIgniteCleanCode.scala:154)
at com.ignite.examples.spark.SparkIgniteCleanCode$.main(SparkIgniteCleanCode.scala:216)
at com.ignite.examples.spark.SparkIgniteCleanCode.main(SparkIgniteCleanCode.scala)


Full Code:- (step 7 Fails)
package com.ignite.examples.spark

import com.ignite.examples.model.Address
import org.apache.ignite.{Ignite, Ignition}
import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.client.{ClientCache, IgniteClient}
import org.apache.ignite.configuration.{CacheConfiguration, ClientConfiguration}
import java.lang.{Long => JLong, String => JString}

import com.ignite.examples.spark.SparkClientConnectionTest.{CACHE_NAME, CONFIG}
import org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath
import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_CREATE_TABLE_PARAMETERS, OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, OPTION_TABLE}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{SaveMode, SparkSession}

object SparkIgniteCleanCode {

private val CACHE_NAME = "SparkCache"

private val CONFIG = "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"

def setupExampleData = {

val cfg2 = new ClientConfiguration().setAddresses("18.206.247.40:10800")
val igniteClient:IgniteClient = Ignition.startClient(cfg2)

System.out.format(">>> Created cache [%s].\n", CACHE_NAME)

val cache:ClientCache[Integer, Address] = igniteClient.getOrCreateCache(CACHE_NAME)

cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS Person"))
.setSchema("PUBLIC")).getAll

cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH \"VALUE_TYPE=%s\"", classOf[Address].getName))
.setSchema("PUBLIC")).getAll

cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", "04074").setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", "520003").setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", "1234").setSchema("PUBLIC")).getAll

System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)

val data=cache.query(new SqlFieldsQuery("select * from Person").setSchema("PUBLIC")).getAll

println(data.toString)
}

def sparkReadIgniteWithThinClient(implicit spark: SparkSession)={

val df = spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://18.206.247.40")
.option("fetchsize",100)
//.option("driver", "org.apache.ignite.IgniteJdbcDriver")
.option("dbtable", "Person").load()

df.printSchema()

df.createOrReplaceTempView("test")

spark.sql("select * from test where id=1").show(10)

spark.sql("select 4,'blah',124232").show(10)

}

def sparkWriteIgniteWithThinClient(implicit spark: SparkSession)={

import java.sql.DriverManager
val connection = DriverManager.getConnection("jdbc:ignite:thin://18.206.247.40")

import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("url", "jdbc:ignite:thin://18.206.247.40")

spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://18.206.247.40",
"Person",connectionProperties)

spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://18.206.247.40")
.option("fetchsize",100)
.option("dbtable", "Person").load().show(10,false)

}

def writeJSonToIgniteUsingSpark(implicit spark: SparkSession): Unit = {

val ignite = Ignition.start(CONFIG)

//Load content of json file to data frame.
val personsDataFrame = spark.read.json(
resolveIgnitePath("/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/resources/person.json").getAbsolutePath)

println()
println("Json file content:")
println()

//Printing content of json file to console.
personsDataFrame.show()

println()
println("Writing Data Frame to Ignite:")
println()

//Writing content of data frame to Ignite.
personsDataFrame.write
.format(FORMAT_IGNITE)
.mode(org.apache.spark.sql.SaveMode.Append)
.option(OPTION_CONFIG_FILE, CONFIG)
.option(OPTION_TABLE, "json_person")
.option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
.option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated")
.save()

println("Done!")

println()
println("Reading data from Ignite table:")
println()

val cache = ignite.cache[Any, Any](CACHE_NAME)

//Reading saved data from Ignite.
val data = cache.query(new SqlFieldsQuery("SELECT id, name, department FROM json_person")).getAll

println(data.toString)

//data.foreach { row ⇒ println(row.mkString("[", ", ", "]")) }
}

def readIgniteUsingSpark(implicit spark: SparkSession) = {
val json_person = spark.read
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, CONFIG)
.option(OPTION_TABLE, "json_person")
.load()

println()
println("Data frame content:json_person")
println()

//Printing content of data frame to console.
json_person.show()
}


def readThinClientTableUsingSpark(implicit spark: SparkSession) = {
val personDataFrame = spark.read
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, CONFIG)
.option(OPTION_TABLE, "person")
.load()

println()
println("Data frame thin connection content:person")
println()

//Printing content of data frame to console.
personDataFrame.show()
}


def main(args: Array[String]): Unit = {

println()
println("Step 1 setupExampleData:")
println()

setupExampleData

println()
println("Step 2 createSparkSession:")
println()

//Creating spark session.
implicit val spark = SparkSession.builder()
.appName("Spark Ignite data sources example")
.master("local")
.config("spark.executor.instances", "2")
.getOrCreate()

// Adjust the logger to exclude the logs of no interest.
Logger.getRootLogger.setLevel(Level.ERROR)
Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)

println()
println("Step 3 ReadIgniteWithThinClient of Step1 Data:")
println()

sparkReadIgniteWithThinClient(spark)

println()
println("Step 4 sparkWriteIgniteWithThinClient of Step1 Data:")
println()

sparkWriteIgniteWithThinClient(spark)

println()
println("Step 5 writeJSonToIgniteUsingSpark Using Spark:")
println()

writeJSonToIgniteUsingSpark(spark)

println()
println("Step 6 readIgniteUsingSpark Using Spark:")
println()

readIgniteUsingSpark(spark)

println()
println("Step 7 readThinClientTableUsingSpark Using Spark:")
println()

readThinClientTableUsingSpark(spark)


spark.stop()


}

}

example-default.xml

<?xml version="1.0" encoding="UTF-8"?>

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<!--
Ignite configuration with all defaults and enabled p2p deployment and enabled events.
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
<bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<!-- Set to true to enable distributed class loading for examples, default is false. -->
<property name="peerClassLoadingEnabled" value="true"/>

<!-- Enable task execution events for examples. -->
<property name="includeEventTypes">
<list>
<!--Task execution events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>

<!--Cache events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
</list>
</property>

<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<!--
Ignite provides several options for automatic discovery that can be used
instead os static IP based discovery. For information on all options refer
to our documentation: http://apacheignite.readme.io/docs/cluster-config
-->
<!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
<!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>18.206.247.40:10800</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>

Thanks
Sri 

On Fri, Oct 18, 2019 at 9:23 AM sri hari kali charan Tummala <[hidden email]> wrote:
do you mean communication ports 47100-47200 as mentioned (https://www.gridgain.com/docs/8.7.6//installation-guide/manual-install-on-ec2) ? bare in mind I am running my spark job outside ignite ec2 box (MAC PC).

which option is right in my default.xml file?

Option 1:-
                        <property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>3.88.248.113:47100..47200</value>
</list>
</property>

Option 2:-
                        <property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>3.88.248.113:47500..47600</value>
</list>
</property>

Option 3:-
                        <property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>3.88.248.113</value>
</list>
</property>


Thanks

On Fri, Oct 18, 2019 at 9:18 AM sri hari kali charan Tummala <[hidden email]> wrote:
Hi Stephen , 

 do you mean 3.88.248.113:47500..47700 something like this? or just public ip 3.88.248.113 I tried all the possibilities none of them are getting connected.

Thanks
Sri 

On Fri, Oct 18, 2019 at 6:02 AM Stephen Darlington <[hidden email]> wrote:
You’re trying to connect a thick client (the Spark integration) to the thin client port (10800). Your example-default.xml file needs to have the same configuration as your server node(s).

Regards,
Stephen

On 17 Oct 2019, at 18:12, sri hari kali charan Tummala <[hidden email]> wrote:

Hi Community, 

I am trying to read and write into the Ignite cluster using apache-spark I am able to do that using JDBC thin client but not native method as mentioned in several spark + ignite examples.

Right now all the spark + ignite examples launch a local ignite cluster but I want my code connecting to already existing cluster (client).

Question:-
How to pass Ignite connection ip and port (10800)  10800 in example-default.xml ?

Error:-
TcpDiscoverySpi: Failed to connect to any address from IP finder (will retry to join topology every 2000 ms; change 'reconnectDelay' to configure the frequency of retries): [/3.88.248.113:10800]

Working (Spark + Ignite using JDBC):-
val df = spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
//.option("driver", "org.apache.ignite.IgniteJdbcDriver")
.option("dbtable", "Person").load()

df.printSchema()

df.createOrReplaceTempView("test")

spark.sql("select * from test where id=1").show(10)

spark.sql("select 4,'blah',124232").show(10)

import java.sql.DriverManager
val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")

import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")

spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113",
"Person",connectionProperties)

spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
.option("dbtable", "Person").load().show(10,false)

Not Working requires a CONFIG file which is example-default.xml:-
val igniteDF = spark.read
.format(FORMAT_IGNITE) //Data source type.
.option(OPTION_TABLE, "person") //Table to read.
.option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
.load()
.filter(col("id") >= 2) //Filter clause.
.filter(col("name") like "%J%") //Another filter clause.

Full Code:- (sparkDSLExample) function fails to connect ignite cluster which I already have
package com.ignite.examples.spark

import com.ignite.examples.model.Address
import org.apache.ignite.{Ignite, Ignition}
import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.client.{ClientCache, IgniteClient}
import org.apache.ignite.configuration.{CacheConfiguration, ClientConfiguration}
import java.lang.{Long => JLong, String => JString}

import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_TABLE}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.functions.col

object SparkClientConnectionTest {

private val CACHE_NAME = "SparkCache"

private val CONFIG = "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"

def setupExampleData = {

val cfg2 = new ClientConfiguration().setAddresses("3.88.248.113:10800")
val igniteClient:IgniteClient = Ignition.startClient(cfg2)

System.out.format(">>> Created cache [%s].\n", CACHE_NAME)

val cache:ClientCache[Integer, Address] = igniteClient.getOrCreateCache(CACHE_NAME)

cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS Person"))
.setSchema("PUBLIC")).getAll

cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH \"VALUE_TYPE=%s\"", classOf[Address].getName))
.setSchema("PUBLIC")).getAll

cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", "04074").setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", "520003").setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", "1234").setSchema("PUBLIC")).getAll

System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)

val data=cache.query(new SqlFieldsQuery("select * from Person").setSchema("PUBLIC")).getAll

println(data.toString)
}

def sparkDSLExample(implicit spark: SparkSession): Unit = {
println("Querying using Spark DSL.")
println


val igniteDF = spark.read
.format(FORMAT_IGNITE) //Data source type.
.option(OPTION_TABLE, "person") //Table to read.
.option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
.load()
.filter(col("id") >= 2) //Filter clause.
.filter(col("name") like "%J%") //Another filter clause.

println("Data frame schema:")

igniteDF.printSchema() //Printing query schema to console.

println("Data frame content:")

igniteDF.show() //Printing query results to console.
}


def main(args: Array[String]): Unit = {

setupExampleData

//Creating spark session.
implicit val spark = SparkSession.builder()
.appName("Spark Ignite data sources example")
.master("local")
.config("spark.executor.instances", "2")
.getOrCreate()

// Adjust the logger to exclude the logs of no interest.
Logger.getRootLogger.setLevel(Level.ERROR)
Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)

//sparkDSLExample


val df = spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
//.option("driver", "org.apache.ignite.IgniteJdbcDriver")
.option("dbtable", "Person").load()

df.printSchema()

df.createOrReplaceTempView("test")

spark.sql("select * from test where id=1").show(10)

spark.sql("select 4,'blah',124232").show(10)

import java.sql.DriverManager
val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")

import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")

spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113",
"Person",connectionProperties)

spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
.option("dbtable", "Person").load().show(10,false)

}

}

example-default.xml:-

<?xml version="1.0" encoding="UTF-8"?>

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<!--
Ignite configuration with all defaults and enabled p2p deployment and enabled events.
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
<bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<!-- Set to true to enable distributed class loading for examples, default is false. -->
<property name="peerClassLoadingEnabled" value="true"/>

<!-- Enable task execution events for examples. -->
<property name="includeEventTypes">
<list>
<!--Task execution events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>

<!--Cache events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
</list>
</property>

<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<!--
Ignite provides several options for automatic discovery that can be used
instead os static IP based discovery. For information on all options refer
to our documentation: http://apacheignite.readme.io/docs/cluster-config
-->
<!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
<!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>3.88.248.113:10800</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>



--
Thanks & Regards
Sri Tummala





--
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala

sri hari kali charan Tummala sri hari kali charan Tummala
Reply | Threaded
Open this post in threaded view
|

Re: Apache Spark + Ignite Connection Issue

In reply to this post by sri hari kali charan Tummala

On Fri, Oct 18, 2019 at 1:38 PM sri hari kali charan Tummala <[hidden email]> wrote:
Ok I created a 2 node ec2 instance ignite cluster is below is the right way to create cache? the code still using my laptop resources unable to connect ignite cluster.

Out put:-
[13:36:36] Ignite node started OK (id=8535f4d3)
[13:36:36] Topology snapshot [ver=30, locNode=8535f4d3, servers=1, clients=1, state=ACTIVE, CPUs=8, offheap=6.4GB, heap=14.0GB]
>>> cache acquired

package com.ignite.examples.igniteStartup

import java.util.Arrays
import java.util.List
import com.ignite.examples.model.Address
import org.apache.commons.logging.Log
import org.apache.commons.logging.LogFactory
import org.apache.ignite.Ignite
import org.apache.ignite.IgniteCache
import org.apache.ignite.Ignition
import org.apache.ignite.configuration.IgniteConfiguration
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder

//remove if not needed
import scala.collection.JavaConversions._

object IgniteStart2 {

private var cache: IgniteCache[String, Address] = _

def main(args: Array[String]): Unit = {

val spi: TcpDiscoverySpi = new TcpDiscoverySpi()
val ipFinder: TcpDiscoveryVmIpFinder = new TcpDiscoveryVmIpFinder()
val hostList: List[String] = Arrays.asList("127.0.0.1:47500..47509,18.206.247.40:47500..47509,52.207.217.31:47500..47509".split(","): _*)
ipFinder.setAddresses(hostList)
spi.setIpFinder(ipFinder)
val cfg: IgniteConfiguration = new IgniteConfiguration()
cfg.setDiscoverySpi(spi)
cfg.setClientMode(true)
System.out.println(">>> I am here")

cfg.setPeerClassLoadingEnabled(true)
val ignite: Ignite = Ignition.start(cfg)
cache = Ignition.ignite().cache("test")
System.out.println(">>> cache acquired")

System.exit(0)


}

}

On Fri, Oct 18, 2019 at 12:56 PM sri hari kali charan Tummala <[hidden email]> wrote:
Hi Stephen , 

I followed below steps and created one node ec2 instance with Ignite on it now I am connecting to the ignite cluster using spark in two ways 1) thin client 2) I don't know you have to tell me

my question is what is the value I have to pass for option 2 is it ec2 instance public IP with port 47500..47509  in my example-default.xml file ?


Thanks
Sri 

On Fri, Oct 18, 2019 at 12:47 PM Stephen Darlington <[hidden email]> wrote:
You’re still pointing your Spark node (thick client) at port 10800 (the thin client port). This is not going to work.

You can create a table using sqlline and read it using Spark, or vice versa. But you need a functioning cluster.

Check out the documentation on clustering concepts and configuration: https://www.gridgain.com/docs/latest/developers-guide/clustering/clustering 

On 18 Oct 2019, at 16:32, sri hari kali charan Tummala <[hidden email]> wrote:

Hi Stephen/All, 

got it working somewhat using below, but have an issue table and data which is created using thin client is failing to read using spark but table created by spark can be read using a thin client, that means table created in Ignite using spark are the only ones read using spark in Ignite?

example-default.xml
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>18.206.247.40:10800</value>
</list>
</property>

NotWorking Scala Function:-

def readThinClientTableUsingSpark(implicit spark: SparkSession) = {
val personDataFrame = spark.read
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, CONFIG)
.option(OPTION_TABLE, "person")
.load()

println()
println("Data frame thin connection content:person")
println()

//Printing content of data frame to console.
personDataFrame.show()
}

Exception in thread "main" class org.apache.ignite.IgniteException: Unknown table person
at org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46)
at org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.ignite.spark.impl.IgniteSQLRelation.schema(IgniteSQLRelation.scala:46)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:431)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164)
at com.ignite.examples.spark.SparkIgniteCleanCode$.readThinClientTableUsingSpark(SparkIgniteCleanCode.scala:154)
at com.ignite.examples.spark.SparkIgniteCleanCode$.main(SparkIgniteCleanCode.scala:216)
at com.ignite.examples.spark.SparkIgniteCleanCode.main(SparkIgniteCleanCode.scala)


Full Code:- (step 7 Fails)
package com.ignite.examples.spark

import com.ignite.examples.model.Address
import org.apache.ignite.{Ignite, Ignition}
import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.client.{ClientCache, IgniteClient}
import org.apache.ignite.configuration.{CacheConfiguration, ClientConfiguration}
import java.lang.{Long => JLong, String => JString}

import com.ignite.examples.spark.SparkClientConnectionTest.{CACHE_NAME, CONFIG}
import org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath
import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_CREATE_TABLE_PARAMETERS, OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, OPTION_TABLE}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{SaveMode, SparkSession}

object SparkIgniteCleanCode {

private val CACHE_NAME = "SparkCache"

private val CONFIG = "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"

def setupExampleData = {

val cfg2 = new ClientConfiguration().setAddresses("18.206.247.40:10800")
val igniteClient:IgniteClient = Ignition.startClient(cfg2)

System.out.format(">>> Created cache [%s].\n", CACHE_NAME)

val cache:ClientCache[Integer, Address] = igniteClient.getOrCreateCache(CACHE_NAME)

cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS Person"))
.setSchema("PUBLIC")).getAll

cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH \"VALUE_TYPE=%s\"", classOf[Address].getName))
.setSchema("PUBLIC")).getAll

cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", "04074").setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", "520003").setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", "1234").setSchema("PUBLIC")).getAll

System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)

val data=cache.query(new SqlFieldsQuery("select * from Person").setSchema("PUBLIC")).getAll

println(data.toString)
}

def sparkReadIgniteWithThinClient(implicit spark: SparkSession)={

val df = spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://18.206.247.40")
.option("fetchsize",100)
//.option("driver", "org.apache.ignite.IgniteJdbcDriver")
.option("dbtable", "Person").load()

df.printSchema()

df.createOrReplaceTempView("test")

spark.sql("select * from test where id=1").show(10)

spark.sql("select 4,'blah',124232").show(10)

}

def sparkWriteIgniteWithThinClient(implicit spark: SparkSession)={

import java.sql.DriverManager
val connection = DriverManager.getConnection("jdbc:ignite:thin://18.206.247.40")

import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("url", "jdbc:ignite:thin://18.206.247.40")

spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://18.206.247.40",
"Person",connectionProperties)

spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://18.206.247.40")
.option("fetchsize",100)
.option("dbtable", "Person").load().show(10,false)

}

def writeJSonToIgniteUsingSpark(implicit spark: SparkSession): Unit = {

val ignite = Ignition.start(CONFIG)

//Load content of json file to data frame.
val personsDataFrame = spark.read.json(
resolveIgnitePath("/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/resources/person.json").getAbsolutePath)

println()
println("Json file content:")
println()

//Printing content of json file to console.
personsDataFrame.show()

println()
println("Writing Data Frame to Ignite:")
println()

//Writing content of data frame to Ignite.
personsDataFrame.write
.format(FORMAT_IGNITE)
.mode(org.apache.spark.sql.SaveMode.Append)
.option(OPTION_CONFIG_FILE, CONFIG)
.option(OPTION_TABLE, "json_person")
.option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
.option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated")
.save()

println("Done!")

println()
println("Reading data from Ignite table:")
println()

val cache = ignite.cache[Any, Any](CACHE_NAME)

//Reading saved data from Ignite.
val data = cache.query(new SqlFieldsQuery("SELECT id, name, department FROM json_person")).getAll

println(data.toString)

//data.foreach { row ⇒ println(row.mkString("[", ", ", "]")) }
}

def readIgniteUsingSpark(implicit spark: SparkSession) = {
val json_person = spark.read
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, CONFIG)
.option(OPTION_TABLE, "json_person")
.load()

println()
println("Data frame content:json_person")
println()

//Printing content of data frame to console.
json_person.show()
}


def readThinClientTableUsingSpark(implicit spark: SparkSession) = {
val personDataFrame = spark.read
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, CONFIG)
.option(OPTION_TABLE, "person")
.load()

println()
println("Data frame thin connection content:person")
println()

//Printing content of data frame to console.
personDataFrame.show()
}


def main(args: Array[String]): Unit = {

println()
println("Step 1 setupExampleData:")
println()

setupExampleData

println()
println("Step 2 createSparkSession:")
println()

//Creating spark session.
implicit val spark = SparkSession.builder()
.appName("Spark Ignite data sources example")
.master("local")
.config("spark.executor.instances", "2")
.getOrCreate()

// Adjust the logger to exclude the logs of no interest.
Logger.getRootLogger.setLevel(Level.ERROR)
Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)

println()
println("Step 3 ReadIgniteWithThinClient of Step1 Data:")
println()

sparkReadIgniteWithThinClient(spark)

println()
println("Step 4 sparkWriteIgniteWithThinClient of Step1 Data:")
println()

sparkWriteIgniteWithThinClient(spark)

println()
println("Step 5 writeJSonToIgniteUsingSpark Using Spark:")
println()

writeJSonToIgniteUsingSpark(spark)

println()
println("Step 6 readIgniteUsingSpark Using Spark:")
println()

readIgniteUsingSpark(spark)

println()
println("Step 7 readThinClientTableUsingSpark Using Spark:")
println()

readThinClientTableUsingSpark(spark)


spark.stop()


}

}

example-default.xml

<?xml version="1.0" encoding="UTF-8"?>

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<!--
Ignite configuration with all defaults and enabled p2p deployment and enabled events.
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
<bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<!-- Set to true to enable distributed class loading for examples, default is false. -->
<property name="peerClassLoadingEnabled" value="true"/>

<!-- Enable task execution events for examples. -->
<property name="includeEventTypes">
<list>
<!--Task execution events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>

<!--Cache events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
</list>
</property>

<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<!--
Ignite provides several options for automatic discovery that can be used
instead os static IP based discovery. For information on all options refer
to our documentation: http://apacheignite.readme.io/docs/cluster-config
-->
<!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
<!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>18.206.247.40:10800</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>

Thanks
Sri 

On Fri, Oct 18, 2019 at 9:23 AM sri hari kali charan Tummala <[hidden email]> wrote:
do you mean communication ports 47100-47200 as mentioned (https://www.gridgain.com/docs/8.7.6//installation-guide/manual-install-on-ec2) ? bare in mind I am running my spark job outside ignite ec2 box (MAC PC).

which option is right in my default.xml file?

Option 1:-
                        <property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>3.88.248.113:47100..47200</value>
</list>
</property>

Option 2:-
                        <property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>3.88.248.113:47500..47600</value>
</list>
</property>

Option 3:-
                        <property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>3.88.248.113</value>
</list>
</property>


Thanks

On Fri, Oct 18, 2019 at 9:18 AM sri hari kali charan Tummala <[hidden email]> wrote:
Hi Stephen , 

 do you mean 3.88.248.113:47500..47700 something like this? or just public ip 3.88.248.113 I tried all the possibilities none of them are getting connected.

Thanks
Sri 

On Fri, Oct 18, 2019 at 6:02 AM Stephen Darlington <[hidden email]> wrote:
You’re trying to connect a thick client (the Spark integration) to the thin client port (10800). Your example-default.xml file needs to have the same configuration as your server node(s).

Regards,
Stephen

On 17 Oct 2019, at 18:12, sri hari kali charan Tummala <[hidden email]> wrote:

Hi Community, 

I am trying to read and write into the Ignite cluster using apache-spark I am able to do that using JDBC thin client but not native method as mentioned in several spark + ignite examples.

Right now all the spark + ignite examples launch a local ignite cluster but I want my code connecting to already existing cluster (client).

Question:-
How to pass Ignite connection ip and port (10800)  10800 in example-default.xml ?

Error:-
TcpDiscoverySpi: Failed to connect to any address from IP finder (will retry to join topology every 2000 ms; change 'reconnectDelay' to configure the frequency of retries): [/3.88.248.113:10800]

Working (Spark + Ignite using JDBC):-
val df = spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
//.option("driver", "org.apache.ignite.IgniteJdbcDriver")
.option("dbtable", "Person").load()

df.printSchema()

df.createOrReplaceTempView("test")

spark.sql("select * from test where id=1").show(10)

spark.sql("select 4,'blah',124232").show(10)

import java.sql.DriverManager
val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")

import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")

spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113",
"Person",connectionProperties)

spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
.option("dbtable", "Person").load().show(10,false)

Not Working requires a CONFIG file which is example-default.xml:-
val igniteDF = spark.read
.format(FORMAT_IGNITE) //Data source type.
.option(OPTION_TABLE, "person") //Table to read.
.option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
.load()
.filter(col("id") >= 2) //Filter clause.
.filter(col("name") like "%J%") //Another filter clause.

Full Code:- (sparkDSLExample) function fails to connect ignite cluster which I already have
package com.ignite.examples.spark

import com.ignite.examples.model.Address
import org.apache.ignite.{Ignite, Ignition}
import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.client.{ClientCache, IgniteClient}
import org.apache.ignite.configuration.{CacheConfiguration, ClientConfiguration}
import java.lang.{Long => JLong, String => JString}

import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_TABLE}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.functions.col

object SparkClientConnectionTest {

private val CACHE_NAME = "SparkCache"

private val CONFIG = "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"

def setupExampleData = {

val cfg2 = new ClientConfiguration().setAddresses("3.88.248.113:10800")
val igniteClient:IgniteClient = Ignition.startClient(cfg2)

System.out.format(">>> Created cache [%s].\n", CACHE_NAME)

val cache:ClientCache[Integer, Address] = igniteClient.getOrCreateCache(CACHE_NAME)

cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS Person"))
.setSchema("PUBLIC")).getAll

cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH \"VALUE_TYPE=%s\"", classOf[Address].getName))
.setSchema("PUBLIC")).getAll

cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", "04074").setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", "520003").setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", "1234").setSchema("PUBLIC")).getAll

System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)

val data=cache.query(new SqlFieldsQuery("select * from Person").setSchema("PUBLIC")).getAll

println(data.toString)
}

def sparkDSLExample(implicit spark: SparkSession): Unit = {
println("Querying using Spark DSL.")
println


val igniteDF = spark.read
.format(FORMAT_IGNITE) //Data source type.
.option(OPTION_TABLE, "person") //Table to read.
.option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
.load()
.filter(col("id") >= 2) //Filter clause.
.filter(col("name") like "%J%") //Another filter clause.

println("Data frame schema:")

igniteDF.printSchema() //Printing query schema to console.

println("Data frame content:")

igniteDF.show() //Printing query results to console.
}


def main(args: Array[String]): Unit = {

setupExampleData

//Creating spark session.
implicit val spark = SparkSession.builder()
.appName("Spark Ignite data sources example")
.master("local")
.config("spark.executor.instances", "2")
.getOrCreate()

// Adjust the logger to exclude the logs of no interest.
Logger.getRootLogger.setLevel(Level.ERROR)
Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)

//sparkDSLExample


val df = spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
//.option("driver", "org.apache.ignite.IgniteJdbcDriver")
.option("dbtable", "Person").load()

df.printSchema()

df.createOrReplaceTempView("test")

spark.sql("select * from test where id=1").show(10)

spark.sql("select 4,'blah',124232").show(10)

import java.sql.DriverManager
val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")

import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")

spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113",
"Person",connectionProperties)

spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
.option("dbtable", "Person").load().show(10,false)

}

}

example-default.xml:-

<?xml version="1.0" encoding="UTF-8"?>

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<!--
Ignite configuration with all defaults and enabled p2p deployment and enabled events.
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
<bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<!-- Set to true to enable distributed class loading for examples, default is false. -->
<property name="peerClassLoadingEnabled" value="true"/>

<!-- Enable task execution events for examples. -->
<property name="includeEventTypes">
<list>
<!--Task execution events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>

<!--Cache events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
</list>
</property>

<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<!--
Ignite provides several options for automatic discovery that can be used
instead os static IP based discovery. For information on all options refer
to our documentation: http://apacheignite.readme.io/docs/cluster-config
-->
<!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
<!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>3.88.248.113:10800</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>



--
Thanks & Regards
Sri Tummala





--
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala





--
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala

dmagda dmagda
Reply | Threaded
Open this post in threaded view
|

Re: Apache Spark + Ignite Connection Issue

Sri,

That's wrong to assume that the Ignite thin client is the only way to access data stored in AWS or other cloud environments. The problems you are experiencing are caused by the fact that your application is running on the laptop while the cluster (server nodes) is within an AWS network.  

If you use a thick/regular client, then servers running on AWS need to have a way to *open* network connections to the application. This won't happen if the laptop or dev environment doesn't have public IPs reachable from AWS. Plus, additional network configuration might be needed.

Overall, thick clients work with AWS, but either an application needs to be deployed to AWS during development, or you need to start a local cluster in your network/premises first, accomplish development and move on with testing in AWS. A network configuration is also an option but a tricky one and might not be doable.

Thin clients are a solution as well if key-value and SQL APIs are all you need. 

-
Denis


On Sun, Oct 20, 2019 at 2:06 PM sri hari kali charan Tummala <[hidden email]> wrote:

On Fri, Oct 18, 2019 at 1:38 PM sri hari kali charan Tummala <[hidden email]> wrote:
Ok I created a 2 node ec2 instance ignite cluster is below is the right way to create cache? the code still using my laptop resources unable to connect ignite cluster.

Out put:-
[13:36:36] Ignite node started OK (id=8535f4d3)
[13:36:36] Topology snapshot [ver=30, locNode=8535f4d3, servers=1, clients=1, state=ACTIVE, CPUs=8, offheap=6.4GB, heap=14.0GB]
>>> cache acquired

package com.ignite.examples.igniteStartup

import java.util.Arrays
import java.util.List
import com.ignite.examples.model.Address
import org.apache.commons.logging.Log
import org.apache.commons.logging.LogFactory
import org.apache.ignite.Ignite
import org.apache.ignite.IgniteCache
import org.apache.ignite.Ignition
import org.apache.ignite.configuration.IgniteConfiguration
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder

//remove if not needed
import scala.collection.JavaConversions._

object IgniteStart2 {

private var cache: IgniteCache[String, Address] = _

def main(args: Array[String]): Unit = {

val spi: TcpDiscoverySpi = new TcpDiscoverySpi()
val ipFinder: TcpDiscoveryVmIpFinder = new TcpDiscoveryVmIpFinder()
val hostList: List[String] = Arrays.asList("127.0.0.1:47500..47509,18.206.247.40:47500..47509,52.207.217.31:47500..47509".split(","): _*)
ipFinder.setAddresses(hostList)
spi.setIpFinder(ipFinder)
val cfg: IgniteConfiguration = new IgniteConfiguration()
cfg.setDiscoverySpi(spi)
cfg.setClientMode(true)
System.out.println(">>> I am here")

cfg.setPeerClassLoadingEnabled(true)
val ignite: Ignite = Ignition.start(cfg)
cache = Ignition.ignite().cache("test")
System.out.println(">>> cache acquired")

System.exit(0)


}

}

On Fri, Oct 18, 2019 at 12:56 PM sri hari kali charan Tummala <[hidden email]> wrote:
Hi Stephen , 

I followed below steps and created one node ec2 instance with Ignite on it now I am connecting to the ignite cluster using spark in two ways 1) thin client 2) I don't know you have to tell me

my question is what is the value I have to pass for option 2 is it ec2 instance public IP with port 47500..47509  in my example-default.xml file ?


Thanks
Sri 

On Fri, Oct 18, 2019 at 12:47 PM Stephen Darlington <[hidden email]> wrote:
You’re still pointing your Spark node (thick client) at port 10800 (the thin client port). This is not going to work.

You can create a table using sqlline and read it using Spark, or vice versa. But you need a functioning cluster.

Check out the documentation on clustering concepts and configuration: https://www.gridgain.com/docs/latest/developers-guide/clustering/clustering 

On 18 Oct 2019, at 16:32, sri hari kali charan Tummala <[hidden email]> wrote:

Hi Stephen/All, 

got it working somewhat using below, but have an issue table and data which is created using thin client is failing to read using spark but table created by spark can be read using a thin client, that means table created in Ignite using spark are the only ones read using spark in Ignite?

example-default.xml
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>18.206.247.40:10800</value>
</list>
</property>

NotWorking Scala Function:-

def readThinClientTableUsingSpark(implicit spark: SparkSession) = {
val personDataFrame = spark.read
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, CONFIG)
.option(OPTION_TABLE, "person")
.load()

println()
println("Data frame thin connection content:person")
println()

//Printing content of data frame to console.
personDataFrame.show()
}

Exception in thread "main" class org.apache.ignite.IgniteException: Unknown table person
at org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46)
at org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.ignite.spark.impl.IgniteSQLRelation.schema(IgniteSQLRelation.scala:46)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:431)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164)
at com.ignite.examples.spark.SparkIgniteCleanCode$.readThinClientTableUsingSpark(SparkIgniteCleanCode.scala:154)
at com.ignite.examples.spark.SparkIgniteCleanCode$.main(SparkIgniteCleanCode.scala:216)
at com.ignite.examples.spark.SparkIgniteCleanCode.main(SparkIgniteCleanCode.scala)


Full Code:- (step 7 Fails)
package com.ignite.examples.spark

import com.ignite.examples.model.Address
import org.apache.ignite.{Ignite, Ignition}
import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.client.{ClientCache, IgniteClient}
import org.apache.ignite.configuration.{CacheConfiguration, ClientConfiguration}
import java.lang.{Long => JLong, String => JString}

import com.ignite.examples.spark.SparkClientConnectionTest.{CACHE_NAME, CONFIG}
import org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath
import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_CREATE_TABLE_PARAMETERS, OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, OPTION_TABLE}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{SaveMode, SparkSession}

object SparkIgniteCleanCode {

private val CACHE_NAME = "SparkCache"

private val CONFIG = "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"

def setupExampleData = {

val cfg2 = new ClientConfiguration().setAddresses("18.206.247.40:10800")
val igniteClient:IgniteClient = Ignition.startClient(cfg2)

System.out.format(">>> Created cache [%s].\n", CACHE_NAME)

val cache:ClientCache[Integer, Address] = igniteClient.getOrCreateCache(CACHE_NAME)

cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS Person"))
.setSchema("PUBLIC")).getAll

cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH \"VALUE_TYPE=%s\"", classOf[Address].getName))
.setSchema("PUBLIC")).getAll

cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", "04074").setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", "520003").setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", "1234").setSchema("PUBLIC")).getAll

System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)

val data=cache.query(new SqlFieldsQuery("select * from Person").setSchema("PUBLIC")).getAll

println(data.toString)
}

def sparkReadIgniteWithThinClient(implicit spark: SparkSession)={

val df = spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://18.206.247.40")
.option("fetchsize",100)
//.option("driver", "org.apache.ignite.IgniteJdbcDriver")
.option("dbtable", "Person").load()

df.printSchema()

df.createOrReplaceTempView("test")

spark.sql("select * from test where id=1").show(10)

spark.sql("select 4,'blah',124232").show(10)

}

def sparkWriteIgniteWithThinClient(implicit spark: SparkSession)={

import java.sql.DriverManager
val connection = DriverManager.getConnection("jdbc:ignite:thin://18.206.247.40")

import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("url", "jdbc:ignite:thin://18.206.247.40")

spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://18.206.247.40",
"Person",connectionProperties)

spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://18.206.247.40")
.option("fetchsize",100)
.option("dbtable", "Person").load().show(10,false)

}

def writeJSonToIgniteUsingSpark(implicit spark: SparkSession): Unit = {

val ignite = Ignition.start(CONFIG)

//Load content of json file to data frame.
val personsDataFrame = spark.read.json(
resolveIgnitePath("/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/resources/person.json").getAbsolutePath)

println()
println("Json file content:")
println()

//Printing content of json file to console.
personsDataFrame.show()

println()
println("Writing Data Frame to Ignite:")
println()

//Writing content of data frame to Ignite.
personsDataFrame.write
.format(FORMAT_IGNITE)
.mode(org.apache.spark.sql.SaveMode.Append)
.option(OPTION_CONFIG_FILE, CONFIG)
.option(OPTION_TABLE, "json_person")
.option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
.option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated")
.save()

println("Done!")

println()
println("Reading data from Ignite table:")
println()

val cache = ignite.cache[Any, Any](CACHE_NAME)

//Reading saved data from Ignite.
val data = cache.query(new SqlFieldsQuery("SELECT id, name, department FROM json_person")).getAll

println(data.toString)

//data.foreach { row ⇒ println(row.mkString("[", ", ", "]")) }
}

def readIgniteUsingSpark(implicit spark: SparkSession) = {
val json_person = spark.read
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, CONFIG)
.option(OPTION_TABLE, "json_person")
.load()

println()
println("Data frame content:json_person")
println()

//Printing content of data frame to console.
json_person.show()
}


def readThinClientTableUsingSpark(implicit spark: SparkSession) = {
val personDataFrame = spark.read
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, CONFIG)
.option(OPTION_TABLE, "person")
.load()

println()
println("Data frame thin connection content:person")
println()

//Printing content of data frame to console.
personDataFrame.show()
}


def main(args: Array[String]): Unit = {

println()
println("Step 1 setupExampleData:")
println()

setupExampleData

println()
println("Step 2 createSparkSession:")
println()

//Creating spark session.
implicit val spark = SparkSession.builder()
.appName("Spark Ignite data sources example")
.master("local")
.config("spark.executor.instances", "2")
.getOrCreate()

// Adjust the logger to exclude the logs of no interest.
Logger.getRootLogger.setLevel(Level.ERROR)
Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)

println()
println("Step 3 ReadIgniteWithThinClient of Step1 Data:")
println()

sparkReadIgniteWithThinClient(spark)

println()
println("Step 4 sparkWriteIgniteWithThinClient of Step1 Data:")
println()

sparkWriteIgniteWithThinClient(spark)

println()
println("Step 5 writeJSonToIgniteUsingSpark Using Spark:")
println()

writeJSonToIgniteUsingSpark(spark)

println()
println("Step 6 readIgniteUsingSpark Using Spark:")
println()

readIgniteUsingSpark(spark)

println()
println("Step 7 readThinClientTableUsingSpark Using Spark:")
println()

readThinClientTableUsingSpark(spark)


spark.stop()


}

}

example-default.xml

<?xml version="1.0" encoding="UTF-8"?>

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<!--
Ignite configuration with all defaults and enabled p2p deployment and enabled events.
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
<bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<!-- Set to true to enable distributed class loading for examples, default is false. -->
<property name="peerClassLoadingEnabled" value="true"/>

<!-- Enable task execution events for examples. -->
<property name="includeEventTypes">
<list>
<!--Task execution events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>

<!--Cache events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
</list>
</property>

<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<!--
Ignite provides several options for automatic discovery that can be used
instead os static IP based discovery. For information on all options refer
to our documentation: http://apacheignite.readme.io/docs/cluster-config
-->
<!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
<!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>18.206.247.40:10800</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>

Thanks
Sri 

On Fri, Oct 18, 2019 at 9:23 AM sri hari kali charan Tummala <[hidden email]> wrote:
do you mean communication ports 47100-47200 as mentioned (https://www.gridgain.com/docs/8.7.6//installation-guide/manual-install-on-ec2) ? bare in mind I am running my spark job outside ignite ec2 box (MAC PC).

which option is right in my default.xml file?

Option 1:-
                        <property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>3.88.248.113:47100..47200</value>
</list>
</property>

Option 2:-
                        <property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>3.88.248.113:47500..47600</value>
</list>
</property>

Option 3:-
                        <property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>3.88.248.113</value>
</list>
</property>


Thanks

On Fri, Oct 18, 2019 at 9:18 AM sri hari kali charan Tummala <[hidden email]> wrote:
Hi Stephen , 

 do you mean 3.88.248.113:47500..47700 something like this? or just public ip 3.88.248.113 I tried all the possibilities none of them are getting connected.

Thanks
Sri 

On Fri, Oct 18, 2019 at 6:02 AM Stephen Darlington <[hidden email]> wrote:
You’re trying to connect a thick client (the Spark integration) to the thin client port (10800). Your example-default.xml file needs to have the same configuration as your server node(s).

Regards,
Stephen

On 17 Oct 2019, at 18:12, sri hari kali charan Tummala <[hidden email]> wrote:

Hi Community, 

I am trying to read and write into the Ignite cluster using apache-spark I am able to do that using JDBC thin client but not native method as mentioned in several spark + ignite examples.

Right now all the spark + ignite examples launch a local ignite cluster but I want my code connecting to already existing cluster (client).

Question:-
How to pass Ignite connection ip and port (10800)  10800 in example-default.xml ?

Error:-
TcpDiscoverySpi: Failed to connect to any address from IP finder (will retry to join topology every 2000 ms; change 'reconnectDelay' to configure the frequency of retries): [/3.88.248.113:10800]

Working (Spark + Ignite using JDBC):-
val df = spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
//.option("driver", "org.apache.ignite.IgniteJdbcDriver")
.option("dbtable", "Person").load()

df.printSchema()

df.createOrReplaceTempView("test")

spark.sql("select * from test where id=1").show(10)

spark.sql("select 4,'blah',124232").show(10)

import java.sql.DriverManager
val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")

import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")

spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113",
"Person",connectionProperties)

spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
.option("dbtable", "Person").load().show(10,false)

Not Working requires a CONFIG file which is example-default.xml:-
val igniteDF = spark.read
.format(FORMAT_IGNITE) //Data source type.
.option(OPTION_TABLE, "person") //Table to read.
.option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
.load()
.filter(col("id") >= 2) //Filter clause.
.filter(col("name") like "%J%") //Another filter clause.

Full Code:- (sparkDSLExample) function fails to connect ignite cluster which I already have
package com.ignite.examples.spark

import com.ignite.examples.model.Address
import org.apache.ignite.{Ignite, Ignition}
import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.client.{ClientCache, IgniteClient}
import org.apache.ignite.configuration.{CacheConfiguration, ClientConfiguration}
import java.lang.{Long => JLong, String => JString}

import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_TABLE}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.functions.col

object SparkClientConnectionTest {

private val CACHE_NAME = "SparkCache"

private val CONFIG = "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"

def setupExampleData = {

val cfg2 = new ClientConfiguration().setAddresses("3.88.248.113:10800")
val igniteClient:IgniteClient = Ignition.startClient(cfg2)

System.out.format(">>> Created cache [%s].\n", CACHE_NAME)

val cache:ClientCache[Integer, Address] = igniteClient.getOrCreateCache(CACHE_NAME)

cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS Person"))
.setSchema("PUBLIC")).getAll

cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH \"VALUE_TYPE=%s\"", classOf[Address].getName))
.setSchema("PUBLIC")).getAll

cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", "04074").setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", "520003").setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", "1234").setSchema("PUBLIC")).getAll

System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)

val data=cache.query(new SqlFieldsQuery("select * from Person").setSchema("PUBLIC")).getAll

println(data.toString)
}

def sparkDSLExample(implicit spark: SparkSession): Unit = {
println("Querying using Spark DSL.")
println


val igniteDF = spark.read
.format(FORMAT_IGNITE) //Data source type.
.option(OPTION_TABLE, "person") //Table to read.
.option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
.load()
.filter(col("id") >= 2) //Filter clause.
.filter(col("name") like "%J%") //Another filter clause.

println("Data frame schema:")

igniteDF.printSchema() //Printing query schema to console.

println("Data frame content:")

igniteDF.show() //Printing query results to console.
}


def main(args: Array[String]): Unit = {

setupExampleData

//Creating spark session.
implicit val spark = SparkSession.builder()
.appName("Spark Ignite data sources example")
.master("local")
.config("spark.executor.instances", "2")
.getOrCreate()

// Adjust the logger to exclude the logs of no interest.
Logger.getRootLogger.setLevel(Level.ERROR)
Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)

//sparkDSLExample


val df = spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
//.option("driver", "org.apache.ignite.IgniteJdbcDriver")
.option("dbtable", "Person").load()

df.printSchema()

df.createOrReplaceTempView("test")

spark.sql("select * from test where id=1").show(10)

spark.sql("select 4,'blah',124232").show(10)

import java.sql.DriverManager
val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")

import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")

spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113",
"Person",connectionProperties)

spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
.option("dbtable", "Person").load().show(10,false)

}

}

example-default.xml:-

<?xml version="1.0" encoding="UTF-8"?>

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<!--
Ignite configuration with all defaults and enabled p2p deployment and enabled events.
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
<bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<!-- Set to true to enable distributed class loading for examples, default is false. -->
<property name="peerClassLoadingEnabled" value="true"/>

<!-- Enable task execution events for examples. -->
<property name="includeEventTypes">
<list>
<!--Task execution events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>

<!--Cache events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
</list>
</property>

<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<!--
Ignite provides several options for automatic discovery that can be used
instead os static IP based discovery. For information on all options refer
to our documentation: http://apacheignite.readme.io/docs/cluster-config
-->
<!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
<!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>3.88.248.113:10800</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>



--
Thanks & Regards
Sri Tummala





--
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala





--
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala

sri hari kali charan Tummala sri hari kali charan Tummala
Reply | Threaded
Open this post in threaded view
|

Re: Apache Spark + Ignite Connection Issue

I got it so I have to build a jar move to aws ignite cluster node and run it on the node.

Thanks 
Sri

On Thursday, October 24, 2019, Denis Magda <[hidden email]> wrote:
Sri,

That's wrong to assume that the Ignite thin client is the only way to access data stored in AWS or other cloud environments. The problems you are experiencing are caused by the fact that your application is running on the laptop while the cluster (server nodes) is within an AWS network.  

If you use a thick/regular client, then servers running on AWS need to have a way to *open* network connections to the application. This won't happen if the laptop or dev environment doesn't have public IPs reachable from AWS. Plus, additional network configuration might be needed.

Overall, thick clients work with AWS, but either an application needs to be deployed to AWS during development, or you need to start a local cluster in your network/premises first, accomplish development and move on with testing in AWS. A network configuration is also an option but a tricky one and might not be doable.

Thin clients are a solution as well if key-value and SQL APIs are all you need. 

-
Denis


On Sun, Oct 20, 2019 at 2:06 PM sri hari kali charan Tummala <[hidden email]> wrote:

On Fri, Oct 18, 2019 at 1:38 PM sri hari kali charan Tummala <[hidden email]> wrote:
Ok I created a 2 node ec2 instance ignite cluster is below is the right way to create cache? the code still using my laptop resources unable to connect ignite cluster.

Out put:-
[13:36:36] Ignite node started OK (id=8535f4d3)
[13:36:36] Topology snapshot [ver=30, locNode=8535f4d3, servers=1, clients=1, state=ACTIVE, CPUs=8, offheap=6.4GB, heap=14.0GB]
>>> cache acquired

package com.ignite.examples.igniteStartup

import java.util.Arrays
import java.util.List
import com.ignite.examples.model.Address
import org.apache.commons.logging.Log
import org.apache.commons.logging.LogFactory
import org.apache.ignite.Ignite
import org.apache.ignite.IgniteCache
import org.apache.ignite.Ignition
import org.apache.ignite.configuration.IgniteConfiguration
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder

//remove if not needed
import scala.collection.JavaConversions._

object IgniteStart2 {

private var cache: IgniteCache[String, Address] = _

def main(args: Array[String]): Unit = {

val spi: TcpDiscoverySpi = new TcpDiscoverySpi()
val ipFinder: TcpDiscoveryVmIpFinder = new TcpDiscoveryVmIpFinder()
val hostList: List[String] = Arrays.asList("127.0.0.1:47500..47509,18.206.247.40:47500..47509,52.207.217.31:47500..47509".split(","): _*)
ipFinder.setAddresses(hostList)
spi.setIpFinder(ipFinder)
val cfg: IgniteConfiguration = new IgniteConfiguration()
cfg.setDiscoverySpi(spi)
cfg.setClientMode(true)
System.out.println(">>> I am here")

cfg.setPeerClassLoadingEnabled(true)
val ignite: Ignite = Ignition.start(cfg)
cache = Ignition.ignite().cache("test")
System.out.println(">>> cache acquired")

System.exit(0)


}

}

On Fri, Oct 18, 2019 at 12:56 PM sri hari kali charan Tummala <[hidden email]> wrote:
Hi Stephen , 

I followed below steps and created one node ec2 instance with Ignite on it now I am connecting to the ignite cluster using spark in two ways 1) thin client 2) I don't know you have to tell me

my question is what is the value I have to pass for option 2 is it ec2 instance public IP with port 47500..47509  in my example-default.xml file ?


Thanks
Sri 

On Fri, Oct 18, 2019 at 12:47 PM Stephen Darlington <[hidden email]> wrote:
You’re still pointing your Spark node (thick client) at port 10800 (the thin client port). This is not going to work.

You can create a table using sqlline and read it using Spark, or vice versa. But you need a functioning cluster.

Check out the documentation on clustering concepts and configuration: https://www.gridgain.com/docs/latest/developers-guide/clustering/clustering 

On 18 Oct 2019, at 16:32, sri hari kali charan Tummala <[hidden email]> wrote:

Hi Stephen/All, 

got it working somewhat using below, but have an issue table and data which is created using thin client is failing to read using spark but table created by spark can be read using a thin client, that means table created in Ignite using spark are the only ones read using spark in Ignite?

example-default.xml
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>18.206.247.40:10800</value>
</list>
</property>

NotWorking Scala Function:-

def readThinClientTableUsingSpark(implicit spark: SparkSession) = {
val personDataFrame = spark.read
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, CONFIG)
.option(OPTION_TABLE, "person")
.load()

println()
println("Data frame thin connection content:person")
println()

//Printing content of data frame to console.
personDataFrame.show()
}

Exception in thread "main" class org.apache.ignite.IgniteException: Unknown table person
at org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46)
at org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.ignite.spark.impl.IgniteSQLRelation.schema(IgniteSQLRelation.scala:46)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:431)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164)
at com.ignite.examples.spark.SparkIgniteCleanCode$.readThinClientTableUsingSpark(SparkIgniteCleanCode.scala:154)
at com.ignite.examples.spark.SparkIgniteCleanCode$.main(SparkIgniteCleanCode.scala:216)
at com.ignite.examples.spark.SparkIgniteCleanCode.main(SparkIgniteCleanCode.scala)


Full Code:- (step 7 Fails)
package com.ignite.examples.spark

import com.ignite.examples.model.Address
import org.apache.ignite.{Ignite, Ignition}
import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.client.{ClientCache, IgniteClient}
import org.apache.ignite.configuration.{CacheConfiguration, ClientConfiguration}
import java.lang.{Long => JLong, String => JString}

import com.ignite.examples.spark.SparkClientConnectionTest.{CACHE_NAME, CONFIG}
import org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath
import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_CREATE_TABLE_PARAMETERS, OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, OPTION_TABLE}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{SaveMode, SparkSession}

object SparkIgniteCleanCode {

private val CACHE_NAME = "SparkCache"

private val CONFIG = "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"

def setupExampleData = {

val cfg2 = new ClientConfiguration().setAddresses("18.206.247.40:10800")
val igniteClient:IgniteClient = Ignition.startClient(cfg2)

System.out.format(">>> Created cache [%s].\n", CACHE_NAME)

val cache:ClientCache[Integer, Address] = igniteClient.getOrCreateCache(CACHE_NAME)

cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS Person"))
.setSchema("PUBLIC")).getAll

cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH \"VALUE_TYPE=%s\"", classOf[Address].getName))
.setSchema("PUBLIC")).getAll

cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", "04074").setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", "520003").setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", "1234").setSchema("PUBLIC")).getAll

System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)

val data=cache.query(new SqlFieldsQuery("select * from Person").setSchema("PUBLIC")).getAll

println(data.toString)
}

def sparkReadIgniteWithThinClient(implicit spark: SparkSession)={

val df = spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://18.206.247.40")
.option("fetchsize",100)
//.option("driver", "org.apache.ignite.IgniteJdbcDriver")
.option("dbtable", "Person").load()

df.printSchema()

df.createOrReplaceTempView("test")

spark.sql("select * from test where id=1").show(10)

spark.sql("select 4,'blah',124232").show(10)

}

def sparkWriteIgniteWithThinClient(implicit spark: SparkSession)={

import java.sql.DriverManager
val connection = DriverManager.getConnection("jdbc:ignite:thin://18.206.247.40")

import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("url", "jdbc:ignite:thin://18.206.247.40")

spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://18.206.247.40",
"Person",connectionProperties)

spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://18.206.247.40")
.option("fetchsize",100)
.option("dbtable", "Person").load().show(10,false)

}

def writeJSonToIgniteUsingSpark(implicit spark: SparkSession): Unit = {

val ignite = Ignition.start(CONFIG)

//Load content of json file to data frame.
val personsDataFrame = spark.read.json(
resolveIgnitePath("/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/resources/person.json").getAbsolutePath)

println()
println("Json file content:")
println()

//Printing content of json file to console.
personsDataFrame.show()

println()
println("Writing Data Frame to Ignite:")
println()

//Writing content of data frame to Ignite.
personsDataFrame.write
.format(FORMAT_IGNITE)
.mode(org.apache.spark.sql.SaveMode.Append)
.option(OPTION_CONFIG_FILE, CONFIG)
.option(OPTION_TABLE, "json_person")
.option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
.option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated")
.save()

println("Done!")

println()
println("Reading data from Ignite table:")
println()

val cache = ignite.cache[Any, Any](CACHE_NAME)

//Reading saved data from Ignite.
val data = cache.query(new SqlFieldsQuery("SELECT id, name, department FROM json_person")).getAll

println(data.toString)

//data.foreach { row ⇒ println(row.mkString("[", ", ", "]")) }
}

def readIgniteUsingSpark(implicit spark: SparkSession) = {
val json_person = spark.read
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, CONFIG)
.option(OPTION_TABLE, "json_person")
.load()

println()
println("Data frame content:json_person")
println()

//Printing content of data frame to console.
json_person.show()
}


def readThinClientTableUsingSpark(implicit spark: SparkSession) = {
val personDataFrame = spark.read
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, CONFIG)
.option(OPTION_TABLE, "person")
.load()

println()
println("Data frame thin connection content:person")
println()

//Printing content of data frame to console.
personDataFrame.show()
}


def main(args: Array[String]): Unit = {

println()
println("Step 1 setupExampleData:")
println()

setupExampleData

println()
println("Step 2 createSparkSession:")
println()

//Creating spark session.
implicit val spark = SparkSession.builder()
.appName("Spark Ignite data sources example")
.master("local")
.config("spark.executor.instances", "2")
.getOrCreate()

// Adjust the logger to exclude the logs of no interest.
Logger.getRootLogger.setLevel(Level.ERROR)
Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)

println()
println("Step 3 ReadIgniteWithThinClient of Step1 Data:")
println()

sparkReadIgniteWithThinClient(spark)

println()
println("Step 4 sparkWriteIgniteWithThinClient of Step1 Data:")
println()

sparkWriteIgniteWithThinClient(spark)

println()
println("Step 5 writeJSonToIgniteUsingSpark Using Spark:")
println()

writeJSonToIgniteUsingSpark(spark)

println()
println("Step 6 readIgniteUsingSpark Using Spark:")
println()

readIgniteUsingSpark(spark)

println()
println("Step 7 readThinClientTableUsingSpark Using Spark:")
println()

readThinClientTableUsingSpark(spark)


spark.stop()


}

}

example-default.xml

<?xml version="1.0" encoding="UTF-8"?>

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<!--
Ignite configuration with all defaults and enabled p2p deployment and enabled events.
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
<bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<!-- Set to true to enable distributed class loading for examples, default is false. -->
<property name="peerClassLoadingEnabled" value="true"/>

<!-- Enable task execution events for examples. -->
<property name="includeEventTypes">
<list>
<!--Task execution events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>

<!--Cache events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
</list>
</property>

<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<!--
Ignite provides several options for automatic discovery that can be used
instead os static IP based discovery. For information on all options refer
to our documentation: http://apacheignite.readme.io/docs/cluster-config
-->
<!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
<!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>18.206.247.40:10800</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>

Thanks
Sri 

On Fri, Oct 18, 2019 at 9:23 AM sri hari kali charan Tummala <[hidden email]> wrote:
do you mean communication ports 47100-47200 as mentioned (https://www.gridgain.com/docs/8.7.6//installation-guide/manual-install-on-ec2) ? bare in mind I am running my spark job outside ignite ec2 box (MAC PC).

which option is right in my default.xml file?

Option 1:-
                        <property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>3.88.248.113:47100..47200</value>
</list>
</property>

Option 2:-
                        <property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>3.88.248.113:47500..47600</value>
</list>
</property>

Option 3:-
                        <property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>3.88.248.113</value>
</list>
</property>


Thanks

On Fri, Oct 18, 2019 at 9:18 AM sri hari kali charan Tummala <[hidden email]> wrote:
Hi Stephen , 

 do you mean 3.88.248.113:47500..47700 something like this? or just public ip 3.88.248.113 I tried all the possibilities none of them are getting connected.

Thanks
Sri 

On Fri, Oct 18, 2019 at 6:02 AM Stephen Darlington <[hidden email]> wrote:
You’re trying to connect a thick client (the Spark integration) to the thin client port (10800). Your example-default.xml file needs to have the same configuration as your server node(s).

Regards,
Stephen

On 17 Oct 2019, at 18:12, sri hari kali charan Tummala <[hidden email]> wrote:

Hi Community, 

I am trying to read and write into the Ignite cluster using apache-spark I am able to do that using JDBC thin client but not native method as mentioned in several spark + ignite examples.

Right now all the spark + ignite examples launch a local ignite cluster but I want my code connecting to already existing cluster (client).

Question:-
How to pass Ignite connection ip and port (10800)  10800 in example-default.xml ?

Error:-
TcpDiscoverySpi: Failed to connect to any address from IP finder (will retry to join topology every 2000 ms; change 'reconnectDelay' to configure the frequency of retries): [/3.88.248.113:10800]

Working (Spark + Ignite using JDBC):-
val df = spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
//.option("driver", "org.apache.ignite.IgniteJdbcDriver")
.option("dbtable", "Person").load()

df.printSchema()

df.createOrReplaceTempView("test")

spark.sql("select * from test where id=1").show(10)

spark.sql("select 4,'blah',124232").show(10)

import java.sql.DriverManager
val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")

import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")

spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113",
"Person",connectionProperties)

spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
.option("dbtable", "Person").load().show(10,false)

Not Working requires a CONFIG file which is example-default.xml:-
val igniteDF = spark.read
.format(FORMAT_IGNITE) //Data source type.
.option(OPTION_TABLE, "person") //Table to read.
.option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
.load()
.filter(col("id") >= 2) //Filter clause.
.filter(col("name") like "%J%") //Another filter clause.

Full Code:- (sparkDSLExample) function fails to connect ignite cluster which I already have
package com.ignite.examples.spark

import com.ignite.examples.model.Address
import org.apache.ignite.{Ignite, Ignition}
import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.client.{ClientCache, IgniteClient}
import org.apache.ignite.configuration.{CacheConfiguration, ClientConfiguration}
import java.lang.{Long => JLong, String => JString}

import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_TABLE}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.functions.col

object SparkClientConnectionTest {

private val CACHE_NAME = "SparkCache"

private val CONFIG = "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"

def setupExampleData = {

val cfg2 = new ClientConfiguration().setAddresses("3.88.248.113:10800")
val igniteClient:IgniteClient = Ignition.startClient(cfg2)

System.out.format(">>> Created cache [%s].\n", CACHE_NAME)

val cache:ClientCache[Integer, Address] = igniteClient.getOrCreateCache(CACHE_NAME)

cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS Person"))
.setSchema("PUBLIC")).getAll

cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH \"VALUE_TYPE=%s\"", classOf[Address].getName))
.setSchema("PUBLIC")).getAll

cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", "04074").setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", "520003").setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", "1234").setSchema("PUBLIC")).getAll

System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)

val data=cache.query(new SqlFieldsQuery("select * from Person").setSchema("PUBLIC")).getAll

println(data.toString)
}

def sparkDSLExample(implicit spark: SparkSession): Unit = {
println("Querying using Spark DSL.")
println


val igniteDF = spark.read
.format(FORMAT_IGNITE) //Data source type.
.option(OPTION_TABLE, "person") //Table to read.
.option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
.load()
.filter(col("id") >= 2) //Filter clause.
.filter(col("name") like "%J%") //Another filter clause.

println("Data frame schema:")

igniteDF.printSchema() //Printing query schema to console.

println("Data frame content:")

igniteDF.show() //Printing query results to console.
}


def main(args: Array[String]): Unit = {

setupExampleData

//Creating spark session.
implicit val spark = SparkSession.builder()
.appName("Spark Ignite data sources example")
.master("local")
.config("spark.executor.instances", "2")
.getOrCreate()

// Adjust the logger to exclude the logs of no interest.
Logger.getRootLogger.setLevel(Level.ERROR)
Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)

//sparkDSLExample


val df = spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
//.option("driver", "org.apache.ignite.IgniteJdbcDriver")
.option("dbtable", "Person").load()

df.printSchema()

df.createOrReplaceTempView("test")

spark.sql("select * from test where id=1").show(10)

spark.sql("select 4,'blah',124232").show(10)

import java.sql.DriverManager
val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")

import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")

spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113",
"Person",connectionProperties)

spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://3.88.248.113")
.option("fetchsize",100)
.option("dbtable", "Person").load().show(10,false)

}

}

example-default.xml:-

<?xml version="1.0" encoding="UTF-8"?>

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<!--
Ignite configuration with all defaults and enabled p2p deployment and enabled events.
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
<bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<!-- Set to true to enable distributed class loading for examples, default is false. -->
<property name="peerClassLoadingEnabled" value="true"/>

<!-- Enable task execution events for examples. -->
<property name="includeEventTypes">
<list>
<!--Task execution events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>

<!--Cache events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
</list>
</property>

<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<!--
Ignite provides several options for automatic discovery that can be used
instead os static IP based discovery. For information on all options refer
to our documentation: http://apacheignite.readme.io/docs/cluster-config
-->
<!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
<!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>3.88.248.113:10800</value>


--
Thanks & Regards
Sri Tummala