19 September 2012

Do Coherence Aggregators Failover?

Coherence provides three possibilities for clients to execute code over entries in the data grid.

  • Entry Processors
  • Invocation Service
  • Aggregators

Entry Processors

The behaviour of Coherence's Entry Processors is well documented, they provide a lock-free programming model and a guarantee to apply any cache entry mutations as a single atomic operation.  An entry processor is created by a client and executed against a key, a set of keys or a filter.  In a partitioned cache execution takes place in parallel and should a primary node for a key fail while executing a processor the task is automatically migrated to a back-up.  An Entry Processor provides a "once and only once" guarantee and offers a powerful strategy for performing multiple updates concurrently.

Invocation Service

The InvocationService provides the ability to run code on the grid.  A client issues a Coherence Invocable task (based on an instance of java.lang.Runnable) to any number of cluster members.  Invocables can be issued synchronously using the query method or asynchronously using the execute method.  Notifications on the state of an asynchronous execution can be collated through the registration of an InvocationObserver.  There's no guarantee of tasks completing, if the node running the task fails then that's the end of it so therefore Invocables are best suited to idempotent, repeatable operations.

Aggregators

Aggregators are MapReduce for Coherence.  Like Entry Processors, Aggregators operate against an instance of an InvocableMap, and can be run against a set of keys or a filter.  You can write your own or choose from some of the Coherence standard out of the box implementations such as Count, Average, Min, Max etc etc.  Execution takes place in parallel but what happens when the primary key owner fails?  Well you would hope that the aggregator fails over to a back-up and returns the correct result.  This isn't documented, well I couldn't find it anyway and the only clue to the behaviour is in its sharing of InvocableMap with Entry Processors - so you'd hope they shared the same failover properties.

The best way to functionally test any multi-node Coherence configuration is with Little Grid.  This is a fantastic utility which lets you start up any number of Coherence cluster members (storage enabled, storage disabled, proxy services, extends clients etc) all inside a single JVM.  If you are using Coherence then it should definitely be part of your unit testing and continuous integration suites.  There are some great examples to get you started and we're going to extend the failover example to answer our question.

The JUnit test below is shamelessly based on the Entry Processor failover test available from the Little Grid examples section.  For our purposes it's just been modified to:

  • Create a 3 storage node cluster
  • Populate a partitioned cache with 100 records before each test
  • Issue a sub-classed version of com.tangosol.util.aggregator.Count (just so we can log what's happening) against all cache entries
  • Select a storage node at random and
    • In the first test simulate a non-graceful cluster member exit (stop)
    • In the second test simulate the graceful exit of a cluster member (shutdown)
  • Check the Aggregator counts all 100 records

The Aggregator has a sleep programmed into it so that the member stop and shutdown events can be triggered at the right time.

package uk.co.c2b2.coherence.failover;
import static org.junit.Assert.assertTrue;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.littlegrid.ClusterMemberGroup;
import org.littlegrid.ClusterMemberGroupUtils;

import com.tangosol.net.CacheFactory;
import com.tangosol.net.DistributedCacheService;
import com.tangosol.net.NamedCache;
import com.tangosol.util.aggregator.Count;
import com.tangosol.util.filter.AlwaysFilter;


public class AggregatorFailoverTest {
    private static final int SECONDS_DELAY_BEFORE_DEFERRED_ACTION = 2;
    private static final int SECONDS_TO_SLEEP_IN_AGGREGATOR = SECONDS_DELAY_BEFORE_DEFERRED_ACTION + 3;

    private ClusterMemberGroup memberGroup;
    private NamedCache cache;

    private int numberOfEntries = 100;
    private Random randomNumber = new Random(System.currentTimeMillis());

    @Before
    public void beforeTest() {
        memberGroup = ClusterMemberGroupUtils.newBuilder().setStorageEnabledCount(3).setCacheConfiguration("cache-config.xml").setLogLevel(3)
                .setFastStartJoinTimeoutMilliseconds(1000).buildAndConfigureForStorageDisabledClient();

        cache = CacheFactory.getCache("test");

        Map cacheEntries = new HashMap() {{
            for (int i = 0; i < numberOfEntries; i++) {
                put(i, String.valueOf(i));
            }
        }};
        
        cache.putAll(cacheEntries);
    }

    @After
    public void afterTest() {
        ClusterMemberGroupUtils.shutdownCacheFactoryThenClusterMemberGroups(memberGroup);
    }
    

    @Test
    public void deferredStopAndAggregatorFailover() {

        assertTrue(cache.size() == numberOfEntries);

        final int idOfMemberToStop = getRandomClusterMemberId();

        final Runnable runnable = new Runnable() {
            @Override
            public void run() {
                /* stop - leave the cluster immediately */
                memberGroup.stopMember(idOfMemberToStop);
            }
        };

        final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        executorService.schedule(runnable, SECONDS_DELAY_BEFORE_DEFERRED_ACTION, TimeUnit.SECONDS);

        int recordCount = (Integer) cache.aggregate(AlwaysFilter.INSTANCE, new SleepyCount());
        assertTrue(recordCount == numberOfEntries);
    }

    @Test
    public void deferredShutdownAndAggregatorFailover() throws InterruptedException {

        assertTrue(cache.size() == numberOfEntries);

        final int idOfMemberToShutdown = getRandomClusterMemberId();

        final Runnable runnable = new Runnable() {
            @Override
            public void run() {
                /* shutdown - graceful exit when all work on this member has completed*/
                memberGroup.shutdownMember(idOfMemberToShutdown);
            }
        };

        final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        executorService.schedule(runnable, SECONDS_DELAY_BEFORE_DEFERRED_ACTION, TimeUnit.SECONDS);

        int recordCount = (Integer) cache.aggregate(AlwaysFilter.INSTANCE, new SleepyCount());
        assertTrue(recordCount == numberOfEntries);
    }

    private int getRandomClusterMemberId() {
        int key = randomNumber.nextInt(numberOfEntries);
        final int idOfMemberToShutdown = ((DistributedCacheService) cache.getCacheService()).getKeyOwner(key).getId();
        return idOfMemberToShutdown;
    }

    public static class SleepyCount extends Count {

        /**
         * 
         */
        private static final long serialVersionUID = 6957327006461184666L;

        public SleepyCount() {
            super();
        }

        public Object aggregate(Set entries) {

            int memberId = CacheFactory.ensureCluster().getLocalMember().getId();

            CacheFactory.log("Member : " + memberId + " is aggregating " + entries.size() + " keys");
            CacheFactory.log("Member : " + memberId + " sleeping for " + SECONDS_TO_SLEEP_IN_AGGREGATOR + " seconds");

            try {
                TimeUnit.SECONDS.sleep(SECONDS_TO_SLEEP_IN_AGGREGATOR);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }

            Object result = super.aggregate(entries);

            CacheFactory.log("Member : " + memberId + " Finished sleeping - survived! ID of member that completed processing: " + memberId);

            return result;
        }

        public Object aggregateResults(Collection results) {
            int memberId = CacheFactory.ensureCluster().getLocalMember().getId();
            CacheFactory.log("Member : " + memberId + " is aggregating results " + Arrays.deepToString(results.toArray()));
            return super.aggregateResults(results);
        }

    }

}

And here's the cache config:

<cache-config>
    <caching-scheme-mapping>
        <cache-mapping>
            <cache-name>*</cache-name>
            <scheme-name>distributed-scheme</scheme-name>
        </cache-mapping>
    </caching-scheme-mapping>

    <caching-schemes>
        <distributed-scheme>
            <scheme-name>distributed-scheme</scheme-name>
            <backing-map-scheme>
                <local-scheme>
                </local-scheme>
            </backing-map-scheme>
            <autostart>true</autostart>
        </distributed-scheme>
    </caching-schemes>
</cache-config>

Test 1 - Member Stop

So lets take a look at the console output and see what happens during a non-graceful exit.  Firstly we see the Aggregator start to run on each storage enabled member, it displays the number of keys to process on that node and puts itself to sleep for 5 seconds:

Member : 3 is aggregating 34 keys
Member : 1 is aggregating 32 keys
Member : 1 sleeping for 5 seconds
Member : 3 sleeping for 5 seconds
Member : 2 is aggregating 34 keys
Member : 2 sleeping for 5 seconds

Then a cluster member is selected at random and stopped:

Sep 19, 2012 8:18:16 PM org.littlegrid.impl.DefaultClusterMemberGroup stopMember
INFO: About to stop cluster member with id '1'

Next the surviving Aggregators report back telling us they are still alive!  These Aggregators will have successfully counted the records on nodes 2 and 3 (34 + 34 == 68 entries)

Member : 3 Finished sleeping - survived! ID of member that completed processing: 3
Member : 2 Finished sleeping - survived! ID of member that completed processing: 2

Coherence recovers any redundancy lost due to the departure of member 1

Restored from backup 43 partitions
Restored from backup 43 partitions

Any now for the magic, the Aggregator process on the departed member migrates to the re-balanced keys and runs again, of course it still has a sleep programmed into it...

Member : 2 is aggregating 18 keys
Member : 2 sleeping for 5 seconds
Member : 3 is aggregating 14 keys
Member : 3 sleeping for 5 seconds
Member : 2 Finished sleeping - survived! ID of member that completed processing: 2
Member : 3 Finished sleeping - survived! ID of member that completed processing: 3

And then finally the results are totalled (34 + 34 + 18 + 14 == 100)

Member : 4 is aggregating results [34, 34, 18, 14]

Test 2 - Member Shutdown

Again the Aggregator runs on each storage enabled member, it displays the number of keys to process on that node and puts itself to sleep for 5 seconds:

Member : 1 is aggregating 32 keys
Member : 3 is aggregating 34 keys
Member : 3 sleeping for 5 seconds
Member : 2 is aggregating 34 keys
Member : 2 sleeping for 5 seconds
Member : 1 sleeping for 5 seconds

And a member is chosen to shutdown at random:

Sep 19, 2012 9:52:43 PM org.littlegrid.impl.DefaultClusterMemberGroup shutdownMember
INFO: About to shutdown cluster member '2'

This time though all members survive and the aggregation completes before member 2 is allowed to leave the cluster:

Member : 3 Finished sleeping - survived! ID of member that completed processing: 3
Member : 2 Finished sleeping - survived! ID of member that completed processing: 2
Member : 1 Finished sleeping - survived! ID of member that completed processing: 1
Member : 4 is aggregating results [32, 34, 34]

The End

So that's it, you can use Aggregators for guaranteed read-only operations and use little-grid unit tests to functionally test your projects.

Mark Addy

No comments:

Post a Comment