A guide to working with Riemann

This is not a comprehensive guide to everything in Riemann, but it can get you started and help solve specific problems. Want to add your own article to this page? Send me a pull request!

Once you know where to look, you can learn more by reading the API docs (especially riemann.streams), and the source. I try hard to write readable code. The test suite is also a great place to look for use examples.

Running Riemann

Changing the config

If you use the Debian or Centos packages, Riemann adds a riemann user, stores its configuration in /etc/riemann/riemann.config, and logs to /var/log/riemann.log. You should always tail the log file when working with Riemann; it'll alert you to configuration errors and help you debug your streams.

tail -F /var/log/riemann.log

In another terminal, use your favorite editor to open /etc/riemann/riemann.config. Let's add a logging stream, so we can see all events that pass through the streams. #(info %) expands into (fn [x] (info x)), which is a function that takes an event and logs it at the INFO log level.

(streams
  index
  ...
  #(info "received event" %))

Now we need to reload the config file. Riemann will respond to SIGHUP, or you can use the init scripts. You'll see a message about reloading the config in the Riemann log. If you make a mistake, like a syntax error, Riemann will continue running with the old config and won't apply the new one. Instead, it'll log an explanation of the problem, so you can investigate and fix it.

Now that the config has been reloaded, you should see events flowing by in the log. Some come from external sources, like riemann-health or your own programs. Others are generated by Riemann internally, as a part of its internal performance instrumentation. You can insert logging statements anywhere in your streams to verify what kind of events are flowing through that point.

Reloading is experimental, and is subject to the normal Clojure rules about (def) and (defn). To avoid confusion, use let bindings instead of def; it'll guarantee reloads work correctly. If reloads seem broken, you might need to do a full restart for your changes to take effect.

sudo service riemann reload

A minimal configuration

We present here a minimal configuration which only prints incoming events to standard output. This configuration can prove useful when debugging clients to make sure expected events are reaching Riemann.

This example includes a full configuration, later examples will focus on streams. The following actions are performed:

  • logging/init: Setting-up logging to log to standard output (STDOUT).
  • tcp-server: Starting a Riemann TCP server on port 5555 (the default).
  • instrumentation: Disabling internal event production.

Since streams expects a list of functions to call with individual events, we can output any input by calling prn on them.

(logging/init {:console true})
(tcp-server {})
(instrumentation {:enabled? false})

(streams
 prn)

Including functions from multiple files

Often your Riemann configuration file will get quite large as you add streams and handle more events. Additionally, if you're managing Riemann with a configuration management tool like Puppet or Chef then it can be complex to template a large configuration file. To help with this Riemann also supports including functions via Clojure's namespacing model. These can include your own custom stream functions.

To enable this, the contents of the directory containing the riemann.config configuration file are included in the CLASSPATH.

To use other functions then create a source directory for them. For example, to create functions specific to your organization you might create a examplecom.etc namespace.

 sudo mkdir -p /etc/riemann/examplecom/etc
 

Then create a namespace containing the functions we wish to include in a file in this directory, for example email.clj.

; Create a new function with ns
(ns examplecom.etc.email
  (:require [riemann.email :refer :all]))

(def email (mailer {:from "reimann@example.com"}))

We would then require this namespace in our riemann.config. With this we could use the email variable inside our Riemann configuration.

You can read more about Clojure's namespacing and how it works.

This approach is supported in Riemann 0.2.11 and later.

; Require the example.etc.email functions
(require '[examplecom.etc.email :refer :all])

Putting Riemann into production

Riemann has three main parts: clients, the server, and dashboard. By default, all listen on the loopback interface. You'll need to configure each to listen to an appropriate interface for your environment. On the host that runs the Riemann server, open /etc/riemann/riemann.config, and change the host that Riemann binds to. To listen on all interfaces, use 0.0.0.0.

(let [host "0.0.0.0"]
  (tcp-server {:host host})
  (udp-server {:host host})
  (ws-server  {:host host}))

Now we need to reload Riemann to tell it about our changes.

sudo service riemann reload

To change the address that the dashboard binds to, edit riemann-dash's config.rb.

set :bind, "1.2.3.4"

Riemann-dash only serves some JS code and a small piece of configuration; when you open riemann-dash in the browser, it'll open connections from your browser to the Riemann server's websocket interface to receive events. To connect the dashboard to a remote Riemann server, double-click the text box in the top right of the dashboard, and change the address from 127.0.0.1:5556 to your Riemann server's host and port. When you hit enter, the browser will initiate a websocket connection to that address and begin displaying events. Hit s to save the new dash config; the dashboard will connect to that address every time you load the page.

Riemann-dash is not the only dashboard for Riemann; Riemann decouples data processing and visualization. Opening the Riemann server from your browser *won't* work.

If the dashboard is unable to connect to the Riemann websocket server, you'll see an alert pop up every few seconds. Check that the server is running, that the Riemann websocket server is reachable from your browser, that you used the correct host and port, and so on.

If you don't see *anything* on the dash, but it connected successfully, you may not have any events to show, or you may not have set up any views.

Using Docker

Instead of installing Riemann directly on your system, you can also use Docker to run it. There is a prepackaged Docker image available as riemannio/riemann.

With its default configuration, Riemann will index all incoming events and print expired events. Make sure to bind the neccessary ports to your host system, or else you won't be able to send events to Riemann.

If you expect lots of traffic and don't need segregating of containers, you can also skip the Docker network and use the host network by using the --net host flag. Note that the default Riemann configuration in Docker will listen on all network interfaces, so make sure to either mount a different configuration (see below) or configure your firewall appropriately.

docker run --rm -p 5555:5555 riemannio/riemann
docker run --rm --net host riemannio/riemann

Riemann will, by default, use a configuration file located at /etc/riemann.config, which you will most likely override by mounting your own configuration into the container.

For this you can either mount your configuration file at the default location or override the default container command (/bin/riemann /etc/riemann.config) if for some reason you want to change more command line options.

docker run \
  --rm \
  -p 5555:5555 \
  -v $(realpath riemann.config):/etc/riemann.config \
  riemannio/riemann
docker run \
  --rm \
  -p 5555:5555 \
  -v $(realpath myapp.clj):/myapp.clj \
  riemannio/riemann \
  /bin/riemann /myapp.clj

To keep things manageable, you can use Docker Compose to define how the Riemann container should be run.

version: "3"
services:
  riemann:
    image: riemannio/riemann:latest
    ports:
      - "127.0.0.1:5555:5555"
      - "127.0.0.1:5555:5555/udp"
      - "127.0.0.1:5556:5556"
    volumes:
      - ./riemann.config:/etc/riemann.config

Which JVM should I use?

Oracle JDK8 is likely to be the fastest option (and supports the full range of aggressive tuning options with riemann -a), but we test and deploy with JDK 8, from both Oracle and OpenJDK, and JDK 9 from Oracle. You're welcome to give other JVMs a shot, though!

What protocol should I use to talk to Riemann?

Use the Riemann TCP protocol. If you have to, and know what you're doing, back off to UDP, HTTP, or Graphite.

Use UDP if you intend to have the OS and network discard data automatically, instead of application-level flow control. Don't use UDP "for speed"; it's designed to drop data, it will drop data, and you'll spend a long time trying to figure out why your stats are wrong. UDP may be appropriate for sampling events where discarding large parts of the datastream is OK, but if different nodes drop packets at different rates you'll introduce sampling bias. If you don't understand what this means, don't use UDP.

Use HTTP only if a TCP client is unavailable; it introduces significant encoding and state machine overhead, takes more bytes on the wire, and can't represent the full range of Riemann events.

Use the Graphite server and other compatibility shims only for interop with legacy systems. The graphite protocol can only represent a small subset of the Riemann data model; you'll have to do additional work to reconstruct meaningful events.

What ports does Riemann use?

Riemann runs servers by default on several ports.

  • TCP on port 5555.
  • TLS on port 5554.
  • UDP on port 5555.
  • Websockets on port 5556.
  • REPL on port 5557.
  • OpenTSBD on port 4242
  • Graphite on port 2003
Each server's port can be altered in your configuration.

(let [host "0.0.0.0"
      iport 1234]
  (tcp-server {:host host :port iport})
  (udp-server {:host host :port iport}))

Change the log file

To log to a file, just say

(logging/init {:file "/path/to/some/riemann.log"})

If you'd prefer to only log to stdout, just leave out :file

(logging/init)

Configure Logback

Under the hood, Riemann is using Logback.

To configure Logback, you can provide a configuration file via the system property logback.configurationFile.

For more information, please read the Logback documentation.

java -Dlogback.configurationFile=file:///path/to/logback.xml \
-jar riemann-standalone.jar

Securing traffic using TLS

You can use Riemann across any TCP or UDP VPN, or an SSH tunnel. Riemann also supports bidirectional TLS auth as a part of its TCP protocol.

Before you can use TLS, you need a CA, server key and cert, and a client key and cert. Refer to less-awful-ssl and easy-rsa for more details.

Then, in riemann.config, add a new (or repurpose an existing) TCP server for TLS.

(tcp-server {:host "0.0.0.0"
             :port 5554
             :tls? true
             :key "riemann_server.pkcs8"
             :cert "riemann_server.crt"
             :ca-cert "ca.crt"})

Refer to your client's documentation or source for the client TLS options. In riemann-clojure-client, try something like:

(riemann/tcp-client {:host "1.2.3.4"
                     :port 5554
                     :tls? true
                     :key "riemann_client.pkcs8"
                     :cert "riemann_client.crt"
                     :ca-cert "ca.crt"})

Overriding Riemann functions

Sometimes, something in Riemann doesn't work the way you'd like. When this happens, you can redefine any function in Riemann in the config file. Just switch to the appropriate namespace, redefine the function, and switch back to riemann.config.

For instance, let's say you wanted to change how emails are formatted. The namespace riemann.common has a function called body, which accepts a sequence of events and returns a string. We'll override it in our config file:

(ns riemann.common)
(defn body [events]
  ; pr-str formats events as a clojure-readable string.
  (pr-str events))
(ns riemann.config)

; And then your servers, streams, etc...
(tcp-server ...)
(streams ...)

Fault tolerance

Because Riemann performs arbitrary computation over the event stream, with side effects, it cannot be distributed safely. You can, however, distribute it somewhat less than safely--a technique aligned with Riemann's philosophy that "mostly correct information right now is more useful, than totally correct information only available once the failure is over."

One obvious distribution strategy is to place two Riemann nodes on the same switch and have them share a virtual IP address using, say, Heartbeat or Corosync. On resource acquisition, the node allocating the IP should issue a gratuitous arp broadcast over that interface advertising the new route to the relevant switch(es). Failover in this case looks essentially like restarting a Riemann node, which should be relatively painless--most Riemann deployments fill in missing state within a few seconds to minutes.

What happens if the nodes are isolated by a network partition and both claim the interface? Depends on your network, but the answer is "probably not good things". OTOH, there's no good way for *any* consensus algorithm in an asynchronous network to make latency-bounded decisions in a safe way. FLP rears its ugly head. You're going to lose data in the short term; the question is how to emit as much useful, actionable information as possible, so you can fix the problem.

I've got a sketch worked out for a highly-available clustered Riemann which tries to negotiate these tradeoffs in a sane way, but it involves minimum latency costs, severe throughput costs (I'm estimating three to five orders of magnitude slower), disk IO, all kinds of esoteric failure modes, and some moderately challenging mathematics. It'd take a long time to build and would require some serious educational work on my part. There's just no way around it: distributing arbitrary latency-bounded computation is hard. You're going to have to reason about consensus. I'm really sorry about this. :(

Sharding

Because Riemann performs arbitrary computation, it can't shard your workload automatically. You'll need to pick some axes along which to shard, perform local aggregation on those nodes, and reduce the results with some higher-level Riemann node. The forward stream can help you connect nodes to one another, and async-queue! can allow some elasticity around latency jitters and downstream node restarts, but the actual sharding and discovery mechanics you'll have to build yourself.

Client backpressure, latency, and queues

Event processing in Riemann's model is synchronous (though there's lots of asynchronous stuff going on internally). When you send an event to Riemann, it flows through every top-level stream, and possibly those streams children, and *their* children, and so forth. Once the event has flowed through every stream, Riemann sends an acknowledgement back to the client. This model enables *backpressure*: clients can use the acknowledgements to avoid overwhelming Riemann with more events than it can process.

Riemann keeps track of some internal metrics, exposed as events, for stream latency and TCP server latency. Stream latency is how long it took for an event to flow through every stream. Server latency is how long it took for an event to be picked up by Riemann's TCP server, decoded, handed off to the streaming system, processed, and for the acknowledgement to be sent back to the client.

If you wait for an acknowledgement every time you send a message to Riemann, each process will behave synchronously. This approach is just fine when there are lots of clients sending requests infrequently, where "infrequently" means, via Little's Law, that the typical period between events is greater than the tcp server latency plus the round-trip network time to Riemann itself.

Sometimes, though, you'll want to send events *faster* than that. You've got a few options here.

If you ignore the acknowledgement messages, you can just shove events at Riemann as fast as your runtime will let you. Those events will fill up a queue in front of Riemann's streaming system. If that fills up, they'll spill over into the Netty queue associated with the channel. If that fills up, they'll spill over into the TCP buffer on the Riemann box, and by extension, the queues in the network and the sending node. At some point TCP backpressure will come into play, but understanding that interaction can be complex.

Doing this kind of infinitely-asynchronous send will manifest as high memory consumption on the Riemann node (to store all those queued operations), and correspondingly high wait times for events. If events arrive *too* far delayed, they may no longer be valid by the time they arrive, and you'll start seeing services expire. It's like sending a message via a courier who gets stuck in a waiting room and dies en route.

You can detect this (and alert on it) by watching the service called "riemann netty event-executor queue size"; if it starts getting above a thousand events or so, you're likely overloading Riemann. This queue size is directly related to the latency distribution given by "riemann streams latency 0.5" and friends, and "riemann streams rate". The higher the latency, and the higher the throughput, the larger the queue.

So: infinite asynchrony is a problem. You want *some* degree of asynchrony, especially when Riemann processes faster than the time it takes to send events over the network. But too much and you'll crash into finite resource limits. We need *backpressure*: bounds on the number of outstanding asynchronous requests, or the rate at which we send them.

One option is simply to spin up more threads and have each send synchronously. Depending on the client's design, you may be able to have many threads concurrently sending operations over the same connection. Riemann-java-client and riemann-clojure-client, for instance, will interleave operations from multiple threads on a single network connection. If threads are cheap in your runtime, or you don't need very many (on the JVM, more than 10K IO threads is probably a warning sign), this is perfectly fine. If you're running in Erlang or Haskell, threads are dirt cheap; fire away.

If threads in your runtime are more expensive, or you don't have them at all--say, node.js--you can emulate them using other asynchronous constructs. Riemann-java-client and riemann-clojure-client, for instance, have async-send functions that return promises; you can dereference those promises to find out whether Riemann acknowledged the message. Deref'ing blocks until acknowledgement occurs, or throws an exception if something in the network or Riemann went wrong. You can specify a timeout for deref; if the timeout is zero, you get a nonblocking check ("did it go through yet?"). Fixed timeouts are useful for retries of idempotent operations. "Allow this much time, then send again". Same semantics as any other asynchronous or stateful construct in Clojure.

The important thing for *backpressure*, if you're emulating threads using asynchronous objects, coroutines, continuations, fibers, callbacks, or what-have-you, is that you a.) eventually dereference the results of the send, and b.) only allow a fixed number of outstanding asynchronous requests. Riemann-java-client automatically throws (or returns promises which throw) when you try to send more than a few thousand concurrent messages, forcing you to implement backpressure and flow control logic. You can tune this option if you like, but it's there to protect you against unbounded resource consumption on either Riemann's side or the client.

Different clients and different languages will expose these constructs in different ways, but you'll find the same core ideas of limited asynchrony, queuing, and backpressure in every networked system.

Debugging

Printing events to stdout or the log

Streams are just functions, so you can use any function that prints an event to stdout to see which events pass through a particular stream. prn prints an object to stdout, so adding prn to any stream prints the events at that stream to the console.

If you're running Riemann as a daemon, it probably won't have a stdout to print to. To write to the logfile (which is *also*, by default, printed to stdout), use #(info %), which is a shorthand for the anonymous function (fn [x] (info x)). Note that info logs events a little differently than prn, but you can use pr-str to recover the clojure representation if desired.

(streams
  (where (service "bar")
    ; Print event to stdout
    prn

    ; Print :foo, then the event
    #(prn :foo %)

    ; Log event to the logfile and stdout
    #(info %)

    ; Log event using the same representation as prn
    #(info (pr-str %))

    ; Log some specific fields
    #(info (:service %) (:metric %))

Troubleshooting missing events

First, determine whether the events you're looking for are actually making it into Riemann. This will help localize the problem to the network or the streams. Add a new top-level stream which simply logs all inbound events. Use a (where) filter around that logging stream to cut down on noise.

(streams
  #(info %))
(streams
  (where (service "some thing you're looking for")
    #(info %)))

If you don't see the events you're looking for in the logs, they must be dropped before arriving at the streaming system. Check:

  • Riemann clients are sending to the correct Riemann host and port.
  • Riemann is listening to that host and port. Double-check the config (e.g. (tcp-server) options), that the appropriate server startup message appears in the Riemann logs, and that and netstat -lntp | grep riemann shows the port bound.
  • The network can relay packets from the client machine to the Riemann server. Use telnet some.host 5555 or nmap -sT some.host -P 5555.
  • Your packets are making it from the client to the server. UDP messages can be dropped, delayed, duplicated, or re-ordered at any time by the network, or discarded if the receiving node's receive buffer is too full. Try using TCP.

If your messages are arriving in the streaming system, but aren't making it to the index or other output streams, gradually slide the #(info %) stream downstream towards the exit point, to verify at which point in the chain the events are not what you expect.

If you're indexing events, but they don't appear in queries, they may have expired from the index. Try (expired #(warn "expired" %)) to warn about expired events. They may be expiring because their TTL is shorter than the interval between events. They may also expire because the Riemann clock is in the future relative to the originating event, due to clock skew or network/processing latency.

Check the internal instrumentation for the Riemann queue depth and core latencies; if the queue is more than a handful of events deep, or stream latencies are on the order of the relevant event TTLs, you might need to reduce the load on the Riemann server or investigate downstream services that might be slow.

Check clock skew between nodes using watch date or similar. Make sure your clocks are in UTC, not local time. Never local time. Especially never Daylight Savings Time. You can use Riemann's clock-skew stream to measure clock skew as seen by the Riemann node as well.

(streams
  #(info %) ; First, measure here
  (where (service #"^riak .+")
    #(info %) ; Then move the info stream here to check the filter
    (by :service
      (coalesce
        #(info %) ; Third, check the coalesced vector of events
        (smap folds/maximum
          #(info %) ; Fourth, probe here to check the maximum calculation
          (with :host nil
            #(info %) ; Finally, check exactly what events are being applied
                      ; to the index.
            index))))))

Writing tests

Riemann has integrated support for building fast, repeatable tests into your config itself, including virtualized time and scheduler side effects and suppressing unwanted IO during test runs.

In your streams, use riemann.test/tap to declare named points where you'd like to observe events. For instance, we might want to track any events flowing into the index, so we'd surround it with a tap and call that tap :index.

(let [index (default :ttl 3 (tap :index (index)))]
  (streams
    (expired #(prn "Expired" %))
      (where (not (service #"^riemann "))
        index)))

Then, we can inject events into the config's streams to see what happens. Define tests in any config file using tests and deftest.

(inject! events) returns a map of tap names to a vector of the events that arrived at that tap. Time is always reset to zero at the beginning of each test, and is automatically advanced to each event time just prior to event arrival. You may manually advance the clock using riemann.time.controlled/advance!. Scheduled operations like rate, rollup, etc will take effect atomically as the virtual clock advances.

You can also inject events into specific streams using (inject! [stream] events).

Under the hood, tests expands to a new ns declaration based on the current namespace--if the current namespace is foo, tests take place in foo-test. tests automatically refers clojure.test's is and are, plus riemann.test's version of deftest and inject! for inserting events into the config's streams.

You can define regular clojure tests too; no need to call inject. Every namespace ending in -test is eligible for testing.

(tests
  (deftest index-test
    (is (= {:index [{:service "test"
                     :time    1
                     :ttl     3}]}
           (inject! [{:service "test"
                      :time    1}])
           ))))
(def foo-stream
  (where (service "foo")
    (tap :foo)))

(tests
  (deftest foo-test
    ;; inject in foo-stream only
    (let [result (inject! [riemann.config/foo-stream]
                   [{:host "localhost"
                     :service "foo"
                     :metric 10}])]
      (is (= [{:host "localhost"
               :service "foo"
               :metric 10}]
              (:foo result))))))
$ riemann test riemann.config
loading bin

Testing riemann.config-test

Ran 1 tests containing 1 assertions.
0 failures, 0 errors.

Some streams may have *side effects*, like sending events over the network or emitting emails. You can suppress those side effects by wrapping them in a test/io stream, which forwards events to its children normally in production, but ignores them in test mode.

io suppresses events, which means that (io (tap :foo)) will never see anything. Always place taps *outside* io expressions.

Note that tap and io are eliminated at compile-time during production mode, so there's zero performance penalty; won't count against your Hotspot inlining budget, etc.

From Riemann 0.3.0, the test framework runs a core. Functions requiring a core like riemann.config/reinject, indexing events, index querying and expiry will work. The index is cleaned at the beginning of each deftest.

Before Riemann 0.3.0, the test framework didn't actually run a core, so functions that require one, (like riemann.config/reinject), didn't work from within the test framework.

(streams
  (rate 5
    (io (email "foo@bar.com"))))

Connecting to the REPL

Riemann has an nREPL server built in. You can enable this in your config with:

(repl-server {:host "127.0.0.1"})

You can then use Leiningen 2 to connect to it.

git clone git://github.com/riemann/riemann.git
cd riemann
lein repl :connect 127.0.0.1:5557

you can reload the config by sending riemann a sighup, but you can also do it from the repl.

user=> (riemann.bin/reload!)

Instrumenting your systems

Measure Riemann itself

Riemann comes with instrumentation built-in, which is sampled periodically and injected into the event stream. You can query the dashboard for service =~ "riemann %" to see Riemann's internal metrics. All latencies are in milliseconds.

riemann streams refers to the stream processor: its throughput measures the number of events being passed to (streams ...) per second, and its latencies indicate the time it takes to process a single event through the streaming system.

The TCP, UDP, and Websockets servers instrument their throughput and latency where possible. TCP latency measures the time from initial message deframing to Netty write of the corresponding response. For instance, riemann server tcp 1.2.3.4:5555 in latency 0.95 measures the 95th percentile time, in milliseconds, for the TCP server on port 5555 to parse, queue, process, and dispatch a response for a message. The rate for servers measures *message* rate, not *event* rate.

The main queue connecting Netty IO threads to the parsing/execution threadpool is measured by riemann netty event-executor queue size, and is a critical measure of whether the streaming system is overloaded relative to inbound request load.

To disable instrumentation, or control how often events are sampled, see riemann.config/instrumentation.

Measure CPU, memory, and disk use

Riemann-tools includes a program called riemann-health, which measures the local host's cpu, memory, and disk use. You can install it from rubygems.

gem install riemann-tools

... and run riemann-health with the address of your riemann server like so:

riemann-health --host 1.2.3.4

Riemann-health reports utilization fractions by default, ranging from zero to one. Load average is divided by the number of cores, and disk use by capacity. You can adjust the polling interval, what percentage of CPU usage is considered critical, and more. riemann-health --help will tell you more.

Handle exceptions

Streams can produce exceptions. A special event containing the exception details will be sent to the first child of exception-stream if an exception occurs on other child streams.

(exception-stream (email "polito@vonbraun.com")
                  (async-queue!
                   :graphite
                   {:core-pool-size 128}
                   graph))

Monitor Riak

Riemann-tools includes riemann-riak, which uses Riak's HTTP stats interface, some filesystem checks, and optionally, some erlang RPC requests to measure Riak's get and put latencies, request throughput, ring status, and disk use. Run riemann-riak on each Riak node.

Riemann-riak comes with defaults for the Riak .deb packages shipped by Basho, but it's tunable for other configurations. See riemann-riak --help for more options.

gem install riemann-tools
riemann-riak --host 1.2.3.4

Custom event attributes

You can include arbitrary additional fields in Riemann events. These are not a replacement for service, host, and time: the composite primary key of a logical "thing" in the Riemann universe, but they *do* let you more easily aggregate and filter events, or carry richer contextual information associated with a single message.

In most clients you can simply pass an extra key and value in the event map. In strongly-typed clients like riemann-java-client, special methods like EventDSL.attribute(key, value) are present.

client << {service: "thumbnailer rate",
           metric:  5.0,
           build:   "7543"}

Custom attributes are always encoded as strings in Protocol Buffers (pull request welcome!), but you can replace them with parsed variants in your streams using smap, adjust, and friends.

In the Riemann server, custom attributes work just like normal attributes on events--they're just a bit slower, owing to the hashmap lookup. Just like normal fields, you can use get, assoc, dissoc, update, and all the other Clojure functions over maps. Custom attributes do *not* have shorthand syntax in (where) (to prevent arbitrary symbol capture), but are accessible through (:my-field event).

(where (service "thumbnailer rate")
  ; Convert build numbers from strings to longs
  (adjust [:build #(Long. %)]
    ; Compute a throughput for each specific build
    (by :build
      (smap #(assoc % :service (str (:service %) " build " (:build %)))
        (rate 5 index)))

    ; Or maybe an old version reported numbers that were 2x larger than they
    ; should have been
    (where (< (:build event) 1055)
      (scale 1/2 index)
      (else index))))

Working with streams

Example configs

The Guardian has published their Riemann config files, which may be a useful guide to advanced use.

Split streams

Splitting streams is the default in Riemann. Almost all streams, unless otherwise documented, take any number of child streams as their final arguments, and forward the same events on to all of them. Because Riemann uses Clojure's immutable data structures, you don't have to worry about locking or mutation. Streams "change" events by sending altered, shared-structure *copies* of events downstream. For instance, this where expression sends matching events to two child streams; one of which indexes a copy of the event with service "foo", and the other with service "bar".

(where (host "db04")
  (with :service "foo" index)
  (with :service "bar" index))

Combine streams

Sending events from two places to the same stream is simple: just bind the stream to a variable. In Clojure, (let [x 1 y 2] ...) assigns x to 1 and y to 2 everywhere inside the let expression. Since streams are values, you can bind them to variables too. In this example, we create a single rate stream and send events to it along multiple paths.

; Create a stream for the total rate, and name it `aggregate`
(let [aggregate (rate 5 (with :service "req rate" index))]
  ; Return a stream which splits up events based on their service
  (splitp = service
    ; HTTP requests pass straight to the aggregate stream
    "http req rate" aggregate
    ; But we'll double the metrics for 0mq requests
    "0mq req rate"  (scale 2 aggregate)))

Even easier: use the pipe stream, which compiles to the same code as the let expression above:

; Within the pipe expression, - marks where events flow to the next stage
(pipe -
  ; The first stage is to split up events by their service
  (splitp = service
    ; HTTP requests pass straight through
    "http req rate" -
    ; But we'll double the metrics for 0mq requests
    "0mq req rate" (scale 2 -))
  ; The second stage is to compute a rate and index it.
  (rate 5 (with :service "req rate" index)))

Distinct streams for each host, service, etc.

Often, you want to run *many copies* of a particular stream; say, computing a rate for each service separately, or each [host, service] pair. Use by to bifurcate a stream into distinct copies based on a function or vector of functions of that event. For instance, to sum up disk use across hosts, but retaining a distinct sum for each service, you could write:

(by :service
  (coalesce
    (smap folds/sum
      (with :host nil
        index))))

Or to compute a rate for each distinct host and service:

(by [:host :service]
  (rate 5 index))

Filter events

Often, we want to do some operation only on a subset of events flowing through a stream. For instance, we might only want to email events which have the state "error". The basic filtering streams are where, her less magical sister where*, match, tagged-all and tagged-any, and expired. Most of the time, you'll use where.

Where takes a predicate, which is a special expression for matching events. After the predicate, where takes any number of child streams, each of which will receive events which the predicate matched. For example, we could email only events which have state "error".

(streams
  (where (state "error")
    ; Only events which have the state "error" are passed on to the email
    ; stream:
    (email "delacroix@vonbraun.com")))

The where stream provides some syntactic sugar to allow you to access your event fields. In a where stream you can refer to "standard" fields like host, service, description, metric, and ttl by name. If you need to refer to another field you need to reference the full field name, (:field_name event).

(streams
  (where (= (:type event) "evolution")
    ; Only events which have a type of "evolution" are passed on to the email
    ; stream:
    (email "delacroix@vonbraun.com")))

Where expressions can match particular values of a field; like passing on events which occur only on the Von Braun:

(where (host "von braun") ...)

Regular expressions (in Clojure, strings like #"foo .+") can be used to match fields as well.

(where (description #"an+elids") ...)

You can filter for the presence of a given tag.

(where (tagged "mutant") ...)

There are also streams specifically for selecting events which have some, or all, of a set of tags. Tagged is shorthand for tagged-all.

(tagged-any ["cat" "dog"] (with :service "animals/sec" (rate 1 index)))
(tagged-all ["ops" "ddos"] (email "ops@githug.com"))

(tagged "page" page-ops)

Numeric functions work like you'd expect:

(where (>= (* metric 1000) 2.5))

Which makes range queries easy:

(where (< 5 metric 10))

Predicates can use any function or macro, including boolean operators.

(where (not (or (tagged "www")
                (and (state "ok") (nil? metric)))))

You can also define and use arbitrary functions in your predicates. (where) binds the variable event to the event being considered.

(defn global? [event] (nil? (:host event)))
(where (global? event))

Higher-order functions work like you'd expect, so you can happily map, fold, filter, etc inside a where predicate too. For instance, we can match any of several regular expressions:

(where (some (fn [r] (re-find r service)) [#"cassandra disk .+"
                                           #"cassandra net .+"
                                           #"cassandra latency \d+"])
  ...)

Where streams forward an event to their children when the predicate matches. When the predicate *doesn't* match, where will forward events to any children in an (else) block.

(where (or (state "ok") (state "warning"))
  index
  (else
    (email "ops@foo.com")))

Set thresholds

You can use (where) to alert you when a metric goes out of bounds. Where the metric falls between zero and five, inclusive, index it with state "ok". Otherwise, index it with "warning":

(streams
  (where (<= 0 metric 5)
    (with :state "ok" index)
    (else
      (with :state "warning" index))))

Detect state transitions

The changed stream forwards on events when some fields of that event change. For instance, you can send an email whenever the state changes:

(streams
  (by [:host :service]
    (changed :state
      (email "ops@startup.io"))))

Note that we use (by) so that each distinct host and service track their changes independently. Otherwise, we'd get alerts when service A reported "ok" and service B reported "down". There's a shorthand for this:

(streams
  (changed-state
    (email "ops@startup.io")))

When you start Riemann, it doesn't know what the previous state was. By default, changed will forward on the first event it receives. You can tell changed and changed-state to *assume* an initial state with the :init option:

(streams
  (changed-state {:init "ok"}
    (email "ops@startup.io")))

I tend to include this snippet in my riemann configurations so I can keep track of which services are starting, crashing, and gracefully restarting.

Measure your app's latency profile

; Imagine you wanted to know the time it takes for your app's API requests
; to complete. The API emits events like:
;
; {:service "api req"
;  :metric: 0.240} ; 240 milliseconds
;
; So first, we select only the API requests

(where (service "api req")

       ; Now, we'll calculate the 50th, 95th, and 99th percentile for all
       ; requests in each 5-second interval.

       (percentiles 5 [0.5 0.95 0.99]

                    ; Percentiles will emit events like
                    ; {:service "api req 0.5" :metric 0.12}
                    ; We'll add them to the index, so they can show up
                    ; on our dashboard.

                    index)

       ; What else can we do with API requests? Let's figure out the total
       ; request rate. (rate interval & children) sums up metrics and
       ; divides by time.

       (rate 5)

       ; But this isn't quite right--these event metrics are *times*, so
       ; we're actually calculating the number of seconds spent by the API,
       ; each second. So we *set* the metric of every event to 1, *then*
       ; take the rate:

       (with :metric 1 (rate 5 index))

       ; (with) takes each event and calls (rate) with a *changed*
       ; copy--one where :metric is always 1. Then (rate) adds up all those
       ; 1's over five seconds, and sends that metric to the index.

       ; (with) has a counterpart, by the way: (default). It works exactly
       ; the same, but it only alters the event when the value is nil. Both
       ; with and default accept maps as well:

       (default {:state "ok" :ttl 60} index)
)))

Report exceptions

Where you handle exceptions in your application, you can also submit an event to Riemann. I usually set the service to the application name, include the stacktrace as the description, and tag the event with both "exception" and the classname of the exception.

(tagged "exception"
  (email "ops@foocorp.com"))

(tagged-all ["exception" "DatabaseError"]
  (email "db@foocorp.com"))

It's also easy to track the *rate* of exceptions per second, so you can graph how many unexpected failures occur per day, and determine which services need the most attention.

(where (tagged "exception")
  (with :metric 1
    (by :service
      (adjust [:service str " exception rate"]
        (rate 10 index graph)))))

Roll up and throttle events

Sometimes, when things break, they submit a *ton* of events. Maybe you receive sixty thousand exceptions in five minutes. You need to know that a problem exists--but don't need to know about *every single failure*. Riemann has two streams for controlling the rate of events.

rollup will allow a few events to pass through readily. Then it starts to accumulate events, rolling them up into a list which is submitted at the end of a given time interval.

Let's define a new stream for alerting the operations team, which sends only five emails per hour (3600 seconds). We'll receive the first four events immediately--and at the end of the hour, a single email with a summary of all the rest.

(def tell-ops (rollup 5 3600 (email "ops@rickenbacker.mil")))

(streams
  (where (state "error") tell-ops)
  (tagged "exception" tell-ops))

Rollup holds on to every event, which can use a lot of memory. Sometimes it's OK to drop some events instead of delivering them. That's where throttle comes in: it allows the first five events through, then ignores all the rest for an hour.

(def tell-ops (throttle 5 3600 (email "ops@rickenbacker.mil")))

And naturally, you can combine throttle and rollup to preserve *some* events, but not allow unbounded memory use:

(def tell-ops
  (throttle 1000 3600
    (rollup 5 3600
      (email "ops@rickenbacker.mil"))))

Detect down services

Each event has a TTL, which states how many seconds the event is valid for after its given time. When a service crashes catastrophically, it will stop submitting events and its events in the index will become stale. Periodically, the index sweeps out events past their lifetime, deletes them from the index, and streams an event for that host and service with state "expired".

You can control how often the index scans for expired events with periodically-expire. Here, we check every ten seconds. If you *don't* want events to expire from the index, you can delete this line from your config:

(periodically-expire 10)

If events have no TTL given, the index assumes their TTL is 60 seconds. You can provide a default TTL for any incoming events that don't provide one:

(streams
  (default :ttl 10
    ; your streams here
    ))

Now, any services that fail to check in every ten seconds will expire (unless they give a different TTL). Since expired events have a different state, any (changed-state) streams will detect the transition and can alert you:

(streams
  (changed-state {:init "ok"}
    (email "ops@foo.com")))

You can also explicitly filter expired events:

(streams
  ; Email any expired event
  (expired (email "ops@foo.com"))

  ; Process only events which are *not* expired
  (not-expired
    ...)

  ; You can also call the expired? function in a where clause
  (where (not (expired? event))
    ...))

By default, Riemann copies [:host :service] to expired events. You can control what keys from events are copied onto expired events by passing :keep-keys to periodically-expire:

(periodically-expire 10 {:keep-keys [:host :service :tags]})

With that in place, you can filter expired events on tags. This way, your app can decide whether an event is worthy of an alert by tagging it:

(streams
  (expired
    (tagged "notify-me" (email "ops@foo.com"))))

You can add a new tag, or a set of tags, to events using the tag stream:

(streams
  (tag "foo"
    #(info %))
  (tag ["foo" "bar"]
    #(info %)))

If you're worried about spikes or flapping you can make use of the stable variable. The stable variable detects stable streams of events over time. It takes a time period and event function as input and only passes on events that remain the same for the specified time period.

(streams
  (changed-state {:init "ok"}
    (stable 60 :state
      (email "ops@foo.com"))))

Group events in time

Sometimes you want to ask a question about the last few minutes, or of groups of 10 consecutive events at a time.

Combine events from different hosts and services

Events arrive independently in Riemann, but each describes the state of a service on some host over some window of time. Often, you want to know how two services relate to one another right now. Riemann provides two streams for this type of aggregation: coalesce and project.

Use project when you want to combine a fixed number of events, identified by arbitrary predicates. For instance, you might want to know the ratio of used gigabytes to total disk capacity, or the difference between enqueues and dequeues in a queue.

Use coalesce to get snapshot of the most recent event from every host and service that passes through that coalesce stream. Coalesce helps answer questions like "what fraction of my API servers are running the most recent version", or "what's the median queue depth across all queues in a given broker?"

Apply a regular expression to a field

You can test whether a field matches a regular expression by simply passing a regex to where. Clojure regular expressions look like #"...".

(where (service #"api req latency [\d\.]+")
  ...)

Or you can transform a string using a regular expression and capture groups (also called "backreferences") using clojure.string/replace.

(require '[clojure.string :as str])

(streams
  ; Replace services like "legacy <anything>" with "new <anything>". Passes
  ; through services that don't match the regular expression, unchanged.
  (adjust [:service str/replace #"^legacy (\w+)$"]
    ...))

Change units

Use the scale stream. For instance, to convert bytes to gigabytes, scale by 1/1024^3.

(scale (/ 1 1024 1024 1024)
  index)

Find the host using the most CPU

If you're running riemann-health, all hosts will report events like {:service "cpu" :host "foo" :metric 0.12}. You want to know which host is under the heaviest CPU load. The coalesce stream remembers the last events from each host and service, and sends them all as a vector to its children. We can map that vector of events to a single event--the one with the largest metric--using folds/maximum. Then we just set the service and host, since this event pertains to the system as a whole.

(coalesce (smap folds/maximum
  (with {:service "Max CPU" :host nil} prn)))

Count total number of hosts

Sometimes it is useful to know how many hosts are sending data to Riemann. This is especially useful in cloud environments where nodes are constantly scaling up and down. We'll assign every event the same service, use coalesce to combine the current set of hosts and services into a single map, throttle to cut down on the number of updates, and folds/count to count the distinct set of hosts. Finally we'll strip the host from that event, since it represents a property of the entire system, and index it.

; All services should be the same
(with :service "distinct hosts"
  ; Combine hosts and services
  (coalesce
    ; Turn lists of events into a single event with the count as its metric
    (smap folds/count
      ; Strip host field
      (with :host nil
        index))))

Alerting when a certain percentage of events happen

Sometimes you'll have a service that will fail. You might expect one or two failures, but if you get over a certain percentage of failures you want to be notified. In this case you can use a fixed-time-window.

(streams
  (where (and (service "signin")
              (not (expired? event)))
    ; We want to get alerted about failed sign ins. However we expect that
    ; there will be failures due to incorect passwords etc. So we only want to
    ; get alerted if more than 50% of the signins in a 60 second period are
    ; failures. Failed signings have state "warning".
    ;
    ; fixed-time-window sends a vector of events out every 60 seconds
    (fixed-time-window 60
      ; smap passes those events into a function
      (smap (fn [events]
        ; Given a list of events, we'll find the number which have state
        ; warning, divide by the total number of events, and emit a new event
        ; based on the ratio.
        (let [fraction (/ (count (filter #(= "warning" (:state %)) events))
                          (count events))]
          ; The metric for this event will be the fraction of failed signins,
          ; and the state will depend on how many failures we see.
          (event {:service "signin failures"
                  :metric  fraction
                  :state   (condp < fraction
                             0.7 "critical"
                             0.3 "warning"
                                 "ok")})))
        ; Now we can use those "signin failures" events to alert on state
        ; transitions:
        (changed-state (email "ops@trioptimum.com"))))))

Arbitrary functions as streams

Since streams are just functions which take an event as their sole argument, you can do any computation over events that you like. Just write a function, either named or anonymous.

You can run any Clojure code you like here; using Clojure or Java libraries on the classpath, writing to the network, logging to disk, using existing variables from the config, and so on. It's up to you.

(where (service "foo")
  (fn [event]
    ; Log a message
    (info "I got an event:" event)

    ; Then extract some fields and insert them into a DB.
    (save-to-my-database (:description event) (:metric event))))

Create your own stream function

Riemann builtins like rate, percentiles, smap, and so on are all functions which take a variable number of child streams and returns a function that takes an event and responds to that event. You can build your own stream-generating functions too.

Let's create a trivial example.

(defn hello-stream [& children]
  (fn [e] (let [new-event (assoc e :hello :world)]
    (call-rescue new-event children))))

Our stream returns a function that when it receives an event, adds a new key-pair via assoc and then uses call-rescue, a standard library function, to pass the event on to all the child streams it was passed.

Add this new stream to the list of event streams.

(streams (hello-stream prn))

Now when you pass an event into Riemann you should see the event printed out with an added key-value pair of :hello :world.

Streams all essentially work in this way, if you look at the source code of the Streams namespace in streams.clj. You can find a few simple examples in the functions match, expired, under and over which filter whether the event is passed along the event stream.

Reinject events

To reinsert an event back into the current core, use riemann.config/reinject. Reinjected events flow through all top-level streams, just as if they had just arrived from the network. In this example, incoming "initial" events are remapped to "derivative" events and reinjected into the streaming system. Each event is printed twice, first "initial", then "derivative".

(streams
  (streams/where (service "derivative")
    #(info "derivative" %))
  (streams/where (service "initial")
    #(info "initial" %)
    (streams/with :service "derivative" reinject)))

Query the index from within a stream

Sometimes, you want the behavior of a stream to depend on the current state of the index. Say, for instance, that you wanted to use a maintenance-mode service to suppress alerting. To enter maintenance mode, we'll submit an event to Riemann which has a TTL as long as the maintenance window we want. Here's a ten minute window, sent from a Ruby client:

1.9.3p385 :001 > Riemann::Client.new << {service: "maintenance-mode",
                                         host:    nil,
                                         state:   "active",
                                         ttl:     600}
 => nil

Next, we'll write a function which identifies whether maintenance mode is active. We could use riemann.index/lookup to find a particular host/service pair, or riemann.index/search for a more general query.

(defn maintenance-mode?
  "Is Riemann currently in maintenance mode?"
  []
  ; Take an expression representing a query for maintenance mode
  (->> '(and (= :host nil)
             (= :service "maintenance-mode"))
       ; Search the current Riemann core's index for any matching events
       (riemann.index/search (:index @core))
       ; Take the first match
       first
       ; Find its state
       :state
       ; Is it the string "active"?
       (= "active")))

Now we can use that function inside of a where predicate, to pass events on only when we're *not* in maintenance mode.

(streams
  (changed-state {:init "ok"}
    (where (not (maintenance-mode?))
      (email "ops@trioptimum.com"))))

Naturally, you could parameterize the maintenance-mode function, perhaps taking a host, tag, or service as an argument, and searching for particular matching events.

(defn maintenance-mode? [host]
  ; Using (list) to build a query dynamically
  (->> (list 'and (list '= ':host host)
                  '(= :service "maintenance-mode"))
       ...))

(where (not (maintenance-mode? host))
  (email "ops@trioptimum.com"))))

Understanding the Riemann stream model

Streams are functions that take an event. There's a subtle distinction here: we use the term "stream" to describe both the stream itself, but also the functions that generate those streams. Remember, in Lisp, (fun arg1 arg2) means "Call fun with arg1 and arg2".

; A function that generates a stream
rate

; Calling the rate function with the arguments `5` and `index`
; returns a stream: a function that accepts events, computes
; a rate every five seconds, and sends rate events to the index.
(rate 5 index)

Riemann usually calls your streams with events for you. That's what the streams function in riemann.config means: "here are some functions that you should call when new events arrive".

; Generate a rate stream, then pass that stream to the `streams` function,
; which tells Riemann to remember that stream and call it with each event as
; an argument.
(streams
  (rate 5 index))

When a new event arrives, Riemann takes each function you've specified using `streams` and calls it with that event. Here's the source of riemann.core/stream!.

That's it. Riemann's entire streaming model is just a function call and some careful API design. All the functions in riemann.streams, like where, rate, with, etc. take streams as arguments and create new streams which call them, usually transforming the events they receive in some way.

(defn stream!
  "Applies an event to the streams in this core."
   [core event]
     (instrumentation/measure-latency (:streaming-metric core)
         (doseq [stream (:streams core))
           (stream event))))

Why is my code only running once?

New users sometimes have trouble understanding Riemann's execution model. For example, you might write a config like this, which is supposed to apply the local Riemann server's current time to every event.

However, this code doesn't do what you might expect. It assigns the same time--the time that Riemann last reloaded its config--to every event. Why?

Riemann evaluates the config once at configuration time, building up a set of streams and setting up services like logging, the TCP server, and so on. Once the config is processed, Riemann begins passing events to those streams.

(streams
  (with :time (unix-time)
    index))

So, at config time, Riemann follows the normal rules of Clojure evaluation, and calls (unix-time) to obtain the current server clock.

(streams
  (with :time 12345
    index))

Then it calls (with :time 12345 index) to build a stream that takes events, assigns the time 12345, and sends them to the index.

Finally, Riemann calls (streams) with that function, telling Riemann to apply that stream to all incoming events: assigning the same time to all events.

(streams
  #<fn-taking-an-event>)

To run code each time an event arrives, we need a function. We could write a new stream from scratch, or use a stream like smap that uses a function to transform each event before passing it on to children.

(streams
  (smap (fn [event]
          (assoc event :time (unix-time)))
        index))

Calling streams yourself

If you use Riemann's built-in stream-generating functions, you don't have to worry about calling streams yourself; Riemann does it for you. But sometimes, you want to write your own streams, and will need to call some child streams yourself.

(where (service "cat")
  (fn [event]
    (let [event (assoc event :state "meowing")]
      ; What now???)))

You might try creating a new stream and calling it with the event, but this will probably behave weirdly. In this example, every time a new event arrives, we create a fresh rate stream, with completely new state, and send our event to it. We probably wanted to send each event to the same rate stream.

(where (service "cat")
  (fn [event]
    (let [event (assoc event :state "meowing")]
      ((rate 5 index) event))))

If we're writing a new function to generate streams, we might have our downstream children passed in as an argument; see any function in riemann.streams for an example. If we're just defining a one-off stream inline, we can bind the child to a variable using let.

(where (service "cat")
  (let [downstream (rate 5 index)]
    (fn [event]
      (let [event (assoc event :state "meowing")]
        (downstream event)))))

If we want to call multiple downstream children, use sdo to wrap them up into a single stream.

(where (service "cat")
  (let [downstream (sdo (rate 5 index)
                        prn)]
    (fn [event]
      (let [event (assoc event :state "meowing")]
        (downstream event)))))

Even easier, in this particular case, is to use a general-purpose transformation stream like smap.

(where (service "cat")
  (smap (fn [event]
          (assoc event :state "meowing"))
    (rate 5 index)
    prn))

Generating events from within a stream

There are times where you will want to generate new events from an existing stream. Most of the time, it is preferrable to use riemann.streams/with to modifying fields from an existing event, but there are cases where it would hinder readability or it is just not possible. In that case, use riemann.common/event.

(fixed-time-window 60
  (smap (fn [events]
          (let [errors   (filter (comp #{"warning" "critical"} :state) events)
                fraction (/ (count errors) (count events))]
            ; Build a new event using a map of fields and values.
            (event {:service "error percent"
                    :metric fraction})))
    index))

Asynchronous streams

In Riemann, streams are expressed by function composition. Each stream is a function that takes an event and does something with it. Child streams are just functions too, so when an event propagates through different layers of streams, it's actually just a series of function calls. We do this for three reasons:

  1. Locality. When one function calls another, the event remains on the stack--which is almost certainly in CPU cache. If streams were asynchronous, we'd have to defer the event to some mutable shared state, likely somewhere on the heap. That causes contention: the scheduler is, to some degree or another, mutable state being accessed by multiple threads, which may have to undergo expensive memory-safety operations. We also avoid cache stalls in transfering that event and its processing context into and out of some scheduler.
  2. Hotspot. Modern JVMs aggressively optimize function calls via inlining, escape analysis, and other techniques. Asynchronous execution makes it harder for the JIT to identify optimization opportunities. Because stream handoff is cheap in Riemann, we can use lots of streams to solve problems instead of worrying about the handoff cost. This means that Riemann's API can break problems into smaller, more composable pieces.
  3. Predictability. It's easier to reason about Riemann stacktraces because they have the same structure as the source. It's easier to predict when side effects will happen. And backpressure is trivially built in; no need to reason about concurrency limits for each stream.

In practice, most Riemann streams are pure functions or only mutate in-memory state; Riemann is typically cpu-bound. We have a threadpool, on the order of num-processors, which runs an event through all streams in the core. This makes Riemann quite fast--but when you make a blocking call to a downstream service, sometimes the rules change. You want additional parallelism for IO, or an explicit queue in between two streams. In keeping with the Riemann philosphy of small, re-usable components, you can make any stream asynchronous, with its own bounded queue, worker threadpool, and performance metrics, via the async-queue! stream.

For instance, the forward stream forwards events to another Riemann server, and returns only when that server has acknowledged the message. This provides backpressure and acknowledgement: a client knows that its message was accepted not only by the local Riemann server, but also by the downstream Riemann server.

(let [index (index)]
  (streams
    (forward (riemann.client/tcp-client :host "agg.riemann.prod"))
    index))

But if that link is slow, and we want to return immediately, we can defer execution of that stream onto a threadpool via a fixed-size queue. Because we acknowledge before the downstream server acknowledges, we might falsely acknowledge a message. The semantics of the stream have changed; we're gaining performance and insensitivity to downstream latency, but at the cost of safety.

We bound the number of in-flight operations to the downstream service by specifying how many threads are allowed, and bound memory consumption via the queue size. If the queue fills up, events passed to the async-queue stream will immediately throw an exception informing you that the queue is full; as with all exceptions in Riemann, you can explicitly catch them using an exception handler stream, or allow Riemann to log the failure. As with all Riemann streams, exceptions don't interfere with other child streams.

All async-queue services support hot config reloading (though they won't cleanly drain events between services if their parameters change), and come with internal metric events about their queue size, outstanding threads, etc. They'll alert you via state changes if they get too full, so make sure to monitor events like "riemann executor rejected rate" with changed-state so you'll know when they start dropping events.

Note the bang (!) at the end of async-queue; this function is dangerous because it mutates state. Async-queue creates a threadpool and changes the Riemann core. You *don't* want to create these dynamically, or you could wind up with hundreds or thousands of queues and threadpools. Never put an async-queue inside of a by, for example: by would create a new async queue for each distinct host or service, etc. For safety, define your async-queue streams in a let binding, and call them wherever needed in streams. That way you'll know it's always the same executor.

Remember, you can wrap any stream in this asynchronous layer. Combining batching and async-queue can dramatically improve throughput; sometimes by several orders of magnitude. Each has its cost: batching increases latency, and async-queue means dropping events if the queue fills up. As a transient event processor, Riemann emphasizes *dropping* old data rather than getting backed up or shutting down entirely. Think of async-queue like a safety valve; allowing you to still process requests by shunting some of the incoming stream aside. If you want guaranteed delivery over long timescales, use an on-disk queuing system like Kafka, either in front of or behind Riemann.

(let [index (index)
      downstream (async-queue!
                    :agg-forwarder        ; A name for the forwarder
                    {:queue-size     1e4  ; 10,000 events max
                     :core-pool-size 4    ; Minimum 4 threads
                     :max-pool-size 100}  ; Maximum 100 threads
                    (forward
                      (riemann.client/tcp-client :host "127.0.0.1")))]
  (streams
    index
    ; We'll accumulate batches of at most 100 events every 10th of a second
    ; before sending those events downstream to the other Riemann node, via
    ; the async queue.
    (batch 100 1/10
      downstream)))

Organizing with functions

Riemann's config is a Clojure program; you can break up your config into smaller, testable, re-usable pieces using functions, and organize those functions into namespaces. For example, here's a config which computes rates and percentiles over two different services.

(let [index (index)
      graph (graphite {:host "graphs.internal")]
  (streams
    (where (service "foo")
      (by :host
        (with {:metric 1, :service "foo rate"}
          (rate 5 index graph))
        (percentiles 5 [0.5 0.95 0.99] index graph)))

    (where (service "bar")
      (by :host
        (with {:metric 1, :service "bar rate"}
          (rate 5 index graph))
        (percentiles 5 [0.5 0.95 0.99] index graph)))))

Let's break that rate and percentile pattern out into its own function. The two where expressions match different services, so we'll make the service name an argument to the function. We also need to send our rate and latency events to two child streams--the index, and the graphite client--so we'll make those the two arguments to the function. The function will construct a stream using (where ...) and return that to its caller.

(defn ...) defines a new function, named rate+percentiles. We provide a docstring that describes the function, and a list of arguments in [brackets]. Then we create a stream using where. The final (in this case only) expression in a defn is the function's return value.

(defn rate+percentiles
  "Filters events to those with the given service, then, for each host,
  compute a rate and percentile distribution every 5 seconds, and forward
  those events to the given index and graphite client."
  [svc index graph]
  (where (= service svc)
    (by :host
      (with {:metric 1, :service (str svc " rate")}
        (rate 5 index graph))
      (percentiles 5 [0.5 0.95 0.99] index graph))))

Then we just call that function in our streams, filling in the service, index, and graphite client we'd like to send events to. Remember, in Clojure, (fun a b) means "call the function fun with arguments a and b".

(let [index (index)
      graph (graphite {:host "graphs.internal")]
  (streams
    (rate+percentiles "foo" index graph)
    (rate+percentiles "bar" index graph)))

These functions can, in turn, call other functions, allowing you to build up complex streams out of small, well-organized parts. Everything in Riemann is defined in terms of functions like this: the same tools we use to build Riemann's internals are in your hands too.

Use functions whenever you start nesting too deeply, or whenever you can put a name to a well-defined stream. Use functions to transform events! Use functions to figure out what time it is! There's no limit. For more on functions and Clojure in general, see Clojure from the Ground Up: Functions.

Organizing with namespaces

As your config grows, you'll have more and more functions--and more and more names to worry about. It helps to break things up into namespaces--typically one per file. Clojure provides a first-class namespace system, which you can use to organize your configuration. For instance, we might have a file in /etc/riemann/mycorp/queue.clj which knows how to handle all the events from a queuing system.

We start the file with a namespace declaration ns, and pull in some common Riemann functions using :require. If you need functions from other namespaces, you can add them as well. Then we define functions that help build queue-related streams.

(ns mycorp.queue
  "Instrumentation and alerts for our distributed queue and consumers"
  (:require [riemann.config :refer :all]
            [riemann.streams :refer :all]))

(defn capacity [index pagerduty] ...)

(defn rate-of-growth [index pagerduty] ...)

(defn queues
  "Handles all queuing events."
  [index pagerduty]
  ; We'll just route all events to each of these streams
  (sdo
    (capacity       index pagerduty)
    (rate-of-growth index pagerduty)))

In our top-level riemann.config, we don't want to worry about capacities or rates of growth or anything else. We'll just load every .clj file in the mycorp directory, and require the mycorp.queues namespace, giving it the short name q. Then we can call any function from that namespace using q/some-fn.

Note that require has a slightly different shape in a ns declaration versus when it's called as a function: when called as a function, we put a single quote in front of the vector. There are Good Reasons (TM) for Clojure to work this way, but it's a long story and we won't dig into it here.

(include "mycorp/")
(require '[mycorp.queue :as q])

(let [index (index)
      pd    (pagerduty ...)]
  (streams
    (where (service #"queue ")
      (q/queues index pd))))

Working with the dashboard

Find out what services are in the index

Some producers, like collectd, spew a huge volume of service into Riemann, which can make it tough to figure out which service to use. You can dump every service from the Riemann index with a client in your favorite language, though. Just issue a query (e.g. true for all events, though this might be slow), and extract the fields you're interested in.

gem install riemann-client
irb -r riemann/client
Riemann::Client.new(host: "my.riemann.server")["true"].map(&:service).sort.uniq.each { |x| puts x }

Integrating with other systems

Make sure to check the asynchronous streams docs; it may be appropriate to batch or defer events which flow to downstream systems.

Send email

riemann.email can send single events--or vectors of events--via email.

You can use any options for Postal.

(let [email (mailer {:from "riemann@trioptimum.com"})]
  (streams
    (where (state "critical")
      (email "shodan@tauceti.five"))))
(mailer {:from "riemann@trioptimum.com"
         :host "mx1.trioptimum.com"
         :user "foo"
         :pass "bar"})

Forward to Graphite

First, you define a client to graphite, which maintains a connection pool.

Then, use the client as a stream.

Or just graph everything.

(def graph (graphite {:host "my.graphite.server"}))
(streams
  (where (service "thing-to-graph")
    graph))
(streams
  graph)

Forward to Librato Metrics

Create a client for librato-metrics with your username and API key.

Then use it in streams.

You can submit events as annotations too. See the librato-metrics documentation.

(def librato (librato-metrics \"aphyr@aphyr.com\" \"abcd01234...\"))
(streams
  (tagged \"latency\" (librato :gauge)))

Notify with Pagerduty

Create a client with your Pagerduty service key, then use :trigger and :resolve to open and close issues for a given host and service.

You can also pass http options using the :options key, and a custom formatter for the Pagerduty event with the :formatter key.

You can use the Pagerduty v2 API by setting the :version key to :v2. In v2, each event can contain a :dedup-key key to handle alert de-duplication.

(let [pd (pagerduty {:service-key "my-service-key"
                     :options {:proxy-host "127.0.0.1"
                               :proxy-port 8118}})]
  (streams
    (changed-state
      (where (state "ok")
        (:resolve pd)
        (else (:trigger pd))))))

Sent events to Slack

Create a Slack webhook for Riemann. Then create a Slack client using the token from the last element of your webhook URL, for example with a URL of:

https://hooks.slack.com/services/QWERSAFG0/AFOIUYTQ48/120984SAFJSFR

Then your token would be 120984SAFJSFR. Specify the user name of the Riemann bot, a channel to notify in and an icon for the notification.

(def credentials {:account "your_org", :token "your_token"})
(def slacker (slack credentials {:username "Riemann bot"
                                 :channel "#ops"
                                 :icon ":smile:"}))

(streams
    (where (state "critical")
      slacker))

Forward between Riemann servers

When you have *many* events, you can use multiple Riemann servers to scale out. You might, for instance, run one Riemann server per data center, and forward only state changes in each service to a master server for a birds-eye view.

(streams
  (let [client (tcp-client :host "aggregator")]
    (by [:host :service]
       (changed :state
                (forward client)))))

Forward to InfluxDB

Create a InfluxDB client for Riemann. Using :version :new-stream allows to use the new InfluxDB stream. Otherwise, the old stream is used.

You can then send events to InfluxDB. You can use smap for example to format events for the new stream.

For the old stream, you can override per event the :tag-fields and :precision options. For the new stream, you can override per event :consistency, :db, :retention and :precision.

If a field is a Clojure Ratio, it will be converted to Double

(def influx (influxdb {:host "localhost"
                       :db "riemann"
                       :version :new-stream}))
(streams
  (smap
    (fn [event]
      (assoc event :measurement     (:service event)
                   :influxdb-tags   {:state (:state event)}
                   ;; :value = 0 by default
                   :influxdb-fields {:value (or (:metric event) 0)}))
    influx))

Kafka Integration

Consume from Kafka

You can consume events from Kafka topics by configuring a Kafka consumer.

For a full list of :consumer.config options see the Kafka consumer docs. Note that the :enable.auto.commit option is ignored and defaults to true.

(kafka-consumer {:consumer.config {:bootstrap.servers "brok01.foo:9092"
                                   :group.id "riemann"}
                 :topics ["metrics"]})

Forward to Kafka

Create a Kafka client. For a complete list of producer configuration options see the Kafka documentation. The Kafka stream uses kinsky under the hood, so you can import it and use these serializers.

You can now call kafka-output in your streams with a topic name and an optional message key.

(def kafka-output (kafka {:bootstrap.servers "brok01.foo:9092"}))

(streams
  (kafka-output "my-topic"))

Forward to Elasticsearch

Create a Elasticsearch client. The elasticsearch stream accepts an optional second parameter specifying an event formatter (see the API documentation for the default value).

(def elastic
  (elasticsearch {:es-endpoint "http://localhost:9200"
                  :es-index "riemann"
                  :index-suffix "-yyyy.MM.dd"
                  :type "event"}))
(streams
 elastic)

You can also use the Elasticsearch bulk API. You can override per event the default-bulk-formatter options.

(def elastic-bulk
  (elasticsearch-bulk
    {:es-endpoint "http://localhost:9200"
     :formatter (riemann.elasticsearch/default-bulk-formatter
                  {:es-index "riemann"
                   :type "event"
                   :index-suffix "-yyyy.MM.dd"
                   :es-action "index"})}))

(streams
  (batch 100 1/10
    elastic-bulk))

Contributing to Riemann

Write a client

A TCP connection to Riemann is a stream of messages. Each message is a 4 byte network-endian integer *length*, followed by a Protocol Buffer Message of *length* bytes. See the protocol buffer definition for the details.

Over UDP, the length header is omitted; just send the protobuf Message directly. UDP datagrams have a default maximum size of 16384 bytes by Riemann's default; larger messages should go over TCP. This limit is configurable in both the client and server; client values *must* be smaller than the server's.

The server will accept a repeated list of Events, and respond with a confirmation message with either an acknowledgement or an error. Check the ok boolean in the message; if false, message.error will be a descriptive string.

Because protocol buffers is a strongly typed protocol, the metric of an event is represented as one of metric_d (floating point 64-bit), metric_f (floating point 32-bit), or metric_sint64 (64-bit signed integer). Your client should emit and consume all of these types. For compatibility with older versions of Riemann, you may *also* emit a metric_f alongside the normal type; newer versions of Riemann will prefer the higher-resolution types.

Events are uniquely identified by host and service. Both allow null. Event.time is the time in unix epoch seconds and is optional. The server will generate a time for each event when received if you do not provide one. Event.time_micros is the time in unix epoch microseconds and is optional too. When receiving a protobuf message, Riemann will use time_micros in priority.

You can also query events from the index using a basic query language. Just submit a Message with your query in message.query.string. Search queries will return a message with repeated Events matching that expression. A null expression will return no states. For some example queries, see The query test suite.

You might find it useful to read the Ruby client source as a guide to writing your own client.

Work with the Riemann source

I try to keep master as clean and runnable as possible. Riemann has an exhaustive test suite, which helps ensure code quality. If you plan on changing the Riemann source, fork it on Github so you'll be able to send pull requests quickly. If you just want to run the latest version, go ahead and clone the official repo:

git clone git://github.com/riemann/riemann.git
cd riemann

You'll also need a JVM, and leiningen 2--the Clojure build system.

To run the tests suite, try lein test. To start Riemann, run lein run. Riemann will read the file riemann.config in the current directory. If you want to run a different config file, try

 lein run -- path/to/my/riemann.config 

If you want a fat jar, run lein uberjar and copy target/riemann-{version}-STANDALONE.jar. To build tarball and debian packages, use lein pkg; .debs and .tar.gz files, plus md5sums, will appear in target/.

The protocol buffer codec and clojure client live in riemann-clojure-client, which wraps the java protobuf code and java client in riemann-java-client. Both of these are available on clojars and most of the time you can ignore them. However, if you need to change the protocol or client, you can fork these projects and make your changes there.

Building riemann-java-client

You'll need maven, and the protocol buffers compiler (protoc) version 2.4.1.

When you've made changes to the java client, install it with mvn install; then test the clojure client and install it with lein install. Finally, you can run riemann itself. You may need to check that the client versions you're working with match up in the riemann and riemann-clojure-clientproject.clj files.

Fix a bug or add a feature

First, fork Riemann on github. Clone your fork and create a new topic branch for your fix:

git clone git@github.com:your-github-username/riemann.git
cd riemann
git checkout -b fix-some-bug

Most of Riemann's source lives in src/riemann/. Corresponding tests live in test/riemann/. When you fix a bug or add a feature, make sure to add new tests that confirm its correctness! You can run the test suite with

lein test

Some tests for integrating with other services require a local sendmail, or graphite, or credentials for a web service. If you make changes that affect these systems, you can test them with special selectors like lein test :graphite or lein test :email. If you're working with a particular namespace, like riemann.streams, lein test riemann.streams-test runs only the tests for that namespace. Once your tests pass, commit your changes and push them to github:

git commit -a
git push origin fix-some-bug

If you change more than a few lines of whitespace, please make your formatting changes in a separate commit; it'll be easier for me to read and understand your changes. Please try to send me only a few commits where possible; use rebase --interactive to squash your small changes.

Help write documentation

Riemann's web site and documentation are in the gh-pages branch of the riemann repository. Fork riemann on github, clone your fork, and check out the branch:

git clone git@github.com:your-github-username/riemann.git
cd riemann
git checkout gh-pages
vim howto.html

Pages are built with Jekyll. To see how your changes will appear on the site,

sudo apt-get install python-pygments jekyll
cd riemann
jekyll serve

... and open _site/howto.html in a web browser. When you're satisfied with your changes, commit, push, and send me a pull request:

git commit -am "Added a howto guide for integrating with FooService"
git push