Crossing the (data) streams: scalable realtime processing with rules
Pipe-and-filter architectures are among the most successful design patterns ever. They dominate data ingestion and processing today, and give 1970's hackers yet another chance to remind us how they thought of everything years ago.
Unfortunately modern variations of this can run into an ugly problem: what are the semantics of a "join" operation between multiple infinite streams of data? Popular systems like Storm point out this ambiguity, as does Samza. Both provide primitives to support correlation of events between streams, but the higher-level semantics of a join are punted to the user.
This is fine for such systems providing infrastructure, but is a troublesome model of our applications: if we can't define clear and compelling semantics for a key part of our processing model, we might be using the wrong model. Projects like Storm offer an excellent infrastructure, but this ambiguity implies that many problems could be solved with a higher-level abstraction.
The challenge with user-defined join semantics is it comes with baggage: maintaining state, structuring it, and recovering state after failure scenarios are challenging problems. It also makes the behavior of the system harder to understand. Since each join can have slightly different behavior, we need to look closely to see what's going on. A better approach is needed. A set of simple yet flexible join operators would be ideal -- so how do we get there?
We need a data processing model with well-defined, declarative joins while supporting rich application logic. There are lots of options here, but here I'll focus on one: suppose we could make rules as a control structure scale linearly across a cluster, letting the rules engine distribute join operations. Let's look at an experiment of making Clara, a Clojure-based rules engine, distribute its working memory and processing across a Storm cluster, with all of the scalability and processing guarantees of the underlying system.
First, each condition in a rule can be evaluated independently, so incoming data can be spread across an arbitrary number of processes and match rule conditions appropriately.
Second, aggregations over matched facts follow a map-and-reduce style pattern -- where the map and partial reductions of aggregated facts can be done in parallel across machines.
Finally, "joins" of aggregations or individual facts are always hash-based. So joins can be efficiently achieved by sending matched facts to the same node via their hash values.
The result is our Storm topology looks something like this:
Each temperature reading has a logical timestamp, so since our logic is interested in the "newest" reading, we use a Clara accumulator that selects the item with the newest timestamp: ``` clojure (def newest-temp (acc/max :timestamp :returns-fact true)) ``` We then use it in a rule that processes all our readings for a location and preserves the newest: ``` clojure (defrule get-current-temperature "Get the current temperature at a location by simply looking at the newest reading." [?current-temp <- newest-temp :from [TemperatureReading (== ?location location)]] => (insert! (->CurrentTemperature (:value ?current-temp) ?location))) ``` Note that accumulators preserve minimal state and apply changes incrementally. In this case we keep only the current temperature based on timestamp; lower values are simply discarded, so we can deal with an infinite stream. Also, this example keeps the maximum, but we could easily accumulate some other value, such as a time-weighted histogram of temperatures to we're robust to outliers. Any fact that doesn't match a rule is simply discarded, incurring no further cost.
Now that we have the current temperature for each location, we want to back off our devices in those locations if a threshold is exceeded. We can write this as a simple rule as well: ``` clojure (defrule reduce-device-speed "Reduce the speed of all devices in a location that has a high temperature." [CurrentTemperature (> value high-threshold) (= ?location-id location)] ;; Find all Device records in the location, and bind them to the ?device variable. [?device <- Device (= ?location-id location)] => (reduce-speed! ?device)) ``` This first condition matches current temperatures that exceed the threshold, and binds it the ?location-id variable. The second condition finds all devices with a matching location, and binds them to the ?device variable. This is then visible on the right-hand side of the rule, where we can take action.
This is effectively performing a join between temperatures that exceeded a threshold at a given location and devices in that same location. When running over Storm, this rule wish hash Device and CurrentTemperature facts and send them to the same processing using a hash value. This is done using Storm's group-by field functionality over a data stream that connects each bolt instance together.
All state for the join operations are managed internally by Clara's engine. Accumulators like the example here compute in a rolling fashion, merging new data together, retracting previously accumulated values, and inserting new ones. Combined with rule engine-style truth maintenance, developers can simply declare their logic and let the engine maintain state and consistency.
Unfortunately modern variations of this can run into an ugly problem: what are the semantics of a "join" operation between multiple infinite streams of data? Popular systems like Storm point out this ambiguity, as does Samza. Both provide primitives to support correlation of events between streams, but the higher-level semantics of a join are punted to the user.
This is fine for such systems providing infrastructure, but is a troublesome model of our applications: if we can't define clear and compelling semantics for a key part of our processing model, we might be using the wrong model. Projects like Storm offer an excellent infrastructure, but this ambiguity implies that many problems could be solved with a higher-level abstraction.
The challenge with user-defined join semantics is it comes with baggage: maintaining state, structuring it, and recovering state after failure scenarios are challenging problems. It also makes the behavior of the system harder to understand. Since each join can have slightly different behavior, we need to look closely to see what's going on. A better approach is needed. A set of simple yet flexible join operators would be ideal -- so how do we get there?
We might consider CEP-style systems such as Esper and Drools Fusion, which have been tackling declarative-style joins for years. But such systems don't offer the scalability or processing guarantees of Storm, and they use limited languages that aren't always expressive enough for sophisticated logic.If we can't define clear and compelling semantics for a key part of our processing model, we might be using the wrong model.
We need a data processing model with well-defined, declarative joins while supporting rich application logic. There are lots of options here, but here I'll focus on one: suppose we could make rules as a control structure scale linearly across a cluster, letting the rules engine distribute join operations. Let's look at an experiment of making Clara, a Clojure-based rules engine, distribute its working memory and processing across a Storm cluster, with all of the scalability and processing guarantees of the underlying system.
Forward-chaining rules on Storm
Imagine a variant of the Rete algorithm implemented with some simple constraints:First, each condition in a rule can be evaluated independently, so incoming data can be spread across an arbitrary number of processes and match rule conditions appropriately.
Second, aggregations over matched facts follow a map-and-reduce style pattern -- where the map and partial reductions of aggregated facts can be done in parallel across machines.
Finally, "joins" of aggregations or individual facts are always hash-based. So joins can be efficiently achieved by sending matched facts to the same node via their hash values.
The result is our Storm topology looks something like this:
Let's consider a simple example. Suppose we have feeds of temperature readings from multiple locations in some facility, and we want to take action in those locations should our readings exceed a threshold.
Each temperature reading has a logical timestamp, so since our logic is interested in the "newest" reading, we use a Clara accumulator that selects the item with the newest timestamp: ``` clojure (def newest-temp (acc/max :timestamp :returns-fact true)) ``` We then use it in a rule that processes all our readings for a location and preserves the newest: ``` clojure (defrule get-current-temperature "Get the current temperature at a location by simply looking at the newest reading." [?current-temp <- newest-temp :from [TemperatureReading (== ?location location)]] => (insert! (->CurrentTemperature (:value ?current-temp) ?location))) ``` Note that accumulators preserve minimal state and apply changes incrementally. In this case we keep only the current temperature based on timestamp; lower values are simply discarded, so we can deal with an infinite stream. Also, this example keeps the maximum, but we could easily accumulate some other value, such as a time-weighted histogram of temperatures to we're robust to outliers. Any fact that doesn't match a rule is simply discarded, incurring no further cost.
Now that we have the current temperature for each location, we want to back off our devices in those locations if a threshold is exceeded. We can write this as a simple rule as well: ``` clojure (defrule reduce-device-speed "Reduce the speed of all devices in a location that has a high temperature." [CurrentTemperature (> value high-threshold) (= ?location-id location)] ;; Find all Device records in the location, and bind them to the ?device variable. [?device <- Device (= ?location-id location)] => (reduce-speed! ?device)) ``` This first condition matches current temperatures that exceed the threshold, and binds it the ?location-id variable. The second condition finds all devices with a matching location, and binds them to the ?device variable. This is then visible on the right-hand side of the rule, where we can take action.
This is effectively performing a join between temperatures that exceeded a threshold at a given location and devices in that same location. When running over Storm, this rule wish hash Device and CurrentTemperature facts and send them to the same processing using a hash value. This is done using Storm's group-by field functionality over a data stream that connects each bolt instance together.
All state for the join operations are managed internally by Clara's engine. Accumulators like the example here compute in a rolling fashion, merging new data together, retracting previously accumulated values, and inserting new ones. Combined with rule engine-style truth maintenance, developers can simply declare their logic and let the engine maintain state and consistency.
Integration with Processing Topologies
The rules used in this example are here, and are run with the Storm launching code here. There is also a draft Java API to attach rules to a topology. Note that our approach is to simply attach to a Storm topology defined via a provided TopologyBuilders, so users can pre-process or perform other logic in their topology, and route data as appropriate into the distributed rule engine. Also, these examples use Clojure records, but they can work equally well with Java objects, including ones generated by Thrift, Avro, or Protocol Buffers.Current State
A prototype of rules over Storm is in the clara-storm project. It also includes the abilities to run queries across the rule engine's working memory, using Storm's Distributed RPC mechanism. A handful of things need to come together to make this production ready, inluding:- I'd like input and suggestions from members of the Storm community. This topology layout isn't an idiomatic use of Storm, so we need to ensure this won't run into problems as we scale. (This is one of the reasons I'm posting this now.)
- The ability to persist Clara's working memory to recover from machine failures. This will probably take the form of writing state changes for each rule node to reliable write-ahead log, with Kafka being a good storage mechanism.
- Optimizations ranging from efficient serialization to doing partial aggregations prior to sharing state between bolts are needed.
- Consider temporal operators in Clara. Accumulators have worked well to this point but may run into limits.
- Testing at scale!
The biggest takeaway is how technologies like Storm and Clojure provide an opportunity to express computation with higher-level abstractions. Things like SummingBird (and Cascalog 2.0?) offer ways to query data streams. These could be complemented by support for forward-chaining rules for problems easily expressed that way.