From cabcb432284baf9d4d04b7098b6e9b3931d481d9 Mon Sep 17 00:00:00 2001 From: Paulo Pires Date: Thu, 1 Sep 2016 14:20:55 +0100 Subject: [PATCH] Added NATS output plug-in. Fixes #1487 --- README.md | 1 + plugins/outputs/all/all.go | 1 + plugins/outputs/nats/README.md | 37 ++++++++ plugins/outputs/nats/nats.go | 139 ++++++++++++++++++++++++++++++ plugins/outputs/nats/nats_test.go | 31 +++++++ 5 files changed, 209 insertions(+) create mode 100644 plugins/outputs/nats/README.md create mode 100644 plugins/outputs/nats/nats.go create mode 100644 plugins/outputs/nats/nats_test.go diff --git a/README.md b/README.md index 8b46534e0..0415d89f7 100644 --- a/README.md +++ b/README.md @@ -245,6 +245,7 @@ want to add support for another service or third-party API. * [kafka](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/kafka) * [librato](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/librato) * [mqtt](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/mqtt) +* [nats](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/nats) * [nsq](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/nsq) * [opentsdb](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/opentsdb) * [prometheus](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/prometheus_client) diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index 27f8958fe..28354e7e4 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -14,6 +14,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/kinesis" _ "github.com/influxdata/telegraf/plugins/outputs/librato" _ "github.com/influxdata/telegraf/plugins/outputs/mqtt" + _ "github.com/influxdata/telegraf/plugins/outputs/nats" _ "github.com/influxdata/telegraf/plugins/outputs/nsq" _ "github.com/influxdata/telegraf/plugins/outputs/opentsdb" _ "github.com/influxdata/telegraf/plugins/outputs/prometheus_client" diff --git a/plugins/outputs/nats/README.md b/plugins/outputs/nats/README.md new file mode 100644 index 000000000..501bd377c --- /dev/null +++ b/plugins/outputs/nats/README.md @@ -0,0 +1,37 @@ +# NATS Output Plugin + +This plugin writes to a (list of) specified NATS instance(s). + +``` +[[outputs.nats]] + ## URLs of NATS servers + servers = ["nats://localhost:4222"] + ## Optional credentials + # username = "" + # password = "" + ## NATS subject for producer messages + subject = "telegraf" + ## Optional TLS Config + ## CA certificate used to self-sign NATS server(s) TLS certificate(s) + # tls_ca = "/etc/telegraf/ca.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false + + ## Data format to output. + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md + data_format = "influx" +``` + +### Required parameters: + +* `servers`: List of strings, this is for NATS clustering support. Each URL should start with `nats://`. +* `subject`: The NATS subject to publish to. + +### Optional parameters: + +* `username`: Username for NATS +* `password`: Password for NATS +* `tls_ca`: TLS CA +* `insecure_skip_verify`: Use SSL but skip chain & host verification (default: false) diff --git a/plugins/outputs/nats/nats.go b/plugins/outputs/nats/nats.go new file mode 100644 index 000000000..64ed4c556 --- /dev/null +++ b/plugins/outputs/nats/nats.go @@ -0,0 +1,139 @@ +package nats + +import ( + "crypto/tls" + "crypto/x509" + "fmt" + "io/ioutil" + + nats_client "github.com/nats-io/nats" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf/plugins/serializers" +) + +type NATS struct { + // Servers is the NATS server pool to connect to + Servers []string + // Credentials + Username string + Password string + // NATS subject to publish metrics to + Subject string + + // Path to CA file + CAFile string `toml:"tls_ca"` + + // Skip SSL verification + InsecureSkipVerify bool + + conn *nats_client.Conn + serializer serializers.Serializer +} + +var sampleConfig = ` + ## URLs of NATS servers + servers = ["nats://localhost:4222"] + ## Optional credentials + # username = "" + # password = "" + ## NATS subject for producer messages + subject = "telegraf" + ## Optional TLS Config + ## CA certificate used to self-sign NATS server(s) TLS certificate(s) + # tls_ca = "/etc/telegraf/ca.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false + + ## Data format to output. + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md + data_format = "influx" +` + +func (n *NATS) SetSerializer(serializer serializers.Serializer) { + n.serializer = serializer +} + +func (n *NATS) Connect() error { + // set NATS connection options + opts := nats_client.DefaultOptions + opts.Servers = n.Servers + if n.Username != "" { + opts.User = n.Username + opts.Password = n.Password + } + + // is TLS enabled? + var tlsConfig tls.Config + tlsConfig.InsecureSkipVerify = n.InsecureSkipVerify + if n.CAFile != "" { + rootPEM, err := ioutil.ReadFile(n.CAFile) + if err != nil || rootPEM == nil { + return fmt.Errorf("FAILED to connect to NATS (can't read root certificate): %s", err) + } + pool := x509.NewCertPool() + ok := pool.AppendCertsFromPEM([]byte(rootPEM)) + if !ok { + return fmt.Errorf("FAILED to connect to NATS (can't parse root certificate): %s", err) + } + tlsConfig.RootCAs = pool + + // set NATS connection TLS options + opts.Secure = true + opts.TLSConfig = &tlsConfig + } + + // try and connect + var err error + n.conn, err = opts.Connect() + + return err +} + +func (n *NATS) Close() error { + n.conn.Close() + return nil +} + +func (n *NATS) SampleConfig() string { + return sampleConfig +} + +func (n *NATS) Description() string { + return "Send telegraf measurements to NATS" +} + +func (n *NATS) Write(metrics []telegraf.Metric) error { + if len(metrics) == 0 { + return nil + } + + for _, metric := range metrics { + values, err := n.serializer.Serialize(metric) + if err != nil { + return err + } + + var pubErr error + for _, value := range values { + err = n.conn.Publish(n.Subject, []byte(value)) + if err != nil { + pubErr = err + } + } + + if pubErr != nil { + return fmt.Errorf("FAILED to send NATS message: %s", err) + } + } + return nil +} + +func init() { + outputs.Add("nats", func() telegraf.Output { + return &NATS{} + }) +} diff --git a/plugins/outputs/nats/nats_test.go b/plugins/outputs/nats/nats_test.go new file mode 100644 index 000000000..773dbaa6e --- /dev/null +++ b/plugins/outputs/nats/nats_test.go @@ -0,0 +1,31 @@ +package nats + +import ( + "testing" + + "github.com/influxdata/telegraf/plugins/serializers" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestConnectAndWrite(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + server := []string{"nats://" + testutil.GetLocalHost() + ":4222"} + s, _ := serializers.NewInfluxSerializer() + n := &NATS{ + Servers: server, + Subject: "telegraf", + serializer: s, + } + + // Verify that we can connect to the NATS daemon + err := n.Connect() + require.NoError(t, err) + + // Verify that we can successfully write data to the NATS daemon + err = n.Write(testutil.MockMetrics()) + require.NoError(t, err) +}