This is not a comprehensive guide to everything in Riemann, but it can get you started and help solve specific problems. Want to add your own article to this page? Send me a pull request!
Once you know where to look, you can learn more by reading the API docs (especially riemann.streams), and the source. I try hard to write readable code. The test suite is also a great place to look for use examples.
If you use the Debian or Centos packages, Riemann adds a
riemann
user, stores its configuration in
/etc/riemann/riemann.config
, and logs to
/var/log/riemann.log
. You should always tail the log file when
working with Riemann; it'll alert you to configuration errors and help you
debug your streams.
In another terminal, use your favorite editor to open
/etc/riemann/riemann.config
. Let's add a logging stream, so we
can see all events that pass through the streams. #(info %)
expands into (fn [x] (info x))
, which is a function that takes
an event and logs it at the INFO log level.
Now we need to reload the config file. Riemann will respond to SIGHUP, or you can use the init scripts. You'll see a message about reloading the config in the Riemann log. If you make a mistake, like a syntax error, Riemann will continue running with the old config and won't apply the new one. Instead, it'll log an explanation of the problem, so you can investigate and fix it.
Now that the config has been reloaded, you should see events flowing by in the log. Some come from external sources, like riemann-health or your own programs. Others are generated by Riemann internally, as a part of its internal performance instrumentation. You can insert logging statements anywhere in your streams to verify what kind of events are flowing through that point.
Reloading is experimental, and is subject to the normal Clojure rules
about (def)
and (defn)
. To avoid confusion, use
let
bindings instead of def
; it'll guarantee
reloads work correctly. If reloads seem broken, you might need to do a
full restart for your changes to take effect.
We present here a minimal configuration which only prints incoming events to standard output. This configuration can prove useful when debugging clients to make sure expected events are reaching Riemann.
This example includes a full configuration, later examples will focus on streams. The following actions are performed:
logging/init
: Setting-up logging to log to standard output (STDOUT
).tcp-server
: Starting a Riemann TCP server on port 5555 (the default).instrumentation
: Disabling internal event production.Since streams expects a list of functions to call with individual events, we can
output any input by calling prn
on them.
Often your Riemann configuration file will get quite large as you add streams and handle more events. Additionally, if you're managing Riemann with a configuration management tool like Puppet or Chef then it can be complex to template a large configuration file. To help with this Riemann also supports including functions via Clojure's namespacing model. These can include your own custom stream functions.
To enable this, the contents of the directory containing the
riemann.config
configuration file are included in the
CLASSPATH
.
To use other functions then create a source directory for them. For example,
to create functions specific to your organization you might create a
examplecom.etc
namespace.
Then create a namespace containing the functions we wish to include in a
file in this directory, for example email.clj
.
We would then require this namespace in our riemann.config
.
With this we could use the email
variable inside our Riemann
configuration.
You can read more about Clojure's namespacing and how it works.
This approach is supported in Riemann 0.2.11 and later.
Riemann has three main parts: clients, the server, and dashboard. By
default, all listen on the loopback interface. You'll need to configure
each to listen to an appropriate interface for your environment. On the
host that runs the Riemann server, open
/etc/riemann/riemann.config
, and change the host that Riemann
binds to. To listen on all interfaces, use 0.0.0.0
.
Now we need to reload Riemann to tell it about our changes.
To change the address that the dashboard binds to, edit riemann-dash's config.rb.
Riemann-dash only serves some JS code and a small piece of
configuration; when you open riemann-dash in the browser, it'll open
connections from your browser to the Riemann server's websocket interface
to receive events. To connect the dashboard to a remote Riemann server,
double-click the text box in the top right of the dashboard, and change the
address from 127.0.0.1:5556
to your Riemann server's host and
port. When you hit enter, the browser will initiate a websocket connection
to that address and begin displaying events. Hit s
to save the
new dash config; the dashboard will connect to that address every time you
load the page.
Riemann-dash is not the only dashboard for Riemann; Riemann decouples data processing and visualization. Opening the Riemann server from your browser *won't* work.
If the dashboard is unable to connect to the Riemann websocket server, you'll see an alert pop up every few seconds. Check that the server is running, that the Riemann websocket server is reachable from your browser, that you used the correct host and port, and so on.
If you don't see *anything* on the dash, but it connected successfully, you may not have any events to show, or you may not have set up any views.
Instead of installing Riemann directly on your system, you can also use
Docker to run it. There is a
prepackaged Docker image available as riemannio/riemann
.
With its default configuration, Riemann will index all incoming events and print expired events. Make sure to bind the neccessary ports to your host system, or else you won't be able to send events to Riemann.
If you expect lots of traffic and don't need segregating of containers,
you can also skip the Docker network and use the host network by using the
--net host
flag. Note that the default Riemann configuration
in Docker will listen on all network interfaces, so make
sure to either mount a different configuration (see below) or configure
your firewall appropriately.
Riemann will, by default, use a configuration file located at
/etc/riemann.config
, which you will most likely override by
mounting your own configuration into the container.
For this you can either mount your configuration file at the default
location or override the default container command
(/bin/riemann /etc/riemann.config
) if for some reason you
want to change more command line options.
To keep things manageable, you can use Docker Compose to define how the Riemann container should be run.
Oracle JDK8 is likely to be the fastest option (and supports the full
range of aggressive tuning options with riemann -a
), but we
test and deploy with JDK 8, from both Oracle and OpenJDK, and JDK 9 from Oracle.
You're welcome to give other JVMs a shot, though!
Use the Riemann TCP protocol. If you have to, and know what you're doing, back off to UDP, HTTP, or Graphite.
Use UDP if you intend to have the OS and network discard data automatically, instead of application-level flow control. Don't use UDP "for speed"; it's designed to drop data, it will drop data, and you'll spend a long time trying to figure out why your stats are wrong. UDP may be appropriate for sampling events where discarding large parts of the datastream is OK, but if different nodes drop packets at different rates you'll introduce sampling bias. If you don't understand what this means, don't use UDP.
Use HTTP only if a TCP client is unavailable; it introduces significant encoding and state machine overhead, takes more bytes on the wire, and can't represent the full range of Riemann events.
Use the Graphite server and other compatibility shims only for interop with legacy systems. The graphite protocol can only represent a small subset of the Riemann data model; you'll have to do additional work to reconstruct meaningful events.
Riemann runs servers by default on several ports.
To log to a file, just say
If you'd prefer to only log to stdout, just leave out :file
Under the hood, Riemann is using Logback.
To configure Logback, you can provide a configuration file
via the system property logback.configurationFile
.
For more information, please read the Logback documentation.
You can use Riemann across any TCP or UDP VPN, or an SSH tunnel. Riemann also supports bidirectional TLS auth as a part of its TCP protocol.
Before you can use TLS, you need a CA, server key and cert, and a client key and cert. Refer to less-awful-ssl and easy-rsa for more details.
Then, in riemann.config, add a new (or repurpose an existing) TCP server for TLS.
Refer to your client's documentation or source for the client TLS options. In riemann-clojure-client, try something like:
Sometimes, something in Riemann doesn't work the way you'd like. When this
happens, you can redefine any function in Riemann in the config file. Just
switch to the appropriate namespace, redefine the function, and switch back to
riemann.config
.
For instance, let's say you wanted to change how emails are formatted. The
namespace riemann.common
has a function called body
,
which accepts a sequence of events and returns a string. We'll override it in
our config file:
Because Riemann performs arbitrary computation over the event stream, with side effects, it cannot be distributed safely. You can, however, distribute it somewhat less than safely--a technique aligned with Riemann's philosophy that "mostly correct information right now is more useful, than totally correct information only available once the failure is over."
One obvious distribution strategy is to place two Riemann nodes on the same switch and have them share a virtual IP address using, say, Heartbeat or Corosync. On resource acquisition, the node allocating the IP should issue a gratuitous arp broadcast over that interface advertising the new route to the relevant switch(es). Failover in this case looks essentially like restarting a Riemann node, which should be relatively painless--most Riemann deployments fill in missing state within a few seconds to minutes.
What happens if the nodes are isolated by a network partition and both claim the interface? Depends on your network, but the answer is "probably not good things". OTOH, there's no good way for *any* consensus algorithm in an asynchronous network to make latency-bounded decisions in a safe way. FLP rears its ugly head. You're going to lose data in the short term; the question is how to emit as much useful, actionable information as possible, so you can fix the problem.
I've got a sketch worked out for a highly-available clustered Riemann which tries to negotiate these tradeoffs in a sane way, but it involves minimum latency costs, severe throughput costs (I'm estimating three to five orders of magnitude slower), disk IO, all kinds of esoteric failure modes, and some moderately challenging mathematics. It'd take a long time to build and would require some serious educational work on my part. There's just no way around it: distributing arbitrary latency-bounded computation is hard. You're going to have to reason about consensus. I'm really sorry about this. :(
Because Riemann performs arbitrary computation, it can't shard your
workload automatically. You'll need to pick some axes along which to shard,
perform local aggregation on those nodes, and reduce the results with some
higher-level Riemann node. The forward
stream can help you
connect nodes to one another, and async-queue!
can allow some
elasticity around latency jitters and downstream node restarts, but the
actual sharding and discovery mechanics you'll have to build yourself.
Event processing in Riemann's model is synchronous (though there's lots of asynchronous stuff going on internally). When you send an event to Riemann, it flows through every top-level stream, and possibly those streams children, and *their* children, and so forth. Once the event has flowed through every stream, Riemann sends an acknowledgement back to the client. This model enables *backpressure*: clients can use the acknowledgements to avoid overwhelming Riemann with more events than it can process.
Riemann keeps track of some internal metrics, exposed as events, for stream latency and TCP server latency. Stream latency is how long it took for an event to flow through every stream. Server latency is how long it took for an event to be picked up by Riemann's TCP server, decoded, handed off to the streaming system, processed, and for the acknowledgement to be sent back to the client.
If you wait for an acknowledgement every time you send a message to Riemann, each process will behave synchronously. This approach is just fine when there are lots of clients sending requests infrequently, where "infrequently" means, via Little's Law, that the typical period between events is greater than the tcp server latency plus the round-trip network time to Riemann itself.
Sometimes, though, you'll want to send events *faster* than that. You've got a few options here.
If you ignore the acknowledgement messages, you can just shove events at Riemann as fast as your runtime will let you. Those events will fill up a queue in front of Riemann's streaming system. If that fills up, they'll spill over into the Netty queue associated with the channel. If that fills up, they'll spill over into the TCP buffer on the Riemann box, and by extension, the queues in the network and the sending node. At some point TCP backpressure will come into play, but understanding that interaction can be complex.
Doing this kind of infinitely-asynchronous send will manifest as high memory consumption on the Riemann node (to store all those queued operations), and correspondingly high wait times for events. If events arrive *too* far delayed, they may no longer be valid by the time they arrive, and you'll start seeing services expire. It's like sending a message via a courier who gets stuck in a waiting room and dies en route.
You can detect this (and alert on it) by watching the service called "riemann netty event-executor queue size"; if it starts getting above a thousand events or so, you're likely overloading Riemann. This queue size is directly related to the latency distribution given by "riemann streams latency 0.5" and friends, and "riemann streams rate". The higher the latency, and the higher the throughput, the larger the queue.
So: infinite asynchrony is a problem. You want *some* degree of asynchrony, especially when Riemann processes faster than the time it takes to send events over the network. But too much and you'll crash into finite resource limits. We need *backpressure*: bounds on the number of outstanding asynchronous requests, or the rate at which we send them.
One option is simply to spin up more threads and have each send synchronously. Depending on the client's design, you may be able to have many threads concurrently sending operations over the same connection. Riemann-java-client and riemann-clojure-client, for instance, will interleave operations from multiple threads on a single network connection. If threads are cheap in your runtime, or you don't need very many (on the JVM, more than 10K IO threads is probably a warning sign), this is perfectly fine. If you're running in Erlang or Haskell, threads are dirt cheap; fire away.
If threads in your runtime are more expensive, or you don't have them at all--say, node.js--you can emulate them using other asynchronous constructs. Riemann-java-client and riemann-clojure-client, for instance, have async-send functions that return promises; you can dereference those promises to find out whether Riemann acknowledged the message. Deref'ing blocks until acknowledgement occurs, or throws an exception if something in the network or Riemann went wrong. You can specify a timeout for deref; if the timeout is zero, you get a nonblocking check ("did it go through yet?"). Fixed timeouts are useful for retries of idempotent operations. "Allow this much time, then send again". Same semantics as any other asynchronous or stateful construct in Clojure.
The important thing for *backpressure*, if you're emulating threads using asynchronous objects, coroutines, continuations, fibers, callbacks, or what-have-you, is that you a.) eventually dereference the results of the send, and b.) only allow a fixed number of outstanding asynchronous requests. Riemann-java-client automatically throws (or returns promises which throw) when you try to send more than a few thousand concurrent messages, forcing you to implement backpressure and flow control logic. You can tune this option if you like, but it's there to protect you against unbounded resource consumption on either Riemann's side or the client.
Different clients and different languages will expose these constructs in different ways, but you'll find the same core ideas of limited asynchrony, queuing, and backpressure in every networked system.
Streams are just functions, so you can use any function that prints an
event to stdout to see which events pass through a particular stream.
prn
prints an object to stdout, so adding prn
to
any stream prints the events at that stream to the console.
If you're running Riemann as a daemon, it probably won't have a stdout
to print to. To write to the logfile (which is *also*, by default, printed
to stdout), use #(info %)
, which is a shorthand for the
anonymous function (fn [x] (info x))
. Note that
info
logs events a little differently than prn
,
but you can use pr-str
to recover the clojure representation
if desired.
First, determine whether the events you're looking for are actually
making it into Riemann. This will help localize the problem to the
network or the streams. Add a new top-level stream which
simply logs all inbound events. Use a (where)
filter around
that logging stream to cut down on noise.
If you don't see the events you're looking for in the logs, they must be dropped before arriving at the streaming system. Check:
(tcp-server)
options), that the appropriate
server startup message appears in the Riemann logs, and that and
netstat -lntp | grep riemann
shows the port bound.telnet some.host 5555
or nmap -sT
some.host -P 5555
.If your messages are arriving in the streaming system, but aren't
making it to the index or other output streams, gradually slide the
#(info %)
stream downstream towards the exit point, to verify
at which point in the chain the events are not what you expect.
If you're indexing events, but they don't appear in queries, they may
have expired from the index. Try (expired #(warn "expired" %))
to warn about expired events. They may be expiring because their TTL is
shorter than the interval between events. They may also expire because the
Riemann clock is in the future relative to the originating event, due to
clock skew or network/processing latency.
Check the internal instrumentation for the Riemann queue depth and core latencies; if the queue is more than a handful of events deep, or stream latencies are on the order of the relevant event TTLs, you might need to reduce the load on the Riemann server or investigate downstream services that might be slow.
Check clock skew between nodes using watch date
or similar.
Make sure your clocks are in UTC, not local time. Never local time.
Especially never Daylight Savings Time. You can use Riemann's
clock-skew
stream to measure clock skew as seen by the Riemann
node as well.
Riemann has integrated support for building fast, repeatable tests into your config itself, including virtualized time and scheduler side effects and suppressing unwanted IO during test runs.
In your streams, use riemann.test/tap
to declare named
points where you'd like to observe events. For instance, we might want to
track any events flowing into the index, so we'd surround it with a tap and
call that tap :index
.
Then, we can inject events into the config's streams to see what
happens. Define tests in any config file using tests
and
deftest
.
(inject! events)
returns a map of tap names to a vector of
the events that arrived at that tap. Time is always reset to zero at the
beginning of each test, and is automatically advanced to each event time
just prior to event arrival. You may manually advance the clock using
riemann.time.controlled/advance!
. Scheduled operations
like rate, rollup, etc will take effect atomically as the virtual clock
advances.
You can also inject events into specific streams using (inject! [stream] events)
.
Under the hood, tests
expands to a new ns declaration based
on the current namespace--if the current namespace is foo
,
tests take place in foo-test
. tests
automatically
refers clojure.test's is
and are
, plus
riemann.test's version of deftest
and inject!
for
inserting events into the config's streams.
You can define regular clojure tests too; no need to call inject
.
Every namespace ending in -test
is eligible for testing.
Some streams may have *side effects*, like sending events over the
network or emitting emails. You can suppress those side effects by wrapping
them in a test/io
stream, which forwards events to its
children normally in production, but ignores them in test mode.
io
suppresses events, which means that (io (tap
:foo))
will never see anything. Always place taps *outside*
io
expressions.
Note that tap
and io
are eliminated at
compile-time during production mode, so there's zero performance penalty;
won't count against your Hotspot inlining budget, etc.
From Riemann 0.3.0, the test framework runs a core. Functions
requiring a core like riemann.config/reinject
, indexing
events, index querying and expiry will work. The index is cleaned
at the beginning of each deftest
.
Before Riemann 0.3.0, the test framework didn't actually
run a core, so functions that require one, (like
riemann.config/reinject
), didn't work from within the test
framework.
Riemann has an nREPL server built in. You can enable this in your config with:
You can then use Leiningen 2 to connect to it.
you can reload the config by sending riemann a sighup, but you can also do it from the repl.
Riemann comes with instrumentation built-in, which is sampled
periodically and injected into the event stream. You can query the
dashboard for service =~ "riemann %"
to see Riemann's internal
metrics. All latencies are in milliseconds.
riemann streams
refers to the stream processor: its
throughput measures the number of events being passed to (streams
...)
per second, and its latencies indicate the time it takes to
process a single event through the streaming system.
The TCP, UDP, and Websockets servers instrument their throughput and
latency where possible. TCP latency measures the time from initial message
deframing to Netty write of the corresponding response. For instance,
riemann server tcp 1.2.3.4:5555 in latency 0.95
measures the
95th percentile time, in milliseconds, for the TCP server on port 5555 to
parse, queue, process, and dispatch a response for a message. The rate for
servers measures *message* rate, not *event* rate.
The main queue connecting Netty IO threads to the parsing/execution
threadpool is measured by riemann netty event-executor queue
size
, and is a critical measure of whether the streaming system is
overloaded relative to inbound request load.
To disable instrumentation, or control how often events are sampled, see riemann.config/instrumentation.
Riemann-tools includes a program called riemann-health, which measures the local host's cpu, memory, and disk use. You can install it from rubygems.
... and run riemann-health with the address of your riemann server like so:
Riemann-health reports utilization fractions by default, ranging
from zero to one. Load average is divided by the number of cores, and disk
use by capacity. You can adjust the polling interval, what percentage of
CPU usage is considered critical, and more. riemann-health
--help
will tell you more.
Streams can produce exceptions. A special event containing the exception details will be sent to the first child of exception-stream
if an exception occurs on other child streams.
Riemann-tools includes riemann-riak, which uses Riak's HTTP stats interface, some filesystem checks, and optionally, some erlang RPC requests to measure Riak's get and put latencies, request throughput, ring status, and disk use. Run riemann-riak on each Riak node.
Riemann-riak comes with defaults for the Riak .deb packages shipped by
Basho, but it's tunable for other configurations. See riemann-riak
--help
for more options.
You can include arbitrary additional fields in Riemann events. These are not a replacement for service, host, and time: the composite primary key of a logical "thing" in the Riemann universe, but they *do* let you more easily aggregate and filter events, or carry richer contextual information associated with a single message.
In most clients you can simply pass an extra key and value in the event map. In strongly-typed clients like riemann-java-client, special methods like EventDSL.attribute(key, value) are present.
Custom attributes are always encoded as strings in Protocol Buffers (pull request welcome!), but you can replace them with parsed variants in your streams using smap, adjust, and friends.
In the Riemann server, custom attributes work just like normal
attributes on events--they're just a bit slower, owing to the hashmap
lookup. Just like normal fields, you can use get, assoc, dissoc, update,
and all the other Clojure functions over maps. Custom attributes do *not*
have shorthand syntax in (where)
(to prevent arbitrary symbol
capture), but are accessible through (:my-field event)
.
The Guardian has published their Riemann config files, which may be a useful guide to advanced use.
Splitting streams is the default in Riemann. Almost all streams, unless otherwise documented, take any number of child streams as their final arguments, and forward the same events on to all of them. Because Riemann uses Clojure's immutable data structures, you don't have to worry about locking or mutation. Streams "change" events by sending altered, shared-structure *copies* of events downstream. For instance, this where expression sends matching events to two child streams; one of which indexes a copy of the event with service "foo", and the other with service "bar".
Sending events from two places to the same stream is simple: just bind
the stream to a variable. In Clojure, (let [x 1 y 2] ...)
assigns x
to 1
and y
to
2
everywhere inside the let
expression. Since
streams are values, you can bind them to variables too. In this example, we
create a single rate stream and send events to it along multiple paths.
Even easier: use the pipe
stream, which compiles to the
same code as the let
expression above:
Often, you want to run *many copies* of a particular stream; say,
computing a rate for each service separately, or each [host, service] pair.
Use by
to bifurcate a stream into distinct copies based on a
function or vector of functions of that event. For instance, to sum up disk
use across hosts, but retaining a distinct sum for each service, you could
write:
Or to compute a rate for each distinct host and service:
Often, we want to do some operation only on a subset of events flowing
through a stream. For instance, we might only want to email events which
have the state "error". The basic filtering streams are where, her less magical
sister where*, match, tagged-all and tagged-any, and expired. Most of the
time, you'll use where
.
Where takes a predicate, which is a special expression for
matching events. After the predicate, where
takes any number
of child streams, each of which will receive events which the predicate
matched. For example, we could email only events which have state
"error".
The where
stream provides some syntactic sugar to allow you
to access your event fields. In a where
stream you can refer to
"standard" fields like host
, service
,
description
, metric
, and ttl
by name.
If you need to refer to another field you need to reference the full field name,
(:field_name event)
.
Where expressions can match particular values of a field; like passing on events which occur only on the Von Braun:
Regular expressions (in Clojure, strings like #"foo .+"
) can be used to match fields as well.
You can filter for the presence of a given tag.
There are also streams specifically for selecting events which have some, or all, of a set of tags. Tagged is shorthand for tagged-all.
Numeric functions work like you'd expect:
Which makes range queries easy:
Predicates can use any function or macro, including boolean operators.
You can also define and use arbitrary functions in your predicates.
(where) binds the variable event
to the event being
considered.
Higher-order functions work like you'd expect, so you can happily map, fold, filter, etc inside a where predicate too. For instance, we can match any of several regular expressions:
Where streams forward an event to their children when the predicate
matches. When the predicate *doesn't* match, where will forward events to
any children in an (else)
block.
You can use (where) to alert you when a metric goes out of bounds. Where the metric falls between zero and five, inclusive, index it with state "ok". Otherwise, index it with "warning":
The changed stream forwards on events when some fields of that event change. For instance, you can send an email whenever the state changes:
Note that we use (by)
so that each distinct host and service
track their changes independently. Otherwise, we'd get alerts when service A
reported "ok" and service B reported "down". There's a shorthand for this:
When you start Riemann, it doesn't know what the previous state was. By
default, changed
will forward on the first event it receives. You
can tell changed
and
changed-state
to *assume* an initial state with the
:init
option:
I tend to include this snippet in my riemann configurations so I can keep track of which services are starting, crashing, and gracefully restarting.
Where you handle exceptions in your application, you can also submit an event to Riemann. I usually set the service to the application name, include the stacktrace as the description, and tag the event with both "exception" and the classname of the exception.
It's also easy to track the *rate* of exceptions per second, so you can graph how many unexpected failures occur per day, and determine which services need the most attention.
Sometimes, when things break, they submit a *ton* of events. Maybe you receive sixty thousand exceptions in five minutes. You need to know that a problem exists--but don't need to know about *every single failure*. Riemann has two streams for controlling the rate of events.
rollup will allow a few events to pass through readily. Then it starts to accumulate events, rolling them up into a list which is submitted at the end of a given time interval.
Let's define a new stream for alerting the operations team, which sends only five emails per hour (3600 seconds). We'll receive the first four events immediately--and at the end of the hour, a single email with a summary of all the rest.
Rollup holds on to every event, which can use a lot of memory. Sometimes it's OK to drop some events instead of delivering them. That's where throttle comes in: it allows the first five events through, then ignores all the rest for an hour.
And naturally, you can combine throttle and rollup to preserve *some* events, but not allow unbounded memory use:
Each event has a TTL, which states how many seconds the event is valid for after its given time. When a service crashes catastrophically, it will stop submitting events and its events in the index will become stale. Periodically, the index sweeps out events past their lifetime, deletes them from the index, and streams an event for that host and service with state "expired".
You can control how often the index scans for expired events with
periodically-expire
. Here, we check every ten seconds. If you
*don't* want events to expire from the index, you can delete this line from
your config:
If events have no TTL given, the index assumes their TTL is 60 seconds. You can provide a default TTL for any incoming events that don't provide one:
Now, any services that fail to check in every ten seconds will expire (unless they give a different TTL). Since expired events have a different state, any (changed-state) streams will detect the transition and can alert you:
You can also explicitly filter expired events:
By default, Riemann copies [:host :service]
to expired events.
You can control what keys from events are copied onto expired events by
passing :keep-keys
to periodically-expire
:
With that in place, you can filter expired events on tags. This way, your app can decide whether an event is worthy of an alert by tagging it:
You can add a new tag, or a set of tags, to events using the tag
stream:
If you're worried about spikes or flapping you can make use of the stable
variable. The stable
variable detects stable streams of events
over time. It takes a time period and event function as input and only passes
on events that remain the same for the specified time period.
Sometimes you want to ask a question about the last few minutes, or of groups of 10 consecutive events at a time.
Events arrive independently in Riemann, but each describes the state of a service on some host over some window of time. Often, you want to know how two services relate to one another right now. Riemann provides two streams for this type of aggregation: coalesce and project.
Use project when you want to combine a fixed number of events, identified by arbitrary predicates. For instance, you might want to know the ratio of used gigabytes to total disk capacity, or the difference between enqueues and dequeues in a queue.
Use coalesce to get snapshot of the most recent event from every host and service that passes through that coalesce stream. Coalesce helps answer questions like "what fraction of my API servers are running the most recent version", or "what's the median queue depth across all queues in a given broker?"
You can test whether a field matches a regular expression by simply
passing a regex to where
. Clojure regular expressions look
like #"..."
.
Or you can transform a string using a regular expression and capture
groups (also called "backreferences") using
clojure.string/replace
.
Use the scale stream. For instance, to convert bytes to gigabytes, scale by 1/1024^3.
If you're running riemann-health, all hosts will report events like
{:service "cpu" :host "foo" :metric 0.12}
. You want to know
which host is under the heaviest CPU load. The coalesce
stream
remembers the last events from each host and service, and sends them all as
a vector to its children. We can map that vector of events to a single
event--the one with the largest metric--using folds/maximum. Then we just
set the service and host, since this event pertains to the system as a
whole.
Sometimes it is useful to know how many hosts are sending data to
Riemann. This is especially useful in cloud environments where nodes are
constantly scaling up and down. We'll assign every event the same service,
use coalesce
to combine the current set of hosts and services
into a single map, throttle
to cut down on the number of
updates, and folds/count
to count the distinct set of hosts.
Finally we'll strip the host from that event, since it represents a
property of the entire system, and index it.
Sometimes you'll have a service that will fail. You might expect one or
two failures, but if you get over a certain percentage of failures you want
to be notified. In this case you can use a
fixed-time-window
.
Since streams are just functions which take an event as their sole argument, you can do any computation over events that you like. Just write a function, either named or anonymous.
You can run any Clojure code you like here; using Clojure or Java libraries on the classpath, writing to the network, logging to disk, using existing variables from the config, and so on. It's up to you.
Riemann builtins like rate
, percentiles
,
smap
, and so on are all functions which take a variable number
of child streams and returns a function that takes an event and responds to
that event. You can build your own stream-generating functions too.
Let's create a trivial example.
Our stream returns a function that when it receives an event, adds a new
key-pair via assoc
and then uses call-rescue
, a
standard library function, to pass the event on to all the child streams it
was passed.
Add this new stream to the list of event streams.
Now when you pass an event into Riemann you should see the event printed
out with an added key-value pair of :hello :world
.
Streams all essentially work in this way, if you look at the source code
of the Streams namespace in streams.clj
. You can find a few
simple examples in the functions match
, expired
,
under
and over
which filter whether the event is
passed along the event stream.
To reinsert an event back into the current core, use
riemann.config/reinject
. Reinjected events flow through all
top-level streams, just as if they had just arrived from the network. In
this example, incoming "initial" events are remapped to "derivative"
events and reinjected into the streaming system. Each event is printed
twice, first "initial", then "derivative".
Sometimes, you want the behavior of a stream to depend on the current state of the index. Say, for instance, that you wanted to use a maintenance-mode service to suppress alerting. To enter maintenance mode, we'll submit an event to Riemann which has a TTL as long as the maintenance window we want. Here's a ten minute window, sent from a Ruby client:
Next, we'll write a function which identifies whether maintenance mode
is active. We could use riemann.index/lookup
to find a
particular host/service pair, or riemann.index/search
for a
more general query.
Now we can use that function inside of a where predicate, to pass events on only when we're *not* in maintenance mode.
Naturally, you could parameterize the maintenance-mode function, perhaps taking a host, tag, or service as an argument, and searching for particular matching events.
Streams are functions that take an event. There's a subtle distinction
here: we use the term "stream" to describe both the stream itself,
but also the functions that generate those streams. Remember, in
Lisp, (fun arg1 arg2)
means "Call fun
with
arg1
and arg2
".
Riemann usually calls your streams with events for you. That's
what the streams
function in riemann.config
means: "here are some functions that you should call when new events
arrive".
When a new event arrives, Riemann takes each function you've specified
using `streams` and calls it with that event. Here's the source of
riemann.core/stream!
.
That's it. Riemann's entire streaming model is just a function call and
some careful API design. All the functions in riemann.streams
,
like where
, rate
, with
, etc. take
streams as arguments and create new streams which call them, usually
transforming the events they receive in some way.
New users sometimes have trouble understanding Riemann's execution model. For example, you might write a config like this, which is supposed to apply the local Riemann server's current time to every event.
However, this code doesn't do what you might expect. It assigns the same time--the time that Riemann last reloaded its config--to every event. Why?
Riemann evaluates the config once at configuration time, building up a set of streams and setting up services like logging, the TCP server, and so on. Once the config is processed, Riemann begins passing events to those streams.
So, at config time, Riemann follows the normal rules of
Clojure evaluation, and calls (unix-time)
to obtain the
current server clock.
Then it calls (with :time 12345 index)
to build a stream
that takes events, assigns the time 12345, and sends them to the index.
Finally, Riemann calls (streams)
with that function, telling
Riemann to apply that stream to all incoming events: assigning the same
time to all events.
To run code each time an event arrives, we need a function. We
could write a new stream from scratch, or use a stream like
smap
that uses a function to transform each event before
passing it on to children.
If you use Riemann's built-in stream-generating functions, you don't have to worry about calling streams yourself; Riemann does it for you. But sometimes, you want to write your own streams, and will need to call some child streams yourself.
You might try creating a new stream and calling it with the event, but this will probably behave weirdly. In this example, every time a new event arrives, we create a fresh rate stream, with completely new state, and send our event to it. We probably wanted to send each event to the same rate stream.
If we're writing a new function to generate streams, we might have our
downstream children passed in as an argument; see any function in riemann.streams
for an example. If we're just defining a one-off stream inline, we can
bind the child to a variable using let
.
If we want to call multiple downstream children, use
sdo
to wrap them up into a single stream.
Even easier, in this particular case, is to use a general-purpose
transformation stream like smap
.
There are times where you will want to generate new events
from an existing stream. Most of the time, it is preferrable
to use riemann.streams/with
to modifying fields
from an existing event, but there are cases where it would hinder
readability or it is just not possible. In that case, use
riemann.common/event
.
In Riemann, streams are expressed by function composition. Each stream is a function that takes an event and does something with it. Child streams are just functions too, so when an event propagates through different layers of streams, it's actually just a series of function calls. We do this for three reasons:
In practice, most Riemann streams are pure functions or only mutate
in-memory state; Riemann is typically cpu-bound. We have a
threadpool, on the order of num-processors, which runs an event through all
streams in the core. This makes Riemann quite fast--but when you make a
blocking call to a downstream service, sometimes the rules change. You
want additional parallelism for IO, or an explicit queue in between
two streams. In keeping with the Riemann philosphy of small, re-usable
components, you can make any stream asynchronous, with its own
bounded queue, worker threadpool, and performance metrics, via the
async-queue!
stream.
For instance, the forward
stream forwards events to another
Riemann server, and returns only when that server has acknowledged the
message. This provides backpressure and acknowledgement: a client knows
that its message was accepted not only by the local Riemann server, but also
by the downstream Riemann server.
But if that link is slow, and we want to return immediately, we can defer execution of that stream onto a threadpool via a fixed-size queue. Because we acknowledge before the downstream server acknowledges, we might falsely acknowledge a message. The semantics of the stream have changed; we're gaining performance and insensitivity to downstream latency, but at the cost of safety.
We bound the number of in-flight operations to the downstream service by specifying how many threads are allowed, and bound memory consumption via the queue size. If the queue fills up, events passed to the async-queue stream will immediately throw an exception informing you that the queue is full; as with all exceptions in Riemann, you can explicitly catch them using an exception handler stream, or allow Riemann to log the failure. As with all Riemann streams, exceptions don't interfere with other child streams.
All async-queue services support hot config reloading (though they won't
cleanly drain events between services if their parameters change), and come
with internal metric events about their queue size, outstanding threads,
etc. They'll alert you via state changes if they get too full, so make sure
to monitor events like "riemann executor rejected rate" with
changed-state
so you'll know when they start dropping
events.
Note the bang (!
) at the end of async-queue
;
this function is dangerous because it mutates state. Async-queue
creates a threadpool and changes the Riemann core. You *don't* want to
create these dynamically, or you could wind up with hundreds or thousands
of queues and threadpools. Never put an async-queue inside of a
by
, for example: by
would create a new async
queue for each distinct host or service, etc. For safety, define your
async-queue streams in a let binding, and call them wherever needed in
streams
. That way you'll know it's always the same
executor.
Remember, you can wrap any stream in this asynchronous layer. Combining batching and async-queue can dramatically improve throughput; sometimes by several orders of magnitude. Each has its cost: batching increases latency, and async-queue means dropping events if the queue fills up. As a transient event processor, Riemann emphasizes *dropping* old data rather than getting backed up or shutting down entirely. Think of async-queue like a safety valve; allowing you to still process requests by shunting some of the incoming stream aside. If you want guaranteed delivery over long timescales, use an on-disk queuing system like Kafka, either in front of or behind Riemann.
Riemann's config is a Clojure program; you can break up your config into smaller, testable, re-usable pieces using functions, and organize those functions into namespaces. For example, here's a config which computes rates and percentiles over two different services.
Let's break that rate and percentile pattern out into its own function.
The two where
expressions match different services, so
we'll make the service name an argument to the function. We also need to
send our rate and latency events to two child streams--the index, and the
graphite client--so we'll make those the two arguments to the function. The
function will construct a stream using (where ...)
and return
that to its caller.
(defn ...)
defines a new function, named
rate+percentiles
. We provide a docstring that describes the
function, and a list of arguments in [brackets]
. Then we
create a stream using where
. The final (in this case only)
expression in a defn
is the function's return value.
Then we just call that function in our streams, filling in the
service, index, and graphite client we'd like to send events to. Remember,
in Clojure, (fun a b)
means "call the function
fun
with arguments a
and b
".
These functions can, in turn, call other functions, allowing you to build up complex streams out of small, well-organized parts. Everything in Riemann is defined in terms of functions like this: the same tools we use to build Riemann's internals are in your hands too.
Use functions whenever you start nesting too deeply, or whenever you can put a name to a well-defined stream. Use functions to transform events! Use functions to figure out what time it is! There's no limit. For more on functions and Clojure in general, see Clojure from the Ground Up: Functions.
As your config grows, you'll have more and more functions--and more and
more names to worry about. It helps to break things up into
namespaces--typically one per file. Clojure provides a first-class
namespace system, which you can use to organize your configuration. For
instance, we might have a file in /etc/riemann/mycorp/queue.clj
which knows how to handle all the events from a queuing system.
We start the file with a namespace declaration ns
, and pull
in some common Riemann functions using :require
. If you need
functions from other namespaces, you can add them as well. Then we define functions that help build queue-related streams.
In our top-level riemann.config
, we don't want to worry
about capacities or rates of growth or anything else. We'll just load every
.clj
file in the mycorp
directory, and require the
mycorp.queues
namespace, giving it the short name q
.
Then we can call any function from that namespace using
q/some-fn
.
Note that require
has a slightly different shape in a
ns
declaration versus when it's called as a function: when called
as a function, we put a single quote in front of the vector. There are Good
Reasons (TM) for Clojure to work this way, but it's a long story and we won't
dig into it here.
Some producers, like collectd, spew a huge volume of service into
Riemann, which can make it tough to figure out which service to use. You
can dump every service from the Riemann index with a client in your
favorite language, though. Just issue a query (e.g. true
for
all events, though this might be slow), and extract the fields you're
interested in.
Make sure to check the asynchronous streams docs; it may be appropriate to batch or defer events which flow to downstream systems.
riemann.email
can send single events--or vectors of events--via email.
You can use any options for Postal.
First, you define a client to graphite, which maintains a connection pool.
Then, use the client as a stream.
Or just graph everything.
Create a client for librato-metrics with your username and API key.
Then use it in streams.
You can submit events as annotations too. See the librato-metrics documentation.
Create a client with your Pagerduty service key, then use :trigger
and :resolve
to open and close issues for a given host and service.
You can also pass http options using the :options
key, and a custom formatter for the Pagerduty event with the :formatter
key.
You can use the Pagerduty v2 API by setting the :version
key to :v2
. In v2, each event can contain a :dedup-key
key to handle alert de-duplication.
Create a Slack webhook for Riemann. Then create a Slack client using the token from the last element of your webhook URL, for example with a URL of:
https://hooks.slack.com/services/QWERSAFG0/AFOIUYTQ48/120984SAFJSFRThen your token would be 120984SAFJSFR
. Specify the user name of the Riemann bot, a channel to notify in and an icon for the notification.
When you have *many* events, you can use multiple Riemann servers to scale out. You might, for instance, run one Riemann server per data center, and forward only state changes in each service to a master server for a birds-eye view.
Create a InfluxDB client for Riemann. Using :version :new-stream
allows to use the new InfluxDB stream. Otherwise, the old stream is used.
You can then send events to InfluxDB. You can use smap
for example to format events for the new stream.
For the old stream, you can override per event the :tag-fields
and :precision
options. For the new stream, you can override per event :consistency
, :db
, :retention
and :precision
.
If a field is a Clojure Ratio
, it will be converted to Double
You can consume events from Kafka topics by configuring a Kafka consumer.
For a full list of :consumer.config
options see the Kafka consumer docs. Note that the :enable.auto.commit
option is ignored and defaults to true.
Create a Kafka client. For a complete list of producer configuration options see the Kafka documentation. The Kafka stream uses kinsky under the hood, so you can import it and use these serializers.
You can now call kafka-output
in your streams with a topic name and an optional message key.
Create a Elasticsearch client. The elasticsearch stream accepts an optional second parameter specifying an event formatter (see the API documentation for the default value).
You can also use the Elasticsearch bulk API. You can override per event the default-bulk-formatter
options.
A TCP connection to Riemann is a stream of messages. Each message is a 4 byte network-endian integer *length*, followed by a Protocol Buffer Message of *length* bytes. See the protocol buffer definition for the details.
Over UDP, the length header is omitted; just send the protobuf Message directly. UDP datagrams have a default maximum size of 16384 bytes by Riemann's default; larger messages should go over TCP. This limit is configurable in both the client and server; client values *must* be smaller than the server's.
The server will accept a repeated list of Events, and respond with a
confirmation message with either an acknowledgement or an error. Check the ok
boolean in the message; if false, message.error
will be a descriptive
string.
Because protocol buffers is a strongly typed protocol, the metric of an event is represented as one of metric_d (floating point 64-bit), metric_f (floating point 32-bit), or metric_sint64 (64-bit signed integer). Your client should emit and consume all of these types. For compatibility with older versions of Riemann, you may *also* emit a metric_f alongside the normal type; newer versions of Riemann will prefer the higher-resolution types.
Events are uniquely identified by host and service. Both allow null. Event.time is the time in unix epoch seconds and is optional. The server will generate a time for each event when received if you do not provide one. Event.time_micros is the time in unix epoch microseconds and is optional too. When receiving a protobuf message, Riemann will use time_micros in priority.
You can also query events from the index using a basic query language. Just submit a Message with your query in message.query.string. Search queries will return a message with repeated Events matching that expression. A null expression will return no states. For some example queries, see The query test suite.
You might find it useful to read the Ruby client source as a guide to writing your own client.
I try to keep master as clean and runnable as possible. Riemann has an exhaustive test suite, which helps ensure code quality. If you plan on changing the Riemann source, fork it on Github so you'll be able to send pull requests quickly. If you just want to run the latest version, go ahead and clone the official repo:
You'll also need a JVM, and leiningen 2--the Clojure build system.
To run the tests suite, try lein test
. To start Riemann, run
lein run
. Riemann will read the file riemann.config
in the current directory. If you want to run a different config file, try
If you want a fat jar, run lein uberjar
and copy
target/riemann-{version}-STANDALONE.jar. To build tarball and debian packages,
use lein pkg
; .debs and .tar.gz files, plus md5sums, will appear
in target/
.
The protocol buffer codec and clojure client live in riemann-clojure-client, which wraps the java protobuf code and java client in riemann-java-client. Both of these are available on clojars and most of the time you can ignore them. However, if you need to change the protocol or client, you can fork these projects and make your changes there.
You'll need maven, and the protocol buffers compiler (protoc) version 2.4.1.
When you've made changes to the java client, install it with mvn
install
; then test the clojure client and install it with lein
install
. Finally, you can run riemann itself. You may need to check
that the client versions you're working with match up in the riemann and
riemann-clojure-clientproject.clj files.
First, fork Riemann on github. Clone your fork and create a new topic branch for your fix:
Most of Riemann's source lives in src/riemann/
. Corresponding
tests live in test/riemann/
. When you fix a bug or add a
feature, make sure to add new tests that confirm its correctness! You can run
the test suite with
Some tests for integrating with other services require a local sendmail, or
graphite, or credentials for a web service. If you make changes that affect
these systems, you can test them with special selectors like lein test
:graphite
or lein test :email
. If you're working with a
particular namespace, like riemann.streams
, lein test
riemann.streams-test
runs only the tests for that namespace. Once your
tests pass, commit your changes and push them to github:
If you change more than a few lines of whitespace, please make your
formatting changes in a separate commit; it'll be easier for me to read and
understand your changes. Please try to send me only a few commits where
possible; use rebase --interactive
to squash your small
changes.
Riemann's web site and documentation are in the gh-pages branch of the riemann repository. Fork riemann on github, clone your fork, and check out the branch:
Pages are built with Jekyll. To see how your changes will appear on the site,
... and open _site/howto.html in a web browser. When you're satisfied with your changes, commit, push, and send me a pull request: