Dynamic ComputeTask distribution with new nodes

classic Classic list List threaded Threaded
6 messages Options
hueb1 hueb1
Reply | Threaded
Open this post in threaded view
|

Dynamic ComputeTask distribution with new nodes

Does Ignite support distributing out compute tasks to new nodes that can come up after the computetask job has been initiated?

I tested this using broadcast, call and run, and with ExecutorService but it didn't seem to work.  My test case was setting the publicThreadPoolSize to 1, and kicking off a distributed closure job that had a delay in it.  I then stood up another node so that it could join the cluster and pick up some of the compute tasks that haven't been run yet.  However this didn't work.  It seems like the jobs are all assigned to existing nodes in the cluster at job kick-off time and cannot load balance from that point for new nodes that join the cluster.
vkulichenko vkulichenko
Reply | Threaded
Open this post in threaded view
|

Re: Dynamic ComputeTask distribution with new nodes

Hi,

Yes, with the default configuration jobs are mapped to nodes when submitted to ExecutorService (or when run/call is called). After that the job sits in that node's queue and waits until it can be executed.

But this behavior can be changed by enabling job stealing. This is the feature that allows to move jobs from the queue to underloaded nodes (in your case it will be the newly joined node).

You will need to configure JobStealingCollisionSpi [1] and JobStealingFailoverSpi [2] (note that they should be always used in conjunction with each other). Here is how the configuration should look like (refer to JavaDoc for available optional parameters):

<property name="collisionSpi">
    <bean class="org.apache.ignite.spi.collision.jobstealing.JobStealingCollisionSpi">
        ...
    </bean>
</property>

<property name="failoverSpi">
    <bean class="org.apache.ignite.spi.failover.jobstealing.JobStealingFailoverSpi">
        ...
    </bean>
</property>

[1] https://ignite.incubator.apache.org/releases/1.2.0/javadoc/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.html
[2] https://ignite.incubator.apache.org/releases/1.2.0/javadoc/org/apache/ignite/spi/failover/jobstealing/JobStealingFailoverSpi.html

-Val
hueb1 hueb1
Reply | Threaded
Open this post in threaded view
|

Re: Dynamic ComputeTask distribution with new nodes

This doesn't seem to work for me.  Below is my config

                <property name="publicThreadPoolSize" value="1" />

                <property name="failoverSpi">
                        <bean class="org.apache.ignite.spi.failover.jobstealing.JobStealingFailoverSpi">
                        </bean>
                </property>

                <property name="collisionSpi">
                        <bean class="org.apache.ignite.spi.collision.jobstealing.JobStealingCollisionSpi">
                                <property name="stealingEnabled" value="true" />
                        </bean>
                </property>

And this is the compute task I'm running

      IgniteCompute compute = ignite.compute();
      Collection<IgniteCallable<Integer>> calls = new ArrayList<>();
      // Iterate through all words in the sentence and create callable jobs.
      for (final String word : "a b c d e f g h i j k l m n o p".split(" "))
      {
         calls.add(new IgniteCallable<Integer>()
         {
            public Integer call() throws Exception
            {
               System.out.println("Starting callable");
               try{Thread.sleep(10000);}
               catch(Exception e){throw new Exception(e);}
               System.out.println("Done callable");
               return word.length(); // Return word length.
            }
         });
      }

      // Execute collection of callables on the cluster.
      Collection<Integer> res = ignite.compute().call(calls);

I have two nodes, the first node executes the above code and I manually start up the second node, hoping to see it pick up some of the jobs in queue on the first node, but it does not. I'm using the same config file for both nodes.
vkulichenko vkulichenko
Reply | Threaded
Open this post in threaded view
|

Re: Dynamic ComputeTask distribution with new nodes

I reproduced the issue. It turned out that Ignite creates static task topology when task (or closure) is executed and do not allow nodes out of this topology to steal jobs. I created the ticket to fix this: https://issues.apache.org/jira/browse/IGNITE-1267

Is your test simulates some of your real use cases? If so, can you describe in more details what are you trying to achieve? Probably I will be able to suggest a workaround or another way to do the same thing.

-Val
hueb1 hueb1
Reply | Threaded
Open this post in threaded view
|

Re: Dynamic ComputeTask distribution with new nodes

Our use case is basically to process terabytes of incoming data as fast as possible while optimizing costs of Amazon EC2 clusters.  The data load varies throughout the day so there may be cases during high data load that we would need to dynamically increase our cluster size.  When data load shrinks, our cluster should also shrink.  So I was investigating if Ignite could at runtime "realize" that it has an above threshold level of compute tasks in queue and would therefore need to add more EC2s with more nodes to farm out compute tasks, or to start more nodes on existing EC2s if able.  This latter is analogous to YARN's dynamic allocation where it can for example start more executors on existing nodes for Spark jobs.  I did read that Ignite can be deployed via YARN, so does that mean YARN would be able to handle instantiating new Ignite nodes on existing EC2's automatically?  Has this aspect been tested?
Nikolai Tikhonov Nikolai Tikhonov
Reply | Threaded
Open this post in threaded view
|

Re: Dynamic ComputeTask distribution with new nodes

Hi, 
Ignite can't dynamically increase and shrink cluster size. Integration with YARN allows to deploy cluster over existing YARN cluster.
Could you look on integration with Mesos, Marathon and Mesosphere? This fraemwork provide more oportunities for dynamically managment cluster. It seems more suitable for your case.


On Wed, Aug 19, 2015 at 8:07 PM, hueb1 <[hidden email]> wrote:
Our use case is basically to process terabytes of incoming data as fast as
possible while optimizing costs of Amazon EC2 clusters.  The data load
varies throughout the day so there may be cases during high data load that
we would need to dynamically increase our cluster size.  When data load
shrinks, our cluster should also shrink.  So I was investigating if Ignite
could at runtime "realize" that it has an above threshold level of compute
tasks in queue and would therefore need to add more EC2s with more nodes to
farm out compute tasks, or to start more nodes on existing EC2s if able.
This latter is analogous to YARN's dynamic allocation where it can for
example start more executors on existing nodes for Spark jobs.  I did read
that Ignite can be deployed via YARN, so does that mean YARN would be able
to handle instantiating new Ignite nodes on existing EC2's automatically?
Has this aspect been tested?



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Dynamic-ComputeTask-distribution-with-new-nodes-tp997p1064.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.