Merge branch 'jipperinbham-datadog-output'
This commit is contained in:
commit
cad0a762a0
|
@ -1,6 +1,7 @@
|
||||||
## v0.1.6 [unreleased]
|
## v0.1.6 [unreleased]
|
||||||
|
|
||||||
### Features
|
### Features
|
||||||
|
[#112](://github.com/influxdb/telegraf/pull/112): Datadog output. Thanks @jipperinbham!
|
||||||
|
|
||||||
### Bugfixes
|
### Bugfixes
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
package all
|
package all
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
_ "github.com/influxdb/telegraf/outputs/datadog"
|
||||||
_ "github.com/influxdb/telegraf/outputs/influxdb"
|
_ "github.com/influxdb/telegraf/outputs/influxdb"
|
||||||
)
|
)
|
||||||
|
|
|
@ -0,0 +1,155 @@
|
||||||
|
package datadog
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"sort"
|
||||||
|
|
||||||
|
"github.com/influxdb/influxdb/client"
|
||||||
|
t "github.com/influxdb/telegraf"
|
||||||
|
"github.com/influxdb/telegraf/outputs"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Datadog struct {
|
||||||
|
Apikey string
|
||||||
|
Timeout t.Duration
|
||||||
|
|
||||||
|
apiUrl string
|
||||||
|
client *http.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
type TimeSeries struct {
|
||||||
|
Series []*Metric `json:"series"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Metric struct {
|
||||||
|
Metric string `json:"metric"`
|
||||||
|
Points [1]Point `json:"points"`
|
||||||
|
Tags []string `json:"tags,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Point [2]float64
|
||||||
|
|
||||||
|
const datadog_api = "https://app.datadoghq.com/api/v1/series"
|
||||||
|
|
||||||
|
func NewDatadog(apiUrl string) *Datadog {
|
||||||
|
return &Datadog{
|
||||||
|
apiUrl: apiUrl,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Datadog) Connect() error {
|
||||||
|
if d.Apikey == "" {
|
||||||
|
return fmt.Errorf("apikey is a required field for datadog output")
|
||||||
|
}
|
||||||
|
d.client = &http.Client{
|
||||||
|
Timeout: d.Timeout.Duration,
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Datadog) Write(bp client.BatchPoints) error {
|
||||||
|
if len(bp.Points) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
ts := TimeSeries{
|
||||||
|
Series: make([]*Metric, len(bp.Points)),
|
||||||
|
}
|
||||||
|
for index, pt := range bp.Points {
|
||||||
|
metric := &Metric{
|
||||||
|
Metric: pt.Measurement,
|
||||||
|
Tags: buildTags(bp.Tags, pt.Tags),
|
||||||
|
}
|
||||||
|
if p, err := buildPoint(bp, pt); err == nil {
|
||||||
|
metric.Points[0] = p
|
||||||
|
}
|
||||||
|
ts.Series[index] = metric
|
||||||
|
}
|
||||||
|
tsBytes, err := json.Marshal(ts)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to marshal TimeSeries, %s\n", err.Error())
|
||||||
|
}
|
||||||
|
req, err := http.NewRequest("POST", d.authenticatedUrl(), bytes.NewBuffer(tsBytes))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to create http.Request, %s\n", err.Error())
|
||||||
|
}
|
||||||
|
req.Header.Add("Content-Type", "application/json")
|
||||||
|
|
||||||
|
resp, err := d.client.Do(req)
|
||||||
|
defer resp.Body.Close()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error POSTing metrics, %s\n", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.StatusCode < 200 || resp.StatusCode > 209 {
|
||||||
|
return fmt.Errorf("received bad status code, %d\n", resp.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Datadog) authenticatedUrl() string {
|
||||||
|
q := url.Values{
|
||||||
|
"api_key": []string{d.Apikey},
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("%s?%s", d.apiUrl, q.Encode())
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildTags(bpTags map[string]string, ptTags map[string]string) []string {
|
||||||
|
tags := make([]string, (len(bpTags) + len(ptTags)))
|
||||||
|
index := 0
|
||||||
|
for k, v := range bpTags {
|
||||||
|
tags[index] = fmt.Sprintf("%s:%s", k, v)
|
||||||
|
index += 1
|
||||||
|
}
|
||||||
|
for k, v := range ptTags {
|
||||||
|
tags[index] = fmt.Sprintf("%s:%s", k, v)
|
||||||
|
index += 1
|
||||||
|
}
|
||||||
|
sort.Strings(tags)
|
||||||
|
return tags
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildPoint(bp client.BatchPoints, pt client.Point) (Point, error) {
|
||||||
|
var p Point
|
||||||
|
if err := p.setValue(pt.Fields["value"]); err != nil {
|
||||||
|
return p, fmt.Errorf("unable to extract value from Fields, %s", err.Error())
|
||||||
|
}
|
||||||
|
if pt.Time.IsZero() {
|
||||||
|
p[0] = float64(bp.Time.Unix())
|
||||||
|
} else {
|
||||||
|
p[0] = float64(pt.Time.Unix())
|
||||||
|
}
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Point) setValue(v interface{}) error {
|
||||||
|
switch d := v.(type) {
|
||||||
|
case int:
|
||||||
|
p[1] = float64(int(d))
|
||||||
|
case int32:
|
||||||
|
p[1] = float64(int32(d))
|
||||||
|
case int64:
|
||||||
|
p[1] = float64(int64(d))
|
||||||
|
case float32:
|
||||||
|
p[1] = float64(d)
|
||||||
|
case float64:
|
||||||
|
p[1] = float64(d)
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("undeterminable type")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Datadog) Close() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
outputs.Add("datadog", func() outputs.Output {
|
||||||
|
return NewDatadog(datadog_api)
|
||||||
|
})
|
||||||
|
}
|
|
@ -0,0 +1,204 @@
|
||||||
|
package datadog
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdb/influxdb/client"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
fakeUrl = "http://test.datadog.com"
|
||||||
|
fakeApiKey = "123456"
|
||||||
|
)
|
||||||
|
|
||||||
|
func fakeDatadog() *Datadog {
|
||||||
|
d := NewDatadog(fakeUrl)
|
||||||
|
d.Apikey = fakeApiKey
|
||||||
|
return d
|
||||||
|
}
|
||||||
|
|
||||||
|
func testData() client.BatchPoints {
|
||||||
|
var bp client.BatchPoints
|
||||||
|
bp.Time = time.Now()
|
||||||
|
bp.Tags = map[string]string{"tag1": "value1"}
|
||||||
|
bp.Points = []client.Point{
|
||||||
|
{
|
||||||
|
Fields: map[string]interface{}{"value": 1.0},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return bp
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUriOverride(t *testing.T) {
|
||||||
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
json.NewEncoder(w).Encode(`{"status":"ok"}`)
|
||||||
|
}))
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
d := NewDatadog(ts.URL)
|
||||||
|
d.Apikey = "123456"
|
||||||
|
err := d.Connect()
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = d.Write(testData())
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBadStatusCode(t *testing.T) {
|
||||||
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
json.NewEncoder(w).Encode(`{ 'errors': [
|
||||||
|
'Something bad happened to the server.',
|
||||||
|
'Your query made the server very sad.'
|
||||||
|
]
|
||||||
|
}`)
|
||||||
|
}))
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
d := NewDatadog(ts.URL)
|
||||||
|
d.Apikey = "123456"
|
||||||
|
err := d.Connect()
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = d.Write(testData())
|
||||||
|
if err == nil {
|
||||||
|
t.Errorf("error expected but none returned")
|
||||||
|
} else {
|
||||||
|
require.EqualError(t, fmt.Errorf("received bad status code, 500\n"), err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAuthenticatedUrl(t *testing.T) {
|
||||||
|
d := fakeDatadog()
|
||||||
|
|
||||||
|
authUrl := d.authenticatedUrl()
|
||||||
|
assert.EqualValues(t, fmt.Sprintf("%s?api_key=%s", fakeUrl, fakeApiKey), authUrl)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildTags(t *testing.T) {
|
||||||
|
var tagtests = []struct {
|
||||||
|
bpIn map[string]string
|
||||||
|
ptIn map[string]string
|
||||||
|
outTags []string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
map[string]string{"one": "two"},
|
||||||
|
map[string]string{"three": "four"},
|
||||||
|
[]string{"one:two", "three:four"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
map[string]string{"aaa": "bbb"},
|
||||||
|
map[string]string{},
|
||||||
|
[]string{"aaa:bbb"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
map[string]string{},
|
||||||
|
map[string]string{},
|
||||||
|
[]string{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tagtests {
|
||||||
|
tags := buildTags(tt.bpIn, tt.ptIn)
|
||||||
|
if !reflect.DeepEqual(tags, tt.outTags) {
|
||||||
|
t.Errorf("\nexpected %+v\ngot %+v\n", tt.outTags, tags)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildPoint(t *testing.T) {
|
||||||
|
var tagtests = []struct {
|
||||||
|
bpIn client.BatchPoints
|
||||||
|
ptIn client.Point
|
||||||
|
outPt Point
|
||||||
|
err error
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
client.BatchPoints{
|
||||||
|
Time: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
},
|
||||||
|
client.Point{
|
||||||
|
Fields: map[string]interface{}{"value": 0.0},
|
||||||
|
},
|
||||||
|
Point{float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 0.0},
|
||||||
|
nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
client.BatchPoints{},
|
||||||
|
client.Point{
|
||||||
|
Fields: map[string]interface{}{"value": 1.0},
|
||||||
|
Time: time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
},
|
||||||
|
Point{float64(time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC).Unix()), 1.0},
|
||||||
|
nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
client.BatchPoints{
|
||||||
|
Time: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
},
|
||||||
|
client.Point{
|
||||||
|
Fields: map[string]interface{}{"value": 10},
|
||||||
|
},
|
||||||
|
Point{float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 10.0},
|
||||||
|
nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
client.BatchPoints{
|
||||||
|
Time: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
},
|
||||||
|
client.Point{
|
||||||
|
Fields: map[string]interface{}{"value": int32(112345)},
|
||||||
|
},
|
||||||
|
Point{float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 112345.0},
|
||||||
|
nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
client.BatchPoints{
|
||||||
|
Time: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
},
|
||||||
|
client.Point{
|
||||||
|
Fields: map[string]interface{}{"value": int64(112345)},
|
||||||
|
},
|
||||||
|
Point{float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 112345.0},
|
||||||
|
nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
client.BatchPoints{
|
||||||
|
Time: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
},
|
||||||
|
client.Point{
|
||||||
|
Fields: map[string]interface{}{"value": float32(11234.5)},
|
||||||
|
},
|
||||||
|
Point{float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 11234.5},
|
||||||
|
nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
client.BatchPoints{
|
||||||
|
Time: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
},
|
||||||
|
client.Point{
|
||||||
|
Fields: map[string]interface{}{"value": "11234.5"},
|
||||||
|
},
|
||||||
|
Point{float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 11234.5},
|
||||||
|
fmt.Errorf("unable to extract value from Fields, undeterminable type"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tagtests {
|
||||||
|
pt, err := buildPoint(tt.bpIn, tt.ptIn)
|
||||||
|
if err != nil && tt.err == nil {
|
||||||
|
t.Errorf("unexpected error, %+v\n", err)
|
||||||
|
}
|
||||||
|
if tt.err != nil && err == nil {
|
||||||
|
t.Errorf("expected an error (%s) but none returned", tt.err.Error())
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(pt, tt.outPt) && tt.err == nil {
|
||||||
|
t.Errorf("\nexpected %+v\ngot %+v\n", tt.outPt, pt)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue