add librato output plugin, update datadog plugin to skip non-number metrics

closes #322
This commit is contained in:
JP 2015-10-26 13:31:21 -05:00 committed by Cameron Sparr
parent ccbfb038ee
commit 7d15061984
8 changed files with 445 additions and 5 deletions

View File

@ -39,6 +39,7 @@ of metrics collected and from how many plugins.
- [#301](https://github.com/influxdb/telegraf/issues/301): Collect on even intervals
- [#298](https://github.com/influxdb/telegraf/pull/298): Support retrying output writes
- [#300](https://github.com/influxdb/telegraf/issues/300): aerospike plugin. Thanks @oldmantaiter!
- [#322](https://github.com/influxdb/telegraf/issues/322): Librato output. Thanks @jipperinbham!
### Bugfixes
- [#228](https://github.com/influxdb/telegraf/pull/228): New version of package will replace old one. Thanks @ekini!

View File

@ -222,6 +222,7 @@ found by running `telegraf -sample-config`.
* opentsdb
* amqp (rabbitmq)
* mqtt
* librato
## Contributing

View File

@ -5,6 +5,7 @@ import (
_ "github.com/influxdb/telegraf/outputs/datadog"
_ "github.com/influxdb/telegraf/outputs/influxdb"
_ "github.com/influxdb/telegraf/outputs/kafka"
_ "github.com/influxdb/telegraf/outputs/librato"
_ "github.com/influxdb/telegraf/outputs/mqtt"
_ "github.com/influxdb/telegraf/outputs/opentsdb"
)

View File

@ -0,0 +1,9 @@
# Datadog Output Plugin
This plugin writes to the [Datadog Metrics API](http://docs.datadoghq.com/api/#metrics)
and requires an `apikey` which can be obtained [here](https://app.datadoghq.com/account/settings#api)
for the account.
If the point value being sent cannot be converted to a float64, the metric is skipped.
Metrics are grouped by converting any `_` characters to `.` in the Point Name.

View File

@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
"net/url"
"sort"
@ -65,10 +66,10 @@ func (d *Datadog) Write(points []*client.Point) error {
if len(points) == 0 {
return nil
}
ts := TimeSeries{
Series: make([]*Metric, len(points)),
}
for index, pt := range points {
ts := TimeSeries{}
var tempSeries = make([]*Metric, len(points))
var acceptablePoints = 0
for _, pt := range points {
metric := &Metric{
Metric: strings.Replace(pt.Name(), "_", ".", -1),
Tags: buildTags(pt.Tags()),
@ -76,9 +77,14 @@ func (d *Datadog) Write(points []*client.Point) error {
}
if p, err := buildPoint(pt); err == nil {
metric.Points[0] = p
tempSeries[acceptablePoints] = metric
acceptablePoints += 1
} else {
log.Printf("unable to build Metric for %s, skipping\n", pt.Name())
}
ts.Series[index] = metric
}
ts.Series = make([]*Metric, acceptablePoints)
copy(ts.Series, tempSeries[0:])
tsBytes, err := json.Marshal(ts)
if err != nil {
return fmt.Errorf("unable to marshal TimeSeries, %s\n", err.Error())

12
outputs/librato/README.md Normal file
View File

@ -0,0 +1,12 @@
# Librato Output Plugin
This plugin writes to the [Librato Metrics API](http://dev.librato.com/v1/metrics#metrics)
and requires an `api_user` and `api_token` which can be obtained [here](https://metrics.librato.com/account/api_tokens)
for the account.
The `source_tag` option in the Configuration file is used to send contextual information from
Point Tags to the API.
If the point value being sent cannot be converted to a float64, the metric is skipped.
Currently, the plugin does not send any associated Point Tags.

165
outputs/librato/librato.go Normal file
View File

@ -0,0 +1,165 @@
package librato
import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
"github.com/influxdb/influxdb/client/v2"
"github.com/influxdb/telegraf/duration"
"github.com/influxdb/telegraf/outputs"
)
type Librato struct {
ApiUser string
ApiToken string
SourceTag string
Timeout duration.Duration
apiUrl string
client *http.Client
}
var sampleConfig = `
# Librator API Docs
# http://dev.librato.com/v1/metrics-authentication
# Librato API user
api_user = "telegraf@influxdb.com" # required.
# Librato API token
api_token = "my-secret-token" # required.
# Tag Field to populate source attribute (optional)
# This is typically the _hostname_ from which the metric was obtained.
source_tag = "hostname"
# Connection timeout.
# timeout = "5s"
`
type Metrics struct {
Gauges []*Gauge `json:"gauges"`
}
type Gauge struct {
Name string `json:"name"`
Value float64 `json:"value"`
Source string `json:"source"`
MeasureTime int64 `json:"measure_time"`
}
const librato_api = "https://metrics-api.librato.com/v1/metrics"
func NewLibrato(apiUrl string) *Librato {
return &Librato{
apiUrl: apiUrl,
}
}
func (l *Librato) Connect() error {
if l.ApiUser == "" || l.ApiToken == "" {
return fmt.Errorf("api_user and api_token are required fields for librato output")
}
l.client = &http.Client{
Timeout: l.Timeout.Duration,
}
return nil
}
func (l *Librato) Write(points []*client.Point) error {
if len(points) == 0 {
return nil
}
metrics := Metrics{}
var tempGauges = make([]*Gauge, len(points))
var acceptablePoints = 0
for _, pt := range points {
if gauge, err := l.buildGauge(pt); err == nil {
tempGauges[acceptablePoints] = gauge
acceptablePoints += 1
} else {
log.Printf("unable to build Gauge for %s, skipping\n", pt.Name())
}
}
metrics.Gauges = make([]*Gauge, acceptablePoints)
copy(metrics.Gauges, tempGauges[0:])
metricsBytes, err := json.Marshal(metrics)
if err != nil {
return fmt.Errorf("unable to marshal Metrics, %s\n", err.Error())
}
req, err := http.NewRequest("POST", l.apiUrl, bytes.NewBuffer(metricsBytes))
if err != nil {
return fmt.Errorf("unable to create http.Request, %s\n", err.Error())
}
req.Header.Add("Content-Type", "application/json")
req.SetBasicAuth(l.ApiUser, l.ApiToken)
resp, err := l.client.Do(req)
if err != nil {
return fmt.Errorf("error POSTing metrics, %s\n", err.Error())
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return fmt.Errorf("received bad status code, %d\n", resp.StatusCode)
}
return nil
}
func (l *Librato) SampleConfig() string {
return sampleConfig
}
func (l *Librato) Description() string {
return "Configuration for Librato API to send metrics to."
}
func (l *Librato) buildGauge(pt *client.Point) (*Gauge, error) {
gauge := &Gauge{
Name: pt.Name(),
MeasureTime: pt.Time().Unix(),
}
if err := gauge.setValue(pt.Fields()["value"]); err != nil {
return gauge, fmt.Errorf("unable to extract value from Fields, %s\n", err.Error())
}
if l.SourceTag != "" {
if source, ok := pt.Tags()[l.SourceTag]; ok {
gauge.Source = source
} else {
return gauge, fmt.Errorf("undeterminable Source type from Field, %s\n", l.SourceTag)
}
}
return gauge, nil
}
func (g *Gauge) setValue(v interface{}) error {
switch d := v.(type) {
case int:
g.Value = float64(int(d))
case int32:
g.Value = float64(int32(d))
case int64:
g.Value = float64(int64(d))
case float32:
g.Value = float64(d)
case float64:
g.Value = float64(d)
default:
return fmt.Errorf("undeterminable type %+v", d)
}
return nil
}
func (l *Librato) Close() error {
return nil
}
func init() {
outputs.Add("librato", func() outputs.Output {
return NewLibrato(librato_api)
})
}

View File

@ -0,0 +1,245 @@
package librato
import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"reflect"
"testing"
"time"
"github.com/influxdb/telegraf/testutil"
"github.com/influxdb/influxdb/client/v2"
"github.com/stretchr/testify/require"
)
var (
fakeUrl = "http://test.librato.com"
fakeUser = "telegraf@influxdb.com"
fakeToken = "123456"
)
func fakeLibrato() *Librato {
l := NewLibrato(fakeUrl)
l.ApiUser = fakeUser
l.ApiToken = fakeToken
return l
}
func TestUriOverride(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer ts.Close()
l := NewLibrato(ts.URL)
l.ApiUser = "telegraf@influxdb.com"
l.ApiToken = "123456"
err := l.Connect()
require.NoError(t, err)
err = l.Write(testutil.MockBatchPoints().Points())
require.NoError(t, err)
}
func TestBadStatusCode(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusServiceUnavailable)
json.NewEncoder(w).Encode(`{
"errors": {
"system": [
"The API is currently down for maintenance. It'll be back shortly."
]
}
}`)
}))
defer ts.Close()
l := NewLibrato(ts.URL)
l.ApiUser = "telegraf@influxdb.com"
l.ApiToken = "123456"
err := l.Connect()
require.NoError(t, err)
err = l.Write(testutil.MockBatchPoints().Points())
if err == nil {
t.Errorf("error expected but none returned")
} else {
require.EqualError(t, fmt.Errorf("received bad status code, 503\n"), err.Error())
}
}
func TestBuildGauge(t *testing.T) {
tags := make(map[string]string)
var gaugeTests = []struct {
ptIn *client.Point
outGauge *Gauge
err error
}{
{
client.NewPoint(
"test1",
tags,
map[string]interface{}{"value": 0.0},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
),
&Gauge{
Name: "test1",
MeasureTime: time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Value: 0.0,
},
nil,
},
{
client.NewPoint(
"test2",
tags,
map[string]interface{}{"value": 1.0},
time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC),
),
&Gauge{
Name: "test2",
MeasureTime: time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC).Unix(),
Value: 1.0,
},
nil,
},
{
client.NewPoint(
"test3",
tags,
map[string]interface{}{"value": 10},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
),
&Gauge{
Name: "test3",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Value: 10.0,
},
nil,
},
{
client.NewPoint(
"test4",
tags,
map[string]interface{}{"value": int32(112345)},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
),
&Gauge{
Name: "test4",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Value: 112345.0,
},
nil,
},
{
client.NewPoint(
"test5",
tags,
map[string]interface{}{"value": int64(112345)},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
),
&Gauge{
Name: "test5",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Value: 112345.0,
},
nil,
},
{
client.NewPoint(
"test6",
tags,
map[string]interface{}{"value": float32(11234.5)},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
),
&Gauge{
Name: "test6",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Value: 11234.5,
},
nil,
},
{
client.NewPoint(
"test7",
tags,
map[string]interface{}{"value": "11234.5"},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
),
&Gauge{
Name: "test7",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Value: 11234.5,
},
fmt.Errorf("unable to extract value from Fields, undeterminable type"),
},
}
l := NewLibrato(fakeUrl)
for _, gt := range gaugeTests {
gauge, err := l.buildGauge(gt.ptIn)
if err != nil && gt.err == nil {
t.Errorf("%s: unexpected error, %+v\n", gt.ptIn.Name(), err)
}
if gt.err != nil && err == nil {
t.Errorf("%s: expected an error (%s) but none returned", gt.ptIn.Name(), gt.err.Error())
}
if !reflect.DeepEqual(gauge, gt.outGauge) && gt.err == nil {
t.Errorf("%s: \nexpected %+v\ngot %+v\n", gt.ptIn.Name(), gt.outGauge, gauge)
}
}
}
func TestBuildGaugeWithSource(t *testing.T) {
var gaugeTests = []struct {
ptIn *client.Point
outGauge *Gauge
err error
}{
{
client.NewPoint(
"test1",
map[string]string{"hostname": "192.168.0.1"},
map[string]interface{}{"value": 0.0},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
),
&Gauge{
Name: "test1",
MeasureTime: time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Value: 0.0,
Source: "192.168.0.1",
},
nil,
},
{
client.NewPoint(
"test2",
map[string]string{"hostnam": "192.168.0.1"},
map[string]interface{}{"value": 1.0},
time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC),
),
&Gauge{
Name: "test2",
MeasureTime: time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC).Unix(),
Value: 1.0,
},
fmt.Errorf("undeterminable Source type from Field, hostname"),
},
}
l := NewLibrato(fakeUrl)
l.SourceTag = "hostname"
for _, gt := range gaugeTests {
gauge, err := l.buildGauge(gt.ptIn)
if err != nil && gt.err == nil {
t.Errorf("%s: unexpected error, %+v\n", gt.ptIn.Name(), err)
}
if gt.err != nil && err == nil {
t.Errorf("%s: expected an error (%s) but none returned", gt.ptIn.Name(), gt.err.Error())
}
if !reflect.DeepEqual(gauge, gt.outGauge) && gt.err == nil {
t.Errorf("%s: \nexpected %+v\ngot %+v\n", gt.ptIn.Name(), gt.outGauge, gauge)
}
}
}