8. Complex events using Esper – Open Source SOA

Chapter 8. Complex events using Esper

The technologies we've covered so far have dealt with the topics of creating reusable services (SCA), and in turn, how they can be woven together to create complex business processes (BPM). In this chapter, we'll shift gears a bit and look at how we can tap into these services and orchestrations to provide deep insights into the operational aspects of your enterprise. This is accomplished through event stream processing (ESP), sometimes also known as complex event processing (CEP). According to the CEP Interest web site (a site devoted to covering this technology), CEP is defined as

...software technology that enables applications to monitor multiple streams of event data, analyze them in terms of key performance indicators that are expressed in event rules, and act upon opportunities and threats in real time, potentially by creating derived events, or forwarding raw events. [CEPInterest]

Simply put, CEP technology enables you to monitor the vital signs of your organization.

Monitoring business events is critical for most enterprises, especially given the focus on metrics and accountability. In our hyper-competitive and compliance-crazy environment, monitoring must be done in real time (or perhaps near-time). In yesteryear, companies would monitor their performance once every few weeks, or even once a quarter; a company adhering to such a philosophy today would have a short shelf life. As Prahalad and Krishnan point out, "Competitiveness favors those who spot new trends and act on them expeditiously," and the "new competitive landscape requires continuous analysis of data for insight" [Prahalad].

SOA's emphasis on the propagation of discrete, largely stand-alone services brings with it some attendant difficulties not present in more monolithic application environments. The main question is how you manage or monitor a highly distributed environment. In the monolithic world, the entire app usually runs on a single or clustered set of machines and monitoring is fairly straightforward. In the SOA world, an application may be using service components from a multitude of machines (perhaps many of them virtual). If any single component goes down, it could wreak havoc. In other words, in a SOA environment the possible points of failure are much greater (an often-overlooked downside to SOA).

ESP can play a significant role in risk mitigation through its ability to monitor, in real time, any deviations from the norm. Further, the operational insights provided by ESP can help organizations rapidly detect new opportunities or trends, thereby improving their competitive position.

In this chapter, we'll first review why events are important to the enterprise, and how they're constructed and consumed by an event stream processor. We'll then explore how to use Esper, the open source ESP solution selected for our Open SOA Platform. Finally, you'll see a framework for exposing the Esper engine through web services—an important requirement for integration within our SOA environment. Let's begin by examining the importance of ESP for monitoring the ongoing pulse of your enterprise.

Business events in the enterprise

The ability to derive instant insights into the operations of your enterprise is essential. Businesses must engage in continuous evolution to remain competitive. Events are an important ingredient in this process. How? Let's examine a few possible scenarios:

  • Purchasing patterns—Many companies, especially those in retail, must quickly identify changes in customer behavior. Buying trends, particularly those in the finicky youth and young adult markets, can change dramatically within a matter of weeks or even days. It's essential to identify any changes in purchasing habits quickly, as they drive product placement decisions, stocking requirements, and pricing. A sudden drop-off in a particular product line could also point to fulfillment issues or negative publicity arising from the dynamic communication channels now afforded by the web (blogs, emails, social networking sites, etc.).

  • Compliance—Many companies, particularly those in the public realm, must abide by onerous compliance regulations. Some, like the Sarbanes-Oxley Act of 2002, touch nearly all public companies, but other domain-specific regulations—such as liquidity requirements for financial firms or adverse action reporting for pharmaceutical companies—are also common. Failure to abide and demonstrate ongoing controls can result in significant fines, or possibly even criminal prosecution.

  • Fraud detection—The press is replete with stories of fraud and loss of data containing sensitive personal information (SPI). Although some of these incidents couldn't be detected by ESP (such as theft of backup tapes), in other scenarios comprehensive use of ESP could have more quickly identified possible breaches [Choicepoint]. Unusual activity that falls outside normal business patterns can help identify possible fraudulent behavior (you've probably encountered such controls firsthand if you've ever started buying unusual items on your credit card from a remote location only to find your account frozen until a call to your provider is placed).

These brief examples only scratch the surface of what's possible using ESP, and undoubtedly your organization has many other event-related activities that require close supervision. Before we progress further, let's dissect what constitutes events and ESP.

Understanding events

Up until now, we've discussed events at an abstract level within the context of the business or enterprise. Let's step back for a moment and consider, at its most basic level, what constitutes an event. Most agree, an event is really just a fact of something occurring. An event object, then, is a record of the activity expressed in a manner that allows it to be digested by an event processor. An event processor is an application that performs operations on event objects, including creating, reading, transforming, aggregating, or removing them. Finally, an event stream is a sequence of event objects, which typically arrive in the order in which they were generated.

One of the main responsibilities of an event processor is to analyze incoming event streams, discard events that are of no importance, and flag the relevant ones. Figure 8.1 depicts how applications or systems generate outbound events using objects or containers to represent the event data. Each event object is then streamed to the processor, which performs a variety of functions on the inbound stream.

Historically, two major impediments exist to embracing the "Power of Events" (to quote the namesake book written by David Luckham [Luckham2002]):

  • The publishing of events

  • The consumption of a vast array of event data

Unless you've coded for it up front, creating events throughout the workflow of your application can present a major refactoring effort. Often, event publishing is an afterthought, and thus difficult to bolt on after the initial development (since it touches so much of the code).

Figure 8.1. The relationship between event objects, streams, and the processor<br></br> 

Assuming that you do begin publishing events, another challenge quickly comes to mind: how do you manage the processing and consumption of such events? Even a modestly sized enterprise may find that it's producing thousands of event messages per minute. Storing them all as they're received in a database isn't prudent, both because of the storage demands and the attendant CPU cycles necessary to process the information after it's arrived. An ESP engine can address the latter, but the former challenge of producing the events remains.

Fortunately, embracing SOA, and its notion of discrete services, does make the task of event production much more tenable. Further, adoption of BPM can dramatically advance this goal. How, you might ask? Let's consider a jBPM business process (the topic of chapters 5, 6, and 7). As you may recall, a jBPM process consists of nodes and transitions, with nodes representing states where actions usually occur (a task, a callout to another system, etc.). Events can be generated as transitions occur from one node to another (indeed, there's the concept of events built into the fabric of jBPM). More importantly, events can indicate when a new process instance has been initiated, suspended, or completed. Extending jBPM to automatically generate such events is trivial, and I'll explain how that can be done.

Before we dive headfirst into ESP, let's briefly discuss its role with two related technologies: business activity monitoring (BAM) and Event-Driven Architecture (EDA).

BAM and ESP—what's the difference?

There is no doubt some confusion as to what are essentially two complementary technologies. BAM is often considered, in broad terms, to encompass all aspects of monitoring, from data collection, to transformation/analysis, to presentation. I prefer a more narrow interpretation, where BAM is thought of as the presentation layer but data collection is the purview of systems such as ESP. In other words, ESP is a delivery channel for BAM. Why this distinction? In part, as with web development, it's prudent to isolate or partition the various tiers involved in the monitoring process. Event publishing, collection, rule processing and interpretation, and presentation should each be considered separate tiers that are only loosely bound. This allows changes, for instance, to be made to the presentation layer without impacting the collection and interpretation layers. Indeed, a multitude of presentations may be necessary, from iPhone or Blackberry, to conventional web clients, to RSS feeds. Thus we refer to BAM, moving forward, as limited in scope to the display and presentation of information collected and analyzed via ESP.

Now let's conclude our theoretical discussion by briefly examining ESP's role within the context of an EDA. Some believe that CEP requires EDA, but as we'll see, this need not be the case.

Event-Driven Architecture and SOA

According to Wikipedia, EDA is defined as "a software architecture pattern promoting the production, detection, consumption of, and reaction to events" [WIKI]. EDA certainly shares a lot with CEP but is much broader in scope. In an EDA environment, communications between services are conducted asynchronously. Services are defined as being event producers, consumers, or both. The result is a loosely coupled system or environment. Some have suggested EDA is a competing architecture to SOA, but really it's just a particular manifestation of SOA. Within SOA, asynchronous forms of communications consistent with EDA are encouraged but, practically speaking, not always possible.

How does CEP fit with EDA? Events can be used exclusively in CEP for monitoring and analytics, quite apart from whether your services communicate exclusively using an asynchronous, event-driven fashion.

Now that we've developed a clear understanding of ESP—what it is, what it's used for, and its place in SOA—let's move into implementation using Esper, the open source ESP solution that's our ESP selection of choice for the Open SOA Platform.

What is Esper?

Esper is the first, and to my knowledge, the only open source event stream processing application available. Fortunately, hubris hasn't taken hold, and it continues to evolve at a rapid clip and offers exceptional functionality for such a fairly young product. The Esper project was founded by Thomas Bernhardt while he was consulting for a large financial institution. Asked to evaluate rule engines to be used in conjunction with monitoring a trading system, he discovered that what was needed was a high-performance event correlation engine—and Esper was born. The basic components of Esper are shown in figure 8.2.

Figure 8.2. Core Esper components<br></br> 

Think of the EPServiceProvider as the engine instance, by which statements, events, and outputs (listeners or subscribers) are registered. You configure the engine programmatically or through an external XML file. Using the JDBC adapter, you can connect with an external database and cull information from it (with some restrictions, as we'll discuss later).

Recently, Esper graduated to a 3.0 release. This release produced several new features, and improved the performance and reliability of the system. While EsperTech was founded by Bernhardt to commercialize a business based around Esper, the core product remains open source under a favorable GNU license. Esper also now has a .NET version (NEsper for .NET) that parallels the feature set in the Java release. EsperTech monetizes Esper by offering support and an enterprise high-availability version of Esper called EsperHA (we don't cover that product in this book).

Unlike many open source products, Esper has superb documentation. Therefore, we'll just focus on using Esper in the context of our Open SOA Platform. This means that we'll only lightly cover all of the extensive features of Esper and show you how Esper can be used in tandem with our other Open SOA Platform products. Since we've only covered jBPM and SCA, we'll focus on integrating with those products, and in future chapters extend the discussion to include how Esper can be used in tandem with enterprise decision management using Drools.

Let's first look at integrating Esper with jBPM, which will provide real-time monitoring of business processes built using jBPM. You'll find that this greatly enhances the value proposition of jBPM.

As you recall, in chapter 7, we discussed the jBPM logging capabilities, focusing on the built-in capabilities jBPM provides for logging virtually all activities that pertain to the execution of a given process instance. This includes obvious things such as when the process started and stopped, but also includes abilities such as

  • Logging the transitions that occur between the nodes

  • Assigning process instance variables and their values

  • Obtaining action and actor-related information

...in other words, basically everything. I'll show you how to create your own custom logger to automatically transmit these events to an exposed Esper web service. No specific coding within the business process itself will be required, as the event generation will take place in a completely transparent fashion (see figure 8.3).

Figure 8.3. Emitting events from jBPM custom logger to Esper SCA service<br></br> 

In figure 8.3, you can see that jBPM emits the events from the jBPM custom logger. They are then sent to an Esper service running in an SCA container, which exposes Esper as an externally accessible service (through any of the available SCA bindings such as JMS or SOAP). In the SCA container, various Esper-related components are wired together to provide the underlying functionality. This includes the subscription objects used to receive the event notifications published by the Esper engine, which are based on the registered query statements. These notification and subscription objects, in turn, can publish their findings to a BAM solution, for example.

We'll use this case study throughout the chapter. In the sample code for this chapter, you'll see how the custom jBPM logger was implemented, but for now, it's sufficient to show you the event object on which we'll base many of our examples (I have kept it simple to keep the focus on Esper). Listing 8.1 shows the ProcessEvent event object.

Example 8.1. ProcessEvent event data transfer object

The ProcessEvent class in listing 8.1 can be used to capture when a jBPM process instance is initiated, suspended, or ended, as determined by the state property (1=Started, 2=Ended, 3=Suspended). As you can see, it's just a standard Java class, with no Esper-specific libraries. The case study we'll build on will use Esper to monitor a hypothetical process to capture metrics such as unusual delays in the time it takes to complete the process, the number of recently completed process instances, and the average number of running processes in a given time window. Let's begin by looking at the basics for setting up Esper.

Getting started with Esper

Esper is a lightweight application. The project is housed at Codehaus (http://esper.codehaus.org/), and the 3.0 version, released in February 2009, provides the basis for the examples we'll cover. The total size of the download is a testimony to its lightweight nature—it's just around 15 MB. When installed, the core Esper JAR files reside in the main project directory, with a handful of additional third-party JARs located in the esper/lib subdirectory.

Note

An Ant target within the book's source code will automatically download the proper Esper libraries, so there is no need to separately install Esper to run the examples. See the README.txt file associated with this chapter's source.

Esper itself doesn't come with any application server, and instead can be considered more of an engine that can be embedded in other solutions. There's also no administrative interface and no built-in provisions for accessing Esper via web services (something we'll address in section 8.7).

There are four main aspects to setting up Esper to receive inbound events:

  • Creating event objects

  • Defining and registering query statements

  • Specifying listeners or subscriber objects to receive Esper results

  • Defining configuration options

Let's take a fairly high-level peek into each of these steps before proceeding to specific implementation details in section 8.5.

What are event objects?

In listing 8.1, we provided a simple illustration of a Java event object that can be used as a container for sending events to Esper. One caveat is that event classes have to adhere to standard JavaBean-style getter methods for accessing class member variables (we'll discuss some alternatives in a moment). In fact, it's through reflection of the public methods that Esper determines how to interface with the object (the member variables in the Java class may be private for this reason). The Esper event properties represent the member variables or fields within the ProcessEvent class, such as processName and processInstanceId, which are exposed using JavaBean standard getters. As an alternative to the JavaBean style classes, you can also optionally use java.util.Map or org.w3c.dom.Node objects, although we won't provide illustrations of these methods in this chapter (see the Esper reference documentation for more details).

Support also exists for creating far more complicated properties than illustrated in listing 8.1. This includes using nested properties that contain references to other objects, or mapped event properties, using a key value lookup This map, which refers to a getter method with a key value lookup such as property('key'), isn't to be confused with Java Map objects, although they can be implemented using Java Map objects, as we'll illustrate in a moment.

Let's assume that we want to enrich the ProcessEvent object with a list of jBPM process variables and their associated values. You may recall that standard jBPM variables support many standard Java types such as String, Boolean, Double, and Integer. Since Esper interrogates the class methods to determine how to work with a given property, returning a generic Object type wouldn't be as convenient for Esper to work with as type-specific properties would. For example, since jBPM variables can be of any of those types, using getProperty('propertyName') would have to return an Object, and then we'd have to cast it within our Esper statements to the proper type. Instead, we can create separate methods for each of the property types. These methods would manage the casting. The following snippet shows how this can be implemented:

// member variable properties defined
HashMap<String, Object> properties;
...
public String getStringProperty(String key) {
  return (String) this.properties.get(key);
}
public Long getLongProperty(String key) {
  return (Long) this.properties.get(key);
}

This will make more sense as we move into further examining the Esper Processing Language (EPL), which begins in earnest in section 8.5. Let's first develop an understanding of how you create and register EPL statements.

Defining and registering query statements

EPL is an SQL-like language used for querying the inbound event streams. The founder of Esper likes to state that using EPL is almost like working with a database turned upside-down—instead of using the query language to search against existing records in a database, with Esper you register the queries within the Esper engine, and inbound streams of data are then applied against the defined queries [Bernhardt].

Note

While we'll explore the language in some detail in section 8.5, a comprehensive look is beyond the scope of this book. In addition, I'll assume you have some familiarity with SQL, which, like XML, seems to be a fairly ubiquitous skill for most Java developers and architects.

At a general level, you use EPL to define patterns on which to analyze the incoming streams. Unlike SQL, however, some unique time- and aggregation-specific extensions are available to help you perform queries such as, "Return the average total price of all orders received within the last 30 minutes continuously updated as orders appear" or "Return a list of orders where a shipment confirmation by the shipping vendor has not been provided within 12 hours." You can get specific in the types of notifications you'd like to receive. When matching results are found, they can be published to a registered listener, to a subscriber class, or "pulled" from the Esper engine, as we discuss next.

Specifying listeners or subscribers

In conventional database programming, the initiation of a query is performed on demand, or in a pull-style fashion, triggered by some application rules or logic. In Esper, the model is quite different: data is streamed in a continuous fashion. In this respect, it has more characteristics of conventional GUI-style development, where listeners are defined for the various window widgets, such as a button. When an action is detected, the associated listener will perform some function. In Esper, a listener can be defined in a similar fashion and will receive notification when an event of interest arrives. In Esper, the addListener or removeListener method of the EPStatement object is used to associate a listener with a specific EPL statement. The results are then delivered to an Esper EventBean, which contains methods for accessing the details of the notification.

As an alternative approach, a subscriber object can be used. It works in a somewhat similar fashion: using the EPStatement.setSubscriber method, you specify a class with an update method whose signature matches the results expected. For example, if your query returned a count, an update method with a long type parameter signature would be used. In this chapter's examples, this will be the style we use primarily. Why? One advantage to the subscriber approach is that, because you're specifically creating an update method with a signature that matches the query output, it's more efficient than using the listener-style approach, which must marshal the results into the more generic EventBean object. Because of this performance benefit, the recommended approach by the Esper team is to use subscribers. The only drawback to using subscribers is that only a single one can be registered per statement, whereas multiple listeners can be set up.

Esper also provides what it refers to as a "pull-API." This method enables you to retrieve query results in an on-demand fashion rather than having the results pushed to a listener or subscriber. Using this approach, an iterator is returned that allows you to scroll through any queued results. This solution may be appealing in circumstances where you only infrequently need to receive the information returned by Esper (both thread- and non-thread-safe methods are available).

Configuration options

In general, Esper requires few (technically none) configuration options. As a matter of practice, you'll find that, at a minimum, you'll want to use the event alias feature to simplify your EPL statements. For example, if you have an event object class of com.mycompany.esper.MyEventClass, by setting up an alias you can avoid having to specify the full package name when referring to the object in EPL statements. For example, instead of

select * from com.mycompany.esper.MyEventClass

you can use the shortcut

select * from MyEvent

The alias can be defined either programmatically, in the form of

Configuration configuration = new Configuration();
configuration.addEventTypeAlias("MyEvent",
  "com.mycompany.esper.MyEventClass");

or, you can define the alias in an XML file as

<esper-configuration
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns="http://www.espertech.com/schema/esper">
  <event-type alias="MyEvent" class="com.mycompany.esper.MyEventClass" >
</esper-configuration>

You can load the XML file using the Configuration.configure method, which accepts a URL, File, or String parameter. A variety of other configuration options pertain to implementing caching behavior, declaring variables automatically, and configuring JDBC settings for pulling data from a remote database. We'll delve deeper into several of these as we work through our examples.

Now let's drill down into the details of the Esper Processing Language (EPL). Using EPL, you define the rules for how to process the inbound streams, thus constituting the most important aspect in using the product.

EPL basics

I've briefly alluded to the fact that the EPL language shares many of the same constructs as the SQL used in popular relational databases like Oracle or MySQL. The decision to pattern EPL on SQL has obviously helped lessen the learning curve for users of Esper. The beauty of this approach is even more obvious when you consider that you can then mix native EPL statements with normal SQL via JDBC. This means you can pull information from event stream objects, which can be just standard JavaBean-compliant classes (with a few exceptions). You can also retrieve information directly from a relational database. Rather than attempt to cover all of the language constructs, we'll instead focus on the most commonly used statements and highlight areas where EPL differs significantly from standard SQL. We'll begin by looking at the EPL constructs related to querying.

Querying events

The select clause in Esper closely mirrors that of standard SQL. However, instead of specifying tables in which to pull data, Esper event objects are used. Esper will determine, via reflection (or a similar means for Map or XML-based events), the properties or "columns" that can be derived. In listing 8.1 we presented a simple JavaBean, ProcessEvent, that will be used to capture process instance events from jBPM. As you recall, it had four properties: processName, processVersion, processInstanceId, and state (we subsequently added some properties, which we'll leave off for now to keep things simple). As with standard SQL, you can specify a wildcard character (*) in lieu of providing a list of all of the event properties. So the following two phrases are equivalent:

a) select s.processName, s.processVersion,
    s.processInstanceId, s.state
   from ProcessEvent as s
   // comment example
b) select * from ProcessEvent

Notice in the first example we created an alias for the ProcessEvent object in the from clause as s, and subsequently prefixed the properties with that alias (the as could have been dropped as well, as is customary with many SQL dialects). The alias becomes necessary when joining two or more tables that share the same property name in order to avoid column ambiguity. Comments follow normal Java coding conventions when used inline with EPL statements.

One interesting deviation from standard SQL is EPL's ability to filter event streams in the from clause of the SQL statement. For example, the state property for ProcessEvent has a value of 1 to indicate a new process instance was created, or 2 to indicate it has ended. If we're just interested in start process instance events, we could use this:

select * from ProcessEvent(start=1)

Indexed or mapped properties can be referenced as well, using a dot notation or key lookup. For instance, in section 8.4.1 we discussed introducing a properties variable to capture the process variables associated with a given jBPM process instance. I recommended using specific JavaBean methods such as getStringProperty since different property types are possible. So let's assume a String property of customerType is used within a jBPM process to indicate whether a customer is premium or regular. The following EPL select would filter only those events where customerType is premium:

select * from ProcessEvent(stringProperty('customerType') = 'premium')

You can also combine multiple and/or type statements by including those operators (i.e., stringProperty('customerType') = 'premium' and state=2). In addition, you can also use filter ranges, such as

select *
  from ProcessEvent(doubleProperty('totalPrice') in [1000:5000])

This code would capture events where the jBPM process variable totalPrice is between 1000 and 5000 (complete examples are provided in the JUnit Esper tests that accompany this chapter). Using filters is just a shortcut for adding a conventional where clause to your EPL statement. For example, the following two are equivalent:

a) select * from ProcessEvent(start=1)
// is equivalent to
b) select * from ProcessEvent where start = 1

Since we've been moving through these examples at a fairly brisk pace, let's create a complete example so that you get an idea of what's involved in using Esper. In this example, we'll use a subscriber object to receive the result output from Esper, and demonstrate the functionality using a simple JUnit test case. The steps involved in this example are

  1. Create the event object.

  2. Create a statement registration and subscription receiver class.

  3. Create a JUnit test class.

We've already demonstrated an event object class called ProcessEvent (shown in listing 8.1), which represents jBPM create, suspend, and end process instance events. We'll use that for this example, so we can move on to discussing the remaining two steps.

CREATING A STATEMENT REGISTRATION AND SUBSCRIPTION RECEIVER CLASS

To keep things simple, we'll create a single class for registering our EPL statements used by the Esper engine. We'll use an inner class to capture the subscription events that result from any matching hits. Listing 8.2 contains the code for this class, called ProcessStartEnd (the code is included in the source code accompanying the book).

Example 8.2. ProcessStartEnd statement registration and subscriber class

The EPServiceProvider class

The next step is to create the JUnit test that will invoke the register method of ProcessStartEnd and populate and simulate some inbound events.

CREATING A JUNIT TEST CLASS

Our JUnit test class (listing 8.3) will instantiate an instance of ProcessStartEnd, invoke its register method to set up the query statement, and then publish some events to the Esper engine to simulate those arriving from a jBPM instance.

Example 8.3. JUnit test case which simulates inbound events

When run, the JUnit class will publish a total of six events to the Esper engine

*** New Event Arrived ***
   processName: SalesOrder
   processInstanceId: 2

*** New Event Arrived ***
processName: PurchaseOrder
 processInstanceId: 3

Confirming that these notification results were fired requires a bit more effort, as the Esper runtime itself (EPRuntime) doesn't provide such metrics. Retrieving a direct handle to the subscriber class is also not easily possible from within the JUnit test class (it would require some significant refactoring). One straightforward way of achieving this is to create an Esper variable and use it to capture the count of the results produced by Esper. This is the topic of our next section.

This exercise demonstrated how to register EPL statements in Esper, how to set up a subscriber class (in our example, an inner class) to receive output for events that match the EPL criteria, and how to initiate the Esper engine and test using JUnit.

Note

Esper supports many of the standard query-related SQL keywords and features such as group by, having, order by, and subqueries. You can see the official reference documentation for more thorough coverage of these topics.

Using variables

An Esper variable is a single runtime value that can be referenced using the EPRuntime. Why would you consider using one? Well, one reason is for what we have cited already—we want to capture some information returned from a subscriber or listener objects to the EPRuntime to make it accessible to other classes. Perhaps a more common use is that a variable itself can be referenced directly in an EPL statement. Using this approach gives you greater runtime flexibility without requiring any code changes. We'll illustrate both uses.

VARIABLE EXAMPLE FOR A RUNNING COUNT OF SUBSCRIBER CALLS

You can create variables in one of four ways:

  • Using EPL statements such as

    create variable string varname = <somevalue or null>
  • Using an on...set clause such as

    on ProcessEvent set varname = <somevalue>
  • Through the API using EPStatementObjectModel's setCreateVariable

  • Through configuration using the Configuration.addVariable method

Picking up from the example in the previous section, we'll use the Configuration method for creating our counter variable, which we'll then use to enhance our JUnit test created in listing 8.3.

To create the counter variable and increment it for each call made, a handle to the EPServiceProvider is necessary from within the subscriber inner class, ExampleSelect (listing 8.2). This is because we're using EPServiceProvider to retrieve the EPAdministrator by which the Configuration.addVariable method can be called. Thus, we modified the inner class to include a constructor that receives an EPService instance (you may recall the JUnit test class instantiated this upon startup, and passed it to the ExampleSelect's parent object, ProcessStartEnd). Listing 8.4 shows the updated inner class from the original ProcessStartEnd shown in listing 8.2.

Example 8.4. Updated inner class with update that increments variable counter

The ExampleSelect constructor receives the EPServiceProvider

ExampleSelect exampleSelect = this.new ExampleSelect(epService);

We can now enhance the JUnit test class (listing 8.3) by using the counter variable

assertEquals("Counter should be 3",
  epService.getEPRuntime().getVariableValue("counter"), 3);

In this brief example, we demonstrated how to create a variable, set it, and then use the value within a JUnit test to verify the anticipated result was achieved. This greatly improved the preciseness of the JUnit test we first created in listing 8.3. Let's now look at how a variable can be used within EPL statements.

USING A VARIABLE IN EPL STATEMENT(S)

In this example, instead of hard-coding the ProcessEvent(state=2) as we did in the example shown in listing 8.2, we'll reference a variable as the expression value. The variable created will be called EVENT_STARTED; we're using uppercase to follow the naming convention typically used in Java for static final (i.e., constant) fields. The following is a modified setUp method from the JUnit test where the Esper configuration values are defined. It now includes the Configuration.addVariable method to create this new variable:

public void setUp() {
 Configuration configuration = new Configuration();
 configuration.addEventTypeAlias("ProcessEvent",
   ProcessEvent.class.getName());
 configuration.addVariable("EVENT_STARTED", Integer.class, 1);
 epService = EPServiceProviderManager.getProvider("EsperTest",
   configuration);
 statements.register(epService);
}

The addVariable statement highlighted creates the EVENT_STARTED variable, identifies it as of type Integer, and assigns it a value of 1. Using the variable with an EPL statement is just a matter of referencing it:

select * from ProcessEvent where state=EVENT_ENDED

As you can see, using variables is a straightforward process and provides flexibility you may find beneficial in your Esper usage. We've demonstrated how variables can be used to store and retrieve values within the Esper engine context, and also how they can be used directly in EPL statements to provide for more dynamic statement definitions.

Up until now, we've created and registered some simple EPL statements that apply filtering to inbound events. You might be thinking, "This is great, but can I do the same thing using my own Java logic?" It's true—we haven't yet touched on the correlation and analytical capabilities that are essential for an ESP engine (such as the ability to detect unusual event patterns that have occurred within a certain period of time). This is where the true power of Esper is apparent, and the ability is provided through what Esper calls views.

Understanding views

Views represent one of the most powerful, and most used, features of Esper. Typically, they take the form of a time window, which essentially is a time interval that extends into the past. A view can also be non-time related, and instead be tied to the last number of events generated. The concept can best be illustrated by some examples. Let's first consider the scenario in which you're capturing jBPM events from a sales order process and you want to calculate the average price for the last × number of orders placed. In this case, a length window would be used, resulting in a query such as this:

select avg(doubleProperty('totalPrice'))
  from ProcessEvent(state=2 and processName='SalesOrder').win:length(3)
  output snapshot every 3 seconds

In this example, the average price is computed based on the last three orders placed. Views are defined using a specific namespace, which prefaces the view function requested (that is, win:length(3)). Notice also the use of the output snapshot every 3 seconds clause. This allows you to stabilize the rate at which results are returned, thus streaming them back in a continuous fashion. In this case, an output result is generated every 3 seconds. In the absence of the output clause, the results would only be returned when a new qualifying event arrives (if no qualifying event arrives, no output would be generated).

Let's consider another example. We want to retrieve the total number of orders placed within the last 10 seconds. This query can be defined as

private static final String EXAMPLE_VIEW2 = "select size " +
  "from ProcessEvent(state=2 and processName='SalesOrder') " +
.win:time(10 sec).std:size() output snapshot every 3 seconds";

Notice how we combine different view types through chaining. In this case, win:time(10 sec).std:size() is chained together to indicate that we want a count of all events within the last 10 seconds. Quite a number of view functions are available, so I encourage you to consult the reference documentation for more details. This ability to provide analytics based on snapshots in time or on the volume of events is a powerful feature.

One final thought before we move on to named windows: you can use an anonymous inner class as well for specifying the subscriber class. This makes your code a bit more concise at the cost of reusability. Here's an example of this approach, which uses an anonymous inner class for the subscriber of the EXAMPLE_VIEW2 statement we discussed a moment ago:

eps = epService.getEPAdministrator().createEPL(EXAMPLE_VIEW2);
Object exampleView2 = new Object() {
  public void update(Long avgPrice) {
    System.out.println("Average orders for last 2 seconds: " + avgPrice);
  }
};
eps.setSubscriber(exampleView2);

A generic Java object is used as the basis for the inner class, and specifies the single update method whose parameters must match the select criteria of the EPL statement it's associated with. You can also specify an updateRStream method to capture events being removed from a stream, as well as start and end methods to capture the beginning and ending of an event delivery.

When working with select statements as we did in these view examples, the generated output is consumed by a listener or subscriber object. However, there will be times when you want to use the output to act as another input stream, or aggregate results for consumption by other queries. This can be accomplished by using the insert clause to create an altogether new stream, or populate the results into a holding area (what Esper refers to as a named window).

Imagine, for example, that you have many different jBPM processes that are firing off events to Esper. Maybe a group of the jBPM processes pertain to HR-related activities, such as a new hire or termination process. Ideally, you'd like to keep a running total of all HR-related process instance activity within a certain period of time, in addition to the normal event processing you're performing. As events arrive for each process instance event, a summary entry could be temporarily stored for use by other select statements. This is an example of where inserts—our next topic—come in handy.

Creating new event streams with named windows

EPL has an insert into clause that's analogous to SQL, with the exception that in Esper, you're actually creating a new derived event stream and not inserting data into a relational database table. Building on our sales order jBPM process example, let's assume we want to create a new derived stream that contains orders from only premium customers (obviously, this is a bit contrived, as we could easily modify any select statement to include any where filter). The designation of a premium or regular customer will arrive in the ProcessEvent event as a jBPM property called custType. Given that, we can create our new event stream by using an EPL such as

insert into PremiumOrders
 select * from ProcessEvent(state=2 and
 processName='SalesOrder' and stringProperty('custType') = 'premium')

This creates the new stream PremiumOrders, which is derived using the same event object structure as ProcessEvent, because the wildcard (*) was used for the column definition. This new stream can now be treated like any other event stream, and queried upon using statements such as select * from PremiumOrders.

Why would you want to dynamically create new streams using this approach? One possible reason is that it can help simplify creating downstream SQL statements, as you saw when we created PremiumOrders (that is, it doesn't have all the baggage associated with the filter clauses). For instance, if our jBPM engine has a multitude of different business processes running in it and they're all generating ProcessEvents (listing 8.1), creating the right filters can result in rather long EPL statements. Instead, carving those event streams into more specific ones tailored to the process in which you're interested can help avoid mistakes arising from highly complex EPL statements. Decomposing complex things into more manageable pieces is always beneficial (at least when there's no penalty for doing so).

One of the more intriguing capabilities introduced in the 2.0 release of Esper is the concept of named windows. Essentially, a named window represents a shared view on a stream that you can insert, delete, or query against. The process for creating it is straightforward: you use a create window clause and specify the column and property structure to be associated with it. When creating a named window, you use view functions to manage the retention policy of the events inserted into the window (with some notable exceptions, which we'll discuss in a moment). Then you can issue inserts against the named window or delete events contained within it. You can also query against it (the primary reason for creating it, after all). So, to recap, you first define your named window using the create window clause, and then use insert clauses to populate it with data.

To demonstrate the capabilities of named windows, let's create one that captures orders over $100. You may recall that retrieving the price of an order was somewhat tedious, as the value was stored in a jBPM process instance property. When creating this window, we'll simplify the query so that the named window contains only the columns we're interested in sharing. Later, we'll create a statement for querying against this window. The first step is to create the named window:

create window HighPricedOrders.win:time(1 hour) as
  select processName, processInstanceId, doubleProperty('totalPrice') as
  price from ProcessEvent

This statement creates the named window called HighPricedOrders, as well as three property columns: processName, processInstanceId, and price (an alias was used to create this property). The view specification win:time(1 hour) instructs the window to actively remove any events that are older than one hour. The ability for multiple statements to then access the events stored in this window is what distinguishes a named window from a normal event stream.

Note

These examples can all be run using the JUnit test cases provided with the source code for this chapter.

Now that the named window has been created, you can add events to it with an insert statement. In our scenario, we use this:

insert into HighPricedOrders(processName, processInstanceId, price)
  select processName, processInstanceId, doubleProperty('totalPrice')
  from ProcessEvent(state=2 and processName='SalesOrder' and
   doubleProperty('totalPrice') > 100)

As you can see, this statement will insert events when a process instance is completed (state=2). The statement has a conditional that matches only events where the process name is SalesOrder and where the totalPrice process variable is greater than 100. Notice that we specified the specific columns in which to insert the data—this is consistent with SQL standards. Any matching events will be preserved in the named window for a one-hour period. Lastly, we can query against the named window just as we would against any standard event stream:

select processName, processInstanceId, price from HighPricedOrders

Since this is a named window, we can also create multiple queries against it. For instance, to get an overall average order amount, we could add this query:

select avg(price) from HighPricedOrders

This query will generate a notification event with the average price returned every time a new event is entered into the HighPricedOrders named window. Using named windows, you can create buckets of events that can be used by multiple other event streams. These buckets will retain their event entries until they fall outside the view's retention criteria. In your own organization, can you envision scenarios where this capability would be useful?

As a final note on this topic, you can optionally create your named window to never expire any events using the view win:keepall(). You should then periodically purge the events through an on event or on pattern statement that includes a delete from clause. We'll discuss the use of on pattern statements in the next section.

Advanced Esper

The capabilities of Esper we've discussed so far give you a good taste of using the product and how it can be used in conjunction with products like jBPM to analyze events that are emitted from services such as BPM. Up to this point, however, we've kept things simple, and we've used only basic SQL-style constructs for creating EPL statements. Although we've covered concepts such as views and named windows, we haven't used advanced correlation capabilities, such as how to analyze whether event B was first followed by event A, or how to detect any unexpected latency between receiving those events. By the time you've finished reading this section, you'll know the answers to such questions and be fully prepared to roll out Esper in your enterprise. You'll also learn how to retrieve data from a relational database via JDBC and create custom functions to add new EPL extensions. Let's begin by looking at EPL functions.

Extending with functions

We've already used one of the built-in EPL functions, avg, which returns the average of the values provided in the expression enclosed within parentheses. There are several such built-in functions, with many similar to those available in SQL, such as sum, count, min, and max. There are also several evaluation-based functions, such as case, cast, and coalesce. Rather than cover all of these, many of which you may be already familiar with (or can learn more about in the reference documentation), I'll cover some that are unique to Esper as it relates to ESP—namely, prev, instanceof, and Java static methods (standard Java libraries or user-defined functions).

THE PREV FUNCTION

The previous function (prev) returns the value of a previous event. It accepts two parameters:

  • The nth previous event in the order defined in the time window

  • The property name or column whose value you're evaluating

At first, it may have been unclear why or how to use this function. One of the most interesting ways of using prev is in conjunction with a sorted window. The sorted window is one of the available view functions (see an overview in section 8.5.3). The sort function will preserve a specified number of events (parameter 3), sorted by a specific property (parameter 1), in ascending or descending order (parameter 2). The event window will therefore not exceed the size specified as the third parameter. By using the prev function with a sorted window, you can selectively return, for example, the last two highest-priced orders out of the previous ten for the ProcessEvents associated with a SalesOrder process (continuing our jBPM sales order example). We could achieve this using the EPL

select max(doubleProperty('totalPrice')) as price1,
prev(1,doubleProperty('totalPrice')) as price2
  from ProcessEvent(state=2 and processName='SalesOrder')
  .ext:sort(doubleProperty('totalPrice'), true, 10)

The first column property value, assigned an alias of price1, uses the max function to return the maximum totalPrice for the up to ten events that may exist in this view (the ext:sort window view specification, as its third parameter, specifies the maximum of events preserved at any given time). The second column property, given an alias of price2, uses the prev function to return the next highest price value (which is the first previous event to the max function). Table 8.1 shows a series of orders as they arrive, and the corresponding max and prev function values given the preceding EPL.

Table 8.1. Example of the prev function<br></br> 

ProcessEvent OrderPrice

Property price1 value

Property price2 value

$125.12

$125.12

Null

$50.45

$125.12

$50.45

$1200.87

$1200.87

$125.12

$73.34

$1200.87

$125.12

$250.23

$1200.87

$250.23

$12.05

$1200.87

$250.23

As you can see, the price1 property, which represents the max value, always produces the highest sale price. In addition, the prev function works as anticipated for the price2 property and returns the next highest sale price. Next, let's look at the EPL's instanceOf function.

THE INSTANCEOF FUNCTION

Much like the Java method of a similar name, the instanceof function evaluates a given property (parameter #1) and returns a boolean value to indicate whether the property type belongs to the Java class provided (parameter #2). This function is often used with a case or cast function. In section 8.3, we discussed managing jBPM process variables passed as part of ProcessEvent by creating special type-specific methods such as getStringProperty(String key). Instead of creating these getter methods for each of the possible jBPM variable types, we could create a single method called getProperty(Object obj) that's used in the EPL, and then use the instanceof function with a cast to publish the data to the appropriate subscriber object method. Let me illustrate. First, the ProcessEvent class (Listing 8.1), which is the event object, was modified to include a generic getProperty method:

public Object getProperty(String key) {
  return (Object) this.properties.get(key);
}

Then, in the ProcessStartEnd class (Listing 8.2), which is used to define and invoke the EPL statements and their associated subscribe objects, we add the following EPL definition:

private static final String EXAMPLE_FUNC4 =
  "select case " +
  "when instanceof(property('totalPrice'), java.lang.Double) " +
    "then cast(property('totalPrice'), double) " +
  "end " +
  "from ProcessEvent(state=2 and processName='SalesOrder')";

In this example, we're using the instanceof function to evaluate the object returned from the ProcessEvent.getProperty method call (property('totalPrice')) to determine whether it's of type Double. This, in turn, is wrapped within a case statement, so that if it evaluates to true, the cast function expression is performed. In this case, the cast simply converts the Double to a primitive double type. The sole property returned from this EPL is the total price returned as a double. So a new inner class used to subscribe to this EPL is defined as

EPStatement eps = epService.getEPAdministrator().createEPL(EXAMPLE_FUNC4);
Object exampleView4 = new Object() {
  public void update(double price) {
    System.out.println("Example 4 - Price is: " + price);
  }
};
eps.setSubscriber(exampleView4);

When a matching event occurs, the inner class should then print out the message found in the update method of that class. Notice that the update method accepts a double; we'd cast the output to that type in the EPL statement. The ability to use the instanceof function to cast column property types can be useful in cases where Esper is receiving generic event objects. Such might be the case where you have set up a web service to receive the inbound events, and for simplicity or compatibility reasons, you make all of the inbound event properties simple String values.

In addition to using the 20 or so single-row or aggregate functions, you can use any Java class that has a public static method. Let's explore further this capability.

USER-DEFINED FUNCTIONS

The ability to use any Java class that has a public static method as a user-defined function (UDF) opens up all sort of possibilities. Immediately coming to mind are functions that perform transformations or lookups, but you're obviously not limited to these areas. Continuing the example from the previous section, say we want to round the total price amount associated with ProcessEvent. Since the price arrives as a Java Double, the java.lang.Math round function can be used as a UDF, since it's a static method that accepts as a parameter a Double. Here's a simple EPL illustration of this in practice:

select Math.round(doubleProperty('totalPrice'))
  from ProcessEvent(state=2 and processName='SalesOrder')

As you can see, the full package specification wasn't necessary, as Esper automatically imports the following: java.lang.*, java.math.*, java.text.*, java.util.*.

Let's see how easy it is to create our own UDF. Assume for this example that we're receiving in our ProcessEvent event object a jBPM parameter value that identifies the customer who placed an order. The jBPM process variable is called custId. We can then create a UDF that simulates a lookup that will take the custId integer as its single parameter and return the actual customer name (in this case, Acme Corporation). Here's our simple lookup class:

As you can see, the class has one declared public static method, lookupCustomer. Using this UDF in an EPL is equally straightforward:

select integerProperty('custId'),
 opensoa851.esper.jbpm.functions.HelperFunctions.lookupCustomer
 (integerProperty('custId')) from ProcessEvent(state=2 and
   processName='SalesOrder')

As you can see, having to specify the full package is a bit tedious. You can avoid having to do this by automatically importing the package using the Esper configuration. For example, if using an XML configuration, you could use this:

<auto-import import-name="opensoa851.esper.jbpm.functions.*"/>

Or you could use the Configuration class (you may recall this class from section 8.4.4) via the API:

Configuration configuration = new Configuration();
configuration.addImport("opensoa851.esper.jbpm.functions.*");

Once imported, you can then remove the package from your EPL statement:

select integerProperty('custId'),
HelperFunctions.lookupCustomer(integerProperty('custId')) from
 ProcessEvent(state=2 and processName='SalesOrder')

Note

I stated earlier that, when using standard Java static methods that belong to java.lang.Math and similar classes, these are automatically imported. While this is true, when you do specifically import your own classes, it apparently overrides this standard behavior, and you must then import them when needed as well. I presume this is a bug in the 2.0 release.

As you can see, incorporating your own custom functions in your EPL is straightforward and unlocks a world of possibilities.

In the beginning of this section, we alluded to Esper's advanced correlation capabilities, which can be used to analyze the sequence and time of incoming event streams and determine whether they arrive in the anticipated order and duration. I'm referring to a type of advanced view known as patterns, which we explore next.

Applying event patterns

Event patterns, as the name suggests, allow you to define, via expressions, various matching rules that can be applied to incoming events. For example, you can use what are referred to as temporal operators that compare inbound events to see whether they arrive in the anticipated order. Patterns enable you to have very fine-grained control over how events are evaluated. Interestingly, patterns can be used in an EPL select or in a stand-alone fashion, whereby a listener can be registered to receive the event output.

In its most basic form, a pattern can be defined as just a single event. Consider this EPL, which creates a simple pattern definition specifying only jBPM process events that have ended (state=2) and whose process name is equal to SalesOrder:

String EXAMPLE_PATTERN1 = "process=ProcessEvent(state=2 and
  processName='SalesOrder')";

What distinguishes this from a normal EPL select is that this pattern will fire only once, regardless of how many inbound events match the specified filter criteria. Also, when registering a pattern, we use the createPattern method of EPAdministrator instead of createEPL. This is illustrated in the following fragment, which also uses an anonymous inner class that implements the UpdateListener interface (that is, we're using the listener instead of a subscriber class, unlike previous examples):

EPStatement  eps =
 epService.getEPAdministrator().createPattern(EXAMPLE_PATTERN1);
eps.addListener(new UpdateListener () {
 public void update(EventBean[] newEvents, EventBean[] oldEvents) {
   System.out.println("Listener for Single Pattern: " +
    ((ProcessEvent)newEvents[0].get("process")).getProcessInstanceId());
   }
  });
}

When run, if one or more qualifying events are matched, a single println message will be displayed to the console. Since a listener was used in this example, a tag called process was used to assign the pattern output (this was defined when the EPL assigned EXAMPLE_PATTERN1 was created). In turn, this tag was used to extract the event from the EventBean using its get method, which accepts as its parameter a String value representing the tag name. The every operation loosely mimics the standard EPL select statement, and is used to manage repetition. The previous EPL statement, now modified to include every, will produce an output for each qualifying event:

every(process=ProcessEvent(state=2, processName='SalesOrder'))

Notice here that the and was removed from the filter expression and replaced by a comma— this is a convenience shortcut. The every clause causes the start of a new pattern expression and listens for new events that match the filter. One of the most useful aspects of this is that you can create an expression that checks for the occurrence and order of multiple events, which may be of different event object types. For example, if event B is always supposed to follow event A, you can define a pattern such as every (A -> B) that will fire once B arrives in order after A. In addition, you can devise fairly elaborate combinations when you use the every statement in conjunction with pattern timers. In our example, we could use it to flag any occurrences where the instance end event wasn't completed within a certain period of time in relation to when the process was started. Such a check would undoubtedly be useful for managing compliance with service-level agreements or in proactive detection of problems.

Before we delve into an example of this capability, let's first take a closer look at what Esper calls pattern guards. A pattern guard is a where condition applied through a custom function. Esper comes with three timer-based guards: timer:within, timer:interval, and timer:at (as with regular functions, you can also create your own). Let's look at the three timer-based guards provided by Esper in more detail.

TIMER:WITHIN GUARD

The timer:within guard is similar to a stopwatch in function. If the event associated with the timer fires within the timer period provided, true is returned, and false otherwise. What's an example of where this might be useful? Building on the scenario we've used so far, consider a situation where, as part of a fraud-detection initiative, you want to flag any orders placed by the same customer that arrive within a few minutes of each other. Perhaps such an occurrence would be unusual, and indicate that someone has possibly compromised the customer account. Using our ProcessEvent event object, the following EPL pattern will determine whether consecutive orders were placed by the same customer within 5 seconds of each other (I chose 5 seconds since it's more convenient for testing):

(
   every
(oldOrder=ProcessEvent(state=1, processName='SalesOrder'))
->

   every
    (newOrder=ProcessEvent(state=1, processName='SalesOrder',
     newOrder.integerProperty('custId')=
     oldOrder.integerProperty('custId')))
   where timer:within(5 sec)
)

Using the parentheses is critical for determining how the clause is evaluated. In pseudo-code, the pattern states, "For every new order placed, check every subsequent order with the same custId that occurs within 5 seconds of the previous order." Figure 8.4 demonstrates a scenario of how you'd apply this.

Figure 8.4. Example of timer:within guard rule being applied<br></br> 

You can craft many possible combinations of this type of time-based temporal matching, but exercise care to ensure the proper precedence of your phrases.

TIMER:INTERVAL GUARD

The timer:interval function will wait the specified period of time and then return true. While at first glance this function may appear to have limited utility, it can be useful for helping identify time gaps outside a given boundary. How would this be beneficial? One example related to our jBPM scenario would be to use this ability to identify any process instance that wasn't completed within a certain period of time. For example, continuing with the SalesOrder business process, here's how you could identify any instances that hadn't completed within 25 seconds:

every ord=ProcessEvent(state=1, processName='SalesOrder')
->
timer:interval(25 sec) and not
   ProcessEvent(state=2, processInstanceId = ord.processInstanceId))

In this case, the timer:interval clause will trigger an observer at 25 seconds, and if there's no corresponding ProcessEvent that has ended (state=2) with the matching processInstanceId, then an event notification will be triggered. The ability to identify when an event hasn't occurred is in many cases more important than whether an event has occurred. This capability has wide-ranging implications in helping organizations ensure that activities have occurred within a prescribed period.

TIMER:AT GUARD

The timer:at function resembles the functionality of the Unix/Linux crontab command. Using it, you can set up specific times for when the statement execution should be invoked. This could be used, for instance, to periodically purge events that have accumulated in a named window (section 8.5.4), or to periodically pull data from a relational database using the JDBC remote connectivity feature, described next.

Using JDBC for remote connectivity

Esper has the ability to query a remote database via JDBC and enables regular SQL statements to be embedded into EPL statements. This can be useful for gleaning reference or historical data that may be present based on some key value in the inbound event. While configuration for simple remote connectivity is straightforward, there are numerous, optional configuration settings that go beyond the scope of this chapter; find out more by checking out the official documentation. Many of these settings pertain to establishing the remote connection or cache settings.

We'll demonstrate a simple example that builds on section 8.6.1's discussion of UDFs. You may recall that we used a UDF to return a customer name based on a custId value that was passed to the function—it was a lookup-style routine. Let's see how this can be done using a relational database join, which is probably a more intuitive method for achieving this result. There are two main steps in accomplishing this:

  1. Adding a data source reference in Esper's configuration

  2. Adding the embedded SQL into your EPL statements

ADDING A DATASOURCE REFERENCE TO THE ESPER CONFIGURATION

Since you are connecting with a remote JDBC database, you must tell Esper how to connect to it. Esper provides a few options for doing this, which are documented in the official documentation. I'll demonstrate one approach using java.sql.DriverManager. Given the number of possible property settings, I suggest using the Esper XML-based configuration instead of the Java API approach. In the sample code for this section, I've shown how to configure a connection to an in-memory Hyperthreaded Structured Query Language Database (HSQLDB) instance. The Esper XML configuration looks like this:

The @name attribute of the database-reference element defines the alias assigned when referencing this connection from within EPL. The child elements define connectivity parameters and various lifecycle and cache settings. In this case, our in-memory database uses a single table called CUSTOMER, which is created with the following line during the JUnit test code initialization:

create table CUSTOMER  (CUST_ID int, CUST_NAME varchar(30));

It's then populated with some initial sample records used for purposes of the test. Once the connection configuration is complete, the next step is to use it in an EPL statement.

USING SQL CALLS IN EPL

To combine an event stream with data from a relational source, you must specify the sql keyword followed by the database alias you defined, and then include the SQL phase within double brackets. In our example, let's assume we want to look up the customer name based on a custId that was provided. We could do this using

select integerProperty('custId') as custId, CUST_NAME
  from ProcessEvent(state=1 and processName='SalesOrder'),
  sql:mydb[\" select CUST_NAME from CUSTOMER where
          CUST_ID=${integerProperty('custId') \"]

In most respects, this code works like a normal join statement, with the exception that, rather than specifying an event object in which to join, you use the Esper sql keyword clause. Notice that in the sql clause you can reference properties from the joined event stream using the ${<variable>} notation—in this case, we're resolving the value returned by integerProperty('custId'). The select statement then outputs the custId derived from the ProcessEvent event stream along with the joined CUST_NAME value that came from the HSQLDB CUSTOMER table. Obviously, this capability is useful for performing reference lookups.

Other interesting possibilities exist when using the SQL integration. Support for stored procedures is also available, so you can use it in conjunction with an EPL pattern to automatically pull data periodically from a relational database. The stored procedure would identify all new records that have accumulated since the previous call and return them as output, while flagging them so as to not be included in subsequent calls. The output of the EPL with the SQL join can be inserted into a new event stream or into a named window. In this fashion, you're using an existing relational database as an event publisher (albeit not real time, but probably sufficient for many situations).

The next topic we'll focus on is how to service enable Esper using SOAP-based web services.

Service enabling Esper

Earlier in this chapter I pointed out that Esper is more akin to an ESP "engine" as opposed to a full-fledged end-to-end solution. In a way this is a blessing, because the lightweight nature of it allows you to easily embed the solution into a variety of scenarios. Further, Esper's main attraction is its correlation engine and rule-processing language—peripheral enhancements may detract from this focus. Given that, the user community clamored for a more straightforward way for publishing to the Esper engine, so the folks behind Esper created a companion product called EsperIO that was released along with the 1.4 version. EsperIO provides a prebuilt configuration for receiving events via an input adapter to JMS along with the ability to optionally publish results to JMS using an output adapter.

If you're a pure Java show and don't anticipate receiving events from non-Java systems or applications, this could be an attractive and easy-to-configure option. For enterprise users, the Java-only nature of JMS is likely going to be an impediment (granted, there are some bridges available from .NET to JMS, for example). A more platform- and protocol-neutral approach based on SOAP-based web services would probably be more appealing. Fortunately, as you saw in chapters 3 and 4, SCA-based Apache Tuscany enables you to readily expose multilanguage components through a variety of protocols, including SOAP and JMS. As it turns out, exposing Esper through SCA is easy to do, and once you accomplish that, you'll have a reusable framework for supporting any multitude of inbound events. The steps involved are

  1. Creating a framework and components

  2. Creating the Esper service and session manager

  3. Developing the SCA composition

  4. Testing the web service using the soapUI testing tool

Creating a framework and components

Given the diversity of events that Esper will likely need to consume in most environments, it's obvious that attempting to define a single event schema or canonical representation would be challenging. More importantly, trying to generalize the events strips them of their ability to be easily self-describing. Thus, the approach I recommend is to recognize that each domain may have its own set of event objects (and related schemas). We've already been using this approach in our examples, where our ProcessEvent event object is tailored to jBPM process instances. Continuing with this approach, let's devise a framework that allows a single instance of Esper to be used and exposed through SOAP-based web services that are semantically tailored as needed. An overview of the framework can be seen in figure 8.5.

The Esper web service framework 

Figure 8.5. The Esper web service framework<br></br> 

The framework is simple in design. When you determine that a new event type needs to be sent to Esper, you create a specific StatementSubscriber. This implementation class must implement the register method, which takes as its only parameter EPServiceProvider. The register method is then responsible for registering the EPL statements specific to that domain and for creating the appropriate subscriber objects. We've already seen an example of this class with the ProcessStartEnd object we displayed in listing 8.2 (the only change to this class is to implement StatementSubscriber).

Note

The framework I'm describing is found in its entirety in the source code available for this section.

The StatementManagerImpl class is where we register individual StatementSubscriber implementations. Listing 8.5 contains an example captured from the sample code for this section (the imports and package definition has been omitted for brevity).

Example 8.5. StatementManagerImpl.java, which registers statement subscribers

StatementManagerImpl.java, which registers statement subscribers

The purpose of this class is to register all of the EPL statements and associated subscriber objects into the Esper engine. In this case, there's a single registration object called ProcessStartEnd

StatementManagerImpl.java, which registers statement subscribers

A nice enhancement would be to add JMX tooling to the class so that the registration process could be done on-demand, whereas currently, as you'll see, a restart of the Esper SCA container is necessary. The class currently is invoked only once upon Esper SCA startup.

Esper service and session manager

The SessionManagerImpl class, which can be viewed in the sample code, is responsible for calling the initializeStatements and register methods of StatementManagerImpl from listing 8.5. The SessionManagerImpl retains a handle to the currently running instance of Esper by way of EPServiceProvider. This is accomplished by using SCA's conversational features (see chapter 4 for a refresher). In other words, the SessionManagerImpl is responsible for keeping the Esper engine session active.

The main service class, which exposes Esper through SOAP, is called EsperManagerImpl. The public methods defined by its interface, EsperManager, identify the service operation being exposed by SCA. In this case, we're using SOAP, and the interface will be used to autogenerate the WSDL (of course, you can manually create your WSDL as well, as discussed in chapters 3 and 4). Here's an interface class signature:

@Remotable
public interface EsperManager {
  public void sendProcessStartEndEvent(ProcessEvent event);
}

As you can see, the sendProcessStartEndEvent method, by way of its ProcessEvent parameter, is tailored specifically for the event object it's consuming. Each new domain event sent to Esper would require a new service method to be created.

The last main development effort is to create the SCA composite file. This is really a onetime exercise given the nature of this framework.

SCA composite file

The SCA composite file, as discussed in chapter 3, is used to define the components and identify the services that are being made available. Listing 8.6 shows the composite file used for our Esper services.

Example 8.6. esper.composite SCA composite file

The composite file is fairly minimal in scope. A single service is defined

The SCA container is started by way of a Java main class called EsperManagerMain:

public class EsperManagerMain {

  public static void main(String[] args) {
    EsperManagerMain server = new EsperManagerMain();
    server.run();
  }
  public void run() {
    System.out.println("Running");
    SCADomain scaDomain = SCADomain.newInstance("esper.composite");
  }
}

The class simply instantiates a new SCADomain using the SCA composite file shown in listing 8.6 and, when running, will be listening for incoming service events.

We covered quite a lot here, so let me recap what the steps are to add a new domain event to this framework:

  1. Create a new domain event object. This will be used as the container for the inbound event. We demonstrated this earlier when we created the ProcessEvent class (listing 8.1).

  2. Create a new implementation of StatementSubscriber. This class serves two purposes: (a) using it to register EPL statements associated with the new event you're setting up, and (b) creating an anonymous inner class to receive the output generated by the EPL statements as they process the inbound events. The EPL statements you create will reference the event object(s) created in step 1.

  3. Add the class created in step 2 to the StatementManagerImpl object's initializeStatements method. This will enable the class to be properly registered so that it can process the inbound events.

  4. Add a new service method in the EsperManager and its implementation class EsperManagerImpl. This will expose the method as a SOAP operation.

  5. Restart SCA.

Obviously, there are improvement opportunities available for this framework. We cited some previously, such as instrumenting the classes for JMX management, but other ideas might include better declarative support using Spring so that Java code changes aren't as necessary (that is, building a plug-in model). Esper can now be integrated using web services. Next, we'll explore how you can test this web service using soapUI.

Testing with soapUI

Once Tuscany is running, the service can be testing using soapUI (a soapUI project is included in this section's sample code). With soapUI, you start a new project by reading in an existing WSDL. It will then interrogate that WSDL and create a sample request you can edit. For example, you could enter the following:

<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"
 xmlns:esp="http://esper.opensoa"
 xmlns:xsd="http://event.jbpm.esper.opensoa/xsd">
  <soapenv:Header/>
  <soapenv:Body>
    <esp:sendProcessStartEndEvent>
      <esp:param0>
        <xsd:processInstanceId>1</xsd:processInstanceId>
        <xsd:processName>SalesOrder</xsd:processName>
        <xsd:processVersion>1</xsd:processVersion>
        <xsd:props>
          <xsd:key>totalPrice</xsd:key>
          <xsd:value>55.43</xsd:value>
        </xsd:props>
        <xsd:state>2</xsd:state>
      </esp:param0>
    </esp:sendProcessStartEndEvent>
  </soapenv:Body>
</soapenv:Envelope>

When you click the submit button, the request will then be sent to the Esper service we instantiated previously. You can use soapUI to test a variety of requests, and even develop test suites and cases with the tool (some features are only available in the Enterprise edition, however). It's a useful tool for testing. As a convenience, the sample code also includes an SCA client called JBPMClientMain that you can use for submitting test requests.

Summary

This chapter introduced the principles behind complex event processing and its important role within SOA. A SOA environment, because of its distributed nature, presents some unique challenges insofar as management and monitoring. ESP addresses those concerns by evaluating and analysis events, in real time, that can be generated automatically by SOA services and BPM processes. The ability to detect subtle (or not-so-subtle) changes in the operational, day-to-day workings of your organization is becoming both a regulatory and a competitive requirement. ESP, working in conjunction with BAM for presentation and dissemination, provides powerful operational insights.

Until recently, there were no open source ESP solutions available. Fortunately, that changed with Esper. Esper is an ESP engine that provides comprehensive event correlation, aggregation, and analysis capabilities. Using an SQL-like syntax called EPL, a user can create queries, views, and patterns by which inbound events are evaluated.

Although Esper doesn't offer a web services front-end to its engine, using Apache Tuscany and SCA, we introduced a framework for how this can be accomplished. The combination of SCA, BPM through jBPM, and Esper creates a compelling SOA solution. In the next chapter, we'll turn to Apache Synapse, which is a lightweight ESB that can augment the capabilities we've described so far.