|
|
Hi Community,
I am trying to read and write into the Ignite cluster using apache-spark I am able to do that using JDBC thin client but not native method as mentioned in several spark + ignite examples.
Right now all the spark + ignite examples launch a local ignite cluster but I want my code connecting to already existing cluster (client).
Question:- How to pass Ignite connection ip and port (10800) 10800 in example-default.xml ?
Error:- TcpDiscoverySpi: Failed to connect to any address from IP finder (will retry to join topology every 2000 ms; change 'reconnectDelay' to configure the frequency of retries): [/3.88.248.113:10800]
Working (Spark + Ignite using JDBC):- val df = spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) //.option("driver", "org.apache.ignite.IgniteJdbcDriver") .option("dbtable", "Person").load()
df.printSchema()
df.createOrReplaceTempView("test")
spark.sql("select * from test where id=1").show(10)
spark.sql("select 4,'blah',124232").show(10)
import java.sql.DriverManager val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")
import java.util.Properties val connectionProperties = new Properties()
connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")
spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113", "Person",connectionProperties)
spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) .option("dbtable", "Person").load().show(10,false)
Not Working requires a CONFIG file which is example-default.xml:- val igniteDF = spark.read .format(FORMAT_IGNITE) //Data source type. .option(OPTION_TABLE, "person") //Table to read. .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config. .load() .filter(col("id") >= 2) //Filter clause. .filter(col("name") like "%J%") //Another filter clause.
Full Code:- (sparkDSLExample) function fails to connect ignite cluster which I already have package com.ignite.examples.spark
import com.ignite.examples.model.Address import org.apache.ignite.{Ignite, Ignition} import org.apache.ignite.cache.query.SqlFieldsQuery import org.apache.ignite.client.{ClientCache, IgniteClient} import org.apache.ignite.configuration.{CacheConfiguration, ClientConfiguration} import java.lang.{Long => JLong, String => JString}
import org.apache.ignite.cache.query.SqlFieldsQuery import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_TABLE} import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.functions.col
object SparkClientConnectionTest {
private val CACHE_NAME = "SparkCache"
private val CONFIG = "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"
def setupExampleData = {
val cfg2 = new ClientConfiguration().setAddresses("3.88.248.113:10800") val igniteClient:IgniteClient = Ignition.startClient(cfg2)
System.out.format(">>> Created cache [%s].\n", CACHE_NAME)
val cache:ClientCache[Integer, Address] = igniteClient.getOrCreateCache(CACHE_NAME)
cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS Person")) .setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH \"VALUE_TYPE=%s\"", classOf[Address].getName)) .setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", "04074").setSchema("PUBLIC")).getAll cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", "520003").setSchema("PUBLIC")).getAll cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", "1234").setSchema("PUBLIC")).getAll
System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)
val data=cache.query(new SqlFieldsQuery("select * from Person").setSchema("PUBLIC")).getAll
println(data.toString) }
def sparkDSLExample(implicit spark: SparkSession): Unit = { println("Querying using Spark DSL.") println
val igniteDF = spark.read .format(FORMAT_IGNITE) //Data source type. .option(OPTION_TABLE, "person") //Table to read. .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config. .load() .filter(col("id") >= 2) //Filter clause. .filter(col("name") like "%J%") //Another filter clause.
println("Data frame schema:")
igniteDF.printSchema() //Printing query schema to console.
println("Data frame content:")
igniteDF.show() //Printing query results to console. }
def main(args: Array[String]): Unit = {
setupExampleData
//Creating spark session. implicit val spark = SparkSession.builder() .appName("Spark Ignite data sources example") .master("local") .config("spark.executor.instances", "2") .getOrCreate()
// Adjust the logger to exclude the logs of no interest. Logger.getRootLogger.setLevel(Level.ERROR) Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
//sparkDSLExample
val df = spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) //.option("driver", "org.apache.ignite.IgniteJdbcDriver") .option("dbtable", "Person").load()
df.printSchema()
df.createOrReplaceTempView("test")
spark.sql("select * from test where id=1").show(10)
spark.sql("select 4,'blah',124232").show(10)
import java.sql.DriverManager val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")
import java.util.Properties val connectionProperties = new Properties()
connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")
spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113", "Person",connectionProperties)
spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) .option("dbtable", "Person").load().show(10,false)
}
}
example-default.xml:-
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -->
<!-- Ignite configuration with all defaults and enabled p2p deployment and enabled events. --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"> <bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> <!-- Set to true to enable distributed class loading for examples, default is false. --> <property name="peerClassLoadingEnabled" value="true"/>
<!-- Enable task execution events for examples. --> <property name="includeEventTypes"> <list> <!--Task execution events--> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
<!--Cache events--> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/> </list> </property>
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. --> <property name="discoverySpi"> <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> <property name="ipFinder"> <!-- Ignite provides several options for automatic discovery that can be used instead os static IP based discovery. For information on all options refer to our documentation: http://apacheignite.readme.io/docs/cluster-config --> <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. --> <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>3.88.248.113:10800</value> </list> </property> </bean> </property> </bean> </property> </bean> </beans>
-- Thanks & Regards Sri Tummala
|
|
You’re trying to connect a thick client (the Spark integration) to the thin client port (10800). Your example-default.xml file needs to have the same configuration as your server node(s).
Regards, Stephen On 17 Oct 2019, at 18:12, sri hari kali charan Tummala < [hidden email]> wrote:
Hi Community,
I am trying to read and write into the Ignite cluster using apache-spark I am able to do that using JDBC thin client but not native method as mentioned in several spark + ignite examples.
Right now all the spark + ignite examples launch a local ignite cluster but I want my code connecting to already existing cluster (client).
Question:- How to pass Ignite connection ip and port (10800) 10800 in example-default.xml ?
Error:- TcpDiscoverySpi: Failed to connect to any address from IP finder (will retry to join topology every 2000 ms; change 'reconnectDelay' to configure the frequency of retries): [/3.88.248.113:10800]
Working (Spark + Ignite using JDBC):- val df = spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) //.option("driver", "org.apache.ignite.IgniteJdbcDriver") .option("dbtable", "Person").load()
df.printSchema()
df.createOrReplaceTempView("test")
spark.sql("select * from test where id=1").show(10)
spark.sql("select 4,'blah',124232").show(10)
import java.sql.DriverManager val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")
import java.util.Properties val connectionProperties = new Properties()
connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")
spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113", "Person",connectionProperties)
spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) .option("dbtable", "Person").load().show(10,false)
Not Working requires a CONFIG file which is example-default.xml:- val igniteDF = spark.read .format(FORMAT_IGNITE) //Data source type. .option(OPTION_TABLE, "person") //Table to read. .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config. .load() .filter(col("id") >= 2) //Filter clause. .filter(col("name") like "%J%") //Another filter clause.
Full Code:- (sparkDSLExample) function fails to connect ignite cluster which I already have package com.ignite.examples.spark
import com.ignite.examples.model.Address import org.apache.ignite.{Ignite, Ignition} import org.apache.ignite.cache.query.SqlFieldsQuery import org.apache.ignite.client.{ClientCache, IgniteClient} import org.apache.ignite.configuration.{CacheConfiguration, ClientConfiguration} import java.lang.{Long => JLong, String => JString}
import org.apache.ignite.cache.query.SqlFieldsQuery import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_TABLE} import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.functions.col
object SparkClientConnectionTest {
private val CACHE_NAME = "SparkCache"
private val CONFIG = "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"
def setupExampleData = {
val cfg2 = new ClientConfiguration().setAddresses("3.88.248.113:10800") val igniteClient:IgniteClient = Ignition.startClient(cfg2)
System.out.format(">>> Created cache [%s].\n", CACHE_NAME)
val cache:ClientCache[Integer, Address] = igniteClient.getOrCreateCache(CACHE_NAME)
cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS Person")) .setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH \"VALUE_TYPE=%s\"", classOf[Address].getName)) .setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", "04074").setSchema("PUBLIC")).getAll cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", "520003").setSchema("PUBLIC")).getAll cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", "1234").setSchema("PUBLIC")).getAll
System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)
val data=cache.query(new SqlFieldsQuery("select * from Person").setSchema("PUBLIC")).getAll
println(data.toString) }
def sparkDSLExample(implicit spark: SparkSession): Unit = { println("Querying using Spark DSL.") println
val igniteDF = spark.read .format(FORMAT_IGNITE) //Data source type. .option(OPTION_TABLE, "person") //Table to read. .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config. .load() .filter(col("id") >= 2) //Filter clause. .filter(col("name") like "%J%") //Another filter clause.
println("Data frame schema:")
igniteDF.printSchema() //Printing query schema to console.
println("Data frame content:")
igniteDF.show() //Printing query results to console. }
def main(args: Array[String]): Unit = {
setupExampleData
//Creating spark session. implicit val spark = SparkSession.builder() .appName("Spark Ignite data sources example") .master("local") .config("spark.executor.instances", "2") .getOrCreate()
// Adjust the logger to exclude the logs of no interest. Logger.getRootLogger.setLevel(Level.ERROR) Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
//sparkDSLExample
val df = spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) //.option("driver", "org.apache.ignite.IgniteJdbcDriver") .option("dbtable", "Person").load()
df.printSchema()
df.createOrReplaceTempView("test")
spark.sql("select * from test where id=1").show(10)
spark.sql("select 4,'blah',124232").show(10)
import java.sql.DriverManager val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")
import java.util.Properties val connectionProperties = new Properties()
connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")
spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113", "Person",connectionProperties)
spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) .option("dbtable", "Person").load().show(10,false)
}
}
example-default.xml:-
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -->
<!-- Ignite configuration with all defaults and enabled p2p deployment and enabled events. --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"> <bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> <!-- Set to true to enable distributed class loading for examples, default is false. --> <property name="peerClassLoadingEnabled" value="true"/>
<!-- Enable task execution events for examples. --> <property name="includeEventTypes"> <list> <!--Task execution events--> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
<!--Cache events--> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/> </list> </property>
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. --> <property name="discoverySpi"> <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> <property name="ipFinder"> <!-- Ignite provides several options for automatic discovery that can be used instead os static IP based discovery. For information on all options refer to our documentation: http://apacheignite.readme.io/docs/cluster-config --> <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. --> <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>3.88.248.113:10800</value> </list> </property> </bean> </property> </bean> </property> </bean> </beans>
-- Thanks & Regards Sri Tummala
|
|
Hi Stephen ,
do you mean 3 .88.248.113:47500..47700 something like this? or just public ip 3 .88.248.113 I tried all the possibilities none of them are getting connected.
Thanks Sri On Fri, Oct 18, 2019 at 6:02 AM Stephen Darlington < [hidden email]> wrote: You’re trying to connect a thick client (the Spark integration) to the thin client port (10800). Your example-default.xml file needs to have the same configuration as your server node(s).
Regards, Stephen On 17 Oct 2019, at 18:12, sri hari kali charan Tummala < [hidden email]> wrote:
Hi Community,
I am trying to read and write into the Ignite cluster using apache-spark I am able to do that using JDBC thin client but not native method as mentioned in several spark + ignite examples.
Right now all the spark + ignite examples launch a local ignite cluster but I want my code connecting to already existing cluster (client).
Question:- How to pass Ignite connection ip and port (10800) 10800 in example-default.xml ?
Error:- TcpDiscoverySpi: Failed to connect to any address from IP finder (will retry to join topology every 2000 ms; change 'reconnectDelay' to configure the frequency of retries): [/3.88.248.113:10800]
Working (Spark + Ignite using JDBC):- val df = spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) //.option("driver", "org.apache.ignite.IgniteJdbcDriver") .option("dbtable", "Person").load()
df.printSchema()
df.createOrReplaceTempView("test")
spark.sql("select * from test where id=1").show(10)
spark.sql("select 4,'blah',124232").show(10)
import java.sql.DriverManager val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")
import java.util.Properties val connectionProperties = new Properties()
connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")
spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113", "Person",connectionProperties)
spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) .option("dbtable", "Person").load().show(10,false)
Not Working requires a CONFIG file which is example-default.xml:- val igniteDF = spark.read .format(FORMAT_IGNITE) //Data source type. .option(OPTION_TABLE, "person") //Table to read. .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config. .load() .filter(col("id") >= 2) //Filter clause. .filter(col("name") like "%J%") //Another filter clause.
Full Code:- (sparkDSLExample) function fails to connect ignite cluster which I already have package com.ignite.examples.spark
import com.ignite.examples.model.Address import org.apache.ignite.{Ignite, Ignition} import org.apache.ignite.cache.query.SqlFieldsQuery import org.apache.ignite.client.{ClientCache, IgniteClient} import org.apache.ignite.configuration.{CacheConfiguration, ClientConfiguration} import java.lang.{Long => JLong, String => JString}
import org.apache.ignite.cache.query.SqlFieldsQuery import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_TABLE} import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.functions.col
object SparkClientConnectionTest {
private val CACHE_NAME = "SparkCache"
private val CONFIG = "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"
def setupExampleData = {
val cfg2 = new ClientConfiguration().setAddresses("3.88.248.113:10800") val igniteClient:IgniteClient = Ignition.startClient(cfg2)
System.out.format(">>> Created cache [%s].\n", CACHE_NAME)
val cache:ClientCache[Integer, Address] = igniteClient.getOrCreateCache(CACHE_NAME)
cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS Person")) .setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH \"VALUE_TYPE=%s\"", classOf[Address].getName)) .setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", "04074").setSchema("PUBLIC")).getAll cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", "520003").setSchema("PUBLIC")).getAll cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", "1234").setSchema("PUBLIC")).getAll
System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)
val data=cache.query(new SqlFieldsQuery("select * from Person").setSchema("PUBLIC")).getAll
println(data.toString) }
def sparkDSLExample(implicit spark: SparkSession): Unit = { println("Querying using Spark DSL.") println
val igniteDF = spark.read .format(FORMAT_IGNITE) //Data source type. .option(OPTION_TABLE, "person") //Table to read. .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config. .load() .filter(col("id") >= 2) //Filter clause. .filter(col("name") like "%J%") //Another filter clause.
println("Data frame schema:")
igniteDF.printSchema() //Printing query schema to console.
println("Data frame content:")
igniteDF.show() //Printing query results to console. }
def main(args: Array[String]): Unit = {
setupExampleData
//Creating spark session. implicit val spark = SparkSession.builder() .appName("Spark Ignite data sources example") .master("local") .config("spark.executor.instances", "2") .getOrCreate()
// Adjust the logger to exclude the logs of no interest. Logger.getRootLogger.setLevel(Level.ERROR) Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
//sparkDSLExample
val df = spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) //.option("driver", "org.apache.ignite.IgniteJdbcDriver") .option("dbtable", "Person").load()
df.printSchema()
df.createOrReplaceTempView("test")
spark.sql("select * from test where id=1").show(10)
spark.sql("select 4,'blah',124232").show(10)
import java.sql.DriverManager val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")
import java.util.Properties val connectionProperties = new Properties()
connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")
spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113", "Person",connectionProperties)
spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) .option("dbtable", "Person").load().show(10,false)
}
}
example-default.xml:-
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -->
<!-- Ignite configuration with all defaults and enabled p2p deployment and enabled events. --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"> <bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> <!-- Set to true to enable distributed class loading for examples, default is false. --> <property name="peerClassLoadingEnabled" value="true"/>
<!-- Enable task execution events for examples. --> <property name="includeEventTypes"> <list> <!--Task execution events--> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
<!--Cache events--> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/> </list> </property>
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. --> <property name="discoverySpi"> <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> <property name="ipFinder"> <!-- Ignite provides several options for automatic discovery that can be used instead os static IP based discovery. For information on all options refer to our documentation: http://apacheignite.readme.io/docs/cluster-config --> <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. --> <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>3.88.248.113:10800</value> </list> </property> </bean> </property> </bean> </property> </bean> </beans>
-- Thanks & Regards Sri Tummala
-- Thanks & Regards Sri Tummala
|
|
do you mean communication ports 47100-47200 as mentioned (https://www.gridgain.com/docs/8.7.6//installation-guide/manual-install-on-ec2) ? bare in mind I am running my spark job outside ignite ec2 box (MAC PC).
which option is right in my default.xml file?
Option 1:- <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>3.88.248.113:47100..47200</value> </list> </property>
Option 2:- <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>3.88.248.113:47500..47600</value> </list> </property>
Option 3:- <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>3.88.248.113</value> </list> </property> On Fri, Oct 18, 2019 at 9:18 AM sri hari kali charan Tummala < [hidden email]> wrote: Hi Stephen ,
do you mean 3 .88.248.113:47500..47700 something like this? or just public ip 3 .88.248.113 I tried all the possibilities none of them are getting connected.
Thanks Sri
On Fri, Oct 18, 2019 at 6:02 AM Stephen Darlington < [hidden email]> wrote: You’re trying to connect a thick client (the Spark integration) to the thin client port (10800). Your example-default.xml file needs to have the same configuration as your server node(s).
Regards, Stephen On 17 Oct 2019, at 18:12, sri hari kali charan Tummala < [hidden email]> wrote:
Hi Community,
I am trying to read and write into the Ignite cluster using apache-spark I am able to do that using JDBC thin client but not native method as mentioned in several spark + ignite examples.
Right now all the spark + ignite examples launch a local ignite cluster but I want my code connecting to already existing cluster (client).
Question:- How to pass Ignite connection ip and port (10800) 10800 in example-default.xml ?
Error:- TcpDiscoverySpi: Failed to connect to any address from IP finder (will retry to join topology every 2000 ms; change 'reconnectDelay' to configure the frequency of retries): [/3.88.248.113:10800]
Working (Spark + Ignite using JDBC):- val df = spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) //.option("driver", "org.apache.ignite.IgniteJdbcDriver") .option("dbtable", "Person").load()
df.printSchema()
df.createOrReplaceTempView("test")
spark.sql("select * from test where id=1").show(10)
spark.sql("select 4,'blah',124232").show(10)
import java.sql.DriverManager val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")
import java.util.Properties val connectionProperties = new Properties()
connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")
spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113", "Person",connectionProperties)
spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) .option("dbtable", "Person").load().show(10,false)
Not Working requires a CONFIG file which is example-default.xml:- val igniteDF = spark.read .format(FORMAT_IGNITE) //Data source type. .option(OPTION_TABLE, "person") //Table to read. .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config. .load() .filter(col("id") >= 2) //Filter clause. .filter(col("name") like "%J%") //Another filter clause.
Full Code:- (sparkDSLExample) function fails to connect ignite cluster which I already have package com.ignite.examples.spark
import com.ignite.examples.model.Address import org.apache.ignite.{Ignite, Ignition} import org.apache.ignite.cache.query.SqlFieldsQuery import org.apache.ignite.client.{ClientCache, IgniteClient} import org.apache.ignite.configuration.{CacheConfiguration, ClientConfiguration} import java.lang.{Long => JLong, String => JString}
import org.apache.ignite.cache.query.SqlFieldsQuery import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_TABLE} import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.functions.col
object SparkClientConnectionTest {
private val CACHE_NAME = "SparkCache"
private val CONFIG = "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"
def setupExampleData = {
val cfg2 = new ClientConfiguration().setAddresses("3.88.248.113:10800") val igniteClient:IgniteClient = Ignition.startClient(cfg2)
System.out.format(">>> Created cache [%s].\n", CACHE_NAME)
val cache:ClientCache[Integer, Address] = igniteClient.getOrCreateCache(CACHE_NAME)
cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS Person")) .setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH \"VALUE_TYPE=%s\"", classOf[Address].getName)) .setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", "04074").setSchema("PUBLIC")).getAll cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", "520003").setSchema("PUBLIC")).getAll cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", "1234").setSchema("PUBLIC")).getAll
System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)
val data=cache.query(new SqlFieldsQuery("select * from Person").setSchema("PUBLIC")).getAll
println(data.toString) }
def sparkDSLExample(implicit spark: SparkSession): Unit = { println("Querying using Spark DSL.") println
val igniteDF = spark.read .format(FORMAT_IGNITE) //Data source type. .option(OPTION_TABLE, "person") //Table to read. .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config. .load() .filter(col("id") >= 2) //Filter clause. .filter(col("name") like "%J%") //Another filter clause.
println("Data frame schema:")
igniteDF.printSchema() //Printing query schema to console.
println("Data frame content:")
igniteDF.show() //Printing query results to console. }
def main(args: Array[String]): Unit = {
setupExampleData
//Creating spark session. implicit val spark = SparkSession.builder() .appName("Spark Ignite data sources example") .master("local") .config("spark.executor.instances", "2") .getOrCreate()
// Adjust the logger to exclude the logs of no interest. Logger.getRootLogger.setLevel(Level.ERROR) Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
//sparkDSLExample
val df = spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) //.option("driver", "org.apache.ignite.IgniteJdbcDriver") .option("dbtable", "Person").load()
df.printSchema()
df.createOrReplaceTempView("test")
spark.sql("select * from test where id=1").show(10)
spark.sql("select 4,'blah',124232").show(10)
import java.sql.DriverManager val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")
import java.util.Properties val connectionProperties = new Properties()
connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")
spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113", "Person",connectionProperties)
spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) .option("dbtable", "Person").load().show(10,false)
}
}
example-default.xml:-
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -->
<!-- Ignite configuration with all defaults and enabled p2p deployment and enabled events. --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"> <bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> <!-- Set to true to enable distributed class loading for examples, default is false. --> <property name="peerClassLoadingEnabled" value="true"/>
<!-- Enable task execution events for examples. --> <property name="includeEventTypes"> <list> <!--Task execution events--> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
<!--Cache events--> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/> </list> </property>
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. --> <property name="discoverySpi"> <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> <property name="ipFinder"> <!-- Ignite provides several options for automatic discovery that can be used instead os static IP based discovery. For information on all options refer to our documentation: http://apacheignite.readme.io/docs/cluster-config --> <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. --> <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>3.88.248.113:10800</value> </list> </property> </bean> </property> </bean> </property> </bean> </beans>
-- Thanks & Regards Sri Tummala
--
Thanks & Regards Sri Tummala
-- Thanks & Regards Sri Tummala
|
|
Hi Stephen/All,
got it working somewhat using below, but have an issue table and data which is created using thin client is failing to read using spark but table created by spark can be read using a thin client, that means table created in Ignite using spark are the only ones read using spark in Ignite?
example-default.xml <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>18.206.247.40:10800</value> </list> </property>
NotWorking Scala Function:-
def readThinClientTableUsingSpark(implicit spark: SparkSession) = { val personDataFrame = spark.read .format(FORMAT_IGNITE) .option(OPTION_CONFIG_FILE, CONFIG) .option(OPTION_TABLE, "person") .load()
println() println("Data frame thin connection content:person") println()
//Printing content of data frame to console. personDataFrame.show() }
Exception in thread "main" class org.apache.ignite.IgniteException: Unknown table person at org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46) at org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46) at scala.Option.getOrElse(Option.scala:121) at org.apache.ignite.spark.impl.IgniteSQLRelation.schema(IgniteSQLRelation.scala:46) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:431) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164) at com.ignite.examples.spark.SparkIgniteCleanCode$.readThinClientTableUsingSpark(SparkIgniteCleanCode.scala:154) at com.ignite.examples.spark.SparkIgniteCleanCode$.main(SparkIgniteCleanCode.scala:216) at com.ignite.examples.spark.SparkIgniteCleanCode.main(SparkIgniteCleanCode.scala)
Full Code:- (step 7 Fails) package com.ignite.examples.spark
import com.ignite.examples.model.Address import org.apache.ignite.{Ignite, Ignition} import org.apache.ignite.cache.query.SqlFieldsQuery import org.apache.ignite.client.{ClientCache, IgniteClient} import org.apache.ignite.configuration.{CacheConfiguration, ClientConfiguration} import java.lang.{Long => JLong, String => JString}
import com.ignite.examples.spark.SparkClientConnectionTest.{CACHE_NAME, CONFIG} import org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_CREATE_TABLE_PARAMETERS, OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, OPTION_TABLE} import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.{SaveMode, SparkSession}
object SparkIgniteCleanCode {
private val CACHE_NAME = "SparkCache"
private val CONFIG = "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"
def setupExampleData = {
val cfg2 = new ClientConfiguration().setAddresses("18.206.247.40:10800") val igniteClient:IgniteClient = Ignition.startClient(cfg2)
System.out.format(">>> Created cache [%s].\n", CACHE_NAME)
val cache:ClientCache[Integer, Address] = igniteClient.getOrCreateCache(CACHE_NAME)
cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS Person")) .setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH \"VALUE_TYPE=%s\"", classOf[Address].getName)) .setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", "04074").setSchema("PUBLIC")).getAll cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", "520003").setSchema("PUBLIC")).getAll cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", "1234").setSchema("PUBLIC")).getAll
System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)
val data=cache.query(new SqlFieldsQuery("select * from Person").setSchema("PUBLIC")).getAll
println(data.toString) }
def sparkReadIgniteWithThinClient(implicit spark: SparkSession)={
val df = spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://18.206.247.40") .option("fetchsize",100) //.option("driver", "org.apache.ignite.IgniteJdbcDriver") .option("dbtable", "Person").load()
df.printSchema()
df.createOrReplaceTempView("test")
spark.sql("select * from test where id=1").show(10)
spark.sql("select 4,'blah',124232").show(10)
}
def sparkWriteIgniteWithThinClient(implicit spark: SparkSession)={
import java.sql.DriverManager val connection = DriverManager.getConnection("jdbc:ignite:thin://18.206.247.40")
import java.util.Properties val connectionProperties = new Properties()
connectionProperties.put("url", "jdbc:ignite:thin://18.206.247.40")
spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://18.206.247.40", "Person",connectionProperties)
spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://18.206.247.40") .option("fetchsize",100) .option("dbtable", "Person").load().show(10,false)
}
def writeJSonToIgniteUsingSpark(implicit spark: SparkSession): Unit = {
val ignite = Ignition.start(CONFIG)
//Load content of json file to data frame. val personsDataFrame = spark.read.json( resolveIgnitePath("/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/resources/person.json").getAbsolutePath)
println() println("Json file content:") println()
//Printing content of json file to console. personsDataFrame.show()
println() println("Writing Data Frame to Ignite:") println()
//Writing content of data frame to Ignite. personsDataFrame.write .format(FORMAT_IGNITE) .mode(org.apache.spark.sql.SaveMode.Append) .option(OPTION_CONFIG_FILE, CONFIG) .option(OPTION_TABLE, "json_person") .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id") .option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated") .save()
println("Done!")
println() println("Reading data from Ignite table:") println()
val cache = ignite.cache[Any, Any](CACHE_NAME)
//Reading saved data from Ignite. val data = cache.query(new SqlFieldsQuery("SELECT id, name, department FROM json_person")).getAll
println(data.toString)
//data.foreach { row ⇒ println(row.mkString("[", ", ", "]")) } }
def readIgniteUsingSpark(implicit spark: SparkSession) = { val json_person = spark.read .format(FORMAT_IGNITE) .option(OPTION_CONFIG_FILE, CONFIG) .option(OPTION_TABLE, "json_person") .load()
println() println("Data frame content:json_person") println()
//Printing content of data frame to console. json_person.show() }
def readThinClientTableUsingSpark(implicit spark: SparkSession) = { val personDataFrame = spark.read .format(FORMAT_IGNITE) .option(OPTION_CONFIG_FILE, CONFIG) .option(OPTION_TABLE, "person") .load()
println() println("Data frame thin connection content:person") println()
//Printing content of data frame to console. personDataFrame.show() }
def main(args: Array[String]): Unit = {
println() println("Step 1 setupExampleData:") println()
setupExampleData
println() println("Step 2 createSparkSession:") println()
//Creating spark session. implicit val spark = SparkSession.builder() .appName("Spark Ignite data sources example") .master("local") .config("spark.executor.instances", "2") .getOrCreate()
// Adjust the logger to exclude the logs of no interest. Logger.getRootLogger.setLevel(Level.ERROR) Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
println() println("Step 3 ReadIgniteWithThinClient of Step1 Data:") println()
sparkReadIgniteWithThinClient(spark)
println() println("Step 4 sparkWriteIgniteWithThinClient of Step1 Data:") println()
sparkWriteIgniteWithThinClient(spark)
println() println("Step 5 writeJSonToIgniteUsingSpark Using Spark:") println()
writeJSonToIgniteUsingSpark(spark)
println() println("Step 6 readIgniteUsingSpark Using Spark:") println()
readIgniteUsingSpark(spark)
println() println("Step 7 readThinClientTableUsingSpark Using Spark:") println()
readThinClientTableUsingSpark(spark)
spark.stop()
}
}
example-default.xml
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -->
<!-- Ignite configuration with all defaults and enabled p2p deployment and enabled events. --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"> <bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> <!-- Set to true to enable distributed class loading for examples, default is false. --> <property name="peerClassLoadingEnabled" value="true"/>
<!-- Enable task execution events for examples. --> <property name="includeEventTypes"> <list> <!--Task execution events--> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
<!--Cache events--> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/> </list> </property>
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. --> <property name="discoverySpi"> <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> <property name="ipFinder"> <!-- Ignite provides several options for automatic discovery that can be used instead os static IP based discovery. For information on all options refer to our documentation: http://apacheignite.readme.io/docs/cluster-config --> <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. --> <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>18.206.247.40:10800</value> </list> </property> </bean> </property> </bean> </property> </bean> </beans>
Thanks Sri On Fri, Oct 18, 2019 at 9:23 AM sri hari kali charan Tummala < [hidden email]> wrote: do you mean communication ports 47100-47200 as mentioned (https://www.gridgain.com/docs/8.7.6//installation-guide/manual-install-on-ec2) ? bare in mind I am running my spark job outside ignite ec2 box (MAC PC).
which option is right in my default.xml file?
Option 1:- <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>3.88.248.113:47100..47200</value> </list> </property>
Option 2:- <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>3.88.248.113:47500..47600</value> </list> </property>
Option 3:- <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>3.88.248.113</value> </list> </property>
On Fri, Oct 18, 2019 at 9:18 AM sri hari kali charan Tummala < [hidden email]> wrote: Hi Stephen ,
do you mean 3 .88.248.113:47500..47700 something like this? or just public ip 3 .88.248.113 I tried all the possibilities none of them are getting connected.
Thanks Sri
On Fri, Oct 18, 2019 at 6:02 AM Stephen Darlington < [hidden email]> wrote: You’re trying to connect a thick client (the Spark integration) to the thin client port (10800). Your example-default.xml file needs to have the same configuration as your server node(s).
Regards, Stephen On 17 Oct 2019, at 18:12, sri hari kali charan Tummala < [hidden email]> wrote:
Hi Community,
I am trying to read and write into the Ignite cluster using apache-spark I am able to do that using JDBC thin client but not native method as mentioned in several spark + ignite examples.
Right now all the spark + ignite examples launch a local ignite cluster but I want my code connecting to already existing cluster (client).
Question:- How to pass Ignite connection ip and port (10800) 10800 in example-default.xml ?
Error:- TcpDiscoverySpi: Failed to connect to any address from IP finder (will retry to join topology every 2000 ms; change 'reconnectDelay' to configure the frequency of retries): [/3.88.248.113:10800]
Working (Spark + Ignite using JDBC):- val df = spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) //.option("driver", "org.apache.ignite.IgniteJdbcDriver") .option("dbtable", "Person").load()
df.printSchema()
df.createOrReplaceTempView("test")
spark.sql("select * from test where id=1").show(10)
spark.sql("select 4,'blah',124232").show(10)
import java.sql.DriverManager val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")
import java.util.Properties val connectionProperties = new Properties()
connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")
spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113", "Person",connectionProperties)
spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) .option("dbtable", "Person").load().show(10,false)
Not Working requires a CONFIG file which is example-default.xml:- val igniteDF = spark.read .format(FORMAT_IGNITE) //Data source type. .option(OPTION_TABLE, "person") //Table to read. .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config. .load() .filter(col("id") >= 2) //Filter clause. .filter(col("name") like "%J%") //Another filter clause.
Full Code:- (sparkDSLExample) function fails to connect ignite cluster which I already have package com.ignite.examples.spark
import com.ignite.examples.model.Address import org.apache.ignite.{Ignite, Ignition} import org.apache.ignite.cache.query.SqlFieldsQuery import org.apache.ignite.client.{ClientCache, IgniteClient} import org.apache.ignite.configuration.{CacheConfiguration, ClientConfiguration} import java.lang.{Long => JLong, String => JString}
import org.apache.ignite.cache.query.SqlFieldsQuery import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_TABLE} import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.functions.col
object SparkClientConnectionTest {
private val CACHE_NAME = "SparkCache"
private val CONFIG = "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"
def setupExampleData = {
val cfg2 = new ClientConfiguration().setAddresses("3.88.248.113:10800") val igniteClient:IgniteClient = Ignition.startClient(cfg2)
System.out.format(">>> Created cache [%s].\n", CACHE_NAME)
val cache:ClientCache[Integer, Address] = igniteClient.getOrCreateCache(CACHE_NAME)
cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS Person")) .setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH \"VALUE_TYPE=%s\"", classOf[Address].getName)) .setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", "04074").setSchema("PUBLIC")).getAll cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", "520003").setSchema("PUBLIC")).getAll cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", "1234").setSchema("PUBLIC")).getAll
System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)
val data=cache.query(new SqlFieldsQuery("select * from Person").setSchema("PUBLIC")).getAll
println(data.toString) }
def sparkDSLExample(implicit spark: SparkSession): Unit = { println("Querying using Spark DSL.") println
val igniteDF = spark.read .format(FORMAT_IGNITE) //Data source type. .option(OPTION_TABLE, "person") //Table to read. .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config. .load() .filter(col("id") >= 2) //Filter clause. .filter(col("name") like "%J%") //Another filter clause.
println("Data frame schema:")
igniteDF.printSchema() //Printing query schema to console.
println("Data frame content:")
igniteDF.show() //Printing query results to console. }
def main(args: Array[String]): Unit = {
setupExampleData
//Creating spark session. implicit val spark = SparkSession.builder() .appName("Spark Ignite data sources example") .master("local") .config("spark.executor.instances", "2") .getOrCreate()
// Adjust the logger to exclude the logs of no interest. Logger.getRootLogger.setLevel(Level.ERROR) Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
//sparkDSLExample
val df = spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) //.option("driver", "org.apache.ignite.IgniteJdbcDriver") .option("dbtable", "Person").load()
df.printSchema()
df.createOrReplaceTempView("test")
spark.sql("select * from test where id=1").show(10)
spark.sql("select 4,'blah',124232").show(10)
import java.sql.DriverManager val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")
import java.util.Properties val connectionProperties = new Properties()
connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")
spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113", "Person",connectionProperties)
spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) .option("dbtable", "Person").load().show(10,false)
}
}
example-default.xml:-
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -->
<!-- Ignite configuration with all defaults and enabled p2p deployment and enabled events. --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"> <bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> <!-- Set to true to enable distributed class loading for examples, default is false. --> <property name="peerClassLoadingEnabled" value="true"/>
<!-- Enable task execution events for examples. --> <property name="includeEventTypes"> <list> <!--Task execution events--> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
<!--Cache events--> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/> </list> </property>
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. --> <property name="discoverySpi"> <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> <property name="ipFinder"> <!-- Ignite provides several options for automatic discovery that can be used instead os static IP based discovery. For information on all options refer to our documentation: http://apacheignite.readme.io/docs/cluster-config --> <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. --> <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>3.88.248.113:10800</value> </list> </property> </bean> </property> </bean> </property> </bean> </beans>
-- Thanks & Regards Sri Tummala
--
Thanks & Regards Sri Tummala
--
Thanks & Regards Sri Tummala
-- Thanks & Regards Sri Tummala
|
|
unfortunately, tables that are created by spark don't exist in ignite when I try to query using sqlline jdbc thin client, so the spark is still running locally the tables which are created by spark exists only for that session.
did anyone come across this issue? how to resolve it?
On Fri, Oct 18, 2019 at 11:32 AM sri hari kali charan Tummala < [hidden email]> wrote: Hi Stephen/All,
got it working somewhat using below, but have an issue table and data which is created using thin client is failing to read using spark but table created by spark can be read using a thin client, that means table created in Ignite using spark are the only ones read using spark in Ignite?
example-default.xml <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>18.206.247.40:10800</value> </list> </property>
NotWorking Scala Function:-
def readThinClientTableUsingSpark(implicit spark: SparkSession) = { val personDataFrame = spark.read .format(FORMAT_IGNITE) .option(OPTION_CONFIG_FILE, CONFIG) .option(OPTION_TABLE, "person") .load()
println() println("Data frame thin connection content:person") println()
//Printing content of data frame to console. personDataFrame.show() }
Exception in thread "main" class org.apache.ignite.IgniteException: Unknown table person at org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46) at org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46) at scala.Option.getOrElse(Option.scala:121) at org.apache.ignite.spark.impl.IgniteSQLRelation.schema(IgniteSQLRelation.scala:46) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:431) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164) at com.ignite.examples.spark.SparkIgniteCleanCode$.readThinClientTableUsingSpark(SparkIgniteCleanCode.scala:154) at com.ignite.examples.spark.SparkIgniteCleanCode$.main(SparkIgniteCleanCode.scala:216) at com.ignite.examples.spark.SparkIgniteCleanCode.main(SparkIgniteCleanCode.scala)
Full Code:- (step 7 Fails) package com.ignite.examples.spark
import com.ignite.examples.model.Address import org.apache.ignite.{Ignite, Ignition} import org.apache.ignite.cache.query.SqlFieldsQuery import org.apache.ignite.client.{ClientCache, IgniteClient} import org.apache.ignite.configuration.{CacheConfiguration, ClientConfiguration} import java.lang.{Long => JLong, String => JString}
import com.ignite.examples.spark.SparkClientConnectionTest.{CACHE_NAME, CONFIG} import org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_CREATE_TABLE_PARAMETERS, OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, OPTION_TABLE} import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.{SaveMode, SparkSession}
object SparkIgniteCleanCode {
private val CACHE_NAME = "SparkCache"
private val CONFIG = "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"
def setupExampleData = {
val cfg2 = new ClientConfiguration().setAddresses("18.206.247.40:10800") val igniteClient:IgniteClient = Ignition.startClient(cfg2)
System.out.format(">>> Created cache [%s].\n", CACHE_NAME)
val cache:ClientCache[Integer, Address] = igniteClient.getOrCreateCache(CACHE_NAME)
cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS Person")) .setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH \"VALUE_TYPE=%s\"", classOf[Address].getName)) .setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", "04074").setSchema("PUBLIC")).getAll cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", "520003").setSchema("PUBLIC")).getAll cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", "1234").setSchema("PUBLIC")).getAll
System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)
val data=cache.query(new SqlFieldsQuery("select * from Person").setSchema("PUBLIC")).getAll
println(data.toString) }
def sparkReadIgniteWithThinClient(implicit spark: SparkSession)={
val df = spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://18.206.247.40") .option("fetchsize",100) //.option("driver", "org.apache.ignite.IgniteJdbcDriver") .option("dbtable", "Person").load()
df.printSchema()
df.createOrReplaceTempView("test")
spark.sql("select * from test where id=1").show(10)
spark.sql("select 4,'blah',124232").show(10)
}
def sparkWriteIgniteWithThinClient(implicit spark: SparkSession)={
import java.sql.DriverManager val connection = DriverManager.getConnection("jdbc:ignite:thin://18.206.247.40")
import java.util.Properties val connectionProperties = new Properties()
connectionProperties.put("url", "jdbc:ignite:thin://18.206.247.40")
spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://18.206.247.40", "Person",connectionProperties)
spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://18.206.247.40") .option("fetchsize",100) .option("dbtable", "Person").load().show(10,false)
}
def writeJSonToIgniteUsingSpark(implicit spark: SparkSession): Unit = {
val ignite = Ignition.start(CONFIG)
//Load content of json file to data frame. val personsDataFrame = spark.read.json( resolveIgnitePath("/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/resources/person.json").getAbsolutePath)
println() println("Json file content:") println()
//Printing content of json file to console. personsDataFrame.show()
println() println("Writing Data Frame to Ignite:") println()
//Writing content of data frame to Ignite. personsDataFrame.write .format(FORMAT_IGNITE) .mode(org.apache.spark.sql.SaveMode.Append) .option(OPTION_CONFIG_FILE, CONFIG) .option(OPTION_TABLE, "json_person") .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id") .option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated") .save()
println("Done!")
println() println("Reading data from Ignite table:") println()
val cache = ignite.cache[Any, Any](CACHE_NAME)
//Reading saved data from Ignite. val data = cache.query(new SqlFieldsQuery("SELECT id, name, department FROM json_person")).getAll
println(data.toString)
//data.foreach { row ⇒ println(row.mkString("[", ", ", "]")) } }
def readIgniteUsingSpark(implicit spark: SparkSession) = { val json_person = spark.read .format(FORMAT_IGNITE) .option(OPTION_CONFIG_FILE, CONFIG) .option(OPTION_TABLE, "json_person") .load()
println() println("Data frame content:json_person") println()
//Printing content of data frame to console. json_person.show() }
def readThinClientTableUsingSpark(implicit spark: SparkSession) = { val personDataFrame = spark.read .format(FORMAT_IGNITE) .option(OPTION_CONFIG_FILE, CONFIG) .option(OPTION_TABLE, "person") .load()
println() println("Data frame thin connection content:person") println()
//Printing content of data frame to console. personDataFrame.show() }
def main(args: Array[String]): Unit = {
println() println("Step 1 setupExampleData:") println()
setupExampleData
println() println("Step 2 createSparkSession:") println()
//Creating spark session. implicit val spark = SparkSession.builder() .appName("Spark Ignite data sources example") .master("local") .config("spark.executor.instances", "2") .getOrCreate()
// Adjust the logger to exclude the logs of no interest. Logger.getRootLogger.setLevel(Level.ERROR) Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
println() println("Step 3 ReadIgniteWithThinClient of Step1 Data:") println()
sparkReadIgniteWithThinClient(spark)
println() println("Step 4 sparkWriteIgniteWithThinClient of Step1 Data:") println()
sparkWriteIgniteWithThinClient(spark)
println() println("Step 5 writeJSonToIgniteUsingSpark Using Spark:") println()
writeJSonToIgniteUsingSpark(spark)
println() println("Step 6 readIgniteUsingSpark Using Spark:") println()
readIgniteUsingSpark(spark)
println() println("Step 7 readThinClientTableUsingSpark Using Spark:") println()
readThinClientTableUsingSpark(spark)
spark.stop()
}
}
example-default.xml
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -->
<!-- Ignite configuration with all defaults and enabled p2p deployment and enabled events. --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"> <bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> <!-- Set to true to enable distributed class loading for examples, default is false. --> <property name="peerClassLoadingEnabled" value="true"/>
<!-- Enable task execution events for examples. --> <property name="includeEventTypes"> <list> <!--Task execution events--> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
<!--Cache events--> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/> </list> </property>
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. --> <property name="discoverySpi"> <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> <property name="ipFinder"> <!-- Ignite provides several options for automatic discovery that can be used instead os static IP based discovery. For information on all options refer to our documentation: http://apacheignite.readme.io/docs/cluster-config --> <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. --> <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>18.206.247.40:10800</value> </list> </property> </bean> </property> </bean> </property> </bean> </beans>
Thanks Sri On Fri, Oct 18, 2019 at 9:23 AM sri hari kali charan Tummala < [hidden email]> wrote: do you mean communication ports 47100-47200 as mentioned (https://www.gridgain.com/docs/8.7.6//installation-guide/manual-install-on-ec2) ? bare in mind I am running my spark job outside ignite ec2 box (MAC PC).
which option is right in my default.xml file?
Option 1:- <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>3.88.248.113:47100..47200</value> </list> </property>
Option 2:- <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>3.88.248.113:47500..47600</value> </list> </property>
Option 3:- <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>3.88.248.113</value> </list> </property>
On Fri, Oct 18, 2019 at 9:18 AM sri hari kali charan Tummala < [hidden email]> wrote: Hi Stephen ,
do you mean 3 .88.248.113:47500..47700 something like this? or just public ip 3 .88.248.113 I tried all the possibilities none of them are getting connected.
Thanks Sri
On Fri, Oct 18, 2019 at 6:02 AM Stephen Darlington < [hidden email]> wrote: You’re trying to connect a thick client (the Spark integration) to the thin client port (10800). Your example-default.xml file needs to have the same configuration as your server node(s).
Regards, Stephen On 17 Oct 2019, at 18:12, sri hari kali charan Tummala < [hidden email]> wrote:
Hi Community,
I am trying to read and write into the Ignite cluster using apache-spark I am able to do that using JDBC thin client but not native method as mentioned in several spark + ignite examples.
Right now all the spark + ignite examples launch a local ignite cluster but I want my code connecting to already existing cluster (client).
Question:- How to pass Ignite connection ip and port (10800) 10800 in example-default.xml ?
Error:- TcpDiscoverySpi: Failed to connect to any address from IP finder (will retry to join topology every 2000 ms; change 'reconnectDelay' to configure the frequency of retries): [/3.88.248.113:10800]
Working (Spark + Ignite using JDBC):- val df = spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) //.option("driver", "org.apache.ignite.IgniteJdbcDriver") .option("dbtable", "Person").load()
df.printSchema()
df.createOrReplaceTempView("test")
spark.sql("select * from test where id=1").show(10)
spark.sql("select 4,'blah',124232").show(10)
import java.sql.DriverManager val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")
import java.util.Properties val connectionProperties = new Properties()
connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")
spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113", "Person",connectionProperties)
spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) .option("dbtable", "Person").load().show(10,false)
Not Working requires a CONFIG file which is example-default.xml:- val igniteDF = spark.read .format(FORMAT_IGNITE) //Data source type. .option(OPTION_TABLE, "person") //Table to read. .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config. .load() .filter(col("id") >= 2) //Filter clause. .filter(col("name") like "%J%") //Another filter clause.
Full Code:- (sparkDSLExample) function fails to connect ignite cluster which I already have package com.ignite.examples.spark
import com.ignite.examples.model.Address import org.apache.ignite.{Ignite, Ignition} import org.apache.ignite.cache.query.SqlFieldsQuery import org.apache.ignite.client.{ClientCache, IgniteClient} import org.apache.ignite.configuration.{CacheConfiguration, ClientConfiguration} import java.lang.{Long => JLong, String => JString}
import org.apache.ignite.cache.query.SqlFieldsQuery import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_TABLE} import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.functions.col
object SparkClientConnectionTest {
private val CACHE_NAME = "SparkCache"
private val CONFIG = "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"
def setupExampleData = {
val cfg2 = new ClientConfiguration().setAddresses("3.88.248.113:10800") val igniteClient:IgniteClient = Ignition.startClient(cfg2)
System.out.format(">>> Created cache [%s].\n", CACHE_NAME)
val cache:ClientCache[Integer, Address] = igniteClient.getOrCreateCache(CACHE_NAME)
cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS Person")) .setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH \"VALUE_TYPE=%s\"", classOf[Address].getName)) .setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", "04074").setSchema("PUBLIC")).getAll cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", "520003").setSchema("PUBLIC")).getAll cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", "1234").setSchema("PUBLIC")).getAll
System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)
val data=cache.query(new SqlFieldsQuery("select * from Person").setSchema("PUBLIC")).getAll
println(data.toString) }
def sparkDSLExample(implicit spark: SparkSession): Unit = { println("Querying using Spark DSL.") println
val igniteDF = spark.read .format(FORMAT_IGNITE) //Data source type. .option(OPTION_TABLE, "person") //Table to read. .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config. .load() .filter(col("id") >= 2) //Filter clause. .filter(col("name") like "%J%") //Another filter clause.
println("Data frame schema:")
igniteDF.printSchema() //Printing query schema to console.
println("Data frame content:")
igniteDF.show() //Printing query results to console. }
def main(args: Array[String]): Unit = {
setupExampleData
//Creating spark session. implicit val spark = SparkSession.builder() .appName("Spark Ignite data sources example") .master("local") .config("spark.executor.instances", "2") .getOrCreate()
// Adjust the logger to exclude the logs of no interest. Logger.getRootLogger.setLevel(Level.ERROR) Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
//sparkDSLExample
val df = spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) //.option("driver", "org.apache.ignite.IgniteJdbcDriver") .option("dbtable", "Person").load()
df.printSchema()
df.createOrReplaceTempView("test")
spark.sql("select * from test where id=1").show(10)
spark.sql("select 4,'blah',124232").show(10)
import java.sql.DriverManager val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")
import java.util.Properties val connectionProperties = new Properties()
connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")
spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113", "Person",connectionProperties)
spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) .option("dbtable", "Person").load().show(10,false)
}
}
example-default.xml:-
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -->
<!-- Ignite configuration with all defaults and enabled p2p deployment and enabled events. --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"> <bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> <!-- Set to true to enable distributed class loading for examples, default is false. --> <property name="peerClassLoadingEnabled" value="true"/>
<!-- Enable task execution events for examples. --> <property name="includeEventTypes"> <list> <!--Task execution events--> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
<!--Cache events--> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/> </list> </property>
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. --> <property name="discoverySpi"> <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> <property name="ipFinder"> <!-- Ignite provides several options for automatic discovery that can be used instead os static IP based discovery. For information on all options refer to our documentation: http://apacheignite.readme.io/docs/cluster-config --> <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. --> <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>3.88.248.113:10800</value> </list> </property> </bean> </property> </bean> </property> </bean> </beans>
-- Thanks & Regards Sri Tummala
--
Thanks & Regards Sri Tummala
--
Thanks & Regards Sri Tummala
-- Thanks & Regards Sri Tummala
-- Thanks & Regards Sri Tummala
|
|
so spark + jdbc thin client is the only way to go forward?
Reading:-
val df = spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://18.206.247.40") .option("fetchsize",100) //.option("driver", "org.apache.ignite.IgniteJdbcDriver") .option("dbtable", "Person2").load()
df.printSchema()
df.createOrReplaceTempView("test")
spark.sql("select * from test where id=1").show(10)
spark.sql("select 4,'blah',124232").show(10)
Writing:- import java.sql.DriverManager val connection = DriverManager.getConnection("jdbc:ignite:thin://18.206.247.40")
import java.util.Properties val connectionProperties = new Properties()
connectionProperties.put("url", "jdbc:ignite:thin://18.206.247.40")
spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://18.206.247.40", "Person2",connectionProperties) On Fri, Oct 18, 2019 at 12:00 PM sri hari kali charan Tummala < [hidden email]> wrote: unfortunately, tables that are created by spark don't exist in ignite when I try to query using sqlline jdbc thin client, so the spark is still running locally the tables which are created by spark exists only for that session.
did anyone come across this issue? how to resolve it?
On Fri, Oct 18, 2019 at 11:32 AM sri hari kali charan Tummala < [hidden email]> wrote: Hi Stephen/All,
got it working somewhat using below, but have an issue table and data which is created using thin client is failing to read using spark but table created by spark can be read using a thin client, that means table created in Ignite using spark are the only ones read using spark in Ignite?
example-default.xml <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>18.206.247.40:10800</value> </list> </property>
NotWorking Scala Function:-
def readThinClientTableUsingSpark(implicit spark: SparkSession) = { val personDataFrame = spark.read .format(FORMAT_IGNITE) .option(OPTION_CONFIG_FILE, CONFIG) .option(OPTION_TABLE, "person") .load()
println() println("Data frame thin connection content:person") println()
//Printing content of data frame to console. personDataFrame.show() }
Exception in thread "main" class org.apache.ignite.IgniteException: Unknown table person at org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46) at org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46) at scala.Option.getOrElse(Option.scala:121) at org.apache.ignite.spark.impl.IgniteSQLRelation.schema(IgniteSQLRelation.scala:46) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:431) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164) at com.ignite.examples.spark.SparkIgniteCleanCode$.readThinClientTableUsingSpark(SparkIgniteCleanCode.scala:154) at com.ignite.examples.spark.SparkIgniteCleanCode$.main(SparkIgniteCleanCode.scala:216) at com.ignite.examples.spark.SparkIgniteCleanCode.main(SparkIgniteCleanCode.scala)
Full Code:- (step 7 Fails) package com.ignite.examples.spark
import com.ignite.examples.model.Address import org.apache.ignite.{Ignite, Ignition} import org.apache.ignite.cache.query.SqlFieldsQuery import org.apache.ignite.client.{ClientCache, IgniteClient} import org.apache.ignite.configuration.{CacheConfiguration, ClientConfiguration} import java.lang.{Long => JLong, String => JString}
import com.ignite.examples.spark.SparkClientConnectionTest.{CACHE_NAME, CONFIG} import org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_CREATE_TABLE_PARAMETERS, OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, OPTION_TABLE} import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.{SaveMode, SparkSession}
object SparkIgniteCleanCode {
private val CACHE_NAME = "SparkCache"
private val CONFIG = "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"
def setupExampleData = {
val cfg2 = new ClientConfiguration().setAddresses("18.206.247.40:10800") val igniteClient:IgniteClient = Ignition.startClient(cfg2)
System.out.format(">>> Created cache [%s].\n", CACHE_NAME)
val cache:ClientCache[Integer, Address] = igniteClient.getOrCreateCache(CACHE_NAME)
cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS Person")) .setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH \"VALUE_TYPE=%s\"", classOf[Address].getName)) .setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", "04074").setSchema("PUBLIC")).getAll cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", "520003").setSchema("PUBLIC")).getAll cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", "1234").setSchema("PUBLIC")).getAll
System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)
val data=cache.query(new SqlFieldsQuery("select * from Person").setSchema("PUBLIC")).getAll
println(data.toString) }
def sparkReadIgniteWithThinClient(implicit spark: SparkSession)={
val df = spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://18.206.247.40") .option("fetchsize",100) //.option("driver", "org.apache.ignite.IgniteJdbcDriver") .option("dbtable", "Person").load()
df.printSchema()
df.createOrReplaceTempView("test")
spark.sql("select * from test where id=1").show(10)
spark.sql("select 4,'blah',124232").show(10)
}
def sparkWriteIgniteWithThinClient(implicit spark: SparkSession)={
import java.sql.DriverManager val connection = DriverManager.getConnection("jdbc:ignite:thin://18.206.247.40")
import java.util.Properties val connectionProperties = new Properties()
connectionProperties.put("url", "jdbc:ignite:thin://18.206.247.40")
spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://18.206.247.40", "Person",connectionProperties)
spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://18.206.247.40") .option("fetchsize",100) .option("dbtable", "Person").load().show(10,false)
}
def writeJSonToIgniteUsingSpark(implicit spark: SparkSession): Unit = {
val ignite = Ignition.start(CONFIG)
//Load content of json file to data frame. val personsDataFrame = spark.read.json( resolveIgnitePath("/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/resources/person.json").getAbsolutePath)
println() println("Json file content:") println()
//Printing content of json file to console. personsDataFrame.show()
println() println("Writing Data Frame to Ignite:") println()
//Writing content of data frame to Ignite. personsDataFrame.write .format(FORMAT_IGNITE) .mode(org.apache.spark.sql.SaveMode.Append) .option(OPTION_CONFIG_FILE, CONFIG) .option(OPTION_TABLE, "json_person") .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id") .option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated") .save()
println("Done!")
println() println("Reading data from Ignite table:") println()
val cache = ignite.cache[Any, Any](CACHE_NAME)
//Reading saved data from Ignite. val data = cache.query(new SqlFieldsQuery("SELECT id, name, department FROM json_person")).getAll
println(data.toString)
//data.foreach { row ⇒ println(row.mkString("[", ", ", "]")) } }
def readIgniteUsingSpark(implicit spark: SparkSession) = { val json_person = spark.read .format(FORMAT_IGNITE) .option(OPTION_CONFIG_FILE, CONFIG) .option(OPTION_TABLE, "json_person") .load()
println() println("Data frame content:json_person") println()
//Printing content of data frame to console. json_person.show() }
def readThinClientTableUsingSpark(implicit spark: SparkSession) = { val personDataFrame = spark.read .format(FORMAT_IGNITE) .option(OPTION_CONFIG_FILE, CONFIG) .option(OPTION_TABLE, "person") .load()
println() println("Data frame thin connection content:person") println()
//Printing content of data frame to console. personDataFrame.show() }
def main(args: Array[String]): Unit = {
println() println("Step 1 setupExampleData:") println()
setupExampleData
println() println("Step 2 createSparkSession:") println()
//Creating spark session. implicit val spark = SparkSession.builder() .appName("Spark Ignite data sources example") .master("local") .config("spark.executor.instances", "2") .getOrCreate()
// Adjust the logger to exclude the logs of no interest. Logger.getRootLogger.setLevel(Level.ERROR) Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
println() println("Step 3 ReadIgniteWithThinClient of Step1 Data:") println()
sparkReadIgniteWithThinClient(spark)
println() println("Step 4 sparkWriteIgniteWithThinClient of Step1 Data:") println()
sparkWriteIgniteWithThinClient(spark)
println() println("Step 5 writeJSonToIgniteUsingSpark Using Spark:") println()
writeJSonToIgniteUsingSpark(spark)
println() println("Step 6 readIgniteUsingSpark Using Spark:") println()
readIgniteUsingSpark(spark)
println() println("Step 7 readThinClientTableUsingSpark Using Spark:") println()
readThinClientTableUsingSpark(spark)
spark.stop()
}
}
example-default.xml
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -->
<!-- Ignite configuration with all defaults and enabled p2p deployment and enabled events. --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"> <bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> <!-- Set to true to enable distributed class loading for examples, default is false. --> <property name="peerClassLoadingEnabled" value="true"/>
<!-- Enable task execution events for examples. --> <property name="includeEventTypes"> <list> <!--Task execution events--> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
<!--Cache events--> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/> </list> </property>
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. --> <property name="discoverySpi"> <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> <property name="ipFinder"> <!-- Ignite provides several options for automatic discovery that can be used instead os static IP based discovery. For information on all options refer to our documentation: http://apacheignite.readme.io/docs/cluster-config --> <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. --> <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>18.206.247.40:10800</value> </list> </property> </bean> </property> </bean> </property> </bean> </beans>
Thanks Sri On Fri, Oct 18, 2019 at 9:23 AM sri hari kali charan Tummala < [hidden email]> wrote: do you mean communication ports 47100-47200 as mentioned (https://www.gridgain.com/docs/8.7.6//installation-guide/manual-install-on-ec2) ? bare in mind I am running my spark job outside ignite ec2 box (MAC PC).
which option is right in my default.xml file?
Option 1:- <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>3.88.248.113:47100..47200</value> </list> </property>
Option 2:- <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>3.88.248.113:47500..47600</value> </list> </property>
Option 3:- <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>3.88.248.113</value> </list> </property>
On Fri, Oct 18, 2019 at 9:18 AM sri hari kali charan Tummala < [hidden email]> wrote: Hi Stephen ,
do you mean 3 .88.248.113:47500..47700 something like this? or just public ip 3 .88.248.113 I tried all the possibilities none of them are getting connected.
Thanks Sri
On Fri, Oct 18, 2019 at 6:02 AM Stephen Darlington < [hidden email]> wrote: You’re trying to connect a thick client (the Spark integration) to the thin client port (10800). Your example-default.xml file needs to have the same configuration as your server node(s).
Regards, Stephen On 17 Oct 2019, at 18:12, sri hari kali charan Tummala < [hidden email]> wrote:
Hi Community,
I am trying to read and write into the Ignite cluster using apache-spark I am able to do that using JDBC thin client but not native method as mentioned in several spark + ignite examples.
Right now all the spark + ignite examples launch a local ignite cluster but I want my code connecting to already existing cluster (client).
Question:- How to pass Ignite connection ip and port (10800) 10800 in example-default.xml ?
Error:- TcpDiscoverySpi: Failed to connect to any address from IP finder (will retry to join topology every 2000 ms; change 'reconnectDelay' to configure the frequency of retries): [/3.88.248.113:10800]
Working (Spark + Ignite using JDBC):- val df = spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) //.option("driver", "org.apache.ignite.IgniteJdbcDriver") .option("dbtable", "Person").load()
df.printSchema()
df.createOrReplaceTempView("test")
spark.sql("select * from test where id=1").show(10)
spark.sql("select 4,'blah',124232").show(10)
import java.sql.DriverManager val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")
import java.util.Properties val connectionProperties = new Properties()
connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")
spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113", "Person",connectionProperties)
spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) .option("dbtable", "Person").load().show(10,false)
Not Working requires a CONFIG file which is example-default.xml:- val igniteDF = spark.read .format(FORMAT_IGNITE) //Data source type. .option(OPTION_TABLE, "person") //Table to read. .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config. .load() .filter(col("id") >= 2) //Filter clause. .filter(col("name") like "%J%") //Another filter clause.
Full Code:- (sparkDSLExample) function fails to connect ignite cluster which I already have package com.ignite.examples.spark
import com.ignite.examples.model.Address import org.apache.ignite.{Ignite, Ignition} import org.apache.ignite.cache.query.SqlFieldsQuery import org.apache.ignite.client.{ClientCache, IgniteClient} import org.apache.ignite.configuration.{CacheConfiguration, ClientConfiguration} import java.lang.{Long => JLong, String => JString}
import org.apache.ignite.cache.query.SqlFieldsQuery import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_TABLE} import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.functions.col
object SparkClientConnectionTest {
private val CACHE_NAME = "SparkCache"
private val CONFIG = "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"
def setupExampleData = {
val cfg2 = new ClientConfiguration().setAddresses("3.88.248.113:10800") val igniteClient:IgniteClient = Ignition.startClient(cfg2)
System.out.format(">>> Created cache [%s].\n", CACHE_NAME)
val cache:ClientCache[Integer, Address] = igniteClient.getOrCreateCache(CACHE_NAME)
cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS Person")) .setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH \"VALUE_TYPE=%s\"", classOf[Address].getName)) .setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", "04074").setSchema("PUBLIC")).getAll cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", "520003").setSchema("PUBLIC")).getAll cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", "1234").setSchema("PUBLIC")).getAll
System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)
val data=cache.query(new SqlFieldsQuery("select * from Person").setSchema("PUBLIC")).getAll
println(data.toString) }
def sparkDSLExample(implicit spark: SparkSession): Unit = { println("Querying using Spark DSL.") println
val igniteDF = spark.read .format(FORMAT_IGNITE) //Data source type. .option(OPTION_TABLE, "person") //Table to read. .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config. .load() .filter(col("id") >= 2) //Filter clause. .filter(col("name") like "%J%") //Another filter clause.
println("Data frame schema:")
igniteDF.printSchema() //Printing query schema to console.
println("Data frame content:")
igniteDF.show() //Printing query results to console. }
def main(args: Array[String]): Unit = {
setupExampleData
//Creating spark session. implicit val spark = SparkSession.builder() .appName("Spark Ignite data sources example") .master("local") .config("spark.executor.instances", "2") .getOrCreate()
// Adjust the logger to exclude the logs of no interest. Logger.getRootLogger.setLevel(Level.ERROR) Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
//sparkDSLExample
val df = spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) //.option("driver", "org.apache.ignite.IgniteJdbcDriver") .option("dbtable", "Person").load()
df.printSchema()
df.createOrReplaceTempView("test")
spark.sql("select * from test where id=1").show(10)
spark.sql("select 4,'blah',124232").show(10)
import java.sql.DriverManager val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")
import java.util.Properties val connectionProperties = new Properties()
connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")
spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113", "Person",connectionProperties)
spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) .option("dbtable", "Person").load().show(10,false)
}
}
example-default.xml:-
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -->
<!-- Ignite configuration with all defaults and enabled p2p deployment and enabled events. --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"> <bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> <!-- Set to true to enable distributed class loading for examples, default is false. --> <property name="peerClassLoadingEnabled" value="true"/>
<!-- Enable task execution events for examples. --> <property name="includeEventTypes"> <list> <!--Task execution events--> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
<!--Cache events--> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/> </list> </property>
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. --> <property name="discoverySpi"> <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> <property name="ipFinder"> <!-- Ignite provides several options for automatic discovery that can be used instead os static IP based discovery. For information on all options refer to our documentation: http://apacheignite.readme.io/docs/cluster-config --> <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. --> <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>3.88.248.113:10800</value> </list> </property> </bean> </property> </bean> </property> </bean> </beans>
-- Thanks & Regards Sri Tummala
--
Thanks & Regards Sri Tummala
--
Thanks & Regards Sri Tummala
-- Thanks & Regards Sri Tummala
--
Thanks & Regards Sri Tummala
-- Thanks & Regards Sri Tummala
|
|
In reply to this post by sri hari kali charan Tummala
so I got an answer from Grid Grain computing, JDBC thin client is the only way yo connect the ignite cluster which is running on aws.
Thanks Sri On Fri, Oct 18, 2019 at 1:38 PM sri hari kali charan Tummala < [hidden email]> wrote: Ok I created a 2 node ec2 instance ignite cluster is below is the right way to create cache? the code still using my laptop resources unable to connect ignite cluster.
Out put:- [13:36:36] Ignite node started OK (id=8535f4d3) [13:36:36] Topology snapshot [ver=30, locNode=8535f4d3, servers=1, clients=1, state=ACTIVE, CPUs=8, offheap=6.4GB, heap=14.0GB] >>> cache acquired
package com.ignite.examples.igniteStartup
import java.util.Arrays import java.util.List import com.ignite.examples.model.Address import org.apache.commons.logging.Log import org.apache.commons.logging.LogFactory import org.apache.ignite.Ignite import org.apache.ignite.IgniteCache import org.apache.ignite.Ignition import org.apache.ignite.configuration.IgniteConfiguration import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder
//remove if not needed import scala.collection.JavaConversions._
object IgniteStart2 {
private var cache: IgniteCache[String, Address] = _
def main(args: Array[String]): Unit = {
val spi: TcpDiscoverySpi = new TcpDiscoverySpi() val ipFinder: TcpDiscoveryVmIpFinder = new TcpDiscoveryVmIpFinder() val hostList: List[String] = Arrays.asList("127.0.0.1:47500..47509,18.206.247.40:47500..47509,52.207.217.31:47500..47509".split(","): _*) ipFinder.setAddresses(hostList) spi.setIpFinder(ipFinder) val cfg: IgniteConfiguration = new IgniteConfiguration() cfg.setDiscoverySpi(spi) cfg.setClientMode(true) System.out.println(">>> I am here")
cfg.setPeerClassLoadingEnabled(true) val ignite: Ignite = Ignition.start(cfg) cache = Ignition.ignite().cache("test") System.out.println(">>> cache acquired")
System.exit(0)
}
}
On Fri, Oct 18, 2019 at 12:56 PM sri hari kali charan Tummala < [hidden email]> wrote: Hi Stephen ,
I followed below steps and created one node ec2 instance with Ignite on it now I am connecting to the ignite cluster using spark in two ways 1) thin client 2) I don't know you have to tell me
my question is what is the value I have to pass for option 2 is it ec2 instance public IP with port 47500..47509 in my example-default.xml file ?
Thanks Sri
On Fri, Oct 18, 2019 at 12:47 PM Stephen Darlington < [hidden email]> wrote: You’re still pointing your Spark node ( thick client) at port 10800 (the thin client port). This is not going to work.
You can create a table using sqlline and read it using Spark, or vice versa. But you need a functioning cluster.
Check out the documentation on clustering concepts and configuration: https://www.gridgain.com/docs/latest/developers-guide/clustering/clustering On 18 Oct 2019, at 16:32, sri hari kali charan Tummala < [hidden email]> wrote:
Hi Stephen/All,
got it working somewhat using below, but have an issue table and data which is created using thin client is failing to read using spark but table created by spark can be read using a thin client, that means table created in Ignite using spark are the only ones read using spark in Ignite?
example-default.xml <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>18.206.247.40:10800</value> </list> </property>
NotWorking Scala Function:-
def readThinClientTableUsingSpark(implicit spark: SparkSession) = { val personDataFrame = spark.read .format(FORMAT_IGNITE) .option(OPTION_CONFIG_FILE, CONFIG) .option(OPTION_TABLE, "person") .load()
println() println("Data frame thin connection content:person") println()
//Printing content of data frame to console. personDataFrame.show() }
Exception in thread "main" class org.apache.ignite.IgniteException: Unknown table person at org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46) at org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46) at scala.Option.getOrElse(Option.scala:121) at org.apache.ignite.spark.impl.IgniteSQLRelation.schema(IgniteSQLRelation.scala:46) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:431) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164) at com.ignite.examples.spark.SparkIgniteCleanCode$.readThinClientTableUsingSpark(SparkIgniteCleanCode.scala:154) at com.ignite.examples.spark.SparkIgniteCleanCode$.main(SparkIgniteCleanCode.scala:216) at com.ignite.examples.spark.SparkIgniteCleanCode.main(SparkIgniteCleanCode.scala)
Full Code:- (step 7 Fails) package com.ignite.examples.spark
import com.ignite.examples.model.Address import org.apache.ignite.{Ignite, Ignition} import org.apache.ignite.cache.query.SqlFieldsQuery import org.apache.ignite.client.{ClientCache, IgniteClient} import org.apache.ignite.configuration.{CacheConfiguration, ClientConfiguration} import java.lang.{Long => JLong, String => JString}
import com.ignite.examples.spark.SparkClientConnectionTest.{CACHE_NAME, CONFIG} import org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_CREATE_TABLE_PARAMETERS, OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, OPTION_TABLE} import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.{SaveMode, SparkSession}
object SparkIgniteCleanCode {
private val CACHE_NAME = "SparkCache"
private val CONFIG = "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"
def setupExampleData = {
val cfg2 = new ClientConfiguration().setAddresses("18.206.247.40:10800") val igniteClient:IgniteClient = Ignition.startClient(cfg2)
System.out.format(">>> Created cache [%s].\n", CACHE_NAME)
val cache:ClientCache[Integer, Address] = igniteClient.getOrCreateCache(CACHE_NAME)
cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS Person")) .setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH \"VALUE_TYPE=%s\"", classOf[Address].getName)) .setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", "04074").setSchema("PUBLIC")).getAll cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", "520003").setSchema("PUBLIC")).getAll cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", "1234").setSchema("PUBLIC")).getAll
System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)
val data=cache.query(new SqlFieldsQuery("select * from Person").setSchema("PUBLIC")).getAll
println(data.toString) }
def sparkReadIgniteWithThinClient(implicit spark: SparkSession)={
val df = spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://18.206.247.40") .option("fetchsize",100) //.option("driver", "org.apache.ignite.IgniteJdbcDriver") .option("dbtable", "Person").load()
df.printSchema()
df.createOrReplaceTempView("test")
spark.sql("select * from test where id=1").show(10)
spark.sql("select 4,'blah',124232").show(10)
}
def sparkWriteIgniteWithThinClient(implicit spark: SparkSession)={
import java.sql.DriverManager val connection = DriverManager.getConnection("jdbc:ignite:thin://18.206.247.40")
import java.util.Properties val connectionProperties = new Properties()
connectionProperties.put("url", "jdbc:ignite:thin://18.206.247.40")
spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://18.206.247.40", "Person",connectionProperties)
spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://18.206.247.40") .option("fetchsize",100) .option("dbtable", "Person").load().show(10,false)
}
def writeJSonToIgniteUsingSpark(implicit spark: SparkSession): Unit = {
val ignite = Ignition.start(CONFIG)
//Load content of json file to data frame. val personsDataFrame = spark.read.json( resolveIgnitePath("/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/resources/person.json").getAbsolutePath)
println() println("Json file content:") println()
//Printing content of json file to console. personsDataFrame.show()
println() println("Writing Data Frame to Ignite:") println()
//Writing content of data frame to Ignite. personsDataFrame.write .format(FORMAT_IGNITE) .mode(org.apache.spark.sql.SaveMode.Append) .option(OPTION_CONFIG_FILE, CONFIG) .option(OPTION_TABLE, "json_person") .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id") .option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated") .save()
println("Done!")
println() println("Reading data from Ignite table:") println()
val cache = ignite.cache[Any, Any](CACHE_NAME)
//Reading saved data from Ignite. val data = cache.query(new SqlFieldsQuery("SELECT id, name, department FROM json_person")).getAll
println(data.toString)
//data.foreach { row ⇒ println(row.mkString("[", ", ", "]")) } }
def readIgniteUsingSpark(implicit spark: SparkSession) = { val json_person = spark.read .format(FORMAT_IGNITE) .option(OPTION_CONFIG_FILE, CONFIG) .option(OPTION_TABLE, "json_person") .load()
println() println("Data frame content:json_person") println()
//Printing content of data frame to console. json_person.show() }
def readThinClientTableUsingSpark(implicit spark: SparkSession) = { val personDataFrame = spark.read .format(FORMAT_IGNITE) .option(OPTION_CONFIG_FILE, CONFIG) .option(OPTION_TABLE, "person") .load()
println() println("Data frame thin connection content:person") println()
//Printing content of data frame to console. personDataFrame.show() }
def main(args: Array[String]): Unit = {
println() println("Step 1 setupExampleData:") println()
setupExampleData
println() println("Step 2 createSparkSession:") println()
//Creating spark session. implicit val spark = SparkSession.builder() .appName("Spark Ignite data sources example") .master("local") .config("spark.executor.instances", "2") .getOrCreate()
// Adjust the logger to exclude the logs of no interest. Logger.getRootLogger.setLevel(Level.ERROR) Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
println() println("Step 3 ReadIgniteWithThinClient of Step1 Data:") println()
sparkReadIgniteWithThinClient(spark)
println() println("Step 4 sparkWriteIgniteWithThinClient of Step1 Data:") println()
sparkWriteIgniteWithThinClient(spark)
println() println("Step 5 writeJSonToIgniteUsingSpark Using Spark:") println()
writeJSonToIgniteUsingSpark(spark)
println() println("Step 6 readIgniteUsingSpark Using Spark:") println()
readIgniteUsingSpark(spark)
println() println("Step 7 readThinClientTableUsingSpark Using Spark:") println()
readThinClientTableUsingSpark(spark)
spark.stop()
}
}
example-default.xml
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -->
<!-- Ignite configuration with all defaults and enabled p2p deployment and enabled events. --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"> <bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> <!-- Set to true to enable distributed class loading for examples, default is false. --> <property name="peerClassLoadingEnabled" value="true"/>
<!-- Enable task execution events for examples. --> <property name="includeEventTypes"> <list> <!--Task execution events--> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
<!--Cache events--> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/> </list> </property>
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. --> <property name="discoverySpi"> <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> <property name="ipFinder"> <!-- Ignite provides several options for automatic discovery that can be used instead os static IP based discovery. For information on all options refer to our documentation: http://apacheignite.readme.io/docs/cluster-config --> <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. --> <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>18.206.247.40:10800</value> </list> </property> </bean> </property> </bean> </property> </bean> </beans>
Thanks Sri On Fri, Oct 18, 2019 at 9:23 AM sri hari kali charan Tummala < [hidden email]> wrote: do you mean communication ports 47100-47200 as mentioned (https://www.gridgain.com/docs/8.7.6//installation-guide/manual-install-on-ec2) ? bare in mind I am running my spark job outside ignite ec2 box (MAC PC).
which option is right in my default.xml file?
Option 1:- <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>3.88.248.113:47100..47200</value> </list> </property>
Option 2:- <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>3.88.248.113:47500..47600</value> </list> </property>
Option 3:- <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>3.88.248.113</value> </list> </property>
On Fri, Oct 18, 2019 at 9:18 AM sri hari kali charan Tummala < [hidden email]> wrote: Hi Stephen ,
do you mean 3 .88.248.113:47500..47700 something like this? or just public ip 3 .88.248.113 I tried all the possibilities none of them are getting connected.
Thanks Sri
On Fri, Oct 18, 2019 at 6:02 AM Stephen Darlington < [hidden email]> wrote: You’re trying to connect a thick client (the Spark integration) to the thin client port (10800). Your example-default.xml file needs to have the same configuration as your server node(s).
Regards, Stephen On 17 Oct 2019, at 18:12, sri hari kali charan Tummala < [hidden email]> wrote:
Hi Community,
I am trying to read and write into the Ignite cluster using apache-spark I am able to do that using JDBC thin client but not native method as mentioned in several spark + ignite examples.
Right now all the spark + ignite examples launch a local ignite cluster but I want my code connecting to already existing cluster (client).
Question:- How to pass Ignite connection ip and port (10800) 10800 in example-default.xml ?
Error:- TcpDiscoverySpi: Failed to connect to any address from IP finder (will retry to join topology every 2000 ms; change 'reconnectDelay' to configure the frequency of retries): [/3.88.248.113:10800]
Working (Spark + Ignite using JDBC):- val df = spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) //.option("driver", "org.apache.ignite.IgniteJdbcDriver") .option("dbtable", "Person").load()
df.printSchema()
df.createOrReplaceTempView("test")
spark.sql("select * from test where id=1").show(10)
spark.sql("select 4,'blah',124232").show(10)
import java.sql.DriverManager val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")
import java.util.Properties val connectionProperties = new Properties()
connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")
spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113", "Person",connectionProperties)
spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) .option("dbtable", "Person").load().show(10,false)
Not Working requires a CONFIG file which is example-default.xml:- val igniteDF = spark.read .format(FORMAT_IGNITE) //Data source type. .option(OPTION_TABLE, "person") //Table to read. .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config. .load() .filter(col("id") >= 2) //Filter clause. .filter(col("name") like "%J%") //Another filter clause.
Full Code:- (sparkDSLExample) function fails to connect ignite cluster which I already have package com.ignite.examples.spark
import com.ignite.examples.model.Address import org.apache.ignite.{Ignite, Ignition} import org.apache.ignite.cache.query.SqlFieldsQuery import org.apache.ignite.client.{ClientCache, IgniteClient} import org.apache.ignite.configuration.{CacheConfiguration, ClientConfiguration} import java.lang.{Long => JLong, String => JString}
import org.apache.ignite.cache.query.SqlFieldsQuery import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_TABLE} import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.functions.col
object SparkClientConnectionTest {
private val CACHE_NAME = "SparkCache"
private val CONFIG = "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"
def setupExampleData = {
val cfg2 = new ClientConfiguration().setAddresses("3.88.248.113:10800") val igniteClient:IgniteClient = Ignition.startClient(cfg2)
System.out.format(">>> Created cache [%s].\n", CACHE_NAME)
val cache:ClientCache[Integer, Address] = igniteClient.getOrCreateCache(CACHE_NAME)
cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS Person")) .setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH \"VALUE_TYPE=%s\"", classOf[Address].getName)) .setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", "04074").setSchema("PUBLIC")).getAll cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", "520003").setSchema("PUBLIC")).getAll cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", "1234").setSchema("PUBLIC")).getAll
System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)
val data=cache.query(new SqlFieldsQuery("select * from Person").setSchema("PUBLIC")).getAll
println(data.toString) }
def sparkDSLExample(implicit spark: SparkSession): Unit = { println("Querying using Spark DSL.") println
val igniteDF = spark.read .format(FORMAT_IGNITE) //Data source type. .option(OPTION_TABLE, "person") //Table to read. .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config. .load() .filter(col("id") >= 2) //Filter clause. .filter(col("name") like "%J%") //Another filter clause.
println("Data frame schema:")
igniteDF.printSchema() //Printing query schema to console.
println("Data frame content:")
igniteDF.show() //Printing query results to console. }
def main(args: Array[String]): Unit = {
setupExampleData
//Creating spark session. implicit val spark = SparkSession.builder() .appName("Spark Ignite data sources example") .master("local") .config("spark.executor.instances", "2") .getOrCreate()
// Adjust the logger to exclude the logs of no interest. Logger.getRootLogger.setLevel(Level.ERROR) Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
//sparkDSLExample
val df = spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) //.option("driver", "org.apache.ignite.IgniteJdbcDriver") .option("dbtable", "Person").load()
df.printSchema()
df.createOrReplaceTempView("test")
spark.sql("select * from test where id=1").show(10)
spark.sql("select 4,'blah',124232").show(10)
import java.sql.DriverManager val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")
import java.util.Properties val connectionProperties = new Properties()
connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")
spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113", "Person",connectionProperties)
spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) .option("dbtable", "Person").load().show(10,false)
}
}
example-default.xml:-
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -->
<!-- Ignite configuration with all defaults and enabled p2p deployment and enabled events. --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"> <bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> <!-- Set to true to enable distributed class loading for examples, default is false. --> <property name="peerClassLoadingEnabled" value="true"/>
<!-- Enable task execution events for examples. --> <property name="includeEventTypes"> <list> <!--Task execution events--> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
<!--Cache events--> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/> </list> </property>
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. --> <property name="discoverySpi"> <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> <property name="ipFinder"> <!-- Ignite provides several options for automatic discovery that can be used instead os static IP based discovery. For information on all options refer to our documentation: http://apacheignite.readme.io/docs/cluster-config --> <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. --> <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>3.88.248.113:10800</value> </list> </property> </bean> </property> </bean> </property> </bean> </beans>
-- Thanks & Regards Sri Tummala
--
Thanks & Regards Sri Tummala
--
Thanks & Regards Sri Tummala
-- Thanks & Regards Sri Tummala
--
Thanks & Regards Sri Tummala
--
Thanks & Regards Sri Tummala
-- Thanks & Regards Sri Tummala
|
|
Sri,
That's wrong to assume that the Ignite thin client is the only way to access data stored in AWS or other cloud environments. The problems you are experiencing are caused by the fact that your application is running on the laptop while the cluster (server nodes) is within an AWS network.
If you use a thick/regular client, then servers running on AWS need to have a way to *open* network connections to the application. This won't happen if the laptop or dev environment doesn't have public IPs reachable from AWS. Plus, additional network configuration might be needed.
Overall, thick clients work with AWS, but either an application needs to be deployed to AWS during development, or you need to start a local cluster in your network/premises first, accomplish development and move on with testing in AWS. A network configuration is also an option but a tricky one and might not be doable.
Thin clients are a solution as well if key-value and SQL APIs are all you need. On Sun, Oct 20, 2019 at 2:06 PM sri hari kali charan Tummala < [hidden email]> wrote: so I got an answer from Grid Grain computing, JDBC thin client is the only way yo connect the ignite cluster which is running on aws.
Thanks Sri
On Fri, Oct 18, 2019 at 1:38 PM sri hari kali charan Tummala < [hidden email]> wrote: Ok I created a 2 node ec2 instance ignite cluster is below is the right way to create cache? the code still using my laptop resources unable to connect ignite cluster.
Out put:- [13:36:36] Ignite node started OK (id=8535f4d3) [13:36:36] Topology snapshot [ver=30, locNode=8535f4d3, servers=1, clients=1, state=ACTIVE, CPUs=8, offheap=6.4GB, heap=14.0GB] >>> cache acquired
package com.ignite.examples.igniteStartup
import java.util.Arrays import java.util.List import com.ignite.examples.model.Address import org.apache.commons.logging.Log import org.apache.commons.logging.LogFactory import org.apache.ignite.Ignite import org.apache.ignite.IgniteCache import org.apache.ignite.Ignition import org.apache.ignite.configuration.IgniteConfiguration import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder
//remove if not needed import scala.collection.JavaConversions._
object IgniteStart2 {
private var cache: IgniteCache[String, Address] = _
def main(args: Array[String]): Unit = {
val spi: TcpDiscoverySpi = new TcpDiscoverySpi() val ipFinder: TcpDiscoveryVmIpFinder = new TcpDiscoveryVmIpFinder() val hostList: List[String] = Arrays.asList("127.0.0.1:47500..47509,18.206.247.40:47500..47509,52.207.217.31:47500..47509".split(","): _*) ipFinder.setAddresses(hostList) spi.setIpFinder(ipFinder) val cfg: IgniteConfiguration = new IgniteConfiguration() cfg.setDiscoverySpi(spi) cfg.setClientMode(true) System.out.println(">>> I am here")
cfg.setPeerClassLoadingEnabled(true) val ignite: Ignite = Ignition.start(cfg) cache = Ignition.ignite().cache("test") System.out.println(">>> cache acquired")
System.exit(0)
}
}
On Fri, Oct 18, 2019 at 12:56 PM sri hari kali charan Tummala < [hidden email]> wrote: Hi Stephen ,
I followed below steps and created one node ec2 instance with Ignite on it now I am connecting to the ignite cluster using spark in two ways 1) thin client 2) I don't know you have to tell me
my question is what is the value I have to pass for option 2 is it ec2 instance public IP with port 47500..47509 in my example-default.xml file ?
Thanks Sri
On Fri, Oct 18, 2019 at 12:47 PM Stephen Darlington < [hidden email]> wrote: You’re still pointing your Spark node ( thick client) at port 10800 (the thin client port). This is not going to work.
You can create a table using sqlline and read it using Spark, or vice versa. But you need a functioning cluster.
Check out the documentation on clustering concepts and configuration: https://www.gridgain.com/docs/latest/developers-guide/clustering/clustering On 18 Oct 2019, at 16:32, sri hari kali charan Tummala < [hidden email]> wrote:
Hi Stephen/All,
got it working somewhat using below, but have an issue table and data which is created using thin client is failing to read using spark but table created by spark can be read using a thin client, that means table created in Ignite using spark are the only ones read using spark in Ignite?
example-default.xml <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>18.206.247.40:10800</value> </list> </property>
NotWorking Scala Function:-
def readThinClientTableUsingSpark(implicit spark: SparkSession) = { val personDataFrame = spark.read .format(FORMAT_IGNITE) .option(OPTION_CONFIG_FILE, CONFIG) .option(OPTION_TABLE, "person") .load()
println() println("Data frame thin connection content:person") println()
//Printing content of data frame to console. personDataFrame.show() }
Exception in thread "main" class org.apache.ignite.IgniteException: Unknown table person at org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46) at org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46) at scala.Option.getOrElse(Option.scala:121) at org.apache.ignite.spark.impl.IgniteSQLRelation.schema(IgniteSQLRelation.scala:46) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:431) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164) at com.ignite.examples.spark.SparkIgniteCleanCode$.readThinClientTableUsingSpark(SparkIgniteCleanCode.scala:154) at com.ignite.examples.spark.SparkIgniteCleanCode$.main(SparkIgniteCleanCode.scala:216) at com.ignite.examples.spark.SparkIgniteCleanCode.main(SparkIgniteCleanCode.scala)
Full Code:- (step 7 Fails) package com.ignite.examples.spark
import com.ignite.examples.model.Address import org.apache.ignite.{Ignite, Ignition} import org.apache.ignite.cache.query.SqlFieldsQuery import org.apache.ignite.client.{ClientCache, IgniteClient} import org.apache.ignite.configuration.{CacheConfiguration, ClientConfiguration} import java.lang.{Long => JLong, String => JString}
import com.ignite.examples.spark.SparkClientConnectionTest.{CACHE_NAME, CONFIG} import org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_CREATE_TABLE_PARAMETERS, OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, OPTION_TABLE} import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.{SaveMode, SparkSession}
object SparkIgniteCleanCode {
private val CACHE_NAME = "SparkCache"
private val CONFIG = "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"
def setupExampleData = {
val cfg2 = new ClientConfiguration().setAddresses("18.206.247.40:10800") val igniteClient:IgniteClient = Ignition.startClient(cfg2)
System.out.format(">>> Created cache [%s].\n", CACHE_NAME)
val cache:ClientCache[Integer, Address] = igniteClient.getOrCreateCache(CACHE_NAME)
cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS Person")) .setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH \"VALUE_TYPE=%s\"", classOf[Address].getName)) .setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", "04074").setSchema("PUBLIC")).getAll cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", "520003").setSchema("PUBLIC")).getAll cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", "1234").setSchema("PUBLIC")).getAll
System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)
val data=cache.query(new SqlFieldsQuery("select * from Person").setSchema("PUBLIC")).getAll
println(data.toString) }
def sparkReadIgniteWithThinClient(implicit spark: SparkSession)={
val df = spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://18.206.247.40") .option("fetchsize",100) //.option("driver", "org.apache.ignite.IgniteJdbcDriver") .option("dbtable", "Person").load()
df.printSchema()
df.createOrReplaceTempView("test")
spark.sql("select * from test where id=1").show(10)
spark.sql("select 4,'blah',124232").show(10)
}
def sparkWriteIgniteWithThinClient(implicit spark: SparkSession)={
import java.sql.DriverManager val connection = DriverManager.getConnection("jdbc:ignite:thin://18.206.247.40")
import java.util.Properties val connectionProperties = new Properties()
connectionProperties.put("url", "jdbc:ignite:thin://18.206.247.40")
spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://18.206.247.40", "Person",connectionProperties)
spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://18.206.247.40") .option("fetchsize",100) .option("dbtable", "Person").load().show(10,false)
}
def writeJSonToIgniteUsingSpark(implicit spark: SparkSession): Unit = {
val ignite = Ignition.start(CONFIG)
//Load content of json file to data frame. val personsDataFrame = spark.read.json( resolveIgnitePath("/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/resources/person.json").getAbsolutePath)
println() println("Json file content:") println()
//Printing content of json file to console. personsDataFrame.show()
println() println("Writing Data Frame to Ignite:") println()
//Writing content of data frame to Ignite. personsDataFrame.write .format(FORMAT_IGNITE) .mode(org.apache.spark.sql.SaveMode.Append) .option(OPTION_CONFIG_FILE, CONFIG) .option(OPTION_TABLE, "json_person") .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id") .option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated") .save()
println("Done!")
println() println("Reading data from Ignite table:") println()
val cache = ignite.cache[Any, Any](CACHE_NAME)
//Reading saved data from Ignite. val data = cache.query(new SqlFieldsQuery("SELECT id, name, department FROM json_person")).getAll
println(data.toString)
//data.foreach { row ⇒ println(row.mkString("[", ", ", "]")) } }
def readIgniteUsingSpark(implicit spark: SparkSession) = { val json_person = spark.read .format(FORMAT_IGNITE) .option(OPTION_CONFIG_FILE, CONFIG) .option(OPTION_TABLE, "json_person") .load()
println() println("Data frame content:json_person") println()
//Printing content of data frame to console. json_person.show() }
def readThinClientTableUsingSpark(implicit spark: SparkSession) = { val personDataFrame = spark.read .format(FORMAT_IGNITE) .option(OPTION_CONFIG_FILE, CONFIG) .option(OPTION_TABLE, "person") .load()
println() println("Data frame thin connection content:person") println()
//Printing content of data frame to console. personDataFrame.show() }
def main(args: Array[String]): Unit = {
println() println("Step 1 setupExampleData:") println()
setupExampleData
println() println("Step 2 createSparkSession:") println()
//Creating spark session. implicit val spark = SparkSession.builder() .appName("Spark Ignite data sources example") .master("local") .config("spark.executor.instances", "2") .getOrCreate()
// Adjust the logger to exclude the logs of no interest. Logger.getRootLogger.setLevel(Level.ERROR) Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
println() println("Step 3 ReadIgniteWithThinClient of Step1 Data:") println()
sparkReadIgniteWithThinClient(spark)
println() println("Step 4 sparkWriteIgniteWithThinClient of Step1 Data:") println()
sparkWriteIgniteWithThinClient(spark)
println() println("Step 5 writeJSonToIgniteUsingSpark Using Spark:") println()
writeJSonToIgniteUsingSpark(spark)
println() println("Step 6 readIgniteUsingSpark Using Spark:") println()
readIgniteUsingSpark(spark)
println() println("Step 7 readThinClientTableUsingSpark Using Spark:") println()
readThinClientTableUsingSpark(spark)
spark.stop()
}
}
example-default.xml
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -->
<!-- Ignite configuration with all defaults and enabled p2p deployment and enabled events. --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"> <bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> <!-- Set to true to enable distributed class loading for examples, default is false. --> <property name="peerClassLoadingEnabled" value="true"/>
<!-- Enable task execution events for examples. --> <property name="includeEventTypes"> <list> <!--Task execution events--> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
<!--Cache events--> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/> </list> </property>
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. --> <property name="discoverySpi"> <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> <property name="ipFinder"> <!-- Ignite provides several options for automatic discovery that can be used instead os static IP based discovery. For information on all options refer to our documentation: http://apacheignite.readme.io/docs/cluster-config --> <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. --> <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>18.206.247.40:10800</value> </list> </property> </bean> </property> </bean> </property> </bean> </beans>
Thanks Sri On Fri, Oct 18, 2019 at 9:23 AM sri hari kali charan Tummala < [hidden email]> wrote: do you mean communication ports 47100-47200 as mentioned (https://www.gridgain.com/docs/8.7.6//installation-guide/manual-install-on-ec2) ? bare in mind I am running my spark job outside ignite ec2 box (MAC PC).
which option is right in my default.xml file?
Option 1:- <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>3.88.248.113:47100..47200</value> </list> </property>
Option 2:- <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>3.88.248.113:47500..47600</value> </list> </property>
Option 3:- <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>3.88.248.113</value> </list> </property>
On Fri, Oct 18, 2019 at 9:18 AM sri hari kali charan Tummala < [hidden email]> wrote: Hi Stephen ,
do you mean 3 .88.248.113:47500..47700 something like this? or just public ip 3 .88.248.113 I tried all the possibilities none of them are getting connected.
Thanks Sri
On Fri, Oct 18, 2019 at 6:02 AM Stephen Darlington < [hidden email]> wrote: You’re trying to connect a thick client (the Spark integration) to the thin client port (10800). Your example-default.xml file needs to have the same configuration as your server node(s).
Regards, Stephen On 17 Oct 2019, at 18:12, sri hari kali charan Tummala < [hidden email]> wrote:
Hi Community,
I am trying to read and write into the Ignite cluster using apache-spark I am able to do that using JDBC thin client but not native method as mentioned in several spark + ignite examples.
Right now all the spark + ignite examples launch a local ignite cluster but I want my code connecting to already existing cluster (client).
Question:- How to pass Ignite connection ip and port (10800) 10800 in example-default.xml ?
Error:- TcpDiscoverySpi: Failed to connect to any address from IP finder (will retry to join topology every 2000 ms; change 'reconnectDelay' to configure the frequency of retries): [/3.88.248.113:10800]
Working (Spark + Ignite using JDBC):- val df = spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) //.option("driver", "org.apache.ignite.IgniteJdbcDriver") .option("dbtable", "Person").load()
df.printSchema()
df.createOrReplaceTempView("test")
spark.sql("select * from test where id=1").show(10)
spark.sql("select 4,'blah',124232").show(10)
import java.sql.DriverManager val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")
import java.util.Properties val connectionProperties = new Properties()
connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")
spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113", "Person",connectionProperties)
spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) .option("dbtable", "Person").load().show(10,false)
Not Working requires a CONFIG file which is example-default.xml:- val igniteDF = spark.read .format(FORMAT_IGNITE) //Data source type. .option(OPTION_TABLE, "person") //Table to read. .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config. .load() .filter(col("id") >= 2) //Filter clause. .filter(col("name") like "%J%") //Another filter clause.
Full Code:- (sparkDSLExample) function fails to connect ignite cluster which I already have package com.ignite.examples.spark
import com.ignite.examples.model.Address import org.apache.ignite.{Ignite, Ignition} import org.apache.ignite.cache.query.SqlFieldsQuery import org.apache.ignite.client.{ClientCache, IgniteClient} import org.apache.ignite.configuration.{CacheConfiguration, ClientConfiguration} import java.lang.{Long => JLong, String => JString}
import org.apache.ignite.cache.query.SqlFieldsQuery import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_TABLE} import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.functions.col
object SparkClientConnectionTest {
private val CACHE_NAME = "SparkCache"
private val CONFIG = "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"
def setupExampleData = {
val cfg2 = new ClientConfiguration().setAddresses("3.88.248.113:10800") val igniteClient:IgniteClient = Ignition.startClient(cfg2)
System.out.format(">>> Created cache [%s].\n", CACHE_NAME)
val cache:ClientCache[Integer, Address] = igniteClient.getOrCreateCache(CACHE_NAME)
cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS Person")) .setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH \"VALUE_TYPE=%s\"", classOf[Address].getName)) .setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", "04074").setSchema("PUBLIC")).getAll cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", "520003").setSchema("PUBLIC")).getAll cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", "1234").setSchema("PUBLIC")).getAll
System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)
val data=cache.query(new SqlFieldsQuery("select * from Person").setSchema("PUBLIC")).getAll
println(data.toString) }
def sparkDSLExample(implicit spark: SparkSession): Unit = { println("Querying using Spark DSL.") println
val igniteDF = spark.read .format(FORMAT_IGNITE) //Data source type. .option(OPTION_TABLE, "person") //Table to read. .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config. .load() .filter(col("id") >= 2) //Filter clause. .filter(col("name") like "%J%") //Another filter clause.
println("Data frame schema:")
igniteDF.printSchema() //Printing query schema to console.
println("Data frame content:")
igniteDF.show() //Printing query results to console. }
def main(args: Array[String]): Unit = {
setupExampleData
//Creating spark session. implicit val spark = SparkSession.builder() .appName("Spark Ignite data sources example") .master("local") .config("spark.executor.instances", "2") .getOrCreate()
// Adjust the logger to exclude the logs of no interest. Logger.getRootLogger.setLevel(Level.ERROR) Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
//sparkDSLExample
val df = spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) //.option("driver", "org.apache.ignite.IgniteJdbcDriver") .option("dbtable", "Person").load()
df.printSchema()
df.createOrReplaceTempView("test")
spark.sql("select * from test where id=1").show(10)
spark.sql("select 4,'blah',124232").show(10)
import java.sql.DriverManager val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")
import java.util.Properties val connectionProperties = new Properties()
connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")
spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113", "Person",connectionProperties)
spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) .option("dbtable", "Person").load().show(10,false)
}
}
example-default.xml:-
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -->
<!-- Ignite configuration with all defaults and enabled p2p deployment and enabled events. --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"> <bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> <!-- Set to true to enable distributed class loading for examples, default is false. --> <property name="peerClassLoadingEnabled" value="true"/>
<!-- Enable task execution events for examples. --> <property name="includeEventTypes"> <list> <!--Task execution events--> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
<!--Cache events--> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/> </list> </property>
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. --> <property name="discoverySpi"> <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> <property name="ipFinder"> <!-- Ignite provides several options for automatic discovery that can be used instead os static IP based discovery. For information on all options refer to our documentation: http://apacheignite.readme.io/docs/cluster-config --> <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. --> <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>3.88.248.113:10800</value> </list> </property> </bean> </property> </bean> </property> </bean> </beans>
-- Thanks & Regards Sri Tummala
--
Thanks & Regards Sri Tummala
--
Thanks & Regards Sri Tummala
-- Thanks & Regards Sri Tummala
--
Thanks & Regards Sri Tummala
--
Thanks & Regards Sri Tummala
--
Thanks & Regards Sri Tummala
|
|
I got it so I have to build a jar move to aws ignite cluster node and run it on the node.
Thanks Sri On Thursday, October 24, 2019, Denis Magda < [hidden email]> wrote: Sri,
That's wrong to assume that the Ignite thin client is the only way to access data stored in AWS or other cloud environments. The problems you are experiencing are caused by the fact that your application is running on the laptop while the cluster (server nodes) is within an AWS network.
If you use a thick/regular client, then servers running on AWS need to have a way to *open* network connections to the application. This won't happen if the laptop or dev environment doesn't have public IPs reachable from AWS. Plus, additional network configuration might be needed.
Overall, thick clients work with AWS, but either an application needs to be deployed to AWS during development, or you need to start a local cluster in your network/premises first, accomplish development and move on with testing in AWS. A network configuration is also an option but a tricky one and might not be doable.
Thin clients are a solution as well if key-value and SQL APIs are all you need.
On Sun, Oct 20, 2019 at 2:06 PM sri hari kali charan Tummala < [hidden email]> wrote: so I got an answer from Grid Grain computing, JDBC thin client is the only way yo connect the ignite cluster which is running on aws.
Thanks Sri
On Fri, Oct 18, 2019 at 1:38 PM sri hari kali charan Tummala < [hidden email]> wrote: Ok I created a 2 node ec2 instance ignite cluster is below is the right way to create cache? the code still using my laptop resources unable to connect ignite cluster.
Out put:- [13:36:36] Ignite node started OK (id=8535f4d3) [13:36:36] Topology snapshot [ver=30, locNode=8535f4d3, servers=1, clients=1, state=ACTIVE, CPUs=8, offheap=6.4GB, heap=14.0GB] >>> cache acquired
package com.ignite.examples.igniteStartup
import java.util.Arrays import java.util.List import com.ignite.examples.model.Address import org.apache.commons.logging.Log import org.apache.commons.logging.LogFactory import org.apache.ignite.Ignite import org.apache.ignite.IgniteCache import org.apache.ignite.Ignition import org.apache.ignite.configuration.IgniteConfiguration import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder
//remove if not needed import scala.collection.JavaConversions._
object IgniteStart2 {
private var cache: IgniteCache[String, Address] = _
def main(args: Array[String]): Unit = {
val spi: TcpDiscoverySpi = new TcpDiscoverySpi() val ipFinder: TcpDiscoveryVmIpFinder = new TcpDiscoveryVmIpFinder() val hostList: List[String] = Arrays.asList("127.0.0.1:47500..47509,18.206.247.40:47500..47509,52.207.217.31:47500..47509".split(","): _*) ipFinder.setAddresses(hostList) spi.setIpFinder(ipFinder) val cfg: IgniteConfiguration = new IgniteConfiguration() cfg.setDiscoverySpi(spi) cfg.setClientMode(true) System.out.println(">>> I am here")
cfg.setPeerClassLoadingEnabled(true) val ignite: Ignite = Ignition.start(cfg) cache = Ignition.ignite().cache("test") System.out.println(">>> cache acquired")
System.exit(0)
}
}
On Fri, Oct 18, 2019 at 12:56 PM sri hari kali charan Tummala < [hidden email]> wrote: Hi Stephen ,
I followed below steps and created one node ec2 instance with Ignite on it now I am connecting to the ignite cluster using spark in two ways 1) thin client 2) I don't know you have to tell me
my question is what is the value I have to pass for option 2 is it ec2 instance public IP with port 47500..47509 in my example-default.xml file ?
Thanks Sri
On Fri, Oct 18, 2019 at 12:47 PM Stephen Darlington < [hidden email]> wrote: You’re still pointing your Spark node ( thick client) at port 10800 (the thin client port). This is not going to work.
You can create a table using sqlline and read it using Spark, or vice versa. But you need a functioning cluster.
Check out the documentation on clustering concepts and configuration: https://www.gridgain.com/docs/latest/developers-guide/clustering/clustering On 18 Oct 2019, at 16:32, sri hari kali charan Tummala < [hidden email]> wrote:
Hi Stephen/All,
got it working somewhat using below, but have an issue table and data which is created using thin client is failing to read using spark but table created by spark can be read using a thin client, that means table created in Ignite using spark are the only ones read using spark in Ignite?
example-default.xml <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>18.206.247.40:10800</value> </list> </property>
NotWorking Scala Function:-
def readThinClientTableUsingSpark(implicit spark: SparkSession) = { val personDataFrame = spark.read .format(FORMAT_IGNITE) .option(OPTION_CONFIG_FILE, CONFIG) .option(OPTION_TABLE, "person") .load()
println() println("Data frame thin connection content:person") println()
//Printing content of data frame to console. personDataFrame.show() }
Exception in thread "main" class org.apache.ignite.IgniteException: Unknown table person at org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46) at org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46) at scala.Option.getOrElse(Option.scala:121) at org.apache.ignite.spark.impl.IgniteSQLRelation.schema(IgniteSQLRelation.scala:46) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:431) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164) at com.ignite.examples.spark.SparkIgniteCleanCode$.readThinClientTableUsingSpark(SparkIgniteCleanCode.scala:154) at com.ignite.examples.spark.SparkIgniteCleanCode$.main(SparkIgniteCleanCode.scala:216) at com.ignite.examples.spark.SparkIgniteCleanCode.main(SparkIgniteCleanCode.scala)
Full Code:- (step 7 Fails) package com.ignite.examples.spark
import com.ignite.examples.model.Address import org.apache.ignite.{Ignite, Ignition} import org.apache.ignite.cache.query.SqlFieldsQuery import org.apache.ignite.client.{ClientCache, IgniteClient} import org.apache.ignite.configuration.{CacheConfiguration, ClientConfiguration} import java.lang.{Long => JLong, String => JString}
import com.ignite.examples.spark.SparkClientConnectionTest.{CACHE_NAME, CONFIG} import org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_CREATE_TABLE_PARAMETERS, OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, OPTION_TABLE} import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.{SaveMode, SparkSession}
object SparkIgniteCleanCode {
private val CACHE_NAME = "SparkCache"
private val CONFIG = "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"
def setupExampleData = {
val cfg2 = new ClientConfiguration().setAddresses("18.206.247.40:10800") val igniteClient:IgniteClient = Ignition.startClient(cfg2)
System.out.format(">>> Created cache [%s].\n", CACHE_NAME)
val cache:ClientCache[Integer, Address] = igniteClient.getOrCreateCache(CACHE_NAME)
cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS Person")) .setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH \"VALUE_TYPE=%s\"", classOf[Address].getName)) .setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", "04074").setSchema("PUBLIC")).getAll cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", "520003").setSchema("PUBLIC")).getAll cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", "1234").setSchema("PUBLIC")).getAll
System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)
val data=cache.query(new SqlFieldsQuery("select * from Person").setSchema("PUBLIC")).getAll
println(data.toString) }
def sparkReadIgniteWithThinClient(implicit spark: SparkSession)={
val df = spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://18.206.247.40") .option("fetchsize",100) //.option("driver", "org.apache.ignite.IgniteJdbcDriver") .option("dbtable", "Person").load()
df.printSchema()
df.createOrReplaceTempView("test")
spark.sql("select * from test where id=1").show(10)
spark.sql("select 4,'blah',124232").show(10)
}
def sparkWriteIgniteWithThinClient(implicit spark: SparkSession)={
import java.sql.DriverManager val connection = DriverManager.getConnection("jdbc:ignite:thin://18.206.247.40")
import java.util.Properties val connectionProperties = new Properties()
connectionProperties.put("url", "jdbc:ignite:thin://18.206.247.40")
spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://18.206.247.40", "Person",connectionProperties)
spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://18.206.247.40") .option("fetchsize",100) .option("dbtable", "Person").load().show(10,false)
}
def writeJSonToIgniteUsingSpark(implicit spark: SparkSession): Unit = {
val ignite = Ignition.start(CONFIG)
//Load content of json file to data frame. val personsDataFrame = spark.read.json( resolveIgnitePath("/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/resources/person.json").getAbsolutePath)
println() println("Json file content:") println()
//Printing content of json file to console. personsDataFrame.show()
println() println("Writing Data Frame to Ignite:") println()
//Writing content of data frame to Ignite. personsDataFrame.write .format(FORMAT_IGNITE) .mode(org.apache.spark.sql.SaveMode.Append) .option(OPTION_CONFIG_FILE, CONFIG) .option(OPTION_TABLE, "json_person") .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id") .option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated") .save()
println("Done!")
println() println("Reading data from Ignite table:") println()
val cache = ignite.cache[Any, Any](CACHE_NAME)
//Reading saved data from Ignite. val data = cache.query(new SqlFieldsQuery("SELECT id, name, department FROM json_person")).getAll
println(data.toString)
//data.foreach { row ⇒ println(row.mkString("[", ", ", "]")) } }
def readIgniteUsingSpark(implicit spark: SparkSession) = { val json_person = spark.read .format(FORMAT_IGNITE) .option(OPTION_CONFIG_FILE, CONFIG) .option(OPTION_TABLE, "json_person") .load()
println() println("Data frame content:json_person") println()
//Printing content of data frame to console. json_person.show() }
def readThinClientTableUsingSpark(implicit spark: SparkSession) = { val personDataFrame = spark.read .format(FORMAT_IGNITE) .option(OPTION_CONFIG_FILE, CONFIG) .option(OPTION_TABLE, "person") .load()
println() println("Data frame thin connection content:person") println()
//Printing content of data frame to console. personDataFrame.show() }
def main(args: Array[String]): Unit = {
println() println("Step 1 setupExampleData:") println()
setupExampleData
println() println("Step 2 createSparkSession:") println()
//Creating spark session. implicit val spark = SparkSession.builder() .appName("Spark Ignite data sources example") .master("local") .config("spark.executor.instances", "2") .getOrCreate()
// Adjust the logger to exclude the logs of no interest. Logger.getRootLogger.setLevel(Level.ERROR) Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
println() println("Step 3 ReadIgniteWithThinClient of Step1 Data:") println()
sparkReadIgniteWithThinClient(spark)
println() println("Step 4 sparkWriteIgniteWithThinClient of Step1 Data:") println()
sparkWriteIgniteWithThinClient(spark)
println() println("Step 5 writeJSonToIgniteUsingSpark Using Spark:") println()
writeJSonToIgniteUsingSpark(spark)
println() println("Step 6 readIgniteUsingSpark Using Spark:") println()
readIgniteUsingSpark(spark)
println() println("Step 7 readThinClientTableUsingSpark Using Spark:") println()
readThinClientTableUsingSpark(spark)
spark.stop()
}
}
example-default.xml
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -->
<!-- Ignite configuration with all defaults and enabled p2p deployment and enabled events. --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"> <bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> <!-- Set to true to enable distributed class loading for examples, default is false. --> <property name="peerClassLoadingEnabled" value="true"/>
<!-- Enable task execution events for examples. --> <property name="includeEventTypes"> <list> <!--Task execution events--> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
<!--Cache events--> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/> </list> </property>
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. --> <property name="discoverySpi"> <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> <property name="ipFinder"> <!-- Ignite provides several options for automatic discovery that can be used instead os static IP based discovery. For information on all options refer to our documentation: http://apacheignite.readme.io/docs/cluster-config --> <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. --> <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>18.206.247.40:10800</value> </list> </property> </bean> </property> </bean> </property> </bean> </beans>
Thanks Sri On Fri, Oct 18, 2019 at 9:23 AM sri hari kali charan Tummala < [hidden email]> wrote: do you mean communication ports 47100-47200 as mentioned (https://www.gridgain.com/docs/8.7.6//installation-guide/manual-install-on-ec2) ? bare in mind I am running my spark job outside ignite ec2 box (MAC PC).
which option is right in my default.xml file?
Option 1:- <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>3.88.248.113:47100..47200</value> </list> </property>
Option 2:- <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>3.88.248.113:47500..47600</value> </list> </property>
Option 3:- <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>3.88.248.113</value> </list> </property>
On Fri, Oct 18, 2019 at 9:18 AM sri hari kali charan Tummala < [hidden email]> wrote: Hi Stephen ,
do you mean 3 .88.248.113:47500..47700 something like this? or just public ip 3 .88.248.113 I tried all the possibilities none of them are getting connected.
Thanks Sri
On Fri, Oct 18, 2019 at 6:02 AM Stephen Darlington < [hidden email]> wrote: You’re trying to connect a thick client (the Spark integration) to the thin client port (10800). Your example-default.xml file needs to have the same configuration as your server node(s).
Regards, Stephen On 17 Oct 2019, at 18:12, sri hari kali charan Tummala < [hidden email]> wrote:
Hi Community,
I am trying to read and write into the Ignite cluster using apache-spark I am able to do that using JDBC thin client but not native method as mentioned in several spark + ignite examples.
Right now all the spark + ignite examples launch a local ignite cluster but I want my code connecting to already existing cluster (client).
Question:- How to pass Ignite connection ip and port (10800) 10800 in example-default.xml ?
Error:- TcpDiscoverySpi: Failed to connect to any address from IP finder (will retry to join topology every 2000 ms; change 'reconnectDelay' to configure the frequency of retries): [/3.88.248.113:10800]
Working (Spark + Ignite using JDBC):- val df = spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) //.option("driver", "org.apache.ignite.IgniteJdbcDriver") .option("dbtable", "Person").load()
df.printSchema()
df.createOrReplaceTempView("test")
spark.sql("select * from test where id=1").show(10)
spark.sql("select 4,'blah',124232").show(10)
import java.sql.DriverManager val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")
import java.util.Properties val connectionProperties = new Properties()
connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")
spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113", "Person",connectionProperties)
spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) .option("dbtable", "Person").load().show(10,false)
Not Working requires a CONFIG file which is example-default.xml:- val igniteDF = spark.read .format(FORMAT_IGNITE) //Data source type. .option(OPTION_TABLE, "person") //Table to read. .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config. .load() .filter(col("id") >= 2) //Filter clause. .filter(col("name") like "%J%") //Another filter clause.
Full Code:- (sparkDSLExample) function fails to connect ignite cluster which I already have package com.ignite.examples.spark
import com.ignite.examples.model.Address import org.apache.ignite.{Ignite, Ignition} import org.apache.ignite.cache.query.SqlFieldsQuery import org.apache.ignite.client.{ClientCache, IgniteClient} import org.apache.ignite.configuration.{CacheConfiguration, ClientConfiguration} import java.lang.{Long => JLong, String => JString}
import org.apache.ignite.cache.query.SqlFieldsQuery import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_TABLE} import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.functions.col
object SparkClientConnectionTest {
private val CACHE_NAME = "SparkCache"
private val CONFIG = "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"
def setupExampleData = {
val cfg2 = new ClientConfiguration().setAddresses("3.88.248.113:10800") val igniteClient:IgniteClient = Ignition.startClient(cfg2)
System.out.format(">>> Created cache [%s].\n", CACHE_NAME)
val cache:ClientCache[Integer, Address] = igniteClient.getOrCreateCache(CACHE_NAME)
cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS Person")) .setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH \"VALUE_TYPE=%s\"", classOf[Address].getName)) .setSchema("PUBLIC")).getAll
cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", "04074").setSchema("PUBLIC")).getAll cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", "520003").setSchema("PUBLIC")).getAll cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", "1234").setSchema("PUBLIC")).getAll
System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)
val data=cache.query(new SqlFieldsQuery("select * from Person").setSchema("PUBLIC")).getAll
println(data.toString) }
def sparkDSLExample(implicit spark: SparkSession): Unit = { println("Querying using Spark DSL.") println
val igniteDF = spark.read .format(FORMAT_IGNITE) //Data source type. .option(OPTION_TABLE, "person") //Table to read. .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config. .load() .filter(col("id") >= 2) //Filter clause. .filter(col("name") like "%J%") //Another filter clause.
println("Data frame schema:")
igniteDF.printSchema() //Printing query schema to console.
println("Data frame content:")
igniteDF.show() //Printing query results to console. }
def main(args: Array[String]): Unit = {
setupExampleData
//Creating spark session. implicit val spark = SparkSession.builder() .appName("Spark Ignite data sources example") .master("local") .config("spark.executor.instances", "2") .getOrCreate()
// Adjust the logger to exclude the logs of no interest. Logger.getRootLogger.setLevel(Level.ERROR) Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
//sparkDSLExample
val df = spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) //.option("driver", "org.apache.ignite.IgniteJdbcDriver") .option("dbtable", "Person").load()
df.printSchema()
df.createOrReplaceTempView("test")
spark.sql("select * from test where id=1").show(10)
spark.sql("select 4,'blah',124232").show(10)
import java.sql.DriverManager val connection = DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")
import java.util.Properties val connectionProperties = new Properties()
connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")
spark.sql("select 4 as ID,'blah' as STREET,124232 as ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113", "Person",connectionProperties)
spark.read .format("jdbc") .option("url", "jdbc:ignite:thin://3.88.248.113") .option("fetchsize",100) .option("dbtable", "Person").load().show(10,false)
}
}
example-default.xml:-
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -->
<!-- Ignite configuration with all defaults and enabled p2p deployment and enabled events. --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"> <bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> <!-- Set to true to enable distributed class loading for examples, default is false. --> <property name="peerClassLoadingEnabled" value="true"/>
<!-- Enable task execution events for examples. --> <property name="includeEventTypes"> <list> <!--Task execution events--> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
<!--Cache events--> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/> </list> </property>
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. --> <property name="discoverySpi"> <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> <property name="ipFinder"> <!-- Ignite provides several options for automatic discovery that can be used instead os static IP based discovery. For information on all options refer to our documentation: http://apacheignite.readme.io/docs/cluster-config --> <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. --> <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>3.88.248.113:10800</value>
-- Thanks & Regards Sri Tummala
|
|