riemann.streams

The streams namespace aims to provide a comprehensive set of widely applicable, combinable tools for building more complex streams.

Streams are functions which accept events or, in some cases, lists of events.

Streams typically do one or more of the following.

  • Filter events.
  • Transform events.
  • Combine events over time.
  • Apply events to other streams.
  • Forward events to other services.

Most streams accept, after their initial arguments, any number of streams as children. These are known as children or “child streams” of the stream. The children are typically invoked sequentially, any exceptions thrown are caught, logged and optionally forwarded to exception-stream. Return values of children are ignored.

Events are backed by a map (e.g. {:service “foo” :metric 3.5}), so any function that accepts maps will work with events. Common functions like prn can be used as a child stream.

Some common patterns for defining child streams are (fn e(println e)) and (partial log :info).

*exception-stream*

dynamic

When an exception is caught, it’s converted to an event and sent here.

-infinity

adjust

(adjust & args)

Passes on a changed version of each event by applying a function to a particular field or to the event as a whole.

Passing a vector of field function & args to adjust will modify the given field in incoming events by applying the function to it along with the given arguments. For example:

(adjust :service str " rate" …)

takes {:service “foo”} and emits {:service “foo rate”}.

If a function is passed to adjust instead of a vector, adjust behaves like smap: the entire event will be given to the function and the result will be passed along to the children. For example:

(adjust #(assoc % :metric (count (:tags %))) …)

takes {:tags “foo” “bar”} and emits {:tags “foo” “bar” :metric 2}.

Prefer (smap f & children) to (adjust f & children) where possible.

apdex

macro

(apdex dt satisfied? tolerated? & children)

A stream which computes Apdex metrics every dt seconds for a stream of events. Satisfied? and tolerated? are predicates as for (where). If satisfied is truthy, increments the satisfied count for that time window by 1. If (tolerated? event) is truthy, increments the tolerated count for that time window by 1. Any other states are ignored. Every dt seconds (as long as events are arriving), emits an event with a metric between 0 and 1, derived by:

(satisfied count + (tolerating count / 2) / total count of received events

Ignores expired events.

See http://en.wikipedia.org/wiki/Apdex for details.

apdex*

(apdex* dt satisfied? tolerated? & children)

Like apdex, but takes functions of events rather than where-predicates.

A stream which computes Apdex metrics every dt seconds for a stream of events. If (satisfied? event) is truthy, increments the satisfied count for that time window by 1. If (tolerated? event) is truthy, increments the tolerated count for that time window by 1. Any other states are ignored. Every dt seconds (as long as events are arriving), emits an event with a metric between 0 and 1, derived by:

(satisfied count + (tolerating count / 2) / total count of received events

Ignores expired events.

See http://en.wikipedia.org/wiki/Apdex for details.

append

(append reference)

Conj events onto the given reference

batch

(batch n dt & children)

Batches up events into vectors, bounded both by size and by time. Once either n events have accumulated, or dt seconds passed, flushes the current batch to all child streams. Child streams should accept a sequence of events.

bit-bucket

(bit-bucket args)

Discards arguments.

by

macro

(by fields & children)

Splits stream by field. Every time an event arrives with a new value of field, this macro invokes its child forms to return a new, distinct set of streams for that particular value.

(rate 5 prn) prints a single rate for all events, once every five seconds.

(by :host (rate 5) tracks a separate rate for each host, and prints each one every five seconds.

You can pass multiple fields too

(by :host :service)

Note that field can be a keyword like :host or :state, but you can also use any unary function for more complex sharding.

Be aware that (by) over unbounded values can result in many substreams being created, so you wouldn’t want to write (by metric prn): you’d get a separate prn for every unique metric that came in. Also, (by) streams are never garbage-collected.

by-builder

macro

(by-builder [sym fields] & forms)

Splits stream by provided function. This is a variation of by where forms are executed when a fork is created to yield the children.

This allows you to perform operations based on the fork-name, i.e: the output of the given fields.

(by-builder host :host)

by-fn

(by-fn fields new-fork)

call-rescue

macro

(call-rescue event children)

Call each child stream with event, in order. Rescues and logs any failure.

changed

(changed pred & children)

Passes on events only when (f event) differs from that of the previous event. Options:

  • :init The initial value to assume for (pred event).
; Print all state changes
(changed :state prn)

; Assume states *were* ok the first time we see them.
(changed :state {:init "ok"} prn)

; Receive the previous event, in addition to the current event, as a vector.
(changed :state {:pairs? true}
         (fn [[event event']]
           (prn "changed from" (:state event) "to" (:state event'))))

; Note that f can be an arbitrary function:

(changed (fn [e] (> (:metric e) 2)) ...)

changed-state

macro

(changed-state & children)

Passes on changes in state for each distinct host and service.

clock-skew

(clock-skew & children)

Detects clock skew between hosts. Keeps track of what time each host thinks it is, based on their most recent event time. Compares the time of each event to the median clock, and passes on that event with metric equal to the time difference: events ahead of the clock have positive metrics, and events behind the clock have negative metrics.

coalesce

(coalesce & [dt & children])

Combines events over time. Coalesce remembers the most recent event for each service/host combination that passes through it (limited by :ttl). Every dt seconds (default to 1 second), it passes on all events it remembers. When events expire, they are included in the emitted sequence of events once, and removed from the state table thereafter.

Use coalesce to combine states that arrive at different times–for instance, to average the CPU use over several hosts.

Every 10 seconds, print a sequence of events including all the events which share the same :foo and :bar attributes:

(by [:foo :bar]
  (coalesce 10 prn))

coalesce-with-event

(coalesce-with-event keyfn child)

Helper for coalesce: calls (f current-event all-events) every time an event is received.

counter

(counter & children)

Counts things. The first argument may be an initial counter value, which defaults to zero.

; Starts at zero
(counter index)

; Starts at 500
(counter 500 index)

Events without metrics are passed through unchanged. Events with metrics increment the counter, and are passed on with their metric set to the current count.

You can reset the counter by passing it an event with a metric, tagged “reset”; the count will be reset to that metric.

ddt

(ddt & args)

Differentiate metrics with respect to time. Takes an optional number followed by child streams. If the first argument is a number n, emits a rate-of-change event every n seconds, until expired. If the first argument is not number, emits an event for each event received, but with metric equal to the difference between the current event and the previous one, divided by the difference in their times. Skips events without metrics.

(ddt 5 graph index)
(ddt graph index)

ddt-events

(ddt-events & children)

(ddt) between each pair of events.

ddt-real

(ddt-real n & children)

(ddt) in real time.

default

(default & args)

Like with, but does not override existing (i.e. non-nil) values. Useful when you want to fill in default values for events that might come in without them.

(default :ttl 300 index)
(default {:service "jrecursive" :state "chicken"} index)

dual

(dual pred true-stream false-stream)

A stream which splits events into two mirror-images streams, based on (pred e).

If (pred e) is true, calls (true-stream e) and (false-stream (expire e)).

If (pred e) is false, does the opposite. Expired events are forwarded to both streams.

(pred e) is always called once per incoming event.

ewma

(ewma halflife & children)

Exponential weighted moving average. Constant space and time overhead. Passes on each event received, but with metric adjusted to the moving average. Takes into account the time between events.

ewma-timeless

(ewma-timeless r & children)

Exponential weighted moving average. Constant space and time overhead. Passes on each event received, but with metric adjusted to the moving average. Does not take the time between events into account. R is the ratio between successive events: r=1 means always return the most recent metric; r=1/2 means the current event counts for half, the previous event for 1/4, the previous event for 1/8, and so on.

exception-stream

macro

(exception-stream exception-stream & children)

Catches exceptions, converts them to events, and sends those events to a special exception stream.

(exception-stream (email "polito@vonbraun.com")
  (async-queue! :graphite {:core-pool-size 128}
    graph))

Streams often take multiple children and send an event to each using call-rescue. Call-rescue will rescue any exception thrown by a child stream, log it, and move on to the next child stream, so that a failure in one child won’t prevent others from executing.

Exceptions binds a dynamically scoped thread-local variable exception-stream. When call-rescue encounters an exception, it will also route the error to this exception stream. When switching threads (e.g. when using an executor or Thread), you must use (bound-fn) to preserve this binding.

This is a little more complex than you might think, because we not only need to bind this variable during the runtime execution of child streams, but also during the evaluation of the child streams themselves, e.g. at the invocation time of exceptions itself. If we write

(exception-stream (email ...)
  (rate 5 index))

then (rate), when invoked, might need access to this variable immediately. Therefore, this macro binds exception-stream twice: one when evaluating children, and again, every time the returned stream is invoked.

execute-on

(execute-on executor & children)

Returns a stream which accepts events and executes them using a java.util.concurrent.Executor. Returns immediately. May throw RejectedExecutionException if the underlying executor will not accept the event; e.g. if its queue is full. Use together with riemann.service/executor-service for reloadable asynchronous execution of streams. See also: async-queue!, which may be simpler.

(let [io-pool (service!
                (executor-service
                  #(ThreadPoolExecutor. 1 10 ...)))
      graph (execute-on io-pool (graphite {:host ...}))]
  ...
  (tagged "graph"
    graph))

expired

(expired & children)

Passes on expired events.

expired?

(expired? event)

There are two ways an event can be considered expired. First, if it has state “expired”. Second, if its :ttl and :time indicates it has expired.

fill-in

(fill-in interval default-event & children)

Passes on all events. Fills in gaps in event stream with copies of the given event, wherever interval seconds pass without an event arriving. Inserted events have current time. Stops inserting when expired. Uses local times.

fill-in-last

(fill-in-last interval update & children)

Passes on all events. Fills in gaps in event stream with copies of the last event merged with the given data, wherever interval seconds pass without an event arriving. Inserted events have current time. Stops inserting when expired. Uses local times.

fill-in-last*

(fill-in-last* interval updater & children)

Passes on all events. Fills in gaps in event stream with copies of the last event updated with the given updater function, wherever interval seconds pass without an event arriving. Inserted events have current time. Stops inserting when expired. Uses local times.

fixed-event-window

(fixed-event-window n & children)

Passes on fixed-size windows of n events each. Accumulates n events, then calls children with a vector of those events, from oldest to newest. Ignores event times. Example:

(fixed-event-window 5 (smap folds/mean index))

fixed-offset-time-window

(fixed-offset-time-window n & children)

Like fixed-time-window, but divides wall clock time into discrete windows.

A fixed window over the event stream in time. Emits vectors of events, such that each vector has events from a distinct n-second interval. Windows do not overlap; each event appears at most once in the output stream. Once an event is emitted, all events older or equal to that emitted event are silently dropped.

Events without times accrue in the current window.

fixed-time-window

(fixed-time-window n & children)

A fixed window over the event stream in time. Emits vectors of events, such that each vector has events from a distinct n-second interval. Windows do not overlap; each event appears at most once in the output stream. Once an event is emitted, all events older or equal to that emitted event are silently dropped.

Events without times accrue in the current window.

fold-interval

(fold-interval interval event-key folder & children)

Applies the folder function to all event-key values of events during interval seconds.

fold-interval-metric

(fold-interval-metric interval folder & children)

Wrapping for fold-interval that assumes :metric as event-key.

forward

(forward client)

Sends an event or a collection of events through a Riemann client.

infinity

interpolate-constant

(interpolate-constant interval & children)

Emits a constant stream of events every interval seconds, starting when an event is received, and ending when an expired event is received. Times are set to Riemann’s time. The first and last events are forwarded immediately.

Note: ignores event times currently–will change later.

match

(match f value & children)

Passes events on to children only when (f event) matches value, using riemann.common/match. For instance:

(match :service nil prn)
(match :state #{"warning" "critical"} prn)
(match :description #"error" prn)
(match :metric 5 prn)
(match expired? true prn)
(match (fn [e] (/ (:metric e) 1000)) 5 prn)

For cases where you only care about whether (f event) is truthy, use (where some-fn) instead of (match some-fn true).

mean-over-time

(mean-over-time children)

Emits the most recent event each time this stream is called, but with the average of all received metrics.

moving-event-window

(moving-event-window n & children)

A sliding window of the last few events. Every time an event arrives, calls children with a vector of the last n events, from oldest to newest. Ignores event times. Example:

(moving-event-window 5 (smap folds/mean index))

moving-time-window

(moving-time-window n & children)

A sliding window of all events with times within the last n seconds. Uses the maximum event time as the present-time horizon. Every time a new event arrives within the window, emits a vector of events in the window to children.

Events without times accrue in the current window.

not-expired

(not-expired & children)

Passes on not expired events.

over

(over x & children)

Passes on events only when their metric is greater than x

part-time-fast

(part-time-fast interval create add finish)

Partitions events by time (fast variant). Each seconds, creates a new bin by calling (create). Applies each received event to the current bin with (add bin event). When the time interval is over, calls (finish bin start-time elapsed-time).

Concurrency guarantees:

  • (create) may be called multiple times for a given time slice.
  • (add) when called, will receive exactly one distinct bucket in each time slice.
  • (finish) will be called exactly once for each time slice.

part-time-simple

(part-time-simple dt reset add finish)(part-time-simple dt reset add side-effects finish)

Divides wall clock time into discrete windows. Returns a stream, composed of four functions:

(reset previous-state) Given the state for the previous window, returns a fresh state for a new window. Reset must be a pure function, as it will be invoked in a compare-and-set loop. Reset may be invoked at any time. Reset will be invoked with nil when no previous state exists.

(add state event) is called every time an event arrives to combine the event and the state together, returning some new state. Merge must be a pure function.

(side-effects state event) is called with the resulting state and the event which just arrived, but will be called only once per event, and can be impure. Its return value is used for the return value of the stream.

(finish state start-time end-time) is called once at the end of each time window, and receives the final state for that window, and also the start and end times for that window. Finish will be called exactly once per window, and may be impure.

When no events arrive in a given time window, no functions are called.

percentiles

(percentiles interval points & children)

Over each period of interval seconds, aggregates events and selects one event from that period for each point. If point is 0, takes the lowest metric event. If point is 1, takes the highest metric event. 0.5 is the median event, and so forth. Forwards each of these events to children. The service name has the point appended to it; e.g. ‘response time’ becomes ‘response time 0.95’.

periodically-until-expired

(periodically-until-expired f)(periodically-until-expired interval f)(periodically-until-expired interval delay f)

When an event arrives, begins calling f every interval seconds. Starts after delay. Stops calling f when an expired? event arrives, or the most recent event expires.

pipe

macro

(pipe marker & stages)

Sometimes, you want to have a stream split into several paths, then recombine those paths after some transformation. Pipe lets you write these topologies easily.

We might express a linear stream in Riemann, in which a -> b -> c -> d, as

(a (b (c d)))

With pipe, we write

(pipe ↧ (a ↧)
        (b ↧)
        (c ↧)
        d)

The first argument ↧ is a marker for points where events should flow down into the next stage. A delightful list of marker symbols you might enjoy is available at http://www.alanwood.net/unicode/arrows.html.

What makes pipe more powerful than the standard Riemann composition rules is that the marker may appear multiple times in a stage, and at any depth in the expression. For instance, we might want to categorize events based on their metric, and send all those events into the same throttled email stream.

(let [throttled-emailer (throttle 100 1 (email "ops@rickenbacker.mil"))]
  (splitp < metric
    0.9 (with :state :critical throttled-emailer)
    0.5 (with :state :warning  throttled-emailer)
        (with :state :ok       throttled-emailer)))

But with pipe, we can write:

(pipe - (splitp < metric
                0.9 (with :state :critical -)
                0.5 (with :state :warning  -)
                    (with :state :ok       -))
        (throttle 100 1 (email "ops@rickenbacker.mil")))

So pipe lets us do three things:

  1. Flatten a deeply nested expression, like Clojure’s -> and ->>.

  2. Omit or simplify the names for each stage, when we care more about the structure of the streams than giving them full descriptions.

  3. Write the stream in the order in which events flow.

Pipe rewrites its stages as a let binding in reverse order; binding each stage to the placeholder in turn. The placeholder must be a compile-time symbol, and obeys the usual let-binding rules about variable shadowing; you can rebind the marker lexically within any stage using let, etc. Yep, this is a swiss arrow in disguise; ssshhhhhhh. ;-)

predict-linear

(predict-linear n s r & children)

Stream that performs OLS regression. Uses a moving-event-window of n events and emits an event with a prediction for :metric of s seconds in the future. If the optional model rebuild interval r (in seconds) is specified the model will be rebuild periodically and not on every arriving event.

E.g. predict the metric of service “fs-usage” 30 minutes in the future grouped by host:

(where (service "fs-usage")
  (by :host
    (predict-linear 100 1800
      #(info %))))

project

macro

(project basis & children)

Projects an event stream into a specific basis–like (coalesce), but where you only want to compare two or three specific events. Takes a vector of predicate expressions, like those used in (where). Project maintains a vector of the most recent event for each predicate. An incoming event is compared against each predicate; if it matches, the event replaces any previous event in that position and the entire vector of events is forwarded to all child streams. Expired events are included in the emitted vector of events once, and removed from the state vector thereafter.

Use project when you want to compare a small number of distinct states over time. For instance, to find the ratio of enqueues to dequeues:

(project [(service "enqueues")
          (service "dequeues")]
  (smap folds/quotient
    (with :service "enqueues per dequeue"
      ...)))

Here we’ve combined separate events–enqueues and dequeues–into a single event, using the folds/quotient function, which divides the first event’s metric by the second. Then we assigned a new service name to that resulting event, and could subsequently filter based on the metric, assign different states, graph, alert, etc.

project*

(project* predicates & children)

Like project, but takes predicate functions instead of where expressions.

rate

(rate interval & children)

Take the sum of every event’s metric over interval seconds and divide by the interval size. Emits one event every interval seconds. Starts as soon as an event is received, stops when the most recent event expires. Uses the most recently received event with a metric as a template. Event ttls decrease constantly if no new events arrive.

register

(register reference)

Set reference to the most recent event that passes through.

rollup

(rollup n dt & children)

Invokes children with events at most n times per dt second interval. Passes vectors of events to children, not a single event at a time. For instance, (rollup 3 1 f) receives five events and forwards three times per second:

  • 1 -> (f 1)
  • 2 -> (f 2)
  • 3 -> (f 3)
  • 4 ->
  • 5 ->

… and events 4 and 5 are rolled over into the next period:

-> (f 4 5)

runs

(runs len-run field & children)

Usable to perform flap detection, runs examines a moving-event-window of n events and determines if :field is the same across all them. If it is, runs passes on the last (newest) event of the window. In practice, this can be used with (changed-state …) as a child to reduce ‘flappiness’ for state changes.

(runs 3 :state prn) ; Print events where there are 3-in-a-row of a state.

scale

(scale factor & children)

Passes on a changed version of each event by multiplying each metric with the given scale factor.

; Convert bytes to kilobytes

(scale 1/1024 index)

sdo

(sdo)(sdo child)(sdo child & children)

Takes a list of functions f1, f2, f3, and returns f such that (f event) calls (f1 event) (f2 event) (f3 event). Useful for binding several streams to a single variable.

(sdo prn (rate 5 index))

sflatten

(sflatten & children)

Streaming flatten. Calls children with each event in events. Events should be a sequence.

smap

(smap f & children)

Streaming map. Calls children with (f event), whenever (f event) is non-nil. Prefer this to (adjust f) and (combine f). Example:

(smap :metric prn) ; prints the metric of each event.
(smap #(assoc % :state "ok") index) ; Indexes each event with state "ok"

smap*

(smap* f & children)

Streaming map: less magic. Calls children with (f event). Unlike smap, passes on nil results to children. Example:

(smap folds/maximum prn) ; Prints the maximum of lists of events.

smapcat

(smapcat f & children)

Streaming mapcat. Calls children with each event in (f event), which should return a sequence. For instance, to set the state of any services with metrics deviating from the mode to “warning”, one might use coalesce to aggregate all services, and smapcat to find the mode and assoc the proper states; emitting a series of individual events to the index.

(coalesce
  (smapcat (fn [events]
             (let [freqs (frequencies (map :metric events))
                   mode  (apply max-key freqs (keys freqs))]
               (map #(assoc % :state (if (= mode (:metric %))
                                       "ok" "warning"))
                    events)))
    index))

split

macro

(split & clauses)

Behave as for split*, expecting predicates to be (where) expressions instead of functions. Example:

(split
  (< 0.9  metric) (with :state "critical" index)
  (< 0.75 metric) (with :state "warning" index)
  (with :state "ok" index))

split*

(split* & clauses)

Given a list of function and stream pairs, passes the current event onto the stream associated with the first passing condition.

Conditions are functions as for where*. An odd number of forms will make the last form the default stream. For example:

 (split*
   (fn [e] (< 0.9  (:metric e))) (with :state "critical" index)
   (fn [e] (< 0.75 (:metric e))) (with :state "warning" index)
   (with :state "ok" index))

split*-match

(split*-match event [pred stream])

splitp

macro

(splitp pred expr & clauses)

Takes a binary predicate, an expression and a set of clauses. Each clause takes the form

test-expr stream

splitp returns a stream which accepts an event. Expr is a (where) expression, which will be evaluated against the event to obtain a value for selecting a clause. For each clause, evaluates (pred test-expr value). If the result is logical true, evaluates (stream event) and returns that value.

A single default stream can follow the clauses, and its value will be returned if no clause matches. If no default stream is provided and no clause matches, an IllegalArgumentException is thrown.

Splitp evaluates streams once at invocation time.

Example:

(splitp < metric
  0.9  (with :state "critical" index)
  0.75 (with :state "warning" index)
       (with :state "ok" index))

sreduce

(sreduce f & opts)

Streaming reduce. Two forms:

(sreduce f child1 child2 ...)
(sreduce f val child1 child2 ...)

Maintains an internal value, which defaults to the first event received or, if provided, val. When the stream receives an event, calls (f val event) to produce a new value, which is sent to each child. f must be free of side effects. Examples:

Passes on events, but with the maximum of all received metrics:

(sreduce (fn [acc event] (assoc event :metric
                                (max (:metric event) (:metric acc)))) ...)

Or, using riemann.folds, a simple moving average:

(sreduce (fn acc event(folds/mean acc event)) …)

stable

(stable dt f & children)

A stream which detects stable groups of events over time. Takes a time period in seconds, and a function of events. Passes on all events for which (f event1) is equal to (f event2), for each successive pair of events, for at least dt seconds. Use (stable) to filter out transient spikes and flapping states.

In these plots, stable events are shown as =, and unstable events are shown as -. = events are passed to children, and - events are ignored.

     A spike           Flapping           Stable changes
|                 |                    |
|       -         |    -- -   ======   |      =====
|                 |        -           |           ========
|======= ======   |====  -  --         |======
+------------->   +---------------->   +------------------>
      time              time                  time

May buffer events for up to dt seconds when the value of (f event) changes, in order to determine if the new value is stable or not.

; Passes on events where the state remains the same for at least five
; seconds.
(stable 5 :state prn)

stream

(stream & args)

sum-over-time

(sum-over-time & children)

Sums all metrics together. Emits the most recent event each time this stream is called, but with summed metric.

tag

(tag tags & children)

Adds a new tag, or set of tags, to events which flow through.

(tag “foo” index) (tag “foo” “bar” index)

tagged

Alias for tagged-all

tagged-all

(tagged-all tags & children)

Passes on events where all tags are present. This stream returns true if an event it receives matches those tags, nil otherwise.

Can be used as a predicate in a where form.

(tagged-all "foo" prn)
(tagged-all ["foo" "bar"] prn)

tagged-all?

(tagged-all? tags event)

Predicate function to check if a collection of tags is present in the tags of event.

tagged-any

(tagged-any tags & children)

Passes on events where any of tags are present. This stream returns true if an event it receives matches those tags, nil otherwise.

Can be used as a predicate in a where form.

(tagged-any "foo" prn)
(tagged-any ["foo" "bar"] prn)

tagged-any?

(tagged-any? tags event)

Predicate function to check if any of a collection of tags are present in the tags of event.

throttle

(throttle n dt & children)

Passes on at most n events, or vectors of events, every dt seconds. If more than n events (or vectors) arrive in a dt-second fixed window, drops remaining events. Imposes no additional latency; events are either passed on immediately or dropped.

top

(top k f top-stream)(top k f top-stream bottom-stream)(top k f top-stream bottom-stream demote?)

Bifurcates a stream into a dual pair of streams: one for the top k events, and one for the bottom k events.

f is a function which maps events to comparable values, e.g. numbers. If an incoming event e falls in the top k, the top stream receives e and the bottom stream receives (expire e). If the event is not in the top k, calls (top (expire e)) and (bottom e).

If an inbound event is already expired, it is forwarded directly to both streams. In this way, both top- and bottom-stream have a consistent, dual view of the event space.

Index the top 10 events, by metric:

(top 10 :metric index)

Index everything, but tag the top k events with “top”:

(top 10 :metric
  (tag "top" index)
  index)

This implementation of top is lazy, in the sense that it won’t proactively expire events which are bumped from the top-k set–you have to wait for another event with the same host and service to arrive before child streams will know it’s expired.

under

(under x & children)

Passes on events only when their metric is smaller than x

untag

(untag tags & children)

Removes a tag, or set of tags, from events which flow through.

(untag “foo” index) (untag “foo” “bar” index)

where

macro

(where expr & children)

Passes on events where expr is true. Expr is rewritten using where-rewrite. ’event is bound to the event under consideration. Examples:

; Match any event where metric is either 1, 2, 3, or 4.
(where (metric 1 2 3 4) ...)

; Match a event where the metric is negative AND the state is ok.
(where (and (> 0 metric)
            (state "ok")) ...)

; Match a event where the host begins with web
(where (host #"^web") ...)


; Match an event where the service is in a set of services
(where (service #{"service-foo" "service-bar"}) ...)
; which is equivalent to
(where (service "service-foo" "service-bar") ...)

If a child begins with (else …), the else’s body is executed when expr is false. For instance:

(where (service "www")
  (notify-www-team)
  (else
    (notify-misc-team)))

The streams generated by (where) return the value of expr: truthy if expr matched the given event, and falsey otherwise. This means (where (metric 5)) tests events and returns true if their metric is five.

where*

macro

(where* f & children)

A simpler, less magical variant of (where). Instead of binding symbols in the context of an expression, where* takes a function which takes an event. When (f event) is truthy, passes event to children–and otherwise, passes event to (else …) children. For example:

(where* (fn [e] (< 2 (:metric e))) prn)

(where* expired?
  (partial prn "Expired")
  (else
    (partial prn "Not expired!")))

where-partition-clauses

(where-partition-clauses exprs)

Given expressions like (a (else b) c (else d)), returns [a c b d]

where-rewrite

(where-rewrite expr)

Rewrites lists recursively. Replaces (metric x y z) with a test matching (:metric event) to any of x, y, or z, either by = or re-find. Replaces any other instance of metric with (:metric event). Does the same for host, service, event, state, time, ttl, tags (which performs an exact match of the tag vector), tagged (which checks to see if the given tag is present at all), metric_f, and description.

window

(window n & children)

Alias for moving-event-window.

with

(with & args)

Constructs a copy of each incoming event with new values for the given keys, and passes the resulting event on to each child stream. As everywhere in Riemann, events are immutable; only this stream’s children will see this version of the event.

If you only want to set default values, use default. If you want to update values for a key based on the current value of that field in each event, use adjust. If you want to update events using arbitrary functions, use smap.

; Print each event, but with service "foo"
(with :service "foo" prn)

; Print each event, but with no host and state "broken".
(with {:host nil :state "broken"} prn)