riemann.kafka

Receives events from and forwards events to Kafka.

json-deserializer

(json-deserializer)

Deserialize JSON. Let bad payload not break the consumption.

kafka

(kafka)(kafka opts)

Returns a function that is invoked with a topic name and an optional message key and returns a stream. That stream is a function which takes an event or a sequence of events and sends them to Kafka.

(def kafka-output (kafka))

(changed :state
  (kafka-output "mytopic"))

Options:

For a complete list of producer configuration options see https://kafka.apache.org/documentation/#producerconfigs

  • :bootstrap.servers Bootstrap configuration, default is “localhost:9092”.
  • :value.serializer Value serializer, default is json-serializer.

Example with SSL enabled:

(def kafka-output (kafka {:bootstrap.servers "kafka.example.com:9092"
                          :security.protocol "SSL"
                          :ssl.truststore.location "/path/to/my/truststore.jks"
                          :ssl.truststore.password "mypassword"}))

kafka-consumer

(kafka-consumer opts)

Yield a kafka consumption service

start-kafka-thread

(start-kafka-thread running? core opts)

Start a kafka thread which will pop messages off the queue as long as running? is true