Ignite Events

classic Classic list List threaded Threaded
5 messages Options
drosso drosso
Reply | Threaded
Open this post in threaded view
|

Ignite Events

Hi,
I've been playing with Ignite events for a couple of days but there's a
behavior of my sample programs that I really can't understand.
I've prepared 2 sample programs:
1. a Putter program that "gets or creates" a simple cache "MyCache" with an
Integer Key and a String Value and puts 9 elements (from 1 to 9) into
"MyCache"
2. a ServerNode program that defines a local listener on "MyCache" for PUT
events and displays the newly added keys.

N.B. All programs are launched as "Server" Ignite nodes and "MyCache" is
defined as PARTITIONED with 0 backuop copies

If I launch 2 instances of ServerNode and 1 instance of Putter, I obtain the
following output:

ServerNode 1 displays the keys: 2, 3, 5, 7, 9
ServerNode 2 display the keys: 1, 4,6,8

Now, this is already an output that puzzles me: if the cache is partitioned,
the keys should be spread onto all Server instances, so I should not see all
the keys reported by the 2 ServerNode instances. There should be some keys
missing (i.e. those on the Putter server node).

Moreover, if I add a local listener also on the Putter node, the output
becomes still more puzzling:

ServerNode 1 displays the keys: 2, 3, 5, 7, 9
ServerNode 2 display the keys: 1, 4,6,8
Putter displays the keys : 2,3,5

What am I missing here ? There surely must be something that I
misunderstood, but I can't figure out what it could be.
Any help will be much appreciated!



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Павлухин Иван Павлухин Иван
Reply | Threaded
Open this post in threaded view
|

Re: Ignite Events

Hi drosso,

Indeed looks strange. If you provide a reproducer I will take a look.

ср, 10 окт. 2018 г. в 16:44, drosso <[hidden email]>:
Hi,
I've been playing with Ignite events for a couple of days but there's a
behavior of my sample programs that I really can't understand.
I've prepared 2 sample programs:
1. a Putter program that "gets or creates" a simple cache "MyCache" with an
Integer Key and a String Value and puts 9 elements (from 1 to 9) into
"MyCache"
2. a ServerNode program that defines a local listener on "MyCache" for PUT
events and displays the newly added keys.

N.B. All programs are launched as "Server" Ignite nodes and "MyCache" is
defined as PARTITIONED with 0 backuop copies

If I launch 2 instances of ServerNode and 1 instance of Putter, I obtain the
following output:

ServerNode 1 displays the keys: 2, 3, 5, 7, 9
ServerNode 2 display the keys: 1, 4,6,8

Now, this is already an output that puzzles me: if the cache is partitioned,
the keys should be spread onto all Server instances, so I should not see all
the keys reported by the 2 ServerNode instances. There should be some keys
missing (i.e. those on the Putter server node).

Moreover, if I add a local listener also on the Putter node, the output
becomes still more puzzling:

ServerNode 1 displays the keys: 2, 3, 5, 7, 9
ServerNode 2 display the keys: 1, 4,6,8
Putter displays the keys : 2,3,5

What am I missing here ? There surely must be something that I
misunderstood, but I can't figure out what it could be.
Any help will be much appreciated!



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


--
Best regards,
Ivan Pavlukhin
drosso drosso
Reply | Threaded
Open this post in threaded view
|

Re: Ignite Events

Hi Ivan,
thank you for your interest! here below you can find the code for the 2
sample programs:

*********** ServerNode.java **************

package TestATServerMode;

import javax.cache.Cache;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.event.EventType;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.events.*;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgnitePredicate;

import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED;

import java.util.UUID;

/**
 * Starts up an empty node with example compute configuration.
 */
public class ServerNode {
        /**
         * Start up an empty node with example compute configuration.
         *
         * @param args
         *            Command line arguments, none required.
         * @throws IgniteException
         *             If failed.
         */
        private static final String CACHE_NAME = "MyCache";

        @SuppressWarnings("deprecation")
        public static void main(String[] args) throws IgniteException {
                Ignition.start("config/example-ignite.xml");

                Ignite ignite = Ignition.ignite();

                // Get an instance of named cache.
                final IgniteCache<Integer, String> cache =
ignite.getOrCreateCache(CACHE_NAME);

                // Sample remote filter

                IgnitePredicate<CacheEvent> locLsnr = new IgnitePredicate<CacheEvent>() {
                        @Override
                        public boolean apply(CacheEvent evt) {
                                System.out.println("LOCAL cache event [evt=" + evt.name() + ",
cacheName=" + evt.cacheName() + ", key="
                                                + evt.key() + ']');

                                return true; // Return true to continue listening.
                        }
                };

                // Register event listener for all local task execution events.
                ignite.events().localListen(locLsnr, EVT_CACHE_OBJECT_PUT);

                       
        }
}


************ Putter.java *************************

package TestATServerMode;

import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;

import java.sql.Time;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.lang.IgnitePredicate;


@SuppressWarnings("TypeMayBeWeakened")
public class Putter {
    /** Cache name. */
    private static final String CACHE_NAME = "MyCache";

    /**
     * Executes example.
     *
     * @param args Command line arguments, none required.
     * @throws InterruptedException
     */
    public static void main(String[] args) {
   
    // Mark this cluster member as client.
        //Ignition.setClientMode(true);
       
        try (Ignite ignite = Ignition.start("config/example-ignite.xml")) {
            System.out.println();
            System.out.println(">>> Myexample started.");

            CacheConfiguration<Integer, String> cfg = new
CacheConfiguration<>();

            //cfg.setCacheMode(CacheMode.REPLICATED);
            cfg.setName(CACHE_NAME);
            //cfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
           
           
            IgnitePredicate<CacheEvent> lsnr = new
IgnitePredicate<CacheEvent>() {
                @Override public boolean apply(CacheEvent evt) {
                    System.out.println("Received cache event [evt=" +
evt.name() + ", cacheName=" + evt.cacheName() +
                        ", key=" + evt.key() + ']');

                    return true; // Return true to continue listening.
                }
            };

            try (IgniteCache<Integer, String> cache =
ignite.getOrCreateCache(cfg)) {
                if
(ignite.cluster().forDataNodes(cache.getName()).nodes().isEmpty()) {
                    System.out.println();
                    System.out.println(">>> This example requires remote
cache node nodes to be started.");
                    System.out.println(">>> Please start at least 1 remote
cache node.");
                    System.out.println(">>> Refer to example's javadoc for
details on configuration.");
                    System.out.println();

                    return;
                }

               
                // Register event listener for all local task execution
events.
                ignite.events().localListen(lsnr, EVT_CACHE_OBJECT_PUT);

                put(cache);

            }
        }
    }

    /**
     * Execute individual put and get.
     *
     * @param cache Cache.
     */
    private static void put(IgniteCache<Integer, String> cache) {
 
    long startTime = System.currentTimeMillis();
   
   
    for (int i = 1; i < 10; i++) {
                // Put created data entry to cache.
                cache.put(i, "String"+i);
    }

    long endTime =  System.currentTimeMillis();
   
        System.out.println("Time taken (PUT) : " + (endTime-startTime) + "
millisec");
       
    }
}

************** example-ignite.xml *********************
<?xml version="1.0" encoding="UTF-8"?>




<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd">
   
    <import resource="example-default.xml"/>

    <bean parent="ignite.cfg"/>
</beans>

************** example-default.xml ********************
<?xml version="1.0" encoding="UTF-8"?>




<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:util="http://www.springframework.org/schema/util"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/util
        http://www.springframework.org/schema/util/spring-util.xsd">
    <bean abstract="true" id="ignite.cfg"
class="org.apache.ignite.configuration.IgniteConfiguration">
       
        <property name="peerClassLoadingEnabled" value="true"/>

        <property name="cacheConfiguration">
            <list>
               
                <bean
class="org.apache.ignite.configuration.CacheConfiguration">
                    <property name="name" value="default"/>
                    <property name="atomicityMode" value="ATOMIC"/>
                    <property name="backups" value="0"/>
                </bean>
               
                <bean
class="org.apache.ignite.configuration.CacheConfiguration">
                    <property name="name" value="MyCache"/>
                    <property name="cacheMode" value="PARTITIONED"/>
                    <property name="atomicityMode" value="ATOMIC"/>  
                    <property name="backups" value="0"/>
                    <property name="readFromBackup" value="true"/>
                    <property name="partitionLossPolicy"
value="READ_ONLY_SAFE"/>
                   
                </bean>
               
               
            </list>
        </property>
             
                         
            <property name="userAttributes">
                <map>
                    <entry key="group" value="NonPutter"/>
                </map>
  </property>    
                   
                 
        <property name="includeEventTypes">
            <list>
               
                <util:constant
static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
                <util:constant
static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
                <util:constant
static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
                <util:constant
static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
                <util:constant
static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
                <util:constant
static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>

               
                <util:constant
static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
                <util:constant
static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
                <util:constant
static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
            </list>
        </property>

       
        <property name="discoverySpi">
            <bean
class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <property name="ipFinder">
                   
                   
                   
                    <bean
class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
                        <property name="addresses">
                            <list>
                               
                                <value>127.0.0.1:47500..47509</value>
                            </list>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>
</beans>






--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Павлухин Иван Павлухин Иван
Reply | Threaded
Open this post in threaded view
|

Re: Ignite Events

Hi drosso,

Luckily there is no mystery. By the way, what version of Ignite do you use?

The clue to strange behavior here is topology change during your test execution. As I see, Putter node is a server data node as well, so it will hold some data partitions on it and consequently will receive some OBJECT_PUT events. The second seeming strange thing here is observing events for same keys on different nodes. It is explained by so-called "late affinity assignment". Putter enters cluster and some partitions are loaded to it from other nodes. But Putter is usable before all data is actually loaded, instead of waiting data and freezing cluster for possibly long time Ignite creates temporary backup partition on Putter node and primary partition is kept on one of ServerNodes from your example (and when all data is loaded by Putter from other nodes partitions on it will be considered primary and previous primary partitions on other nodes will be destroyed). Events like OBJECT_PUT are fired on backup partitions as well. And it explains why you observe events for same keys on different nodes. If you make Putter non-data node for the target cache (e.g. by starting it as a client node) then you will see events only on ServerNodes.

чт, 11 окт. 2018 г. в 11:20, drosso <[hidden email]>:
Hi Ivan,
thank you for your interest! here below you can find the code for the 2
sample programs:

*********** ServerNode.java **************

package TestATServerMode;

import javax.cache.Cache;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.event.EventType;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.events.*;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgnitePredicate;

import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED;

import java.util.UUID;

/**
 * Starts up an empty node with example compute configuration.
 */
public class ServerNode {
        /**
         * Start up an empty node with example compute configuration.
         *
         * @param args
         *            Command line arguments, none required.
         * @throws IgniteException
         *             If failed.
         */
        private static final String CACHE_NAME = "MyCache";

        @SuppressWarnings("deprecation")
        public static void main(String[] args) throws IgniteException {
                Ignition.start("config/example-ignite.xml");

                Ignite ignite = Ignition.ignite();

                // Get an instance of named cache.
                final IgniteCache<Integer, String> cache =
ignite.getOrCreateCache(CACHE_NAME);

                // Sample remote filter

                IgnitePredicate<CacheEvent> locLsnr = new IgnitePredicate<CacheEvent>() {
                        @Override
                        public boolean apply(CacheEvent evt) {
                                System.out.println("LOCAL cache event [evt=" + evt.name() + ",
cacheName=" + evt.cacheName() + ", key="
                                                + evt.key() + ']');

                                return true; // Return true to continue listening.
                        }
                };

                // Register event listener for all local task execution events.
                ignite.events().localListen(locLsnr, EVT_CACHE_OBJECT_PUT);


        }
}


************ Putter.java *************************

package TestATServerMode;

import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;

import java.sql.Time;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.lang.IgnitePredicate;


@SuppressWarnings("TypeMayBeWeakened")
public class Putter {
    /** Cache name. */
    private static final String CACHE_NAME = "MyCache";

    /**
     * Executes example.
     *
     * @param args Command line arguments, none required.
     * @throws InterruptedException
     */
    public static void main(String[] args) {

        // Mark this cluster member as client.
        //Ignition.setClientMode(true);

        try (Ignite ignite = Ignition.start("config/example-ignite.xml")) {
            System.out.println();
            System.out.println(">>> Myexample started.");

            CacheConfiguration<Integer, String> cfg = new
CacheConfiguration<>();

            //cfg.setCacheMode(CacheMode.REPLICATED);
            cfg.setName(CACHE_NAME);
            //cfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);


            IgnitePredicate<CacheEvent> lsnr = new
IgnitePredicate<CacheEvent>() {
                @Override public boolean apply(CacheEvent evt) {
                    System.out.println("Received cache event [evt=" +
evt.name() + ", cacheName=" + evt.cacheName() +
                        ", key=" + evt.key() + ']');

                    return true; // Return true to continue listening.
                }
            };

            try (IgniteCache<Integer, String> cache =
ignite.getOrCreateCache(cfg)) {
                if
(ignite.cluster().forDataNodes(cache.getName()).nodes().isEmpty()) {
                    System.out.println();
                    System.out.println(">>> This example requires remote
cache node nodes to be started.");
                    System.out.println(">>> Please start at least 1 remote
cache node.");
                    System.out.println(">>> Refer to example's javadoc for
details on configuration.");
                    System.out.println();

                    return;
                }


                // Register event listener for all local task execution
events.
                ignite.events().localListen(lsnr, EVT_CACHE_OBJECT_PUT);

                put(cache);

            }
        }
    }

    /**
     * Execute individual put and get.
     *
     * @param cache Cache.
     */
    private static void put(IgniteCache<Integer, String> cache) {

        long startTime = System.currentTimeMillis();


        for (int i = 1; i < 10; i++) {
                // Put created data entry to cache.
                cache.put(i, "String"+i);
        }

        long endTime =  System.currentTimeMillis();

        System.out.println("Time taken (PUT) : " + (endTime-startTime) + "
millisec");

    }
}

************** example-ignite.xml *********************
<?xml version="1.0" encoding="UTF-8"?>




<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd">

    <import resource="example-default.xml"/>

    <bean parent="ignite.cfg"/>
</beans>

************** example-default.xml ********************
<?xml version="1.0" encoding="UTF-8"?>




<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:util="http://www.springframework.org/schema/util"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/util
        http://www.springframework.org/schema/util/spring-util.xsd">
    <bean abstract="true" id="ignite.cfg"
class="org.apache.ignite.configuration.IgniteConfiguration">

        <property name="peerClassLoadingEnabled" value="true"/>

        <property name="cacheConfiguration">
            <list>

                <bean
class="org.apache.ignite.configuration.CacheConfiguration">
                    <property name="name" value="default"/>
                    <property name="atomicityMode" value="ATOMIC"/>
                    <property name="backups" value="0"/>
                </bean>

                <bean
class="org.apache.ignite.configuration.CacheConfiguration">
                    <property name="name" value="MyCache"/>
                    <property name="cacheMode" value="PARTITIONED"/>
                    <property name="atomicityMode" value="ATOMIC"/> 
                    <property name="backups" value="0"/>
                    <property name="readFromBackup" value="true"/>
                    <property name="partitionLossPolicy"
value="READ_ONLY_SAFE"/>

                </bean>


            </list>
        </property>


            <property name="userAttributes">
                <map>
                    <entry key="group" value="NonPutter"/>
                </map>
                </property>   


        <property name="includeEventTypes">
            <list>

                <util:constant
static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
                <util:constant
static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
                <util:constant
static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
                <util:constant
static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
                <util:constant
static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
                <util:constant
static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>


                <util:constant
static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
                <util:constant
static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
                <util:constant
static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
            </list>
        </property>


        <property name="discoverySpi">
            <bean
class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <property name="ipFinder">



                    <bean
class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
                        <property name="addresses">
                            <list>

                                <value>127.0.0.1:47500..47509</value>
                            </list>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>
</beans>






--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


--
Best regards,
Ivan Pavlukhin
drosso drosso
Reply | Threaded
Open this post in threaded view
|

Re: Ignite Events

Hi Ivan,
thank you so much for your explanation! In fact it was a bit tricky, but now
I understand what's going on.
I'm currently using Ignite 2.4.

Bets regards

Davide



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/