add librato output plugin, update datadog plugin to skip non-number metrics
This commit is contained in:
parent
cb951ebd28
commit
8df0c630d2
|
@ -5,6 +5,7 @@ import (
|
||||||
_ "github.com/influxdb/telegraf/outputs/datadog"
|
_ "github.com/influxdb/telegraf/outputs/datadog"
|
||||||
_ "github.com/influxdb/telegraf/outputs/influxdb"
|
_ "github.com/influxdb/telegraf/outputs/influxdb"
|
||||||
_ "github.com/influxdb/telegraf/outputs/kafka"
|
_ "github.com/influxdb/telegraf/outputs/kafka"
|
||||||
|
_ "github.com/influxdb/telegraf/outputs/librato"
|
||||||
_ "github.com/influxdb/telegraf/outputs/mqtt"
|
_ "github.com/influxdb/telegraf/outputs/mqtt"
|
||||||
_ "github.com/influxdb/telegraf/outputs/opentsdb"
|
_ "github.com/influxdb/telegraf/outputs/opentsdb"
|
||||||
)
|
)
|
||||||
|
|
|
@ -0,0 +1,9 @@
|
||||||
|
# Datadog Output Plugin
|
||||||
|
|
||||||
|
This plugin writes to the [Datadog Metrics API](http://docs.datadoghq.com/api/#metrics)
|
||||||
|
and requires an `apikey` which can be obtained [here](https://app.datadoghq.com/account/settings#api)
|
||||||
|
for the account.
|
||||||
|
|
||||||
|
If the point value being sent cannot be converted to a float64, the metric is skipped.
|
||||||
|
|
||||||
|
Metrics are grouped by converting any `_` characters to `.` in the Point Name.
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"sort"
|
"sort"
|
||||||
|
@ -65,10 +66,10 @@ func (d *Datadog) Write(points []*client.Point) error {
|
||||||
if len(points) == 0 {
|
if len(points) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
ts := TimeSeries{
|
ts := TimeSeries{}
|
||||||
Series: make([]*Metric, len(points)),
|
var tempSeries = make([]*Metric, len(points))
|
||||||
}
|
var acceptablePoints = 0
|
||||||
for index, pt := range points {
|
for _, pt := range points {
|
||||||
metric := &Metric{
|
metric := &Metric{
|
||||||
Metric: strings.Replace(pt.Name(), "_", ".", -1),
|
Metric: strings.Replace(pt.Name(), "_", ".", -1),
|
||||||
Tags: buildTags(pt.Tags()),
|
Tags: buildTags(pt.Tags()),
|
||||||
|
@ -76,9 +77,14 @@ func (d *Datadog) Write(points []*client.Point) error {
|
||||||
}
|
}
|
||||||
if p, err := buildPoint(pt); err == nil {
|
if p, err := buildPoint(pt); err == nil {
|
||||||
metric.Points[0] = p
|
metric.Points[0] = p
|
||||||
|
tempSeries[acceptablePoints] = metric
|
||||||
|
acceptablePoints += 1
|
||||||
|
} else {
|
||||||
|
log.Printf("unable to build Metric for %s, skipping\n", pt.Name())
|
||||||
}
|
}
|
||||||
ts.Series[index] = metric
|
|
||||||
}
|
}
|
||||||
|
ts.Series = make([]*Metric, acceptablePoints)
|
||||||
|
copy(ts.Series, tempSeries[0:])
|
||||||
tsBytes, err := json.Marshal(ts)
|
tsBytes, err := json.Marshal(ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to marshal TimeSeries, %s\n", err.Error())
|
return fmt.Errorf("unable to marshal TimeSeries, %s\n", err.Error())
|
||||||
|
|
|
@ -0,0 +1,12 @@
|
||||||
|
# Librato Output Plugin
|
||||||
|
|
||||||
|
This plugin writes to the [Librato Metrics API](http://dev.librato.com/v1/metrics#metrics)
|
||||||
|
and requires an `api_user` and `api_token` which can be obtained [here](https://metrics.librato.com/account/api_tokens)
|
||||||
|
for the account.
|
||||||
|
|
||||||
|
The `source_tag` option in the Configuration file is used to send contextual information from
|
||||||
|
Point Tags to the API.
|
||||||
|
|
||||||
|
If the point value being sent cannot be converted to a float64, the metric is skipped.
|
||||||
|
|
||||||
|
Currently, the plugin does not send any associated Point Tags.
|
|
@ -0,0 +1,165 @@
|
||||||
|
package librato
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/influxdb/influxdb/client/v2"
|
||||||
|
"github.com/influxdb/telegraf/duration"
|
||||||
|
"github.com/influxdb/telegraf/outputs"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Librato struct {
|
||||||
|
ApiUser string
|
||||||
|
ApiToken string
|
||||||
|
SourceTag string
|
||||||
|
Timeout duration.Duration
|
||||||
|
|
||||||
|
apiUrl string
|
||||||
|
client *http.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
var sampleConfig = `
|
||||||
|
# Librator API Docs
|
||||||
|
# http://dev.librato.com/v1/metrics-authentication
|
||||||
|
|
||||||
|
# Librato API user
|
||||||
|
api_user = "telegraf@influxdb.com" # required.
|
||||||
|
|
||||||
|
# Librato API token
|
||||||
|
api_token = "my-secret-token" # required.
|
||||||
|
|
||||||
|
# Tag Field to populate source attribute (optional)
|
||||||
|
# This is typically the _hostname_ from which the metric was obtained.
|
||||||
|
source_tag = "hostname"
|
||||||
|
|
||||||
|
# Connection timeout.
|
||||||
|
# timeout = "5s"
|
||||||
|
`
|
||||||
|
|
||||||
|
type Metrics struct {
|
||||||
|
Gauges []*Gauge `json:"gauges"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Gauge struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Value float64 `json:"value"`
|
||||||
|
Source string `json:"source"`
|
||||||
|
MeasureTime int64 `json:"measure_time"`
|
||||||
|
}
|
||||||
|
|
||||||
|
const librato_api = "https://metrics-api.librato.com/v1/metrics"
|
||||||
|
|
||||||
|
func NewLibrato(apiUrl string) *Librato {
|
||||||
|
return &Librato{
|
||||||
|
apiUrl: apiUrl,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *Librato) Connect() error {
|
||||||
|
if l.ApiUser == "" || l.ApiToken == "" {
|
||||||
|
return fmt.Errorf("api_user and api_token are required fields for librato output")
|
||||||
|
}
|
||||||
|
l.client = &http.Client{
|
||||||
|
Timeout: l.Timeout.Duration,
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *Librato) Write(points []*client.Point) error {
|
||||||
|
if len(points) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
metrics := Metrics{}
|
||||||
|
var tempGauges = make([]*Gauge, len(points))
|
||||||
|
var acceptablePoints = 0
|
||||||
|
for _, pt := range points {
|
||||||
|
if gauge, err := l.buildGauge(pt); err == nil {
|
||||||
|
tempGauges[acceptablePoints] = gauge
|
||||||
|
acceptablePoints += 1
|
||||||
|
} else {
|
||||||
|
log.Printf("unable to build Gauge for %s, skipping\n", pt.Name())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
metrics.Gauges = make([]*Gauge, acceptablePoints)
|
||||||
|
copy(metrics.Gauges, tempGauges[0:])
|
||||||
|
metricsBytes, err := json.Marshal(metrics)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to marshal Metrics, %s\n", err.Error())
|
||||||
|
}
|
||||||
|
req, err := http.NewRequest("POST", l.apiUrl, bytes.NewBuffer(metricsBytes))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to create http.Request, %s\n", err.Error())
|
||||||
|
}
|
||||||
|
req.Header.Add("Content-Type", "application/json")
|
||||||
|
req.SetBasicAuth(l.ApiUser, l.ApiToken)
|
||||||
|
|
||||||
|
resp, err := l.client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error POSTing metrics, %s\n", err.Error())
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode != 200 {
|
||||||
|
return fmt.Errorf("received bad status code, %d\n", resp.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *Librato) SampleConfig() string {
|
||||||
|
return sampleConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *Librato) Description() string {
|
||||||
|
return "Configuration for Librato API to send metrics to."
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *Librato) buildGauge(pt *client.Point) (*Gauge, error) {
|
||||||
|
gauge := &Gauge{
|
||||||
|
Name: pt.Name(),
|
||||||
|
MeasureTime: pt.Time().Unix(),
|
||||||
|
}
|
||||||
|
if err := gauge.setValue(pt.Fields()["value"]); err != nil {
|
||||||
|
return gauge, fmt.Errorf("unable to extract value from Fields, %s\n", err.Error())
|
||||||
|
}
|
||||||
|
if l.SourceTag != "" {
|
||||||
|
if source, ok := pt.Tags()[l.SourceTag]; ok {
|
||||||
|
gauge.Source = source
|
||||||
|
} else {
|
||||||
|
return gauge, fmt.Errorf("undeterminable Source type from Field, %s\n", l.SourceTag)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return gauge, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *Gauge) setValue(v interface{}) error {
|
||||||
|
switch d := v.(type) {
|
||||||
|
case int:
|
||||||
|
g.Value = float64(int(d))
|
||||||
|
case int32:
|
||||||
|
g.Value = float64(int32(d))
|
||||||
|
case int64:
|
||||||
|
g.Value = float64(int64(d))
|
||||||
|
case float32:
|
||||||
|
g.Value = float64(d)
|
||||||
|
case float64:
|
||||||
|
g.Value = float64(d)
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("undeterminable type %+v", d)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *Librato) Close() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
outputs.Add("librato", func() outputs.Output {
|
||||||
|
return NewLibrato(librato_api)
|
||||||
|
})
|
||||||
|
}
|
|
@ -0,0 +1,245 @@
|
||||||
|
package librato
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdb/telegraf/testutil"
|
||||||
|
|
||||||
|
"github.com/influxdb/influxdb/client/v2"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
fakeUrl = "http://test.librato.com"
|
||||||
|
fakeUser = "telegraf@influxdb.com"
|
||||||
|
fakeToken = "123456"
|
||||||
|
)
|
||||||
|
|
||||||
|
func fakeLibrato() *Librato {
|
||||||
|
l := NewLibrato(fakeUrl)
|
||||||
|
l.ApiUser = fakeUser
|
||||||
|
l.ApiToken = fakeToken
|
||||||
|
return l
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUriOverride(t *testing.T) {
|
||||||
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
}))
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
l := NewLibrato(ts.URL)
|
||||||
|
l.ApiUser = "telegraf@influxdb.com"
|
||||||
|
l.ApiToken = "123456"
|
||||||
|
err := l.Connect()
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = l.Write(testutil.MockBatchPoints().Points())
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBadStatusCode(t *testing.T) {
|
||||||
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusServiceUnavailable)
|
||||||
|
json.NewEncoder(w).Encode(`{
|
||||||
|
"errors": {
|
||||||
|
"system": [
|
||||||
|
"The API is currently down for maintenance. It'll be back shortly."
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}`)
|
||||||
|
}))
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
l := NewLibrato(ts.URL)
|
||||||
|
l.ApiUser = "telegraf@influxdb.com"
|
||||||
|
l.ApiToken = "123456"
|
||||||
|
err := l.Connect()
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = l.Write(testutil.MockBatchPoints().Points())
|
||||||
|
if err == nil {
|
||||||
|
t.Errorf("error expected but none returned")
|
||||||
|
} else {
|
||||||
|
require.EqualError(t, fmt.Errorf("received bad status code, 503\n"), err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildGauge(t *testing.T) {
|
||||||
|
tags := make(map[string]string)
|
||||||
|
var gaugeTests = []struct {
|
||||||
|
ptIn *client.Point
|
||||||
|
outGauge *Gauge
|
||||||
|
err error
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
client.NewPoint(
|
||||||
|
"test1",
|
||||||
|
tags,
|
||||||
|
map[string]interface{}{"value": 0.0},
|
||||||
|
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
),
|
||||||
|
&Gauge{
|
||||||
|
Name: "test1",
|
||||||
|
MeasureTime: time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
|
||||||
|
Value: 0.0,
|
||||||
|
},
|
||||||
|
nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
client.NewPoint(
|
||||||
|
"test2",
|
||||||
|
tags,
|
||||||
|
map[string]interface{}{"value": 1.0},
|
||||||
|
time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
),
|
||||||
|
&Gauge{
|
||||||
|
Name: "test2",
|
||||||
|
MeasureTime: time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC).Unix(),
|
||||||
|
Value: 1.0,
|
||||||
|
},
|
||||||
|
nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
client.NewPoint(
|
||||||
|
"test3",
|
||||||
|
tags,
|
||||||
|
map[string]interface{}{"value": 10},
|
||||||
|
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
),
|
||||||
|
&Gauge{
|
||||||
|
Name: "test3",
|
||||||
|
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
|
||||||
|
Value: 10.0,
|
||||||
|
},
|
||||||
|
nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
client.NewPoint(
|
||||||
|
"test4",
|
||||||
|
tags,
|
||||||
|
map[string]interface{}{"value": int32(112345)},
|
||||||
|
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
),
|
||||||
|
&Gauge{
|
||||||
|
Name: "test4",
|
||||||
|
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
|
||||||
|
Value: 112345.0,
|
||||||
|
},
|
||||||
|
nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
client.NewPoint(
|
||||||
|
"test5",
|
||||||
|
tags,
|
||||||
|
map[string]interface{}{"value": int64(112345)},
|
||||||
|
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
),
|
||||||
|
&Gauge{
|
||||||
|
Name: "test5",
|
||||||
|
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
|
||||||
|
Value: 112345.0,
|
||||||
|
},
|
||||||
|
nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
client.NewPoint(
|
||||||
|
"test6",
|
||||||
|
tags,
|
||||||
|
map[string]interface{}{"value": float32(11234.5)},
|
||||||
|
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
),
|
||||||
|
&Gauge{
|
||||||
|
Name: "test6",
|
||||||
|
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
|
||||||
|
Value: 11234.5,
|
||||||
|
},
|
||||||
|
nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
client.NewPoint(
|
||||||
|
"test7",
|
||||||
|
tags,
|
||||||
|
map[string]interface{}{"value": "11234.5"},
|
||||||
|
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
),
|
||||||
|
&Gauge{
|
||||||
|
Name: "test7",
|
||||||
|
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
|
||||||
|
Value: 11234.5,
|
||||||
|
},
|
||||||
|
fmt.Errorf("unable to extract value from Fields, undeterminable type"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
l := NewLibrato(fakeUrl)
|
||||||
|
for _, gt := range gaugeTests {
|
||||||
|
gauge, err := l.buildGauge(gt.ptIn)
|
||||||
|
if err != nil && gt.err == nil {
|
||||||
|
t.Errorf("%s: unexpected error, %+v\n", gt.ptIn.Name(), err)
|
||||||
|
}
|
||||||
|
if gt.err != nil && err == nil {
|
||||||
|
t.Errorf("%s: expected an error (%s) but none returned", gt.ptIn.Name(), gt.err.Error())
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(gauge, gt.outGauge) && gt.err == nil {
|
||||||
|
t.Errorf("%s: \nexpected %+v\ngot %+v\n", gt.ptIn.Name(), gt.outGauge, gauge)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildGaugeWithSource(t *testing.T) {
|
||||||
|
var gaugeTests = []struct {
|
||||||
|
ptIn *client.Point
|
||||||
|
outGauge *Gauge
|
||||||
|
err error
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
client.NewPoint(
|
||||||
|
"test1",
|
||||||
|
map[string]string{"hostname": "192.168.0.1"},
|
||||||
|
map[string]interface{}{"value": 0.0},
|
||||||
|
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
),
|
||||||
|
&Gauge{
|
||||||
|
Name: "test1",
|
||||||
|
MeasureTime: time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
|
||||||
|
Value: 0.0,
|
||||||
|
Source: "192.168.0.1",
|
||||||
|
},
|
||||||
|
nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
client.NewPoint(
|
||||||
|
"test2",
|
||||||
|
map[string]string{"hostnam": "192.168.0.1"},
|
||||||
|
map[string]interface{}{"value": 1.0},
|
||||||
|
time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
),
|
||||||
|
&Gauge{
|
||||||
|
Name: "test2",
|
||||||
|
MeasureTime: time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC).Unix(),
|
||||||
|
Value: 1.0,
|
||||||
|
},
|
||||||
|
fmt.Errorf("undeterminable Source type from Field, hostname"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
l := NewLibrato(fakeUrl)
|
||||||
|
l.SourceTag = "hostname"
|
||||||
|
for _, gt := range gaugeTests {
|
||||||
|
gauge, err := l.buildGauge(gt.ptIn)
|
||||||
|
if err != nil && gt.err == nil {
|
||||||
|
t.Errorf("%s: unexpected error, %+v\n", gt.ptIn.Name(), err)
|
||||||
|
}
|
||||||
|
if gt.err != nil && err == nil {
|
||||||
|
t.Errorf("%s: expected an error (%s) but none returned", gt.ptIn.Name(), gt.err.Error())
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(gauge, gt.outGauge) && gt.err == nil {
|
||||||
|
t.Errorf("%s: \nexpected %+v\ngot %+v\n", gt.ptIn.Name(), gt.outGauge, gauge)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue