26 June 2012
Infinispan Event push over Tomcat Websocket via HornetQ (part 1)
Continuous query (CQ) is a powerful tool to register for and receive updates on changes to cached data. For example, you have a cache of equity trades and you would like to provide users with the ability to register for and only receive changes effecting google.com. The established big players in Data Grid technology such as Oracle Coherence and Gemfire offer variations on this functionality and this is a common usage pattern that will hopefully become part of the JSR 347 standardisation of data grids.
I wanted to take a look at what Infinispan has to offer in this area and combine it with the latest Tomcat distribution which contains support for websockets as defined in RFC 6455. This combination of technologies will permit a real-time push cache events over a websocket back to the client browser. It wasn't plain sailing so I'll go through some of the issues I encountered, how they were solved and some (rudimentary) code to demonstrate how you might implement this architecture. The image on the right will take you to a working example where you'll find the source code available for download.
The last two Tomcat releases (7.0.27 and 7.0.28) both include Websocket functionality - you'll want to skip 7.0.27 though as websocket timeouts are coupled to the HTTP connector connectionTimeout property defined in server.xml. In 7.0.28 this link is removed and the websocket has an infinite timeout, this is the version used in this example.
Does Infinispan have CQ support? Well there is a project in "incubation" status available for download here and some documentation here, great news - this is going to be straight forward then! So I grabbed a copy, built it, wrote some code but then I hit a problem. Internally Infinispan maintains a "ComponentRegistry" - a singleton store of the core components required to run the cache. When one cache component requires a reference to a component from the registry an @Inject annotation is used to execute the logic to supply this reference. From what I could see the component registry is now implemented slightly differently in the latest 5.1.x releases of Infinispan in comparison to the 4.x code base. Component dependencies are now expressed in a serialized .dat file rather than in the previous releases where reflection was exclusively used - this has broken the CQ project. All is not lost though! Lets take a look at Infinispan's Event Listener Framework. Instances of org.infinispan.Cache implements the Listenable interface. This allows classes marked with the @Listener annotation to receive events from the cache, the full set of events is comprehensively covered in the docs so I won't go into it in much detail here. For this example we'll just be using the @CacheEntryModified event to receive updates when cache entries change.
So what are the drawbacks?
Well there isn't any support for events over the Hotrod client-server architecture, yet - you can track the status of this task here and it looks like this functionality will be available in the 5.2.0 release. So for this example I'm going to stick with embedded mode.
The other issue is that events only fire on nodes that own the data which is fine if your cache is replicated but as we've already discussed in this post replicated caches don't scale well.
So how do we capture events from a distributed cache and still ensure that all clients receive all the updates? We can use a JMS topic to capture events with clients consuming from this destination.
HornetQ is the default JBoss messaging stack, its super quick and also completely free! Check out this post for some more background. To implement this architecture we'll turn the listener we create in the architecture above to be a JMS producer as well. We'll send events to a topic and create a consumer embedded in the application to receive from the topic. Here's what it looks like now:
So all the application instances now receive updates from the cache, all we need to do is push these back to the client using Tomcat's websocket implementation.
Ok, so that's it for part 1, in part 2 we'll look more closely at some of the code.
See Part 2 here