Question regarding possible use case of Sliding Windows

classic Classic list List threaded Threaded
8 messages Options
Anthony Anthony
Reply | Threaded
Open this post in threaded view
|

Question regarding possible use case of Sliding Windows

This post has NOT been accepted by the mailing list yet.
Greetings,

Suppose I have a 10 minute window in which orders are coming in. I can easily stream this data in. Is it possible to use Ignite to answer the question "How many 2 minute windows within this time period contain at least n orders?" The overall goal is to find the maximum number of orders in any 2 minute window within the 10 minute streaming period.

I checked out the examples which included the word counter, but they are quite basic for this idea. Anyone have suggestions or ideas as to whether I can do this? Thank you

- Anthony
vkulichenko vkulichenko
Reply | Threaded
Open this post in threaded view
|

Re: Question regarding possible use case of Sliding Windows

Anthony,

Is the 2-minute window is a sliding window as well, or you just want to split 10-minute window into five intervals and compare number of requests in each interval?

For the first use case I would do the following:
1. Create a cache with 2-minute eviction policy and stream requests data to this cache.
2. Periodically (or on each update) get the size of this cache, which is actually number of the requests in the last two minutes.
3. Save results from step 2 along with corresponding timestamps to another cache with 10-minute eviction policy.

This way you can query the second cache at any moment to answer your question.

For the second use case (simple split), you can have only one cache with 10-minute eviction policy and execute an SQL query with proper grouping and aggregation.

Which one are you trying to implement?

-Val
Anthony Anthony
Reply | Threaded
Open this post in threaded view
|

Re: Question regarding possible use case of Sliding Windows

Val, thank you for your response.

I'm talking about the first case. Ideally, periodically get the size of the cache  (with sliding window 2 minutes) with every update and then save that result. But when I try a small example, it doesn't work the way I want.

For instance, I configure a sliding window of 2 seconds:
cfg.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new ModifiedExpiryPolicy(new Duration(SECONDS, 2))));

I control the stream to add 5 points of data at one second intervals to this cache.
When I run the query "select _val,count(_val) as cnt from String group by _val", I get 5 points of data, instead of 2. Shouldn't data older than 2 seconds be evicted? That's what I thought, except my query returns all the data I streamed.

Thank you,
Anthony
vkulichenko vkulichenko
Reply | Threaded
Open this post in threaded view
|

Re: Question regarding possible use case of Sliding Windows

Anthony,

Can you share the code that streams the data? What are keys and values?

-Val
Anthony Anthony
Reply | Threaded
Open this post in threaded view
|

Re: Question regarding possible use case of Sliding Windows

Val, I appreciate your time. It has been helpful for my understanding of Ignite. Code is here:

public class StreamWords {
   
    private static final String[] INSTRUMENTS = {"ABC"};
   
    public static void main(String[] args) throws Exception {
        Ignition.setClientMode(true);
        try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
            if (!ExamplesUtils.hasServerNodes(ignite))
                return;
            try (
                    IgniteCache<AffinityUuid, String> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());
                ) {
                    try (
                    IgniteDataStreamer<AffinityUuid, String> stmr = ignite.dataStreamer(stmCache.getName())
                   ) {
                            int idx = 0;
                            stmr.addData(new AffinityUuid(INSTRUMENTS[idx]), INSTRUMENTS[idx]);
                            System.out.println("at "+new java.text.SimpleDateFormat("MM/dd/yyyy h:mm:ss a").format(new Date())+", streamed "+INSTRUMENTS[idx]);
                            Thread.sleep(1000);
                           
                            stmr.addData(new AffinityUuid(INSTRUMENTS[idx]), INSTRUMENTS[idx]);
                            System.out.println("at "+new java.text.SimpleDateFormat("MM/dd/yyyy h:mm:ss a").format(new Date())+", streamed "+INSTRUMENTS[idx]);
                            Thread.sleep(1000);
                           
                            stmr.addData(new AffinityUuid(INSTRUMENTS[idx]), INSTRUMENTS[idx]);
                            System.out.println("at "+new java.text.SimpleDateFormat("MM/dd/yyyy h:mm:ss a").format(new Date())+", streamed "+INSTRUMENTS[idx]);
                            Thread.sleep(1000);
                           
                            stmr.addData(new AffinityUuid(INSTRUMENTS[idx]), INSTRUMENTS[idx]);
                            System.out.println("at "+new java.text.SimpleDateFormat("MM/dd/yyyy h:mm:ss a").format(new Date())+", streamed "+INSTRUMENTS[idx]);
                            Thread.sleep(1000);
                           
                            stmr.addData(new AffinityUuid(INSTRUMENTS[idx]), INSTRUMENTS[idx]);
                            System.out.println("at "+new java.text.SimpleDateFormat("MM/dd/yyyy h:mm:ss a").format(new Date())+", streamed "+INSTRUMENTS[idx]);
                            Thread.sleep(1000);
                    }
                    System.out.println("Querying at "+new java.text.SimpleDateFormat("MM/dd/yyyy h:mm:ss a").format(new Date()));
                    SqlFieldsQuery tickerQry = new SqlFieldsQuery(
                    "select _val,count(_val) as cnt from String group by _val");  
                    List<List<?>> tickers = stmCache.query(tickerQry).getAll();
                    ExamplesUtils.printQueryResults(tickers);
        }
    }
}
}

The query returns all 5 of the elements I inserted, instead of just the past two seconds. The CacheConfig is the same as the example used in the alice in wonderland text steaming, but changed to two seconds.
vkulichenko vkulichenko
Reply | Threaded
Open this post in threaded view
|

Re: Question regarding possible use case of Sliding Windows

Anthony,

Streamer's addData() method is asynchronous, calling it doesn't mean that the value momentarily ends up in cache. Streamer sends data to server nodes in batches, and since you added only five entries, they were all sent in one batch when you closed the streamer, therefore they were saved into cache at the same time.

Streamer is designed to handle a lot of continuous events. If you want to test expiration in your scenario, you can use simple cache puts instead - they are synchronous.

Thanks!
Anthony Anthony
Reply | Threaded
Open this post in threaded view
|

Re: Question regarding possible use case of Sliding Windows

Using put() instead of addData does yield the desired results. Good call!

As an extension to this code, suppose I also want to include a price in addition to the instrument.

Currently each stream to the put is of the form [AffinityUuid, symbol] and I can easily do the count in a query as above. How would you go about including an additional column? For instance, I envision something like [AffinityUuid, symbol, price].

If this is possible, please let me know what you think.

vkulichenko vkulichenko
Reply | Threaded
Open this post in threaded view
|

Re: Question regarding possible use case of Sliding Windows

Anthony,

You can use a complex object with several fields as a value. Use @QuerySqlField annotation to annotate queried and/or indexed fields and use these fields in your SQL query.

So in your case it will be smth like this:

class Instrument {
    @QuerySqlField(index = true)
    private String symbol;

    private double price;
}

And the query will be: "select symbol, count(symbol) as cnt from Instrument group by symbol"

Here is the documentation page for more information on queries API: http://apacheignite.readme.io/v1.1/docs/cache-queries

Thanks!