-
Notifications
You must be signed in to change notification settings - Fork 20
Simple Example
The Dempsy Examples repository has several versions of the WordCount example. You can find the final version of this tutorial's code in the userguide-wordcount sub-project.
In order to understand how to use Dempsy you need to understand how a Dempsy application is structured and the best way to see this is through a simple example. The "Hello World" of the BigData applications seems to be the "word counter." If you're familiar with Hadoop then you probably started with the WordCount example. In this simple example let's suppose we have a source of words from some text. In traditional batch based BigData systems the source for these words would be a file or perhaps already partitioned across some distributed storage. In a Dempsy application we're receiving these words in real-time through a data stream. Maybe we're getting all of the live data from a large social media application and we want to calculate the histogram of word occurrences.
What would the Dempsy application that accomplishes the above described feat look like? Imagine that each word from this hypothetical live stream of text is broken into its own message. Each of these messages is routed to an instance of a class (let's call that instance a message processor) that has the responsibility to keep track of the count for a single word. That is, there is an instance of Word Counting message processor, per distinct word. For example, every time the word "Google" flows through the stream it's routed to the same message processor (the one dedicated to the word "Google"). Every time the word "is" is encountered, it's routed to another message processor instance. And likewise with the word "Skynet."
How easy would it be to write the code for that message processor class? This way of looking at the problem makes the code fairly simple. It could be as simple as this:
class WordCount {
private long count = 0;
public void countWord(String word) {
count++;
}
}Notice, we write the message processor in such a way that we assume each instance is responsible for a single word and that in the larger application there will be a many instances, each operating on their piece of the stream. These instances can be (usually are) spread out over a large number of machines.
Of course, what's missing? How does each word get to its respective message processor? How are the WordCount instances instantiated, deleted, provided their word message? Where are they instantiated? What about synchronization? What about sending out messages? All of these are the primary responsibility of Dempsy.
At this point we have a nice little piece of POJO functionality completely unmoored from infrastructural concerns. Let's look at how Dempsy handles some of these concerns. As mentioned, one of Dempsy's primary responsibilities is, given a message, find the message processor responsible for handling that message. Now that we have a POJO that accomplishes some business functionality we need to tell Dempsy how they are to be addressed, that is, how Dempsy is to find which message processor is responsible for which messages.
The way Dempsy does this is through the use of a message key. Each message that flows through Dempsy needs to have a message key. Dempsy is (optionally) annotation driven, so classes that represent messages need to identify a means of obtaining the message's MessageKey through the use of an annotation. An important concept to grasp here is that a MessageKey is essentially the address of an individual message processor instance.
In our example, each word is a message.
import net.dempsy.lifecycle.annotation.MessageKey;
@MessageType
public class Word implements Serializable {
private final String wordText;
public Word(final String data) {
this.wordText = data;
}
@MessageKey
public String getWordText() {
return this.wordText;
}
}So when Dempsy receives a message of type Word, it retrieves the key using the annotated method getWordText(). That key will become the address of a message processor somewhere on the system. Dempsy will find the message processor instance (in this case an instance of the class WordCount) within a cluster of nodes responsible for running the WordCount message processing. In the case that the instance doesn't already exist, Dempsy will clone() a WordCount instance prototype.
Note: the annotation @MessageType is used to tell Dempsy that the class name is meant to be used as a "message type" in the system. Please see the user guide for a more detailed description.
If you're paying attention you might notice there's two gaps that need to be filled in the WordCount implementation. First, how is it that Dempsy understands that the WordCount handles the Word message, and second, how is a WordCount prototype cloned (notice the existing WordCount class cannot (yet) simply be cloned()).
This requires us to revisit the WordCount implementation. We need to do several things to satisfy Dempsy:
- We need to identify the
WordCountclass as an Mp which is done with a class annotation - We need to identify the
WordCount.countWord()call as the point where the Mp handles the message by annotating it with the@MessageHandlerannotation. - We need to make sure
WordCountis Cloneable.
This would be accomplished by the following:
import net.dempsy.lifecycle.annotation.Activation;
import net.dempsy.lifecycle.annotation.MessageHandler;
import net.dempsy.lifecycle.annotation.Mp;
import net.dempsy.lifecycle.annotation.Output;
@Mp
public class WordCount implements Cloneable {
private long count = 0;
@MessageHandler
public void countWord(final Word word) {
count++;
}
@Override
public Object clone() throws CloneNotSupportedException {
return super.clone();
}
}The framework now has enough information that it can understand:
- How to create instances of the
WordCountmessage processor given an already existing instance it will use as a prototype. - That instances of
WordCounthandle messages of typeWordusing theWordCount.countWord()method. - That the key for a given
Wordmessage, which represents the "address" of a uniqueWordCountinstance to which theWordmessage should be routed, is provided by a call on the message instance,Word.getWordText().
Note: Currently the default implementation for the Serializer is Kryo. This at least requires messages to have a default constructor defined (though Kryo supports private default construtor invocations under the right circumstances. See the Kryo documentation for more information). Also, Dempsy can be configured with the standard Java Serializer (though I'm not sure why anyone would ever want to do this).
It is critical that the Object that Dempsy obtains as the message key (in the example that would be the result of the call to Word.getWordText()) has the appropriate identity semantics. In all cases that means there needs to be a non-default equals() and hashCode() method. The reason for this is partially very obvious: a "unique" message key corresponds to an instance of a message processor so it's important to get the understanding of "unique" correct. The default Object behavior is not adequate. Think of Dempsy as using the key as if it were a key in a HashMap that contained all of the current message processor instances. The default implementation of Object.equals and Object.hashCode wouldn't work given multiple instantiations of the same Word.
But this is not all. Given that instances of a message processor are distributed across many nodes, the default routing behavior of Dempsy uses the hashCode() as a means of determining which node a particular message processor is running on. Therefore, while strictly speaking most Java applications would work (though very poorly) if, for example, the hashCode() method were implemented to simply return 1, this would cause ALL message processors to be instantiate on the same node of a cluster.
In the example, the MessageKey is a java.lang.String which has appropriate identity semantics. Note: The mesage key is not restricted to only String type but to any type that is hashable.
So where do Word messages come from and how do they get to Dempsy in order to be routed to the appropriate WordCount message processor? Dempsy provides an interface that needs to be implemented by the application developer in order to adapt sources of stream data to the Dempsy message bus. An Adaptor implementation:
- will be given a handle to the Dempsy message bus through an interface called a
Dispatcher. - will need to obtain data from an external source and use the
Dispatcherto send that data onto Dempsy
The API for an Adaptor is very simple so we will extend the Word Count example with the following class:
...
import net.dempsy.messages.Adaptor;
import net.dempsy.messages.Dispatcher;
public class WordAdaptor implements Adaptor {
private Dispatcher dempsy;
private AtomicBoolean running = new AtomicBoolean(false);
/**
* This method is called by the framework to provide a handle to the
* Dempsy message bus. It's called prior to start()
*/
@Override
public void setDispatcher(final Dispatcher dispatcher) {
this.dempsy = dispatcher;
}
@Override
public void start() {
// ... set up the source for the words.
running.set(true);
while (running.get()) {
// obtain data from an external source
final String wordString = getNextWordFromSoucre();
if (wordString == null)
running.set(false);
else {
try {
dempsy.dispatchAnnotated(new Word(wordString));
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e); // This will stop the flow of Words from this adaptor.
// Optimally you'd like to recover and keep going.
}
}
}
}
@Override
public void stop() {
running.set(false);
}
private String getNextWordFromSoucre() {
... // get the next word to put on the processing stream
}
}When the WordAdaptor is registered with Dempsy, it will be provided a handle to a Dispatcher. Then Adaptor.start() will be called. The application developer is responsible for creating Dempsy compliant messages (as described above, a message should be Serializable by whatever serialization technique is chosen (Kryo by default), and have a means of obtaining a MessageKey identified) using data from an external source.
Notice the lifecycle. The start() is called from the framework but it never exits. If it ever does exit, it will not be called again without restarting the node that the Adaptor was instantiated in. Note: It's very important that you manage this. You are allowed to exit the start() method whenever you want, either because the Adaptor is finished (if such a case exists) or because you decided to do the work in another thread (or many other threads) but Dempsy will not re-invoke the start() method.
Dempsy will invoke the stop() method to shut down the Adaptor when the node shuts down. Well behaved Adaptors must return from start() at this time, if they had not done so previously. Not doing so will hang the Vm on exit since, by default, the Adaptor is run in a non-daemon thread (though this is a configurable option for ill-behaved Adaptors).
The examples that follow will show how to do the configuration by hand which means the translation to any DI container should be obvious to the users of those containers. We will also include Spring examples.
At this point we should begin to have an understanding of what a Dempsy application is. It's a series of instances of message processors across a number of compute nodes, being routed messages based on their keys, and being supplied message data by Adaptors. The configuration of an application is simply a formalization of these specifics with specific infrastructure selections. To define (configure) the Word Count application we've been walking through, we need to simply lay out the specifics. Doing this programatically we would have:
import java.util.concurrent.ArrayBlockingQueue;
import net.dempsy.NodeManager;
import net.dempsy.cluster.ClusterInfoException;
import net.dempsy.cluster.local.LocalClusterSessionFactory;
import net.dempsy.config.Cluster;
import net.dempsy.config.Node;
import net.dempsy.lifecycle.annotation.MessageProcessor;
import net.dempsy.monitoring.dummy.DummyNodeStatsCollector;
import net.dempsy.transport.blockingqueue.BlockingQueueReceiver;
public class Main {
public static void main(final String[] args) throws ClusterInfoException {
@SuppressWarnings("resource")
final NodeManager nodeManager = new NodeManager() // 1
.node(new Node("word-count").setClusters(
new Cluster("adaptor")
.adaptor(new WordAdaptor()),
new Cluster("counter")
.mp(new MessageProcessor<WordCount>(new WordCount()))
.routing("net.dempsy.router.managed"))
.receiver(new BlockingQueueReceiver(new ArrayBlockingQueue<>(100000)))
.nodeStatsCollector(new DummyNodeStatsCollector())) // this will be defaulted in 0.9.1
.collaborator(new LocalClusterSessionFactory().createSession());
nodeManager.start();
System.out.println("Exiting Main");
}
}Notice what we are NOT doing here. We are NOT defining the topology of the Dempsy application. The order the C
lusters are listed in makes no difference. In this above case we've decided to run both our Adaptor and our message processor on the Node.
The above example has all "in process" infrastructure chosen. The LocalClusterSessionFactory and the BlockingQueueReceiver will not span processes. This can be remedied by selecting the ZookeeperSessionFactory and the NioReceiver instead.
Note also, once the distributed infrastructure is chosen (ZookeeperSessionFactory, NioReceiver) we can run the Adaptor on one set of nodes and the Word Processor on another. In that case you'd have 2 different Main classes (or if you're configuring with Spring, two different Spring contexts). One that starts the Node with just the Adaptor and one that started it with just the WordCount message processor. The topology is implicit in the business class definitions (Word, WordCount).
The application, called "word-count," consists of two clusters, "adaptor" and "mp," the first of which contains our Adaptor which, as we have seen, sources Word messages. This is followed by a message processor whose prototype is an instance of WordCount.
|
| Fig. 1 Message Processor Lifecycle |
Although messages coming from the WordAdaptor flow to the WordCount message processor, the order in the definition doesn't actually matter. Dempsy determines where messages are sent based on the type of the message and the type of object that the MessageHandler on the MessageProcessor takes. In the case of our example, when the WordAdaptor adaptor produces a message of type Word, Dempsy knows that message can be handled by the WordCount message processor because the method WordCount.countWord() (which is annotated with the @MessageHandler annotation) takes the type Word. If there are other message processors that also have handlers that take a Word the messages will be routed to the appropriate message processor within those clusters also.
What do we do with the ApplicationDefinition? That depends on the Dependency Injection framework you're using. If using either Spring or Guice you don't need to do much else to run your application. If you're using a different dependency injection container then you'll need to obtain a reference to the Dempsy object and give it the ApplicationDefinition, but this is a more advanced topic for a later section. Moving forward we will show you how the Spring implementation works.
The above application definition could be defined using Spring as follows:
<beans>
<bean class="com.nokia.dempsy.config.ApplicationDefinition">
<constructor-arg value="word-count" />
<property name="clusterDefinitions">
<list>
<bean class="com.nokia.dempsy.config.ClusterDefinition">
<constructor-arg value="adaptor"/>
<property name="adaptor">
<bean class="com.nokia.dempsy.example.userguide.wordcount.WordAdaptor" />
</property>
</bean>
<bean class="com.nokia.dempsy.config.ClusterDefinition">
<constructor-arg value="mp"/>
<property name="messageProcessorPrototype">
<bean class="com.nokia.dempsy.example.userguide.wordcount.WordCount"/>
</property>
</bean>
</list>
</property>
</bean>
</beans>By default there are two modes that you can run the Dempsy application in. It can be run all within a local Java VM. Or it can be run distributed on a set of machines. For the purposes of this tutorial we will demonstrate how to run it in a local Java VM. This would be easy to set up in an IDE like Eclipse.
There are several "main" implementations provided depending on what mode you're running a Dempsy application in, as well as which DI framework you're using. As a matter of fact the only place any particular DI container is assumed is in these supplied "main" applications so adding other currently unsupported DI containers is straightforward.
To run your application using the Spring container on the command line you would use:
java -Dapplication=WordCount.xml -cp [classpath] com.nokia.dempsy.spring.RunAppInVmYour classpath will need to contain all of the main Dempsy artifacts plus the Dempsy Spring library. See the section on the codebase structure for more information.
Next section: Terminology