(see Part 1 here)
In-play Match Betting
For this example we have a selection of sporting events, well just some football and cricket matches that we will publish odds for. In the background a thread randomly updates odds for one of the outcomes (home win, away win or draw) and one of the matches. Using the @CacheEntryModified event we send these back to the connected browser sessions via the websocket connection.
Servlet Code
So lets get started with some code and the web container. Its just got one servlet defined as follows:
@WebServlet(urlPatterns = "/inplay", loadOnStartup = 1)
public class InfinispanServlet extends WebSocketServlet {
As you can see we are using servlet 3.0 annotations, the other important thing to note is we are extending org.apache.catalina.websocket.WebSocketServlet which gives us the ability to process websocket connections.
This servlet gets loaded when the container starts and the init() lifecycle method performs the following tasks
- Create an Infinispan CacheManager instance
- Grab a reference to the distributed cache we are going to use
- Creates a listener (jmsMessageProducer) for events propagated from the cache
- Creates a consumer (jmsMessageConsumer) of events from the JMS topic
- Load the default set of events / matches we have odds for into the cache
- Register our listener on the cache
public void init() throws UnavailableException {
try {
super.init();
manager = new DefaultCacheManager("infinispan-config.xml");
cache = manager.getCache("dist-cache");
jmsMessageProducer = JMSMessageProducer.getInstance();
jmsMessageConsumer = JMSMessageConsumer.getInstance();
/* loads all matches */
for (Match match : Match.selection.getData()) {
cache.put(match.getKey(), match);
}
cache.addListener(jmsMessageProducer);
} catch (Exception e) {
throw new UnavailableException(e.getMessage());
}
}
The corresponding servlet lifecycle destroy method cleans up these resources when the container stops.
The infinispan configuration is very basic, we have enabled JMX monitoring and are using distributed mode, two data owners with asynchronous replication. The configuration file is bundled inside the war and looks like this:
<infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:infinispan:config:5.1 http://www.infinispan.org/schemas/infinispan-config-5.1.xsd"
xmlns="urn:infinispan:config:5.1">
<global>
<globalJmxStatistics enabled="true" jmxDomain="org.infinispan"
cacheManagerName="MyCacheManager" />
<transport>
<properties>
<property name="configurationFile" value="jgroups-udp.xml" />
</properties>
</transport>
</global>
<default>
<jmxStatistics enabled="true" />
</default>
<namedCache name="dist-cache">
<clustering mode="distributed">
<hash numOwners="2" />
<async />
</clustering>
</namedCache>
</infinispan>
When you extend org.apache.catalina.websocket.WebSocketServlet one of the methods you must implement is createWebSocketInbound, i.e. what do we do when a new websocket connection is created? In our example for every connection we receive we want to commence a push of events from the cache to that connection. To handle this we add the new connection to a pool (set) inside our local JMS message consumer. This consumer is listening against the JMS topic that the cache listener pushes events onto:
@Override
protected StreamInbound createWebSocketInbound(String subProtocol) {
WebsocketMessageInbound connection = new WebsocketMessageInbound();
jmsMessageConsumer.addConnection(connection);
return connection;
}
We have to provide our own implementation of the Websocket connection and this is also contained in the servlet as a private class:
final class WebsocketMessageInbound extends MessageInbound {
@Override
protected void onBinaryMessage(ByteBuffer message) throws IOException {
// getWsOutbound().writeBinaryMessage(message);
}
@Override
protected void onTextMessage(CharBuffer message) throws IOException {
// getWsOutbound().writeTextMessage(message);
}
public void onClose(int status) {
jmsMessageConsumer.removeConnection(this);
}
public void onOpen(WsOutbound outbound) {
try {
Set<Entry<String,Match>> entrySet = cache.entrySet();
List<Match> matchList = new ArrayList<Match>();
for (Entry<String, Match> entry : entrySet) {
matchList.add(entry.getValue());
}
MatchSelection selection = new MatchSelection(matchList);
outbound.writeTextMessage(CharBuffer.wrap(JsonHelper.toJSonString(selection)));
} catch (Exception e) {
e.printStackTrace();
}
}
void broadcast(String message) {
try {
CharBuffer buffer = CharBuffer.wrap(message);
getWsOutbound().writeTextMessage(buffer);
} catch (IOException ioe) {
System.out.println("ouch " + ioe.getMessage());
}
}
}
In this example we are ignoring events received from the client (these are handled by the onBinaryMessage and onTextMessage methods), we are just pushing read-only data back down the websocket.
- When a client opens a connection (onOpen) we extract all the in-play match information from the cache and send them the latest view of the data.
- When a client closes their connection (onClose) we remove that connection from the pool.
You'll also notice that the initial data set sent to the client during onOpen is in JSON format - when we take a look at the java script later in this post you'll see why we are using this format for all messages.
Messaging Code and Configuration
We've already seen the servlet initializing a jmsMessageProducer instance (to listen for cache events and publish them onto our topic) and a jmsMessageConsumer instance (to receive messages from the topic and push them back over all their registered client connections).
First we should really set up our HornetQ server:
HornetQ Server
I downloaded the latest release (2.2.14.Final) from
here, extracted and ran the default "standalone" configuration. The only change to the config required is to define our JMS topic in the $HQ_HOME/config/standalone/non-clustered/hornetq-jms.xml file as follows:
<queue name="ExpiryQueue">
<entry name="/queue/ExpiryQueue"/>
</queue>
<topic name="InfinispanEventTopic">
<entry name="/topic/InfinispanEventTopic"/>
</topic>
</configuration>
JmsMessageProducer
The JmsMessageProducer is responsible for receiving cache events from Infinispan so therefore it has to be annotated with the @Listener interface:
@Listener
public class JMSMessageProducer {
Initialization of the JmsMessageProducer creates a connection to the topic using the native HornetQ JMS Api to connect to the host and port where the HornetQ server is located:
private void initialize() throws Exception {
Map<String, Object> connectionParams = new HashMap<String, Object>();
connectionParams.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, 5445);
connectionParams.put(org.hornetq.core.remoting.impl.netty.TransportConstants.HOST_PROP_NAME, "localhost");
TransportConfiguration transportConfiguration = new TransportConfiguration("org.hornetq.core.remoting.impl.netty.NettyConnectorFactory",
connectionParams);
HornetQConnectionFactory cf = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, transportConfiguration);
connection = cf.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("InfinispanEventTopic");
producer = session.createProducer(topic);
connection.start();
}
To receive cache modification events and publish messages to our HornetQ topic the JmsMessageProducer defines a method annotated with @CacheEntryModified containing the logic to generate and publish the message:
@CacheEntryModified
public void logModifiedEvent(CacheEntryModifiedEvent<String, Match> event) throws IOException {
if (!event.isPre()) {
MatchUpdate matchUpdate = new MatchUpdate(event.getValue());
sendMessage(matchUpdate);
}
}
And finally the sendMessage method handles delivery:
public void sendMessage(MatchUpdate matchUpdate) throws JsonGenerationException, JsonMappingException, IOException {
try {
Message textMessage = session.createTextMessage(JsonHelper.toJSonString(matchUpdate));
textMessage.setStringProperty(org.hornetq.core.message.impl.MessageImpl.HDR_DUPLICATE_DETECTION_ID.toString(),
matchUpdate.getJmsDuplicateDetectionId());
producer.send(textMessage);
} catch (JMSException e) {
e.printStackTrace();
}
}
Hang on though, this means for every cache update two events (if we have numOwners=2) get fired, one at each node owning the data. In our current system two duplicate events will generate a JMS message delivered to the HornetQ topic running on the standalone broker and so therefore all the message listeners will receive the event twice!
Obviously this isn't great but HornetQ has an out of the box duplicate detection system we can make use of. One of its attractive features is its amazing performance in performance tests - over 8 million messages per second is very impressive. Native HornetQ bridges are also highly performant and this in part can be attributed to the mechanism HornetQ uses to provide ONCE and ONLY ONCE delivery semantics. The previous incarnation of HornetQ, JBoss Messaging relied on XA transations to provide guaranteed delivery over a JMS bridge but this came at a price, namely disk I/O. XA transactions require that the state of the transaction is persisted to disk at the prepare and commit phases to ensure that should a system outage occur at any point during this process sufficient information is retained to recover the transaction state and ensure that messages are delivered ONCE and ONLY ONCE.
Disk I/O is a huge overhead in a high throughput system so the HornetQ team devised an in-memory mechanism that caches a configurable number of received messages at the receiver end so that in failure scenarios message delivery can be replayed safely with any duplicates discarded by the receiver.
By default the JMS Message Id property is used as the unique identifier but this can be overridden as shown in the sample code above, allowing users to implement their own identifiers - in our case this will be the key to the in-play match cache entry and its associated version number.
JmsMessageConsumer
The consumers job is to receive messages from the topic and forward them back to the clients, the initialization logic is similar, again we use the native HornetQ JMS Api:
private void initialize() throws JMSException {
Map<String, Object> connectionParams = new HashMap<String, Object>();
connectionParams.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, 5445);
connectionParams.put(org.hornetq.core.remoting.impl.netty.TransportConstants.HOST_PROP_NAME, "localhost");
TransportConfiguration transportConfiguration = new TransportConfiguration("org.hornetq.core.remoting.impl.netty.NettyConnectorFactory",
connectionParams);
HornetQConnectionFactory cf = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, transportConfiguration);
connection = cf.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("InfinispanEventTopic");
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(listener);
connection.start();
}
We also maintain the set of connected clients:
private Set<WebsocketMessageInbound> connections = new CopyOnWriteArraySet<InfinispanServlet.WebsocketMessageInbound>();
public void addConnection(WebsocketMessageInbound connection) {
connections.add(connection);
}
public void removeConnection(WebsocketMessageInbound connection) {
connections.remove(connection);
}
And when we receive a message we broadcast it to the clients:
private final MessageListener listener = new MessageListener() {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage textMessage = ((TextMessage) message);
try {
for (WebsocketMessageInbound connection : connections) {
connection.broadcast(textMessage.getText());
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
};
So that's it for the messaging infrastructure, next we'll take a quick look at the client side java script
Client Java Script
Ok, so we've looked at the key components on the server side and also how the servlet container handles websocket traffic, now we'll look at the client side and run through the java script used to create a websocket connection and receive the cache event updates from the server.
This is the javascript used in the example to create the client websocket connection - the actual websocket is wrapped in a "FancyWebSocket" (see below) to handle a JSON message format exchange between client and server:
var WS = {};
WS.socket = null;
WS.connect = (function(host) {
if ('WebSocket' in window) {
WS.socket = new FancyWebSocket(host);
} else {
return;
}
});
WS.initialize = function() {
WS.connect('ws://' + window.location.host + '/infinispan/inplay');
};
WS.initialize();
As you have seen data sent from the server to the browser is in JSON format, this is no accident. Earlier in this post we saw the default handling of client messages in the servlet (onBinaryMessage and onTextMessage) and although we haven't implemented any client messaging back to the server it's worth considering how you would handle this anyway. If you clients send text instructions to the server then you'll probably need to parse the content to comprehend what the client wants to do, is the client trying to request a data item? update some data? query for some data? So we definitely need to parse this communication and JSON is an ideal format both the client and server can understand.
Fortunately for me, not being a javascript expert, handling of java script messages over websockets on the client side has been beautifully encapsulated in this
post back in 2010 - you'll find the code in ws_events_dispatcher.js if you take a look at our running example. So we'll borrow this for our example - its very powerful allowing the binding of "events" to functions.
var FancyWebSocket = function(url){
var conn = new WebSocket(url);
var callbacks = {};
this.bind = function(event_name, callback){
callbacks[event_name] = callbacks[event_name] || [];
callbacks[event_name].push(callback);
return this;// chainable
};
this.send = function(event_name, event_data){
var payload = JSON.stringify({event:event_name, data: event_data});
conn.send( payload ); // <= send JSON data to socket server
return this;
};
// dispatch to the right handlers
conn.onmessage = function(evt){
var json = JSON.parse(evt.data)
dispatch(json.event, json.data)
};
conn.onclose = function(){dispatch('close',null)}
conn.onopen = function(){dispatch('open',null)}
var dispatch = function(event_name, message){
var chain = callbacks[event_name];
if(typeof chain == 'undefined') return; // no callbacks for this event
for(var i = 0; i < chain.length; i++){
chain[i]( message )
}
}
};
Now we can bind our examples events and execute the functionality we want when JSON messages matching that event arrive at the client:
WS.socket.bind('update', function(matchData){
if (availableMatchListBuilt) {
updateMatchRow(matchData);
}
})
WS.socket.bind('start', function(matchData){
if (!availableMatchListBuilt) {
buildAvailableMatches(matchData);
availableMatchListBuilt = true;
}
})
- The update event corresponds to a change in the match odds
- The start event corresponds to the initial load of all the in-play matches
Hopefully this gives you an introduction to the fundamental nuts and bolts I used to get this example working, there's lots more scripting in the client and heavy usage of the jquery libraries to get the formatting and updates to work - the best way to view this will be to take a look at the
source code yourself.
Mark Addy