From 6b1cc67664fd9b37515edf20b113d1f40fd3df98 Mon Sep 17 00:00:00 2001 From: Paulo Pires Date: Thu, 1 Sep 2016 13:11:42 +0100 Subject: [PATCH] Add NATS output plugin. Added NATS server container needed for tests. Added NATS output plug-in. Fixes #1487 NATS output plug-in use internal.GetTLSConfig to instrument TLS configuration. Added NATS output plug-in to changelog. closes #1487 closes #1697 --- CHANGELOG.md | 1 + Godeps | 4 +- Makefile | 6 +- README.md | 1 + plugins/outputs/all/all.go | 1 + plugins/outputs/nats/README.md | 37 +++++++++ plugins/outputs/nats/nats.go | 129 ++++++++++++++++++++++++++++++ plugins/outputs/nats/nats_test.go | 31 +++++++ 8 files changed, 206 insertions(+), 4 deletions(-) create mode 100644 plugins/outputs/nats/README.md create mode 100644 plugins/outputs/nats/nats.go create mode 100644 plugins/outputs/nats/nats_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index ed7a62a38..fcab261df 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ - [#1471](https://github.com/influxdata/telegraf/pull/1471): iptables input plugin. - [#1542](https://github.com/influxdata/telegraf/pull/1542): Add filestack webhook plugin. - [#1599](https://github.com/influxdata/telegraf/pull/1599): Add server hostname for each docker measurements. +- [#1697](https://github.com/influxdata/telegraf/pull/1697): Add NATS output plugin. ### Bugfixes diff --git a/Godeps b/Godeps index fc94b59c0..3a4e9fb1b 100644 --- a/Godeps +++ b/Godeps @@ -37,8 +37,8 @@ github.com/matttproud/golang_protobuf_extensions d0c3fe89de86839aecf2e0579c40ba3 github.com/miekg/dns cce6c130cdb92c752850880fd285bea1d64439dd github.com/mreiferson/go-snappystream 028eae7ab5c4c9e2d1cb4c4ca1e53259bbe7e504 github.com/naoina/go-stringutil 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b -github.com/nats-io/nats b13fc9d12b0b123ebc374e6b808c6228ae4234a3 -github.com/nats-io/nuid 4f84f5f3b2786224e336af2e13dba0a0a80b76fa +github.com/nats-io/nats ea8b4fd12ebb823073c0004b9f09ac8748f4f165 +github.com/nats-io/nuid a5152d67cf63cbfb5d992a395458722a45194715 github.com/nsqio/go-nsq 0b80d6f05e15ca1930e0c5e1d540ed627e299980 github.com/opencontainers/runc 89ab7f2ccc1e45ddf6485eaa802c35dcf321dfc8 github.com/prometheus/client_golang 18acf9993a863f4c4b40612e19cdd243e7c86831 diff --git a/Makefile b/Makefile index 10ddbef6b..0a3be166b 100644 --- a/Makefile +++ b/Makefile @@ -56,6 +56,7 @@ docker-run: docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt docker run --name riemann -p "5555:5555" -d blalor/riemann + docker run --name nats -p "4222:4222" -d nats # Run docker containers necessary for CircleCI unit tests docker-run-circle: @@ -68,11 +69,12 @@ docker-run-circle: docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt docker run --name riemann -p "5555:5555" -d blalor/riemann + docker run --name nats -p "4222:4222" -d nats # Kill all docker containers, ignore errors docker-kill: - -docker kill nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann - -docker rm nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann + -docker kill nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann nats + -docker rm nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann nats # Run full unit tests using docker containers (includes setup and teardown) test: vet docker-kill docker-run diff --git a/README.md b/README.md index 8b46534e0..0415d89f7 100644 --- a/README.md +++ b/README.md @@ -245,6 +245,7 @@ want to add support for another service or third-party API. * [kafka](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/kafka) * [librato](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/librato) * [mqtt](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/mqtt) +* [nats](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/nats) * [nsq](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/nsq) * [opentsdb](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/opentsdb) * [prometheus](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/prometheus_client) diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index 27f8958fe..28354e7e4 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -14,6 +14,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/kinesis" _ "github.com/influxdata/telegraf/plugins/outputs/librato" _ "github.com/influxdata/telegraf/plugins/outputs/mqtt" + _ "github.com/influxdata/telegraf/plugins/outputs/nats" _ "github.com/influxdata/telegraf/plugins/outputs/nsq" _ "github.com/influxdata/telegraf/plugins/outputs/opentsdb" _ "github.com/influxdata/telegraf/plugins/outputs/prometheus_client" diff --git a/plugins/outputs/nats/README.md b/plugins/outputs/nats/README.md new file mode 100644 index 000000000..501bd377c --- /dev/null +++ b/plugins/outputs/nats/README.md @@ -0,0 +1,37 @@ +# NATS Output Plugin + +This plugin writes to a (list of) specified NATS instance(s). + +``` +[[outputs.nats]] + ## URLs of NATS servers + servers = ["nats://localhost:4222"] + ## Optional credentials + # username = "" + # password = "" + ## NATS subject for producer messages + subject = "telegraf" + ## Optional TLS Config + ## CA certificate used to self-sign NATS server(s) TLS certificate(s) + # tls_ca = "/etc/telegraf/ca.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false + + ## Data format to output. + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md + data_format = "influx" +``` + +### Required parameters: + +* `servers`: List of strings, this is for NATS clustering support. Each URL should start with `nats://`. +* `subject`: The NATS subject to publish to. + +### Optional parameters: + +* `username`: Username for NATS +* `password`: Password for NATS +* `tls_ca`: TLS CA +* `insecure_skip_verify`: Use SSL but skip chain & host verification (default: false) diff --git a/plugins/outputs/nats/nats.go b/plugins/outputs/nats/nats.go new file mode 100644 index 000000000..4b0b94d7a --- /dev/null +++ b/plugins/outputs/nats/nats.go @@ -0,0 +1,129 @@ +package nats + +import ( + "fmt" + + nats_client "github.com/nats-io/nats" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf/plugins/serializers" +) + +type NATS struct { + // Servers is the NATS server pool to connect to + Servers []string + // Credentials + Username string + Password string + // NATS subject to publish metrics to + Subject string + + // Path to CA file + CAFile string `toml:"tls_ca"` + + // Skip SSL verification + InsecureSkipVerify bool + + conn *nats_client.Conn + serializer serializers.Serializer +} + +var sampleConfig = ` + ## URLs of NATS servers + servers = ["nats://localhost:4222"] + ## Optional credentials + # username = "" + # password = "" + ## NATS subject for producer messages + subject = "telegraf" + ## Optional TLS Config + ## CA certificate used to self-sign NATS server(s) TLS certificate(s) + # tls_ca = "/etc/telegraf/ca.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false + + ## Data format to output. + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md + data_format = "influx" +` + +func (n *NATS) SetSerializer(serializer serializers.Serializer) { + n.serializer = serializer +} + +func (n *NATS) Connect() error { + var err error + // set NATS connection options + opts := nats_client.DefaultOptions + opts.Servers = n.Servers + if n.Username != "" { + opts.User = n.Username + opts.Password = n.Password + } + + // is TLS enabled? + tlsConfig, err := internal.GetTLSConfig( + "", "", n.CAFile, n.InsecureSkipVerify) + if err != nil { + return err + } + if tlsConfig != nil { + // set NATS connection TLS options + opts.Secure = true + opts.TLSConfig = tlsConfig + } + + // try and connect + n.conn, err = opts.Connect() + + return err +} + +func (n *NATS) Close() error { + n.conn.Close() + return nil +} + +func (n *NATS) SampleConfig() string { + return sampleConfig +} + +func (n *NATS) Description() string { + return "Send telegraf measurements to NATS" +} + +func (n *NATS) Write(metrics []telegraf.Metric) error { + if len(metrics) == 0 { + return nil + } + + for _, metric := range metrics { + values, err := n.serializer.Serialize(metric) + if err != nil { + return err + } + + var pubErr error + for _, value := range values { + err = n.conn.Publish(n.Subject, []byte(value)) + if err != nil { + pubErr = err + } + } + + if pubErr != nil { + return fmt.Errorf("FAILED to send NATS message: %s", err) + } + } + return nil +} + +func init() { + outputs.Add("nats", func() telegraf.Output { + return &NATS{} + }) +} diff --git a/plugins/outputs/nats/nats_test.go b/plugins/outputs/nats/nats_test.go new file mode 100644 index 000000000..773dbaa6e --- /dev/null +++ b/plugins/outputs/nats/nats_test.go @@ -0,0 +1,31 @@ +package nats + +import ( + "testing" + + "github.com/influxdata/telegraf/plugins/serializers" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestConnectAndWrite(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + server := []string{"nats://" + testutil.GetLocalHost() + ":4222"} + s, _ := serializers.NewInfluxSerializer() + n := &NATS{ + Servers: server, + Subject: "telegraf", + serializer: s, + } + + // Verify that we can connect to the NATS daemon + err := n.Connect() + require.NoError(t, err) + + // Verify that we can successfully write data to the NATS daemon + err = n.Write(testutil.MockMetrics()) + require.NoError(t, err) +}