Data Streamer

classic Classic list List threaded Threaded
3 messages Options
Александр Савинов Александр Савинов
Reply | Threaded
Open this post in threaded view
|

Data Streamer


Hello.
I have a problem with stream API and Ignite. The value of "sum" variable should be 1000000 (that equals to length of file test.csv), but it equals 999424. If file length is small (10 or even 1000) there is nothing in cache.
Thank you.
Если вы знаете русский, ответьте, пожалуйста, на русском.
IgniteConfiguration igniteConfiguration = new IgniteConfiguration();
igniteConfiguration.setPeerClassLoadingEnabled(true);
Ignite ignite = Ignition.start(igniteConfiguration);
CacheConfiguration<Integer, Long> cacheConfiguration = new CacheConfiguration<>("cache");
IgniteCache<Integer, Long> cache = ignite.getOrCreateCache(cacheConfiguration);
IgniteDataStreamer<Integer, Long> streamer = ignite.dataStreamer("cache");
streamer.receiver(StreamTransformer.from((entry, arg)->{
Long value = entry.getValue();
entry.setValue(value==null ? 1L : value + 1L);
return entry;
}));
Stream<String> fileStream = Files.lines(Paths.get("test.csv"));
fileStream.mapToInt(Integer::parseInt).forEach(line->streamer.addData(line,1L));
cache.forEach((entry)->System.out.println(entry.getKey() + ": " + entry.getValue()));
int s = 0;
Iterator<Cache.Entry<Integer, Long>> iterator = cache.iterator();
while(iterator.hasNext()){
Cache.Entry<Integer, Long> entry = iterator.next();
s+=entry.getValue();
}
System.out.println(s);
cache.clear();
ignite.close();


--
С уважением, Александр.



--
С уважением, Александр.
Vladimir Ozerov Vladimir Ozerov
Reply | Threaded
Open this post in threaded view
|

Re: Data Streamer

Hi Alexander,

Please make sure that you flush data streamer before checking the "sum" value:
fileStream.mapToInt(Integer::parseInt).forEach(line->streamer.addData(line,1L));
streamer.flush();
Vladimir.

On Mon, May 23, 2016 at 10:35 AM, Александр Савинов <[hidden email]> wrote:

Hello.
I have a problem with stream API and Ignite. The value of "sum" variable should be 1000000 (that equals to length of file test.csv), but it equals 999424. If file length is small (10 or even 1000) there is nothing in cache.
Thank you.
Если вы знаете русский, ответьте, пожалуйста, на русском.
IgniteConfiguration igniteConfiguration = new IgniteConfiguration();
igniteConfiguration.setPeerClassLoadingEnabled(true);
Ignite ignite = Ignition.start(igniteConfiguration);
CacheConfiguration<Integer, Long> cacheConfiguration = new CacheConfiguration<>("cache");
IgniteCache<Integer, Long> cache = ignite.getOrCreateCache(cacheConfiguration);
IgniteDataStreamer<Integer, Long> streamer = ignite.dataStreamer("cache");
streamer.receiver(StreamTransformer.from((entry, arg)->{
Long value = entry.getValue();
entry.setValue(value==null ? 1L : value + 1L);
return entry;
}));
Stream<String> fileStream = Files.lines(Paths.get("test.csv"));
fileStream.mapToInt(Integer::parseInt).forEach(line->streamer.addData(line,1L));
cache.forEach((entry)->System.out.println(entry.getKey() + ": " + entry.getValue()));
int s = 0;
Iterator<Cache.Entry<Integer, Long>> iterator = cache.iterator();
while(iterator.hasNext()){
Cache.Entry<Integer, Long> entry = iterator.next();
s+=entry.getValue();
}
System.out.println(s);
cache.clear();
ignite.close();


--
С уважением, Александр.



--
С уважением, Александр.

Александр Савинов Александр Савинов
Reply | Threaded
Open this post in threaded view
|

Re[2]: Data Streamer

Thanks you! It works correctly.


Понедельник, 23 мая 2016, 11:11 +03:00 от Vladimir Ozerov <[hidden email]>:

Hi Alexander,

Please make sure that you flush data streamer before checking the "sum" value:
fileStream.mapToInt(Integer::parseInt).forEach(line->streamer.addData(line,1L));
streamer.flush();
Vladimir.

On Mon, May 23, 2016 at 10:35 AM, Александр Савинов <[hidden email]> wrote:

Hello.
I have a problem with stream API and Ignite. The value of "sum" variable should be 1000000 (that equals to length of file test.csv), but it equals 999424. If file length is small (10 or even 1000) there is nothing in cache.
Thank you.
Если вы знаете русский, ответьте, пожалуйста, на русском.
IgniteConfiguration igniteConfiguration = new IgniteConfiguration();
igniteConfiguration.setPeerClassLoadingEnabled(true);
Ignite ignite = Ignition.start(igniteConfiguration);
CacheConfiguration<Integer, Long> cacheConfiguration = new CacheConfiguration<>("cache");
IgniteCache<Integer, Long> cache = ignite.getOrCreateCache(cacheConfiguration);
IgniteDataStreamer<Integer, Long> streamer = ignite.dataStreamer("cache");
streamer.receiver(StreamTransformer.from((entry, arg)->{
Long value = entry.getValue();
entry.setValue(value==null ? 1L : value + 1L);
return entry;
}));
Stream<String> fileStream = Files.lines(Paths.get("test.csv"));
fileStream.mapToInt(Integer::parseInt).forEach(line->streamer.addData(line,1L));
cache.forEach((entry)->System.out.println(entry.getKey() + ": " + entry.getValue()));
int s = 0;
Iterator<Cache.Entry<Integer, Long>> iterator = cache.iterator();
while(iterator.hasNext()){
Cache.Entry<Integer, Long> entry = iterator.next();
s+=entry.getValue();
}
System.out.println(s);
cache.clear();
ignite.close();


--
С уважением, Александр.



--
С уважением, Александр.



--
С уважением, Александр.