Data Consistency Question

classic Classic list List threaded Threaded
7 messages Options
adipro adipro
Reply | Threaded
Open this post in threaded view
|

Data Consistency Question

Can anyone tell if this code guarantees all the rows to be inserted right
after all the statements are executed?

                    IgniteSemaphore semaphore = null;
                    try {
                        semaphore =
cacheHolder.getJDBCSemaphore(IgniteLocks.JDBC_LOCK.getLockValue());
                        semaphore.acquire();
                        try{
                           
Class.forName("org.apache.ignite.IgniteJdbcThinDriver");
                        }
                        catch(Exception e) {
                            e.printStackTrace();
                        }
                        try(final Connection conn =
DriverManager.getConnection(IgniteCacheTable.JDBC_THIN_URL_STRING.toString())){
                            try (final Statement stmt =
conn.createStatement()) {
                                stmt.executeUpdate("SET STREAMING ON
ALLOW_OVERWRITE ON");//No I18N
                            }
                            try(final PreparedStatement stmt =
conn.prepareStatement(
                                    "INSERT INTO URLS VALUES (?, ?, ?, ?)"))
{//No I18N
                                while (keys.hasNext()) {
                                    String key = keys.next().toString();
                                    s_id = cacheHolder.getAtomicSequence();
                                    stmt.setLong(1, s_id);
                                    stmt.setString(2, key);
                                    stmt.setDouble(3,
Double.parseDouble(keyValueMap.get(key).toString()));
                                    stmt.setLong(4, appNameId);
                                    stmt.execute();
                                   
appNameUrl.put(IgniteCacheTable.FRONTIER_DB.getDBValue()+"|"+objects[1]+"|"+key,
s_id);
                                }
                            }
                        }
                    }
                    finally {
                        if(semaphore != null){
                            semaphore.release();
                        }
                    }



I am having some trouble understanding data consistency when using
IgniteDataStreamers.



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
akorensh akorensh
Reply | Threaded
Open this post in threaded view
|

Re: Data Consistency Question

Hi,
   A Streamer -- using IgniteDataStreamer.addData(..) does not guarantee
consistency.

   from doc: Note that streamer will stream data concurrently by multiple
internal threads, so the data may get to remote nodes in different order
from which it was added to the streamer.

   see:
https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/IgniteDataStreamer.html
   and:  https://apacheignite.readme.io/docs/data-streamers

   In your case however, the statements are executed serially therefore
order should be preserved, while
   SET STREAMING ON is designed for multiple INSERTS submitted together.

   https://apacheignite-sql.readme.io/docs/set
Thanks, Alex
 



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
adipro adipro
Reply | Threaded
Open this post in threaded view
|

Re: Data Consistency Question

This post was updated on .
Thanks for the reply.

But I've two doubts here.

1. If you check the code, I used ignite semaphore and if i remove that
sempahore checking, then I'm getting JDBC connection issues as multiple
threads access that particular code.

We run 100s of threads. Is there any way where we can run in parallel.
Because most of threads are waiting and frequently throwing these warnings

```
"" "172.21.197.110" "test" "" "" "" "1205"
"org.apache.ignite.logger.java.JavaLogger" "warning" "WARNING" "19-05-2020
08:51:41:554" "294" "Dumping the near node thread that started transaction
[xidVer=GridCacheVersion [topVer=201264296, order=1589828678516,
nodeOrder=36], nodeId=82edf746-ccaf-4757-8d23-921c1c4d3ef6]
Stack trace of the transaction owner thread:
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:304)
o.a.i.i.util.future.GridFutureAdapter.get0(GridFutureAdapter.java:177)
o.a.i.i.util.future.GridFutureAdapter.get(GridFutureAdapter.java:140)
o.a.i.i.processors.cache.GridCacheAdapter.get0(GridCacheAdapter.java:4267)
o.a.i.i.processors.cache.GridCacheAdapter.get(GridCacheAdapter.java:4242)
o.a.i.i.processors.cache.GridCacheAdapter.get(GridCacheAdapter.java:1406)
o.a.i.i.processors.cache.GridCacheProxyImpl.get(GridCacheProxyImpl.java:373)
o.a.i.i.processors.datastructures.GridCacheSemaphoreImpl$Sync$1.call(GridCacheSemaphoreImpl.java:276)
o.a.i.i.processors.datastructures.GridCacheSemaphoreImpl$Sync$1.call(GridCacheSemaphoreImpl.java:270)
o.a.i.i.processors.cache.GridCacheUtils.retryTopologySafe(GridCacheUtils.java:1425)
o.a.i.i.processors.datastructures.GridCacheSemaphoreImpl$Sync.compareAndSetGlobalState(GridCacheSemaphoreImpl.java:270)
o.a.i.i.processors.datastructures.GridCacheSemaphoreImpl$Sync.nonfairTryAcquireShared(GridCacheSemaphoreImpl.java:204)
o.a.i.i.processors.datastructures.GridCacheSemaphoreImpl$Sync.tryAcquireShared(GridCacheSemaphoreImpl.java:211)
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:988)
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
o.a.i.i.processors.datastructures.GridCacheSemaphoreImpl.acquire(GridCacheSemaphoreImpl.java:609)
o.a.i.i.processors.datastructures.GridCacheSemaphoreImpl.acquire(GridCacheSemaphoreImpl.java:597)
com.a.scrapy.common.ignite.IgniteQuery$URLS_OBJECT.sqlBulkInsert(IgniteQuery.java:246)
com.a.scrapy.common.ignite.IgniteQuery.executeFieldsQuery(IgniteQuery.java:63)
com.a.scrapy.common.ignite.IgniteConnectionHandler$CacheHolder.sqlPipelinedSet(IgniteConnectionHandler.java:380)
com.a.scrapy.common.ignite.IgniteUtil.zaddPipeline(IgniteUtil.java:343)
edu.uci.ics.crawler4j.frontier.WorkQueues.putAll(WorkQueues.java:156)
edu.uci.ics.crawler4j.frontier.Frontier.scheduleAll(Frontier.java:122)
edu.uci.ics.crawler4j.crawler.WebCrawler.processPage(WebCrawler.java:560)
edu.uci.ics.crawler4j.crawler.WebCrawler.run(WebCrawler.java:288)
java.lang.Thread.run(Thread.java:748)
```


2. I was under impression that once the JDBC connection terminates, all the
rows will be flushed to insertion in the database. Is it not? Because if
that's the case, we won't have any multi-threaded data consistency issues.



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
adipro adipro
Reply | Threaded
Open this post in threaded view
|

Re: Data Consistency Question

Can someone please help regarding this issue?



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
ilya.kasnacheev ilya.kasnacheev
Reply | Threaded
Open this post in threaded view
|

Re: Data Consistency Question

Hello!

Why do you open connection inside the semaphore? You are supposed to open connection(s) in advance, only issue updates inside the semaphore.

The whole approach is questionable (no sense to use thin JDBC if you already have Ignite node) but this one is a killer. You also seem to open a new connection every time.

Regards,
--
Ilya Kasnacheev


ср, 3 июн. 2020 г. в 09:17, adipro <[hidden email]>:
Can someone please help regarding this issue?



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
adipro adipro
Reply | Threaded
Open this post in threaded view
|

Re: Data Consistency Question

Can you please suggest an efficient solution?



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
ilya.kasnacheev ilya.kasnacheev
Reply | Threaded
Open this post in threaded view
|

Re: Data Consistency Question

Hello!

I'm not sure what you are trying to do and why you felt the need to over-complicate the solution.

Regards,
--
Ilya Kasnacheev


пн, 15 июн. 2020 г. в 09:10, adipro <[hidden email]>:
Can you please suggest an efficient solution?



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/