riemann.config
Riemann config files are eval’d in the context of this namespace. Includes streams, client, email, logging, and graphite; the common functions used in config. Provides a default core and functions ((tcp|udp)-server, streams, index, reinject) which operate on that core.
apply!
(apply!)
Applies pending changes to the core. Transitions the current core to the next one, and resets the next core.
async-queue!
(async-queue! name threadpool-service-opts & children)
A stream which registers (using service!) a new threadpool-service with the next core, and returns a stream which accepts events and applies those events to child streams via the threadpool service.
WARNING: this function is not intended for dynamic use. It creates a new executor service for every invocation. It will not start the executor service until the current configuration is applied. Use sparingly and only at configuration time–preferably once for each distinct IO-bound asynchronous service.
See riemann.service/threadpool-service
for options.
Example:
(let [downstream (batch 100 1/10
(async-queue! :agg {:queue-size 1e3
:core-pool-size 4
:max-pool-size 32}
(forward
(riemann.client/tcp-client
:host "127.0.0.1"))))]
(streams
...
; Forward all events downstream to the aggregator.
(where (service #"^riemann.*")
downstream)))
config-file-path
(config-file-path path)
Computes the full path to a config file. Absolute paths are returned unchanged. Relative paths are expanded relative to config-file. Returns a string.
delete-from-index
(delete-from-index)
(delete-from-index fields)
Deletes any events that pass through from the index. By default, deletes events with the same host and service. If a field, or a list of fields, is given, deletes any events with matching values for all of those fields.
; Delete all events in the index with the same host (delete-from-index :host event)
; Delete all events in the index with the same host and state. (delete-from-index :host :state event)
depend
macro
(depend plugin artifact version options)
Pull in specified dependencies. This combines pulling dependencies with aether and loading a plugin.
The option map is fed to com.cemerick.pomegranate/add-dependencies and accepts an optional :exit-on-failure keyword defaulting to true which indicates whether riemann should bail out when failing to load a plugin as well as an option :alias parameter which will be forward to load-plugin
To prefer https, only the clojars repository is registered by default.
graphite-server
(graphite-server & opts)
Add a new Graphite TCP server with opts to the default core.
(graphite-server {:port 2222})
include
(include path)
Include another config file or directory. If the path points to a directory, all files with names ending in .config
or .clj
within it will be loaded recursively.
; Relative to the current config file, or cwd (include “foo.clj”)
; Absolute path (include “/foo/bar.clj”)
instrumentation
(instrumentation & opts)
Replaces the default core’s instrumentation service with a new one, using the given options. If you prefer not to receive any events about Riemann’s well-being, you can pass :enabled? false.
(instrumentation {:interval 5 :enabled? false})
kafka-consumer
(kafka-consumer & opts)
Add a new kafka consumer with opts to the default core.
(kafka-consumer {:consumer.config {:bootstrap.servers "localhost:9092"
:group.id "riemann"}
:topics ["riemann"]})
Options:
For a full list of :consumer.config options see the kafka consumer docs. NOTE: The :enable.auto.commit option is ignored and defaults to true.
- :consumer.config Consumer configuration
- :bootstrap.servers Bootstrap configuration, default is “localhost:9092”
- :group.id Consumer group id, default is “riemann”
- :topics Topics to consume from, default is “riemann”
- :key.deserializer Key deserializer function, defaults to the keyword-deserializer.
- :value.deserializer Value deserializer function, defaults to json-deserializer.
- :poll.timeout.ms Polling timeout, default is 100.
kwargs-or-map
(kwargs-or-map opts)
Takes a sequence of arguments like
as would be passed to a function taking either kwargs or an options map, and returns an options map.
local-repo
(local-repo path)
Sets the location of the local maven repository used by depend
to load plugins
opentsdb-server
(opentsdb-server & opts)
Add a new OpenTSDB TCP server with opts to the default core.
(opentsdb-server {:port 4242})
periodically-expire
(periodically-expire)
(periodically-expire & args)
Sets up a reaper for this core. See riemann.core/reaper.
publish
(publish channel)
Returns a stream which publishes events to the given channel. Uses this core’s pubsub registry.
rabbitmq-transport
(rabbitmq-transport & opts)
Add a new RabbitMQ transport to the default core.
(rabbitmq-transport {:host "localhost"
:port 5672
:riemann.exchange-settings {:name "riemann"
:auto-delete true}
:riemann.routing-key "riemann.events"})
read-strings
(read-strings string)
(read-strings forms reader)
Returns a sequence of forms read from string.
reinject
(reinject event)
A stream which applies any events it receives back into the current core. You almost never need this: it makes it easy to create infinite loops, and it’s rarely the case that you need top-level recursion. Where possible, prefer a stream that passes events to children.
(with :metric 1 reinject)
service!
(service! service)
Ensures that a given service, or its equivalent, is in the next core. If the current core includes an equivalent service, uses that service instead. Returns the service which will be used in the final core.
This allows configuration to specify and use services in a way which can, where possible, re-use existing services without interruption–e.g., when reloading. For example, say you want to use a threadpool executor:
(let [executor (service! (ThreadPoolExecutor. 1 2 ...))]
(where (service "graphite")
(on executor
graph)))
If you reload this config, the old executor is busily processing messages from the old set of streams. When the new config evaluates (service! …) it creates a new ThreadPoolExecutor and compares it to the existing core’s services. If it’s equivalent, service! will re-use the existing executor, which prevents having to shut down the old executor.
But if you change the dynamics of the new executor somehow–maybe by adjusting a queue depth or max pool size–they won’t compare as equivalent. When the core transitions, the old executor will be shut down, and the new one used to handle any further graphite events.
Note: Yeah, this does duplicate some of the work done in core/transition!. No, I’m not really sure what to do about it. Maybe we need a named service registry so all lookups are dynamic. :-/
sse-server
(sse-server & opts)
Add a new SSE channel server with opts to the default core.
(sse-server {:port 5556})
subscribe
(subscribe channel f)
Subscribes to the given channel with f, which will receive events. Uses the current core’s pubsub registry always, because the next core’s registry will be discarded by core/transition.
Returns a single-arity function that does nothing with its inputs and, when invoked, returns the subscription you created. Why do this weird thing? So you can pretend (subscribe …) is a stream, and use it in the same context as your other streams, like (publish).
tcp-server
(tcp-server & opts)
Add a new TCP server with opts to the default core.
(tcp-server {:host “localhost” :port 5555})
udp-server
(udp-server & opts)
Add a new UDP server with opts to the default core.
(udp-server {:port 5555})
update-index
(update-index index)
Updates the given index with all events received. Also publishes to the index pubsub channel.
ws-server
(ws-server & opts)
Add a new websockets server with opts to the default core.
(ws-server {:port 5556})