diff --git a/CHANGELOG.md b/CHANGELOG.md index a8a75617e..63c4403cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ### Features - [#1138](https://github.com/influxdata/telegraf/pull/1138): nstat input plugin. Thanks @Maksadbek! +- [#1139](https://github.com/influxdata/telegraf/pull/1139): instrumental output plugin. Thanks @jasonroelofs! ### Bugfixes diff --git a/README.md b/README.md index 8d325f6dd..b6cc91536 100644 --- a/README.md +++ b/README.md @@ -245,6 +245,7 @@ want to add support for another service or third-party API. * [datadog](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/datadog) * [file](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/file) * [graphite](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/graphite) +* [instrumental](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/instrumental) * [kafka](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/kafka) * [librato](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/librato) * [mqtt](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/mqtt) diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index 18fb1c925..5b223529c 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -8,6 +8,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/file" _ "github.com/influxdata/telegraf/plugins/outputs/graphite" _ "github.com/influxdata/telegraf/plugins/outputs/influxdb" + _ "github.com/influxdata/telegraf/plugins/outputs/instrumental" _ "github.com/influxdata/telegraf/plugins/outputs/kafka" _ "github.com/influxdata/telegraf/plugins/outputs/kinesis" _ "github.com/influxdata/telegraf/plugins/outputs/librato" diff --git a/plugins/outputs/instrumental/README.md b/plugins/outputs/instrumental/README.md new file mode 100644 index 000000000..128599ee8 --- /dev/null +++ b/plugins/outputs/instrumental/README.md @@ -0,0 +1,25 @@ +# Instrumental Output Plugin + +This plugin writes to the [Instrumental Collector API](https://instrumentalapp.com/docs/tcp-collector) +and requires a Project-specific API token. + +Instrumental accepts stats in a format very close to Graphite, with the only difference being that +the type of stat (gauge, increment) is the first token, separated from the metric itself +by whitespace. The `increment` type is only used if the metric comes in as a counter through `[[input.statsd]]`. + +## Configuration: + +```toml +[[outputs.instrumental]] + ## Project API Token (required) + api_token = "API Token" # required + ## Prefix the metrics with a given name + prefix = "" + ## Stats output template (Graphite formatting) + ## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#graphite + template = "host.tags.measurement.field" + ## Timeout in seconds to connect + timeout = "2s" + ## Debug true - Print communcation to Instrumental + debug = false +``` diff --git a/plugins/outputs/instrumental/instrumental.go b/plugins/outputs/instrumental/instrumental.go new file mode 100644 index 000000000..461ba9d9e --- /dev/null +++ b/plugins/outputs/instrumental/instrumental.go @@ -0,0 +1,192 @@ +package instrumental + +import ( + "fmt" + "io" + "log" + "net" + "regexp" + "strings" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf/plugins/serializers" + "github.com/influxdata/telegraf/plugins/serializers/graphite" +) + +type Instrumental struct { + Host string + ApiToken string + Prefix string + DataFormat string + Template string + Timeout internal.Duration + Debug bool + + conn net.Conn +} + +const ( + DefaultHost = "collector.instrumentalapp.com" + AuthFormat = "hello version go/telegraf/1.0\nauthenticate %s\n" +) + +var ( + StatIncludesBadChar = regexp.MustCompile("[^[:alnum:][:blank:]-_.]") +) + +var sampleConfig = ` + ## Project API Token (required) + api_token = "API Token" # required + ## Prefix the metrics with a given name + prefix = "" + ## Stats output template (Graphite formatting) + ## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#graphite + template = "host.tags.measurement.field" + ## Timeout in seconds to connect + timeout = "2s" + ## Display Communcation to Instrumental + debug = false +` + +func (i *Instrumental) Connect() error { + connection, err := net.DialTimeout("tcp", i.Host+":8000", i.Timeout.Duration) + if err != nil { + i.conn = nil + return err + } + + err = i.authenticate(connection) + if err != nil { + i.conn = nil + return err + } + + return nil +} + +func (i *Instrumental) Close() error { + i.conn.Close() + i.conn = nil + return nil +} + +func (i *Instrumental) Write(metrics []telegraf.Metric) error { + if i.conn == nil { + err := i.Connect() + if err != nil { + return fmt.Errorf("FAILED to (re)connect to Instrumental. Error: %s\n", err) + } + } + + s, err := serializers.NewGraphiteSerializer(i.Prefix, i.Template) + if err != nil { + return err + } + + var points []string + var metricType string + var toSerialize telegraf.Metric + var newTags map[string]string + + for _, metric := range metrics { + // Pull the metric_type out of the metric's tags. We don't want the type + // to show up with the other tags pulled from the system, as they go in the + // beginning of the line instead. + // e.g we want: + // + // increment some_prefix.host.tag1.tag2.tag3.field value timestamp + // + // vs + // + // increment some_prefix.host.tag1.tag2.tag3.counter.field value timestamp + // + newTags = metric.Tags() + metricType = newTags["metric_type"] + delete(newTags, "metric_type") + + toSerialize, _ = telegraf.NewMetric( + metric.Name(), + newTags, + metric.Fields(), + metric.Time(), + ) + + stats, err := s.Serialize(toSerialize) + if err != nil { + log.Printf("Error serializing a metric to Instrumental: %s", err) + } + + switch metricType { + case "counter": + fallthrough + case "histogram": + metricType = "increment" + default: + metricType = "gauge" + } + + for _, stat := range stats { + if !StatIncludesBadChar.MatchString(stat) { + points = append(points, fmt.Sprintf("%s %s", metricType, stat)) + } else if i.Debug { + log.Printf("Unable to send bad stat: %s", stat) + } + } + } + + allPoints := strings.Join(points, "\n") + "\n" + _, err = fmt.Fprintf(i.conn, allPoints) + + if i.Debug { + log.Println(allPoints) + } + + if err != nil { + if err == io.EOF { + i.Close() + } + + return err + } + + return nil +} + +func (i *Instrumental) Description() string { + return "Configuration for sending metrics to an Instrumental project" +} + +func (i *Instrumental) SampleConfig() string { + return sampleConfig +} + +func (i *Instrumental) authenticate(conn net.Conn) error { + _, err := fmt.Fprintf(conn, AuthFormat, i.ApiToken) + if err != nil { + return err + } + + // The response here will either be two "ok"s or an error message. + responses := make([]byte, 512) + if _, err = conn.Read(responses); err != nil { + return err + } + + if string(responses)[:6] != "ok\nok\n" { + return fmt.Errorf("Authentication failed: %s", responses) + } + + i.conn = conn + return nil +} + +func init() { + outputs.Add("instrumental", func() telegraf.Output { + return &Instrumental{ + Host: DefaultHost, + Template: graphite.DEFAULT_TEMPLATE, + } + }) +} diff --git a/plugins/outputs/instrumental/instrumental_test.go b/plugins/outputs/instrumental/instrumental_test.go new file mode 100644 index 000000000..ceb53bac6 --- /dev/null +++ b/plugins/outputs/instrumental/instrumental_test.go @@ -0,0 +1,114 @@ +package instrumental + +import ( + "bufio" + "net" + "net/textproto" + "sync" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/stretchr/testify/assert" +) + +func TestWrite(t *testing.T) { + var wg sync.WaitGroup + wg.Add(1) + go TCPServer(t, &wg) + // Give the fake TCP server some time to start: + time.Sleep(time.Millisecond * 100) + + i := Instrumental{ + Host: "127.0.0.1", + ApiToken: "abc123token", + Prefix: "my.prefix", + } + i.Connect() + + // Default to gauge + m1, _ := telegraf.NewMetric( + "mymeasurement", + map[string]string{"host": "192.168.0.1"}, + map[string]interface{}{"myfield": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + m2, _ := telegraf.NewMetric( + "mymeasurement", + map[string]string{"host": "192.168.0.1", "metric_type": "set"}, + map[string]interface{}{"value": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + + // Simulate a connection close and reconnect. + metrics := []telegraf.Metric{m1, m2} + i.Write(metrics) + i.Close() + + // Counter and Histogram are increments + m3, _ := telegraf.NewMetric( + "my_histogram", + map[string]string{"host": "192.168.0.1", "metric_type": "histogram"}, + map[string]interface{}{"value": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + // We will drop metrics that simply won't be accepted by Instrumental + m4, _ := telegraf.NewMetric( + "bad_values", + map[string]string{"host": "192.168.0.1", "metric_type": "counter"}, + map[string]interface{}{"value": "\" 3:30\""}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + m5, _ := telegraf.NewMetric( + "my_counter", + map[string]string{"host": "192.168.0.1", "metric_type": "counter"}, + map[string]interface{}{"value": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + + metrics = []telegraf.Metric{m3, m4, m5} + i.Write(metrics) + + wg.Wait() + i.Close() +} + +func TCPServer(t *testing.T, wg *sync.WaitGroup) { + tcpServer, _ := net.Listen("tcp", "127.0.0.1:8000") + defer wg.Done() + conn, _ := tcpServer.Accept() + conn.SetDeadline(time.Now().Add(1 * time.Second)) + reader := bufio.NewReader(conn) + tp := textproto.NewReader(reader) + + hello, _ := tp.ReadLine() + assert.Equal(t, "hello version go/telegraf/1.0", hello) + auth, _ := tp.ReadLine() + assert.Equal(t, "authenticate abc123token", auth) + + conn.Write([]byte("ok\nok\n")) + + data1, _ := tp.ReadLine() + assert.Equal(t, "gauge my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000", data1) + data2, _ := tp.ReadLine() + assert.Equal(t, "gauge my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", data2) + + conn, _ = tcpServer.Accept() + conn.SetDeadline(time.Now().Add(1 * time.Second)) + reader = bufio.NewReader(conn) + tp = textproto.NewReader(reader) + + hello, _ = tp.ReadLine() + assert.Equal(t, "hello version go/telegraf/1.0", hello) + auth, _ = tp.ReadLine() + assert.Equal(t, "authenticate abc123token", auth) + + conn.Write([]byte("ok\nok\n")) + + data3, _ := tp.ReadLine() + assert.Equal(t, "increment my.prefix.192_168_0_1.my_histogram 3.14 1289430000", data3) + data4, _ := tp.ReadLine() + assert.Equal(t, "increment my.prefix.192_168_0_1.my_counter 3.14 1289430000", data4) + + conn.Close() +}