-
Notifications
You must be signed in to change notification settings - Fork 20
Output
As mentioned in the Message Processor Lifecycle section, it is possible to schedule calls on the MessageProcessor instances so that they don't have to be completely message driven. We do this by marking which methods are to be called using the @Output annotation. The schedule for @Output calls is configurable as part of the ClusterDefinition.
This is best illustrated by way of expanding the Simple Example. Suppose, in our example, we wanted to periodically output the word counts. We would add the method:
...
@Output
public void outputResults()
{
System.out.println(myWord.getWordText() + ":" + count);
}
...The above example assumes we've saved off the Word message that was first passed to us in the variable myword. Now we need to actually schedule the output. This is done in the application configuration. For example, the above Spring based application definition would be extended as so:
...
<bean class="com.nokia.dempsy.config.ClusterDefinition">
<constructor-arg value="mp"/>
<property name="messageProcessorPrototype">
<bean class="com.nokia.dempsy.example.userguide.wordcount.WordCount"/>
</property>
<property name="outputExecuter">
<bean class="com.nokia.dempsy.output.RelativeOutputSchedule">
<constructor-arg index="0" type="long" value="30"/>
<constructor-arg index="1" type="java.lang.String" value="SECONDS" />
</bean>
</property>
</bean>
...Notice the cluster definition for the MessageProcessor WordCount now contains an outputExecuter that will cause every WordCount MessageProcessor instance to have the method WordCount.outputResults() method to be called. In the example it simply writes to the standard output the word and it's count to that point.
Up to now, in this example, the pipeline is two stages. But the first stage (the Adaptor) only provides the data. There's only one stage of processing. Dempsy is meant to be used in a pipeline of message processor clusters. Let's extend our example to another stage in the processing pipeline. What if we want to rank words we're counting based on usage? We will define a new message processor prototype that receives CountedWords and keeps the top 10.
Dempsy will forward any response to a MessageHandler or Output invocation. So first we must update the existing output method on the message processor as so:
@Output
public CountedWord outputResults()
{
return new CountedWord(myword,count);
}Now when output is invoked the CountedWord will be returned rather than simply printed out. Dempsy will forward that message on as long as it's a valid Dempsy message (again, a valid dempsy message is one that is Serializable and has a means of getting a valid message key using a method annotated with @MessageKey). For this example we could have:
public class CountedWord implements Serializable
{
private static final long serialVersionUID = 1L;
private long count;
private String wordText;
public CountedWord(Word message, long count)
{
wordText = message.getWordText();
this.count = count;
}
public long getCount() { return count; }
public String getWordText() { return wordText; }
@MessageKey
public Integer getKey() { return 1; }
public String toString() { return wordText + ":" + count; }
}A few things to notice. First, the MessageKey is always the same. The message design is part of the application and in this case the next stage of message processors will include only a single instance, by design. This will be the case since there is only a single message key ever created from a CountedWord message - an Integer with the value of '1'.
Also note, Dempsy error messages are meant to be extensive. It is the philosophy of the Dempsy creators that error messages should point exactly to the problem. Messages that don't indicate exactly what's wrong are useless for developers. In keeping with that spirit Dempsy will use the toString on objects supplied to it in error messages. For those messages to be as clear as possible you really should provide a non-default toString implementations for all messages, message processor classes, and adaptor implementations. The above example keeps with that rule-of-thumb.
Now, what will CountedWords be routed to? A WordRank message processor, of course. For example:
@MessageProcessor
public class WordRank implements Cloneable
{
private Comparator<CountedWord> comparator = new Comparator<CountedWord>()
{
@Override
public int compare(CountedWord o1, CountedWord o2)
{
long o1c = o1.getCount();
long o2c = o2.getCount();
return o1c < o2c ? -1 : (o1c > o2c ? 1 : 0);
}
};
private TreeSet<CountedWord> topTen = new TreeSet<CountedWord>(comparator);
@MessageHandler
public void handleCount(CountedWord countedWord)
{
topTen.add(countedWord);
if (topTen.size() > 100)
trim();
}
@Output
public void outputResults()
{
trim();
for (Iterator<CountedWord> iter = topTen.descendingIterator(); iter.hasNext();)
{
CountedWord cur = iter.next();
System.out.println(cur.getWordText() + ":" + cur.getCount());
}
}
public Object clone() throws CloneNotSupportedException
{
return (WordRank)super.clone();
}
private void trim()
{
TreeSet<CountedWord> newTopTen = new TreeSet<CountedWord>(comparator);
Iterator<CountedWord> iter = topTen.descendingIterator();
for (int i=0; i < 10; i++)
newTopTen.add(iter.next());
topTen = newTopTen;
}
}Important things to note here:
- The
WordRankmessage processor doesn't do any synchronization. There's no concern, for example, that while thetopTenTreeMapis being manipulated in theoutputResults()method it might change in thehandleCount()method. Since Dempsy invokes both methods it will not invoke them concurrently. - The
handleCount()method is optimized to run as quickly as possible. If the message processor is busy when another message comes in then the message will be lost so message handling needs to be kept short. In this case we only trim the set periodically. Also note thatoutputResultsis also kept short because@Outputmethods will cause message loss if they are busy when messages arrive for the message handler. - The
WordRankincludes an output so we can periodically view the top ten.
Of course. Our application is now expanded so we need to expand out ApplicationDefinition by adding a new ClusterDefinition. Again, the order of the ClusterDefinitions within the ApplicationDefinition doesn't matter.
...
<bean class="com.nokia.dempsy.config.ClusterDefinition">
<constructor-arg value="word-rank"/>
<property name="messageProcessorPrototype">
<bean class="com.nokia.dempsy.example.userguide.wordcount.WordRank"/>
</property>
<property name="outputExecuter">
<bean class="com.nokia.dempsy.output.RelativeOutputSchedule">
<constructor-arg index="0" type="long" value="30"/>
<constructor-arg index="1" type="java.lang.String" value="SECONDS" />
</bean>
</property>
...Out of the box, Dempsy supports the ability to use the above illustrated RelativeOutputSchedule, but also allows you to use a CronOutputSchedule. With a CronOutputSchedule you can use the clock to determine when to initiate an @Output invocation pass. As an example, the WordRank message configured to execute could be done as follows:
...
<property name="outputExecuter">
<bean class="com.nokia.dempsy.output.CronOutputSchedule">
<constructor-arg value="0 0/5 * * * ?"/>
</bean>
</property>
...Dempsy's CronOutputScheduler uses the Quarts Scheduler. Documentation on the format for the constructor can be found here. The format is similar (if not identical) to the standard Unix cron format except the columns start with seconds rather than minutes.
By default, an output pass happens in a single thread. Though that thread is independent of the message processing, sometimes a single thread isn't enough. If the output pass does a lot of I/O it may help to increase the number of threads used for calling the @Output methods. Both the RelativeOutputSchedule and the CronOutputSchedule provide for the ability to set the number of threads that are dedicated to the output pass. In either case you set the concurrency property on the Scheduler. For example:
...
<property name="outputExecuter">
<bean class="com.nokia.dempsy.output.CronOutputSchedule">
<constructor-arg value="0 0/5 * * * ?"/>
<property name="concurrency" value="8"/>
</bean>
</property>
...Next section: Pre-Instantiation