diff --git a/CHANGELOG.md b/CHANGELOG.md index 4a4a34ace..eb22d1896 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ of metrics collected and from how many plugins. - [#301](https://github.com/influxdb/telegraf/issues/301): Collect on even intervals - [#298](https://github.com/influxdb/telegraf/pull/298): Support retrying output writes - [#300](https://github.com/influxdb/telegraf/issues/300): aerospike plugin. Thanks @oldmantaiter! +- [#322](https://github.com/influxdb/telegraf/issues/322): Librato output. Thanks @jipperinbham! ### Bugfixes - [#228](https://github.com/influxdb/telegraf/pull/228): New version of package will replace old one. Thanks @ekini! diff --git a/README.md b/README.md index 5db1dcb07..6e5a3f171 100644 --- a/README.md +++ b/README.md @@ -222,6 +222,7 @@ found by running `telegraf -sample-config`. * opentsdb * amqp (rabbitmq) * mqtt +* librato ## Contributing diff --git a/outputs/all/all.go b/outputs/all/all.go index 6538af0d2..4967cf850 100644 --- a/outputs/all/all.go +++ b/outputs/all/all.go @@ -5,6 +5,7 @@ import ( _ "github.com/influxdb/telegraf/outputs/datadog" _ "github.com/influxdb/telegraf/outputs/influxdb" _ "github.com/influxdb/telegraf/outputs/kafka" + _ "github.com/influxdb/telegraf/outputs/librato" _ "github.com/influxdb/telegraf/outputs/mqtt" _ "github.com/influxdb/telegraf/outputs/opentsdb" ) diff --git a/outputs/datadog/README.md b/outputs/datadog/README.md new file mode 100644 index 000000000..0563d6444 --- /dev/null +++ b/outputs/datadog/README.md @@ -0,0 +1,9 @@ +# Datadog Output Plugin + +This plugin writes to the [Datadog Metrics API](http://docs.datadoghq.com/api/#metrics) +and requires an `apikey` which can be obtained [here](https://app.datadoghq.com/account/settings#api) +for the account. + +If the point value being sent cannot be converted to a float64, the metric is skipped. + +Metrics are grouped by converting any `_` characters to `.` in the Point Name. \ No newline at end of file diff --git a/outputs/datadog/datadog.go b/outputs/datadog/datadog.go index 7b87a43e1..338861b45 100644 --- a/outputs/datadog/datadog.go +++ b/outputs/datadog/datadog.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "fmt" + "log" "net/http" "net/url" "sort" @@ -65,10 +66,10 @@ func (d *Datadog) Write(points []*client.Point) error { if len(points) == 0 { return nil } - ts := TimeSeries{ - Series: make([]*Metric, len(points)), - } - for index, pt := range points { + ts := TimeSeries{} + var tempSeries = make([]*Metric, len(points)) + var acceptablePoints = 0 + for _, pt := range points { metric := &Metric{ Metric: strings.Replace(pt.Name(), "_", ".", -1), Tags: buildTags(pt.Tags()), @@ -76,9 +77,14 @@ func (d *Datadog) Write(points []*client.Point) error { } if p, err := buildPoint(pt); err == nil { metric.Points[0] = p + tempSeries[acceptablePoints] = metric + acceptablePoints += 1 + } else { + log.Printf("unable to build Metric for %s, skipping\n", pt.Name()) } - ts.Series[index] = metric } + ts.Series = make([]*Metric, acceptablePoints) + copy(ts.Series, tempSeries[0:]) tsBytes, err := json.Marshal(ts) if err != nil { return fmt.Errorf("unable to marshal TimeSeries, %s\n", err.Error()) diff --git a/outputs/librato/README.md b/outputs/librato/README.md new file mode 100644 index 000000000..731b9dbd2 --- /dev/null +++ b/outputs/librato/README.md @@ -0,0 +1,12 @@ +# Librato Output Plugin + +This plugin writes to the [Librato Metrics API](http://dev.librato.com/v1/metrics#metrics) +and requires an `api_user` and `api_token` which can be obtained [here](https://metrics.librato.com/account/api_tokens) +for the account. + +The `source_tag` option in the Configuration file is used to send contextual information from +Point Tags to the API. + +If the point value being sent cannot be converted to a float64, the metric is skipped. + +Currently, the plugin does not send any associated Point Tags. \ No newline at end of file diff --git a/outputs/librato/librato.go b/outputs/librato/librato.go new file mode 100644 index 000000000..b0fa8efb0 --- /dev/null +++ b/outputs/librato/librato.go @@ -0,0 +1,165 @@ +package librato + +import ( + "bytes" + "encoding/json" + "fmt" + "log" + "net/http" + + "github.com/influxdb/influxdb/client/v2" + "github.com/influxdb/telegraf/duration" + "github.com/influxdb/telegraf/outputs" +) + +type Librato struct { + ApiUser string + ApiToken string + SourceTag string + Timeout duration.Duration + + apiUrl string + client *http.Client +} + +var sampleConfig = ` + # Librator API Docs + # http://dev.librato.com/v1/metrics-authentication + + # Librato API user + api_user = "telegraf@influxdb.com" # required. + + # Librato API token + api_token = "my-secret-token" # required. + + # Tag Field to populate source attribute (optional) + # This is typically the _hostname_ from which the metric was obtained. + source_tag = "hostname" + + # Connection timeout. + # timeout = "5s" +` + +type Metrics struct { + Gauges []*Gauge `json:"gauges"` +} + +type Gauge struct { + Name string `json:"name"` + Value float64 `json:"value"` + Source string `json:"source"` + MeasureTime int64 `json:"measure_time"` +} + +const librato_api = "https://metrics-api.librato.com/v1/metrics" + +func NewLibrato(apiUrl string) *Librato { + return &Librato{ + apiUrl: apiUrl, + } +} + +func (l *Librato) Connect() error { + if l.ApiUser == "" || l.ApiToken == "" { + return fmt.Errorf("api_user and api_token are required fields for librato output") + } + l.client = &http.Client{ + Timeout: l.Timeout.Duration, + } + return nil +} + +func (l *Librato) Write(points []*client.Point) error { + if len(points) == 0 { + return nil + } + metrics := Metrics{} + var tempGauges = make([]*Gauge, len(points)) + var acceptablePoints = 0 + for _, pt := range points { + if gauge, err := l.buildGauge(pt); err == nil { + tempGauges[acceptablePoints] = gauge + acceptablePoints += 1 + } else { + log.Printf("unable to build Gauge for %s, skipping\n", pt.Name()) + } + } + metrics.Gauges = make([]*Gauge, acceptablePoints) + copy(metrics.Gauges, tempGauges[0:]) + metricsBytes, err := json.Marshal(metrics) + if err != nil { + return fmt.Errorf("unable to marshal Metrics, %s\n", err.Error()) + } + req, err := http.NewRequest("POST", l.apiUrl, bytes.NewBuffer(metricsBytes)) + if err != nil { + return fmt.Errorf("unable to create http.Request, %s\n", err.Error()) + } + req.Header.Add("Content-Type", "application/json") + req.SetBasicAuth(l.ApiUser, l.ApiToken) + + resp, err := l.client.Do(req) + if err != nil { + return fmt.Errorf("error POSTing metrics, %s\n", err.Error()) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return fmt.Errorf("received bad status code, %d\n", resp.StatusCode) + } + + return nil +} + +func (l *Librato) SampleConfig() string { + return sampleConfig +} + +func (l *Librato) Description() string { + return "Configuration for Librato API to send metrics to." +} + +func (l *Librato) buildGauge(pt *client.Point) (*Gauge, error) { + gauge := &Gauge{ + Name: pt.Name(), + MeasureTime: pt.Time().Unix(), + } + if err := gauge.setValue(pt.Fields()["value"]); err != nil { + return gauge, fmt.Errorf("unable to extract value from Fields, %s\n", err.Error()) + } + if l.SourceTag != "" { + if source, ok := pt.Tags()[l.SourceTag]; ok { + gauge.Source = source + } else { + return gauge, fmt.Errorf("undeterminable Source type from Field, %s\n", l.SourceTag) + } + } + return gauge, nil +} + +func (g *Gauge) setValue(v interface{}) error { + switch d := v.(type) { + case int: + g.Value = float64(int(d)) + case int32: + g.Value = float64(int32(d)) + case int64: + g.Value = float64(int64(d)) + case float32: + g.Value = float64(d) + case float64: + g.Value = float64(d) + default: + return fmt.Errorf("undeterminable type %+v", d) + } + return nil +} + +func (l *Librato) Close() error { + return nil +} + +func init() { + outputs.Add("librato", func() outputs.Output { + return NewLibrato(librato_api) + }) +} diff --git a/outputs/librato/librato_test.go b/outputs/librato/librato_test.go new file mode 100644 index 000000000..d4aa571bf --- /dev/null +++ b/outputs/librato/librato_test.go @@ -0,0 +1,245 @@ +package librato + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "reflect" + "testing" + "time" + + "github.com/influxdb/telegraf/testutil" + + "github.com/influxdb/influxdb/client/v2" + "github.com/stretchr/testify/require" +) + +var ( + fakeUrl = "http://test.librato.com" + fakeUser = "telegraf@influxdb.com" + fakeToken = "123456" +) + +func fakeLibrato() *Librato { + l := NewLibrato(fakeUrl) + l.ApiUser = fakeUser + l.ApiToken = fakeToken + return l +} + +func TestUriOverride(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + l := NewLibrato(ts.URL) + l.ApiUser = "telegraf@influxdb.com" + l.ApiToken = "123456" + err := l.Connect() + require.NoError(t, err) + err = l.Write(testutil.MockBatchPoints().Points()) + require.NoError(t, err) +} + +func TestBadStatusCode(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusServiceUnavailable) + json.NewEncoder(w).Encode(`{ + "errors": { + "system": [ + "The API is currently down for maintenance. It'll be back shortly." + ] + } + }`) + })) + defer ts.Close() + + l := NewLibrato(ts.URL) + l.ApiUser = "telegraf@influxdb.com" + l.ApiToken = "123456" + err := l.Connect() + require.NoError(t, err) + err = l.Write(testutil.MockBatchPoints().Points()) + if err == nil { + t.Errorf("error expected but none returned") + } else { + require.EqualError(t, fmt.Errorf("received bad status code, 503\n"), err.Error()) + } +} + +func TestBuildGauge(t *testing.T) { + tags := make(map[string]string) + var gaugeTests = []struct { + ptIn *client.Point + outGauge *Gauge + err error + }{ + { + client.NewPoint( + "test1", + tags, + map[string]interface{}{"value": 0.0}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ), + &Gauge{ + Name: "test1", + MeasureTime: time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), + Value: 0.0, + }, + nil, + }, + { + client.NewPoint( + "test2", + tags, + map[string]interface{}{"value": 1.0}, + time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC), + ), + &Gauge{ + Name: "test2", + MeasureTime: time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC).Unix(), + Value: 1.0, + }, + nil, + }, + { + client.NewPoint( + "test3", + tags, + map[string]interface{}{"value": 10}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ), + &Gauge{ + Name: "test3", + MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), + Value: 10.0, + }, + nil, + }, + { + client.NewPoint( + "test4", + tags, + map[string]interface{}{"value": int32(112345)}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ), + &Gauge{ + Name: "test4", + MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), + Value: 112345.0, + }, + nil, + }, + { + client.NewPoint( + "test5", + tags, + map[string]interface{}{"value": int64(112345)}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ), + &Gauge{ + Name: "test5", + MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), + Value: 112345.0, + }, + nil, + }, + { + client.NewPoint( + "test6", + tags, + map[string]interface{}{"value": float32(11234.5)}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ), + &Gauge{ + Name: "test6", + MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), + Value: 11234.5, + }, + nil, + }, + { + client.NewPoint( + "test7", + tags, + map[string]interface{}{"value": "11234.5"}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ), + &Gauge{ + Name: "test7", + MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), + Value: 11234.5, + }, + fmt.Errorf("unable to extract value from Fields, undeterminable type"), + }, + } + + l := NewLibrato(fakeUrl) + for _, gt := range gaugeTests { + gauge, err := l.buildGauge(gt.ptIn) + if err != nil && gt.err == nil { + t.Errorf("%s: unexpected error, %+v\n", gt.ptIn.Name(), err) + } + if gt.err != nil && err == nil { + t.Errorf("%s: expected an error (%s) but none returned", gt.ptIn.Name(), gt.err.Error()) + } + if !reflect.DeepEqual(gauge, gt.outGauge) && gt.err == nil { + t.Errorf("%s: \nexpected %+v\ngot %+v\n", gt.ptIn.Name(), gt.outGauge, gauge) + } + } +} + +func TestBuildGaugeWithSource(t *testing.T) { + var gaugeTests = []struct { + ptIn *client.Point + outGauge *Gauge + err error + }{ + { + client.NewPoint( + "test1", + map[string]string{"hostname": "192.168.0.1"}, + map[string]interface{}{"value": 0.0}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ), + &Gauge{ + Name: "test1", + MeasureTime: time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), + Value: 0.0, + Source: "192.168.0.1", + }, + nil, + }, + { + client.NewPoint( + "test2", + map[string]string{"hostnam": "192.168.0.1"}, + map[string]interface{}{"value": 1.0}, + time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC), + ), + &Gauge{ + Name: "test2", + MeasureTime: time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC).Unix(), + Value: 1.0, + }, + fmt.Errorf("undeterminable Source type from Field, hostname"), + }, + } + + l := NewLibrato(fakeUrl) + l.SourceTag = "hostname" + for _, gt := range gaugeTests { + gauge, err := l.buildGauge(gt.ptIn) + if err != nil && gt.err == nil { + t.Errorf("%s: unexpected error, %+v\n", gt.ptIn.Name(), err) + } + if gt.err != nil && err == nil { + t.Errorf("%s: expected an error (%s) but none returned", gt.ptIn.Name(), gt.err.Error()) + } + if !reflect.DeepEqual(gauge, gt.outGauge) && gt.err == nil { + t.Errorf("%s: \nexpected %+v\ngot %+v\n", gt.ptIn.Name(), gt.outGauge, gauge) + } + } +}