CEP with Esper

Posted on Monday, April 19, 2010

2


In the last post we looked at the basics of Complex Event Processing (CEP). In this post we are going to give that a try using the community edition of Esper which is an open source event stream processing (ESP) and event correlation engine (CEP).

Let us assume a simple scenario. Say you want to go from Foster City to Redwood city on US 101. There are 2 meters installed. One at the point where you would hit US 101 from Foster City (A) and the other as soon as you would take the exit for Redwood City (B) on US 101. Now for the sake of simplicity let us assume that events are being triggered after every 5 seconds which let you know the average time that it takes for a car which is traveling from A to B. Now assume that you want to make this trip. You would like to take a quick peek on how the traffic is doing before you can commit a time to your date. Luckily, there is a hosted system on the web which lets find out the average time to complete the trip on the basis of data collected from the last 30 seconds. This is the sliding window for which we are interested to be as real as possible.

Esper provides a rich Event Processing Language (EPL) to express filtering, aggregation, and joins, possibly over sliding windows of multiple event streams.

picture courtesy: EsperTech

In our case let us see what a traffic event would look like


package org.inphina.cep;

public class TrafficEvent {
 private String roadName;
 private double timeSpentToCoverTheDistance;

 public TrafficEvent(String roadName, double timeSpentToCoverTheDistance) {
 this.timeSpentToCoverTheDistance = timeSpentToCoverTheDistance;
 }

 public String getRoadName() {
 return roadName;
 }

 public void setRoadName(String roadName) {
 this.roadName = roadName;
 }

 public double getTimeSpentToCoverTheDistance() {
 return timeSpentToCoverTheDistance;
 }

 public void setTimeSpentToCoverTheDistance(double timeSpentToCoverTheDistance) {
 this.timeSpentToCoverTheDistance = timeSpentToCoverTheDistance;
 }

}

Here, for now we are interested in the time taken to complete the trip from A to B, which is the property timeSpentToCoverTheDistance.

Also, since we are interested in a sliding window of 30 s hence we should be able to gather events for the last 30 s and do a computation on that. Hence, our statement looks like this


private static final String EVENT_QUERY = "select avg(timeSpentToCoverTheDistance) from org.inphina.cep.TrafficEvent.win:time(30 sec)";
 protected final Log logger = LogFactory.getLog(getClass());

 public static void main(String[] args) throws InterruptedException {
 EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
 EPStatement statement = epService.getEPAdministrator().createEPL(EVENT_QUERY);

This statement is a continuous query. All the event data is passed through this statement.

We have also added a listener which is invoked whenever there is a change in the resultset of the statement. The listener is either invoked when the resultset changes or when the data expires with the sliding window. The listener in our case looks at the average time taken to commute and lets the consumer know whether it is good to travel at this time or not.


public class TrafficEventListener implements UpdateListener {
 private static final int TRAFFIC_5SECONDS = 5;
 private static final int TRAFFIC_3SECONDS = 3;
 private static final String AVG_TRAFFIC_TIME_IS = "Avg traffic time is = ";
 private static final String HEAVY_TRAFFIC_MESSAGE = "Peak Traffic, Rather stay where you are! ";
 private static final String MODERATE_TRAFFIC_MESSAGE = "Slow Traffic, but still tolerable! ";
 private static final String EASY_TRAFFIC_MESSAGE = "You would fly! ";
 protected final Log logger = LogFactory.getLog(getClass());

 public void update(EventBean[] newEvents, EventBean[] oldEvents) {
 long timeStart = System.currentTimeMillis();
 logger.info("In the event handler " + timeStart/1000);

 EventBean event = newEvents[0];

 showTrafficStatus((Double)event.get("avg(timeSpentToCoverTheDistance)"));
 logger.info("avg=" + event.get("avg(timeSpentToCoverTheDistance)"));
 }

 private void showTrafficStatus(Double averageTime) {
 if (averageTime < TRAFFIC_3SECONDS){
 displayMessage(EASY_TRAFFIC_MESSAGE, averageTime);
 }else if (averageTime >= TRAFFIC_3SECONDS && averageTime <= TRAFFIC_5SECONDS){
 displayMessage(MODERATE_TRAFFIC_MESSAGE, averageTime);
 }else{
 displayMessage(HEAVY_TRAFFIC_MESSAGE, averageTime);
 }

 }

 private void displayMessage(String message, Double averageTime){
 logger.info(message + AVG_TRAFFIC_TIME_IS + averageTime);
 }

All right, Now we have the infrastructure ready. Let us fire events. The traffic events in our case are fired like this


private void generateTrafficEvents(EPServiceProvider epService) throws InterruptedException {
 long timeStart = System.currentTimeMillis();
 logger.info("Start " + timeStart / 1000);

 TrafficEvent event = new TrafficEvent("US 101", 1);
 epService.getEPRuntime().sendEvent(event);
 Thread.sleep(5000);

 TrafficEvent event1 = new TrafficEvent("US 101", 2);
 epService.getEPRuntime().sendEvent(event1);
 Thread.sleep(5000);// 5s

 TrafficEvent event2 = new TrafficEvent("US 101", 3);
 epService.getEPRuntime().sendEvent(event2);
 Thread.sleep(5000);// 10s

 TrafficEvent event3 = new TrafficEvent("US 101", 4);
 epService.getEPRuntime().sendEvent(event3);
 Thread.sleep(5000);// 15s

 TrafficEvent event4 = new TrafficEvent("US 101", 5);
 epService.getEPRuntime().sendEvent(event4);
 Thread.sleep(5000);// 20s

 TrafficEvent event5 = new TrafficEvent("US 101", 6);
 epService.getEPRuntime().sendEvent(event5);
 Thread.sleep(40000);// 25s

 TrafficEvent event6 = new TrafficEvent("US 101", 7);
 epService.getEPRuntime().sendEvent(event6);
 Thread.sleep(5000);// 30s

 TrafficEvent event7 = new TrafficEvent("US 101", 8);
 epService.getEPRuntime().sendEvent(event7);
 Thread.sleep(5000);// 35s

 TrafficEvent event8 = new TrafficEvent("US 101", 9);
 epService.getEPRuntime().sendEvent(event8);

 logger.info("thats all");
 }

For the purpose of demonstration, we are showing a message from the event listener whenever an event is triggered. We could customize it to show the message only when specifically someone logs into the application and wants to know whether “now” is a good time to travel or not. In our scenario this is what the logs look like on the basis of above events.


11:37:01,256  INFO Client:30 - Start 1271657221
11:37:01,265  INFO TrafficEventListener:20 - In the event handler 1271657221
11:37:01,265  INFO TrafficEventListener:40 - You would fly! Avg traffic time is = 1.0
11:37:01,266  INFO TrafficEventListener:25 - avg=1.0
11:37:06,267  INFO TrafficEventListener:20 - In the event handler 1271657226
11:37:06,267  INFO TrafficEventListener:40 - You would fly! Avg traffic time is = 1.5
11:37:06,268  INFO TrafficEventListener:25 - avg=1.5
11:37:11,270  INFO TrafficEventListener:20 - In the event handler 1271657231
11:37:11,271  INFO TrafficEventListener:40 - You would fly! Avg traffic time is = 2.0
11:37:11,271  INFO TrafficEventListener:25 - avg=2.0
11:37:16,272  INFO TrafficEventListener:20 - In the event handler 1271657236
11:37:16,273  INFO TrafficEventListener:40 - You would fly! Avg traffic time is = 2.5
11:37:16,274  INFO TrafficEventListener:25 - avg=2.5
11:37:21,275  INFO TrafficEventListener:20 - In the event handler 1271657241
11:37:21,276  INFO TrafficEventListener:40 - Slow Traffic, but still tolerable! Avg traffic time is = 3.0
11:37:21,276  INFO TrafficEventListener:25 - avg=3.0
11:37:26,277  INFO TrafficEventListener:20 - In the event handler 1271657246
11:37:26,278  INFO TrafficEventListener:40 - Slow Traffic, but still tolerable! Avg traffic time is = 3.5
11:37:26,279  INFO TrafficEventListener:25 - avg=3.5
11:37:31,226  INFO TrafficEventListener:20 - In the event handler 1271657251
11:37:31,227  INFO TrafficEventListener:40 - Slow Traffic, but still tolerable! Avg traffic time is = 4.0
11:37:31,235  INFO TrafficEventListener:25 - avg=4.0
11:37:36,224  INFO TrafficEventListener:20 - In the event handler 1271657256
11:37:36,225  INFO TrafficEventListener:40 - Slow Traffic, but still tolerable! Avg traffic time is = 4.5
11:37:36,225  INFO TrafficEventListener:25 - avg=4.5
11:37:41,224  INFO TrafficEventListener:20 - In the event handler 1271657261
11:37:41,225  INFO TrafficEventListener:40 - Slow Traffic, but still tolerable! Avg traffic time is = 5.0
11:37:41,225  INFO TrafficEventListener:25 - avg=5.0
11:37:46,224  INFO TrafficEventListener:20 - In the event handler 1271657266
11:37:46,225  INFO TrafficEventListener:40 - Peak Traffic, Rather stay where you are! Avg traffic time is = 5.5
11:37:46,225  INFO TrafficEventListener:25 - avg=5.5
11:37:51,224  INFO TrafficEventListener:20 - In the event handler 1271657271
11:37:51,225  INFO TrafficEventListener:40 - Peak Traffic, Rather stay where you are! Avg traffic time is = 6.0
11:37:51,225  INFO TrafficEventListener:25 - avg=6.0
11:38:06,281  INFO TrafficEventListener:20 - In the event handler 1271657286
11:38:06,281  INFO TrafficEventListener:40 - Peak Traffic, Rather stay where you are! Avg traffic time is = 7.0
11:38:06,282  INFO TrafficEventListener:25 - avg=7.0
11:38:11,283  INFO TrafficEventListener:20 - In the event handler 1271657291
11:38:11,284  INFO TrafficEventListener:40 - Peak Traffic, Rather stay where you are! Avg traffic time is = 7.5
11:38:11,285  INFO TrafficEventListener:25 - avg=7.5
11:38:16,286  INFO TrafficEventListener:20 - In the event handler 1271657296
11:38:16,287  INFO TrafficEventListener:40 - Peak Traffic, Rather stay where you are! Avg traffic time is = 8.0
11:38:16,288  INFO TrafficEventListener:25 - avg=8.0
11:38:16,289  INFO Client:67 - thats all

Again, the events could be triggered at non regular intervals from multiple places. Esper would make all the event data pass through the statement. This is inline with what we had mentioned in the last post about the fact that CEP is database inverted. Instead of running queries on data which is already there in the database, data is passed through the statements as and when it happens.


Thus, a CEP framework like Esper can be easily plugged into and be made to process large amounts of incoming events to give real time push for action. The benchmarks favor Esper as a performant CEP engine.

Esper exceeds over 500 000 event/s on a dual CPU 2GHz Intel based hardware, with engine latency below 3 microseconds average (below 10us with more than 99% predictability) on a VWAP (volume weighted average) benchmark with 1000 statements registered in the system - this tops at 70 Mbit/s at 85% CPU usage. Esper also demonstrates linear scalability from 100 000 to 500 000 event/s on this hardware, with consistent results across different statements.

Posted in: Architecture, Java