19 September 2012

Do Coherence Aggregators Failover?

Coherence provides three possibilities for clients to execute code over entries in the data grid.

  • Entry Processors
  • Invocation Service
  • Aggregators

Entry Processors

The behaviour of Coherence's Entry Processors is well documented, they provide a lock-free programming model and a guarantee to apply any cache entry mutations as a single atomic operation.  An entry processor is created by a client and executed against a key, a set of keys or a filter.  In a partitioned cache execution takes place in parallel and should a primary node for a key fail while executing a processor the task is automatically migrated to a back-up.  An Entry Processor provides a "once and only once" guarantee and offers a powerful strategy for performing multiple updates concurrently.

Invocation Service

The InvocationService provides the ability to run code on the grid.  A client issues a Coherence Invocable task (based on an instance of java.lang.Runnable) to any number of cluster members.  Invocables can be issued synchronously using the query method or asynchronously using the execute method.  Notifications on the state of an asynchronous execution can be collated through the registration of an InvocationObserver.  There's no guarantee of tasks completing, if the node running the task fails then that's the end of it so therefore Invocables are best suited to idempotent, repeatable operations.

Aggregators

Aggregators are MapReduce for Coherence.  Like Entry Processors, Aggregators operate against an instance of an InvocableMap, and can be run against a set of keys or a filter.  You can write your own or choose from some of the Coherence standard out of the box implementations such as Count, Average, Min, Max etc etc.  Execution takes place in parallel but what happens when the primary key owner fails?  Well you would hope that the aggregator fails over to a back-up and returns the correct result.  This isn't documented, well I couldn't find it anyway and the only clue to the behaviour is in its sharing of InvocableMap with Entry Processors - so you'd hope they shared the same failover properties.

The best way to functionally test any multi-node Coherence configuration is with Little Grid.  This is a fantastic utility which lets you start up any number of Coherence cluster members (storage enabled, storage disabled, proxy services, extends clients etc) all inside a single JVM.  If you are using Coherence then it should definitely be part of your unit testing and continuous integration suites.  There are some great examples to get you started and we're going to extend the failover example to answer our question.

The JUnit test below is shamelessly based on the Entry Processor failover test available from the Little Grid examples section.  For our purposes it's just been modified to:

  • Create a 3 storage node cluster
  • Populate a partitioned cache with 100 records before each test
  • Issue a sub-classed version of com.tangosol.util.aggregator.Count (just so we can log what's happening) against all cache entries
  • Select a storage node at random and
    • In the first test simulate a non-graceful cluster member exit (stop)
    • In the second test simulate the graceful exit of a cluster member (shutdown)
  • Check the Aggregator counts all 100 records

The Aggregator has a sleep programmed into it so that the member stop and shutdown events can be triggered at the right time.

package uk.co.c2b2.coherence.failover;
import static org.junit.Assert.assertTrue;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.littlegrid.ClusterMemberGroup;
import org.littlegrid.ClusterMemberGroupUtils;

import com.tangosol.net.CacheFactory;
import com.tangosol.net.DistributedCacheService;
import com.tangosol.net.NamedCache;
import com.tangosol.util.aggregator.Count;
import com.tangosol.util.filter.AlwaysFilter;


public class AggregatorFailoverTest {
    private static final int SECONDS_DELAY_BEFORE_DEFERRED_ACTION = 2;
    private static final int SECONDS_TO_SLEEP_IN_AGGREGATOR = SECONDS_DELAY_BEFORE_DEFERRED_ACTION + 3;

    private ClusterMemberGroup memberGroup;
    private NamedCache cache;

    private int numberOfEntries = 100;
    private Random randomNumber = new Random(System.currentTimeMillis());

    @Before
    public void beforeTest() {
        memberGroup = ClusterMemberGroupUtils.newBuilder().setStorageEnabledCount(3).setCacheConfiguration("cache-config.xml").setLogLevel(3)
                .setFastStartJoinTimeoutMilliseconds(1000).buildAndConfigureForStorageDisabledClient();

        cache = CacheFactory.getCache("test");

        Map cacheEntries = new HashMap() {{
            for (int i = 0; i < numberOfEntries; i++) {
                put(i, String.valueOf(i));
            }
        }};
        
        cache.putAll(cacheEntries);
    }

    @After
    public void afterTest() {
        ClusterMemberGroupUtils.shutdownCacheFactoryThenClusterMemberGroups(memberGroup);
    }
    

    @Test
    public void deferredStopAndAggregatorFailover() {

        assertTrue(cache.size() == numberOfEntries);

        final int idOfMemberToStop = getRandomClusterMemberId();

        final Runnable runnable = new Runnable() {
            @Override
            public void run() {
                /* stop - leave the cluster immediately */
                memberGroup.stopMember(idOfMemberToStop);
            }
        };

        final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        executorService.schedule(runnable, SECONDS_DELAY_BEFORE_DEFERRED_ACTION, TimeUnit.SECONDS);

        int recordCount = (Integer) cache.aggregate(AlwaysFilter.INSTANCE, new SleepyCount());
        assertTrue(recordCount == numberOfEntries);
    }

    @Test
    public void deferredShutdownAndAggregatorFailover() throws InterruptedException {

        assertTrue(cache.size() == numberOfEntries);

        final int idOfMemberToShutdown = getRandomClusterMemberId();

        final Runnable runnable = new Runnable() {
            @Override
            public void run() {
                /* shutdown - graceful exit when all work on this member has completed*/
                memberGroup.shutdownMember(idOfMemberToShutdown);
            }
        };

        final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        executorService.schedule(runnable, SECONDS_DELAY_BEFORE_DEFERRED_ACTION, TimeUnit.SECONDS);

        int recordCount = (Integer) cache.aggregate(AlwaysFilter.INSTANCE, new SleepyCount());
        assertTrue(recordCount == numberOfEntries);
    }

    private int getRandomClusterMemberId() {
        int key = randomNumber.nextInt(numberOfEntries);
        final int idOfMemberToShutdown = ((DistributedCacheService) cache.getCacheService()).getKeyOwner(key).getId();
        return idOfMemberToShutdown;
    }

    public static class SleepyCount extends Count {

        /**
         * 
         */
        private static final long serialVersionUID = 6957327006461184666L;

        public SleepyCount() {
            super();
        }

        public Object aggregate(Set entries) {

            int memberId = CacheFactory.ensureCluster().getLocalMember().getId();

            CacheFactory.log("Member : " + memberId + " is aggregating " + entries.size() + " keys");
            CacheFactory.log("Member : " + memberId + " sleeping for " + SECONDS_TO_SLEEP_IN_AGGREGATOR + " seconds");

            try {
                TimeUnit.SECONDS.sleep(SECONDS_TO_SLEEP_IN_AGGREGATOR);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }

            Object result = super.aggregate(entries);

            CacheFactory.log("Member : " + memberId + " Finished sleeping - survived! ID of member that completed processing: " + memberId);

            return result;
        }

        public Object aggregateResults(Collection results) {
            int memberId = CacheFactory.ensureCluster().getLocalMember().getId();
            CacheFactory.log("Member : " + memberId + " is aggregating results " + Arrays.deepToString(results.toArray()));
            return super.aggregateResults(results);
        }

    }

}

And here's the cache config:

<cache-config>
    <caching-scheme-mapping>
        <cache-mapping>
            <cache-name>*</cache-name>
            <scheme-name>distributed-scheme</scheme-name>
        </cache-mapping>
    </caching-scheme-mapping>

    <caching-schemes>
        <distributed-scheme>
            <scheme-name>distributed-scheme</scheme-name>
            <backing-map-scheme>
                <local-scheme>
                </local-scheme>
            </backing-map-scheme>
            <autostart>true</autostart>
        </distributed-scheme>
    </caching-schemes>
</cache-config>

Test 1 - Member Stop

So lets take a look at the console output and see what happens during a non-graceful exit.  Firstly we see the Aggregator start to run on each storage enabled member, it displays the number of keys to process on that node and puts itself to sleep for 5 seconds:

Member : 3 is aggregating 34 keys
Member : 1 is aggregating 32 keys
Member : 1 sleeping for 5 seconds
Member : 3 sleeping for 5 seconds
Member : 2 is aggregating 34 keys
Member : 2 sleeping for 5 seconds

Then a cluster member is selected at random and stopped:

Sep 19, 2012 8:18:16 PM org.littlegrid.impl.DefaultClusterMemberGroup stopMember
INFO: About to stop cluster member with id '1'

Next the surviving Aggregators report back telling us they are still alive!  These Aggregators will have successfully counted the records on nodes 2 and 3 (34 + 34 == 68 entries)

Member : 3 Finished sleeping - survived! ID of member that completed processing: 3
Member : 2 Finished sleeping - survived! ID of member that completed processing: 2

Coherence recovers any redundancy lost due to the departure of member 1

Restored from backup 43 partitions
Restored from backup 43 partitions

Any now for the magic, the Aggregator process on the departed member migrates to the re-balanced keys and runs again, of course it still has a sleep programmed into it...

Member : 2 is aggregating 18 keys
Member : 2 sleeping for 5 seconds
Member : 3 is aggregating 14 keys
Member : 3 sleeping for 5 seconds
Member : 2 Finished sleeping - survived! ID of member that completed processing: 2
Member : 3 Finished sleeping - survived! ID of member that completed processing: 3

And then finally the results are totalled (34 + 34 + 18 + 14 == 100)

Member : 4 is aggregating results [34, 34, 18, 14]

Test 2 - Member Shutdown

Again the Aggregator runs on each storage enabled member, it displays the number of keys to process on that node and puts itself to sleep for 5 seconds:

Member : 1 is aggregating 32 keys
Member : 3 is aggregating 34 keys
Member : 3 sleeping for 5 seconds
Member : 2 is aggregating 34 keys
Member : 2 sleeping for 5 seconds
Member : 1 sleeping for 5 seconds

And a member is chosen to shutdown at random:

Sep 19, 2012 9:52:43 PM org.littlegrid.impl.DefaultClusterMemberGroup shutdownMember
INFO: About to shutdown cluster member '2'

This time though all members survive and the aggregation completes before member 2 is allowed to leave the cluster:

Member : 3 Finished sleeping - survived! ID of member that completed processing: 3
Member : 2 Finished sleeping - survived! ID of member that completed processing: 2
Member : 1 Finished sleeping - survived! ID of member that completed processing: 1
Member : 4 is aggregating results [32, 34, 34]

The End

So that's it, you can use Aggregators for guaranteed read-only operations and use little-grid unit tests to functionally test your projects.

Mark Addy

10 September 2012

Book Review: HBase Administration Cookbook

Publisher: Packt Publishing
Language : English
Paperback : 332 pages [ 235mm x 191mm ]
Release Date : August 2012
ISBN : 1849517142
ISBN 13 : 9781849517140
Author(s) : Yifeng Jiang
Available at : Packt Publishing Website, Amazon



HBase is an Apache project designed to enable realtime access to very large datasets. In order to enable this it leverages components from the Hadoop projects HDFS (distributed, massive scale, fault taulerant file system modelled on GFS), Zookeeper and Hadoop MapReduce.


The newly published book from Packt Publishing is designed to bring an administrator up to speed with the creation of a HBase cluster and help them with a wide range of tasks. It follows the style of presenting issues and solutions in a recipe which are then grouped into 9 high level chapters. This unfortunately leads to a little bit of repetition in some of the technical detail, but this does help the reader come in from a high level perspective knowing their issue; they can easily find a matching recipe in a section that makes sense rather than having to dig around. The recipes are good although they shy away from stating best practice which is probably for the best as the technology in question is maturing rapidly.

The book can also be read sequentially; the recipes are well written with clear step by step instructions on achieving a highly available, monitored HBase setup (optionally with Hive) in either a local environment or on a cloud based provider such as EC2. Note there are considerations for running on a cloud and the book provides pragmatic usable solutions that you can implement yourself.

The book addresses such topics as maintenance and security, troubleshooting and tuning of HBase (and implicitly Hadoop, MapReduce and the underlying hardware). Recovery of Hadoop clusters is covered to an extent I would expect is practical in a published book, but your mileage may vary with whether these recipes are pertinent to your infrastructure.

The book presents Hadoop concepts simply enough for a first timer to come into this topic with no prior knowledge of any Hadoop components and be able to perform the exercises. However  it does get into advanced topics later on, and some of the things that are covered (such as manual region server splitting and key management) left me with the impression that people unfamiliar with Hadoop would either need real world experience of such matters or alternate sources of information to leave them with a better understanding of what they were doing and why. After reading this book they would have a good foundation knowledge of what a cluster should look like and should have been introduced to enough concepts to find out more by themselves.

Overall I consider this book to be well written, the author has a demonstratable knowledge of System Administration and practical experience with installing and managing Hadoop and HBase installations. Well done Yifeng.

Nick Wright

3 September 2012

Out Of Memory Exception First Response


In any business, it’s a wise decision to not only plan for outages in production but to expect them. When problems do occur, particularly for production systems, time taken to resolve the issue is always key, whether that means time taken to restore the affected system to a stable, consistent and usable state or a root cause analysis to determine the causes and preventative measures to put in place.

When opening a support ticket, then, it is very important to reduce the amount of time wasted by waiting for support to tell you what data they need to diagnose the problem.

This post will provide a basic guide to some useful first steps following a Java OutOfMemory exception which should help get things moving quickly.

What data should I collect?


In short: as much as possible! In an ideal world, all possible logs would be available for such problems but, since these errors occur mainly in production systems, running your server with maximum tracing on is not an option! Here is a list of things which are very useful to provide if you can:

Heapdumps and Java cores

  • The JVM should be set to generate heapdumps and javacores following an OOM event. 

Verbose GC output

  • You will find Verbose GC logged in the native_stderr.log file, but it can be configured to log to a different file without much trouble

SystemOut.log, SystemErr.log

  • If any recurring errors contributed to the OOM, it is likely they will have been logged in some way. The data contained in these will very likely be minimal, but it’s worth providing them to support anyway. 

Any other server logs

 

How to prepare for a reoccurrence 

 

Make sure the JVM is set to provide a heapdump on OutOfMemory errors.

 Turn on VerboseGC if it’s not on already

  • Verbose GC output is a very lightweight process which will make negligible difference to performance in almost all scenarios. Most of the work to produce the output will be done by the garbage collector anyway, so setting –verbose:gc will just save this data to a log file.

Monitoring

  • The JDK provides a useful tool called VisualVM which allows live monitoring of the heap. See below for more details.

 

Supply as much information about how the affected system is set up as possible 

 

Have you deployed any new applications to the server?

  • The problem could lie in the application code, or it may have uncovered an underlying issue on the server. If it’s possible to roll back these updates they may, in some cases, add some stability.

Any increased load to the system?

  • This could be due to a marketing campaign, or seasonal changes to your business rather than a technical change.

Any updates to the system? What is the current software level? (Including minor version?)

  • Are there any fixes or patches related to memory or performance that you are missing?

What network topology is implemented?

  • This is, perhaps, of less importance but it is still useful to know what, if any, load balancing is available to the problem server.

Does the system handle large message sizes or large volumes of messages/transactions?

  • This sort of information is helpful in the diagnosis of heap exhaustion.

 

Use freely available tools to perform a basic analysis of the problem while waiting 

 

If you find yourself at a loose end after support has been engaged, there are a lot of tools available for you to investigate the problem yourself. You may even spot the answer before they do! Going into much detail about these is outside the scope of this article, but links are provided to more comprehensive articles on those you find interesting.
  •  Memory Analyzer can give reports and summary tables of heapdumps (HPROF or IBM formatted). It can be used as an Eclipse plugin or through the IBM Support Assistant (ISA).
  •  IBM GCMV. This is a very useful dynamic GC visualiser. It provides graphs showing heap usage over time and can quickly show usage patterns. It will also generate a report giving recommendations for best practices based on your JVM’s behaviour.
  •  IBM Pattern Modelling and Analysis Tool (PMAT) for the garbage collector is a more static tool with an emphasis squarely on improving performance, rather than fixing a problem.
  •  IBM TMDA for javacores is a very useful tool which can work together with a tool like Memory Analyzer to tie in hung threads to objects taking up a large percentage of the heap. Deadlocks can often be uncovered like this, though they are never easy to spot!
  •  Heap Analyzer is a very good stand-alone alternative to Memory Analyzer.
  • VisualVM is a part of the JDK from 1.6 update 7 onwards. More information on how to use it on local and remote VMs is available in the comprehensive documentation.

This is such a huge topic that one blog post could never cover everything in as much depth as it warrants. Hopefully though, despite that, you will feel more prepared the next time you get an OutOfMemory error!

Mike Croft