How to retry failed job on any node with Apache Ignite

classic Classic list List threaded Threaded
4 messages Options
Aleksei Valikov Aleksei Valikov
Reply | Threaded
Open this post in threaded view
|

How to retry failed job on any node with Apache Ignite

Hi,

this is basically a copy of


I'm experimenting with fault tolerance in Apache Ignite.

What I can't figure out is how to retry a failed job on any node. I have a use case where my jobs will be calling a third-party tool as a system process via process buildr to do some calculations. In some cases the tool may fail, but in most cases it's OK to retry the job on any node - including the one where it previously failed.

At the moment Ignite seems to reroute the job to another node which did not have this job before. So, after a while all nodes are gone and the task fails.

What I'm looking for is how to retry a job on any node.

Here's a test to demonstrate my problem.

Here's my randomly failing job:

public static class RandomlyFailingComputeJob implements ComputeJob {
    private static final long serialVersionUID = -8351095134107406874L;
    private final String data;

    public RandomlyFailingComputeJob(String data) {
        Validate.notNull(data);
        this.data = data;
    }

    public void cancel() {
    }

    public Object execute() throws IgniteException {
        final double random = Math.random();
        if (random > 0.5) {
            throw new IgniteException();
        } else {
            return StringUtils.reverse(data);
        }
    }
}

An below is the task:

public static class RandomlyFailingComputeTask extends
        ComputeTaskSplitAdapter<String, String> {
    private static final long serialVersionUID = 6756691331287458885L;

    @Override
    public ComputeJobResultPolicy result(ComputeJobResult res,
            List<ComputeJobResult> rcvd) throws IgniteException {
        if (res.getException() != null) {
            return ComputeJobResultPolicy.FAILOVER;
        }
        return ComputeJobResultPolicy.WAIT;
    }

    public String reduce(List<ComputeJobResult> results)
            throws IgniteException {
        final Collection<String> reducedResults = new ArrayList<String>(
                results.size());
        for (ComputeJobResult result : results) {
            reducedResults.add(result.<String> getData());
        }
        return StringUtils.join(reducedResults, ' ');
    }

    @Override
    protected Collection<? extends ComputeJob> split(int gridSize,
            String arg) throws IgniteException {
        final String[] args = StringUtils.split(arg, ' ');
        final Collection<ComputeJob> computeJobs = new ArrayList<ComputeJob>(
                args.length);
        for (String data : args) {
            computeJobs.add(new RandomlyFailingComputeJob(data));
        }
        return computeJobs;
    }

}

Test code:

    final Ignite ignite = Ignition.start();
    final String original = "The quick brown fox jumps over the lazy dog";

    final String reversed = StringUtils.join(
            ignite.compute().execute(new RandomlyFailingComputeTask(),
                    original), ' ');

As you can see, should always be failovered. Since the probability of failure != 1, I expect the task to successfully terminate at some point.

With the probability threshold of 0.5 and a total of 3 nodes this hardly happens. I'm getting an exception like class org.apache.ignite.cluster.ClusterTopologyException: Failed to failover a job to another node (failover SPI returned null). After some debugging I've found out that this is because I eventually run out of nodes. All of the are gone.

I understand that I can write my own FailoverSpi to handle this.

But this just doesn't feel right.

First, it seems to be an overkill to do this.
But then the SPI is a kind of global thing. I'd like to decide per job if it should be retried or failed over. This may, for instance, depend on what the exit code of the third-party tool I'm invoking. So configuring failover over the global SPI isn't right.

I'd appreciate any pointers.

Many thanks and best wishes,

Alexey

vkulichenko vkulichenko
Reply | Threaded
Open this post in threaded view
|

Re: How to retry failed job on any node with Apache Ignite

Alexey,

I see your point and it really looks like your use case should be an option of AlwaysFailoverSpi (which is the default one). But now it doesn't failover if it has already tried all nodes for a particular job. So you will have to implement your own failover SPI (it should be pretty simple - just pick a random node from the topology each time a job is trying to fail over).

As for global nature of the SPI, you're right, but its failover() takes FailoverContext, which has information about failed job (task name, attributes, exception, etc.), so you can make decision based on this information.

Hope this helps.

Thanks!

On Mon, Jun 29, 2015 at 1:08 PM, Aleksei Valikov <[hidden email]> wrote:
Hi,

this is basically a copy of


I'm experimenting with fault tolerance in Apache Ignite.

What I can't figure out is how to retry a failed job on any node. I have a use case where my jobs will be calling a third-party tool as a system process via process buildr to do some calculations. In some cases the tool may fail, but in most cases it's OK to retry the job on any node - including the one where it previously failed.

At the moment Ignite seems to reroute the job to another node which did not have this job before. So, after a while all nodes are gone and the task fails.

What I'm looking for is how to retry a job on any node.

Here's a test to demonstrate my problem.

Here's my randomly failing job:

public static class RandomlyFailingComputeJob implements ComputeJob {
    private static final long serialVersionUID = -8351095134107406874L;
    private final String data;

    public RandomlyFailingComputeJob(String data) {
        Validate.notNull(data);
        this.data = data;
    }

    public void cancel() {
    }

    public Object execute() throws IgniteException {
        final double random = Math.random();
        if (random > 0.5) {
            throw new IgniteException();
        } else {
            return StringUtils.reverse(data);
        }
    }
}

An below is the task:

public static class RandomlyFailingComputeTask extends
        ComputeTaskSplitAdapter<String, String> {
    private static final long serialVersionUID = 6756691331287458885L;

    @Override
    public ComputeJobResultPolicy result(ComputeJobResult res,
            List<ComputeJobResult> rcvd) throws IgniteException {
        if (res.getException() != null) {
            return ComputeJobResultPolicy.FAILOVER;
        }
        return ComputeJobResultPolicy.WAIT;
    }

    public String reduce(List<ComputeJobResult> results)
            throws IgniteException {
        final Collection<String> reducedResults = new ArrayList<String>(
                results.size());
        for (ComputeJobResult result : results) {
            reducedResults.add(result.<String> getData());
        }
        return StringUtils.join(reducedResults, ' ');
    }

    @Override
    protected Collection<? extends ComputeJob> split(int gridSize,
            String arg) throws IgniteException {
        final String[] args = StringUtils.split(arg, ' ');
        final Collection<ComputeJob> computeJobs = new ArrayList<ComputeJob>(
                args.length);
        for (String data : args) {
            computeJobs.add(new RandomlyFailingComputeJob(data));
        }
        return computeJobs;
    }

}

Test code:

    final Ignite ignite = Ignition.start();
    final String original = "The quick brown fox jumps over the lazy dog";

    final String reversed = StringUtils.join(
            ignite.compute().execute(new RandomlyFailingComputeTask(),
                    original), ' ');

As you can see, should always be failovered. Since the probability of failure != 1, I expect the task to successfully terminate at some point.

With the probability threshold of 0.5 and a total of 3 nodes this hardly happens. I'm getting an exception like class org.apache.ignite.cluster.ClusterTopologyException: Failed to failover a job to another node (failover SPI returned null). After some debugging I've found out that this is because I eventually run out of nodes. All of the are gone.

I understand that I can write my own FailoverSpi to handle this.

But this just doesn't feel right.

First, it seems to be an overkill to do this.
But then the SPI is a kind of global thing. I'd like to decide per job if it should be retried or failed over. This may, for instance, depend on what the exit code of the third-party tool I'm invoking. So configuring failover over the global SPI isn't right.

I'd appreciate any pointers.

Many thanks and best wishes,

Alexey


Aleksei Valikov Aleksei Valikov
Reply | Threaded
Open this post in threaded view
|

Re: How to retry failed job on any node with Apache Ignite

Hi,

thanks for the quick response. So it seems I didn't miss something obvious.

Thank you!

Best wishes,
Alexey

On Mon, Jun 29, 2015 at 10:28 PM, Valentin Kulichenko <[hidden email]> wrote:
Alexey,

I see your point and it really looks like your use case should be an option of AlwaysFailoverSpi (which is the default one). But now it doesn't failover if it has already tried all nodes for a particular job. So you will have to implement your own failover SPI (it should be pretty simple - just pick a random node from the topology each time a job is trying to fail over).

As for global nature of the SPI, you're right, but its failover() takes FailoverContext, which has information about failed job (task name, attributes, exception, etc.), so you can make decision based on this information.

Hope this helps.

Thanks!

On Mon, Jun 29, 2015 at 1:08 PM, Aleksei Valikov <[hidden email]> wrote:
Hi,

this is basically a copy of


I'm experimenting with fault tolerance in Apache Ignite.

What I can't figure out is how to retry a failed job on any node. I have a use case where my jobs will be calling a third-party tool as a system process via process buildr to do some calculations. In some cases the tool may fail, but in most cases it's OK to retry the job on any node - including the one where it previously failed.

At the moment Ignite seems to reroute the job to another node which did not have this job before. So, after a while all nodes are gone and the task fails.

What I'm looking for is how to retry a job on any node.

Here's a test to demonstrate my problem.

Here's my randomly failing job:

public static class RandomlyFailingComputeJob implements ComputeJob {
    private static final long serialVersionUID = -8351095134107406874L;
    private final String data;

    public RandomlyFailingComputeJob(String data) {
        Validate.notNull(data);
        this.data = data;
    }

    public void cancel() {
    }

    public Object execute() throws IgniteException {
        final double random = Math.random();
        if (random > 0.5) {
            throw new IgniteException();
        } else {
            return StringUtils.reverse(data);
        }
    }
}

An below is the task:

public static class RandomlyFailingComputeTask extends
        ComputeTaskSplitAdapter<String, String> {
    private static final long serialVersionUID = 6756691331287458885L;

    @Override
    public ComputeJobResultPolicy result(ComputeJobResult res,
            List<ComputeJobResult> rcvd) throws IgniteException {
        if (res.getException() != null) {
            return ComputeJobResultPolicy.FAILOVER;
        }
        return ComputeJobResultPolicy.WAIT;
    }

    public String reduce(List<ComputeJobResult> results)
            throws IgniteException {
        final Collection<String> reducedResults = new ArrayList<String>(
                results.size());
        for (ComputeJobResult result : results) {
            reducedResults.add(result.<String> getData());
        }
        return StringUtils.join(reducedResults, ' ');
    }

    @Override
    protected Collection<? extends ComputeJob> split(int gridSize,
            String arg) throws IgniteException {
        final String[] args = StringUtils.split(arg, ' ');
        final Collection<ComputeJob> computeJobs = new ArrayList<ComputeJob>(
                args.length);
        for (String data : args) {
            computeJobs.add(new RandomlyFailingComputeJob(data));
        }
        return computeJobs;
    }

}

Test code:

    final Ignite ignite = Ignition.start();
    final String original = "The quick brown fox jumps over the lazy dog";

    final String reversed = StringUtils.join(
            ignite.compute().execute(new RandomlyFailingComputeTask(),
                    original), ' ');

As you can see, should always be failovered. Since the probability of failure != 1, I expect the task to successfully terminate at some point.

With the probability threshold of 0.5 and a total of 3 nodes this hardly happens. I'm getting an exception like class org.apache.ignite.cluster.ClusterTopologyException: Failed to failover a job to another node (failover SPI returned null). After some debugging I've found out that this is because I eventually run out of nodes. All of the are gone.

I understand that I can write my own FailoverSpi to handle this.

But this just doesn't feel right.

First, it seems to be an overkill to do this.
But then the SPI is a kind of global thing. I'd like to decide per job if it should be retried or failed over. This may, for instance, depend on what the exit code of the third-party tool I'm invoking. So configuring failover over the global SPI isn't right.

I'd appreciate any pointers.

Many thanks and best wishes,

Alexey



vkulichenko vkulichenko
Reply | Threaded
Open this post in threaded view
|

Re: How to retry failed job on any node with Apache Ignite

Alexey,

I duplicated the response on stack overflow as well.

Thanks!

On Mon, Jun 29, 2015 at 1:31 PM, Aleksei Valikov <[hidden email]> wrote:
Hi,

thanks for the quick response. So it seems I didn't miss something obvious.

Thank you!

Best wishes,
Alexey

On Mon, Jun 29, 2015 at 10:28 PM, Valentin Kulichenko <[hidden email]> wrote:
Alexey,

I see your point and it really looks like your use case should be an option of AlwaysFailoverSpi (which is the default one). But now it doesn't failover if it has already tried all nodes for a particular job. So you will have to implement your own failover SPI (it should be pretty simple - just pick a random node from the topology each time a job is trying to fail over).

As for global nature of the SPI, you're right, but its failover() takes FailoverContext, which has information about failed job (task name, attributes, exception, etc.), so you can make decision based on this information.

Hope this helps.

Thanks!

On Mon, Jun 29, 2015 at 1:08 PM, Aleksei Valikov <[hidden email]> wrote:
Hi,

this is basically a copy of


I'm experimenting with fault tolerance in Apache Ignite.

What I can't figure out is how to retry a failed job on any node. I have a use case where my jobs will be calling a third-party tool as a system process via process buildr to do some calculations. In some cases the tool may fail, but in most cases it's OK to retry the job on any node - including the one where it previously failed.

At the moment Ignite seems to reroute the job to another node which did not have this job before. So, after a while all nodes are gone and the task fails.

What I'm looking for is how to retry a job on any node.

Here's a test to demonstrate my problem.

Here's my randomly failing job:

public static class RandomlyFailingComputeJob implements ComputeJob {
    private static final long serialVersionUID = -8351095134107406874L;
    private final String data;

    public RandomlyFailingComputeJob(String data) {
        Validate.notNull(data);
        this.data = data;
    }

    public void cancel() {
    }

    public Object execute() throws IgniteException {
        final double random = Math.random();
        if (random > 0.5) {
            throw new IgniteException();
        } else {
            return StringUtils.reverse(data);
        }
    }
}

An below is the task:

public static class RandomlyFailingComputeTask extends
        ComputeTaskSplitAdapter<String, String> {
    private static final long serialVersionUID = 6756691331287458885L;

    @Override
    public ComputeJobResultPolicy result(ComputeJobResult res,
            List<ComputeJobResult> rcvd) throws IgniteException {
        if (res.getException() != null) {
            return ComputeJobResultPolicy.FAILOVER;
        }
        return ComputeJobResultPolicy.WAIT;
    }

    public String reduce(List<ComputeJobResult> results)
            throws IgniteException {
        final Collection<String> reducedResults = new ArrayList<String>(
                results.size());
        for (ComputeJobResult result : results) {
            reducedResults.add(result.<String> getData());
        }
        return StringUtils.join(reducedResults, ' ');
    }

    @Override
    protected Collection<? extends ComputeJob> split(int gridSize,
            String arg) throws IgniteException {
        final String[] args = StringUtils.split(arg, ' ');
        final Collection<ComputeJob> computeJobs = new ArrayList<ComputeJob>(
                args.length);
        for (String data : args) {
            computeJobs.add(new RandomlyFailingComputeJob(data));
        }
        return computeJobs;
    }

}

Test code:

    final Ignite ignite = Ignition.start();
    final String original = "The quick brown fox jumps over the lazy dog";

    final String reversed = StringUtils.join(
            ignite.compute().execute(new RandomlyFailingComputeTask(),
                    original), ' ');

As you can see, should always be failovered. Since the probability of failure != 1, I expect the task to successfully terminate at some point.

With the probability threshold of 0.5 and a total of 3 nodes this hardly happens. I'm getting an exception like class org.apache.ignite.cluster.ClusterTopologyException: Failed to failover a job to another node (failover SPI returned null). After some debugging I've found out that this is because I eventually run out of nodes. All of the are gone.

I understand that I can write my own FailoverSpi to handle this.

But this just doesn't feel right.

First, it seems to be an overkill to do this.
But then the SPI is a kind of global thing. I'd like to decide per job if it should be retried or failed over. This may, for instance, depend on what the exit code of the third-party tool I'm invoking. So configuring failover over the global SPI isn't right.

I'd appreciate any pointers.

Many thanks and best wishes,

Alexey