diff --git a/CHANGELOG.md b/CHANGELOG.md index f6fdf638b..6cd184d14 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## v0.1.6 [unreleased] ### Features +[#112](://github.com/influxdb/telegraf/pull/112): Datadog output. Thanks @jipperinbham! ### Bugfixes diff --git a/outputs/all/all.go b/outputs/all/all.go index 2a8018674..0fb5f3723 100644 --- a/outputs/all/all.go +++ b/outputs/all/all.go @@ -1,5 +1,6 @@ package all import ( + _ "github.com/influxdb/telegraf/outputs/datadog" _ "github.com/influxdb/telegraf/outputs/influxdb" ) diff --git a/outputs/datadog/datadog.go b/outputs/datadog/datadog.go new file mode 100644 index 000000000..b79d1c828 --- /dev/null +++ b/outputs/datadog/datadog.go @@ -0,0 +1,155 @@ +package datadog + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "net/url" + "sort" + + "github.com/influxdb/influxdb/client" + t "github.com/influxdb/telegraf" + "github.com/influxdb/telegraf/outputs" +) + +type Datadog struct { + Apikey string + Timeout t.Duration + + apiUrl string + client *http.Client +} + +type TimeSeries struct { + Series []*Metric `json:"series"` +} + +type Metric struct { + Metric string `json:"metric"` + Points [1]Point `json:"points"` + Tags []string `json:"tags,omitempty"` +} + +type Point [2]float64 + +const datadog_api = "https://app.datadoghq.com/api/v1/series" + +func NewDatadog(apiUrl string) *Datadog { + return &Datadog{ + apiUrl: apiUrl, + } +} + +func (d *Datadog) Connect() error { + if d.Apikey == "" { + return fmt.Errorf("apikey is a required field for datadog output") + } + d.client = &http.Client{ + Timeout: d.Timeout.Duration, + } + return nil +} + +func (d *Datadog) Write(bp client.BatchPoints) error { + if len(bp.Points) == 0 { + return nil + } + ts := TimeSeries{ + Series: make([]*Metric, len(bp.Points)), + } + for index, pt := range bp.Points { + metric := &Metric{ + Metric: pt.Measurement, + Tags: buildTags(bp.Tags, pt.Tags), + } + if p, err := buildPoint(bp, pt); err == nil { + metric.Points[0] = p + } + ts.Series[index] = metric + } + tsBytes, err := json.Marshal(ts) + if err != nil { + return fmt.Errorf("unable to marshal TimeSeries, %s\n", err.Error()) + } + req, err := http.NewRequest("POST", d.authenticatedUrl(), bytes.NewBuffer(tsBytes)) + if err != nil { + return fmt.Errorf("unable to create http.Request, %s\n", err.Error()) + } + req.Header.Add("Content-Type", "application/json") + + resp, err := d.client.Do(req) + defer resp.Body.Close() + if err != nil { + return fmt.Errorf("error POSTing metrics, %s\n", err.Error()) + } + + if resp.StatusCode < 200 || resp.StatusCode > 209 { + return fmt.Errorf("received bad status code, %d\n", resp.StatusCode) + } + + return nil +} + +func (d *Datadog) authenticatedUrl() string { + q := url.Values{ + "api_key": []string{d.Apikey}, + } + return fmt.Sprintf("%s?%s", d.apiUrl, q.Encode()) +} + +func buildTags(bpTags map[string]string, ptTags map[string]string) []string { + tags := make([]string, (len(bpTags) + len(ptTags))) + index := 0 + for k, v := range bpTags { + tags[index] = fmt.Sprintf("%s:%s", k, v) + index += 1 + } + for k, v := range ptTags { + tags[index] = fmt.Sprintf("%s:%s", k, v) + index += 1 + } + sort.Strings(tags) + return tags +} + +func buildPoint(bp client.BatchPoints, pt client.Point) (Point, error) { + var p Point + if err := p.setValue(pt.Fields["value"]); err != nil { + return p, fmt.Errorf("unable to extract value from Fields, %s", err.Error()) + } + if pt.Time.IsZero() { + p[0] = float64(bp.Time.Unix()) + } else { + p[0] = float64(pt.Time.Unix()) + } + return p, nil +} + +func (p *Point) setValue(v interface{}) error { + switch d := v.(type) { + case int: + p[1] = float64(int(d)) + case int32: + p[1] = float64(int32(d)) + case int64: + p[1] = float64(int64(d)) + case float32: + p[1] = float64(d) + case float64: + p[1] = float64(d) + default: + return fmt.Errorf("undeterminable type") + } + return nil +} + +func (d *Datadog) Close() error { + return nil +} + +func init() { + outputs.Add("datadog", func() outputs.Output { + return NewDatadog(datadog_api) + }) +} diff --git a/outputs/datadog/datadog_test.go b/outputs/datadog/datadog_test.go new file mode 100644 index 000000000..744afc99b --- /dev/null +++ b/outputs/datadog/datadog_test.go @@ -0,0 +1,204 @@ +package datadog + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "reflect" + "testing" + "time" + + "github.com/influxdb/influxdb/client" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var ( + fakeUrl = "http://test.datadog.com" + fakeApiKey = "123456" +) + +func fakeDatadog() *Datadog { + d := NewDatadog(fakeUrl) + d.Apikey = fakeApiKey + return d +} + +func testData() client.BatchPoints { + var bp client.BatchPoints + bp.Time = time.Now() + bp.Tags = map[string]string{"tag1": "value1"} + bp.Points = []client.Point{ + { + Fields: map[string]interface{}{"value": 1.0}, + }, + } + return bp +} + +func TestUriOverride(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(`{"status":"ok"}`) + })) + defer ts.Close() + + d := NewDatadog(ts.URL) + d.Apikey = "123456" + err := d.Connect() + require.NoError(t, err) + err = d.Write(testData()) + require.NoError(t, err) +} + +func TestBadStatusCode(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + json.NewEncoder(w).Encode(`{ 'errors': [ + 'Something bad happened to the server.', + 'Your query made the server very sad.' + ] + }`) + })) + defer ts.Close() + + d := NewDatadog(ts.URL) + d.Apikey = "123456" + err := d.Connect() + require.NoError(t, err) + err = d.Write(testData()) + if err == nil { + t.Errorf("error expected but none returned") + } else { + require.EqualError(t, fmt.Errorf("received bad status code, 500\n"), err.Error()) + } +} + +func TestAuthenticatedUrl(t *testing.T) { + d := fakeDatadog() + + authUrl := d.authenticatedUrl() + assert.EqualValues(t, fmt.Sprintf("%s?api_key=%s", fakeUrl, fakeApiKey), authUrl) +} + +func TestBuildTags(t *testing.T) { + var tagtests = []struct { + bpIn map[string]string + ptIn map[string]string + outTags []string + }{ + { + map[string]string{"one": "two"}, + map[string]string{"three": "four"}, + []string{"one:two", "three:four"}, + }, + { + map[string]string{"aaa": "bbb"}, + map[string]string{}, + []string{"aaa:bbb"}, + }, + { + map[string]string{}, + map[string]string{}, + []string{}, + }, + } + for _, tt := range tagtests { + tags := buildTags(tt.bpIn, tt.ptIn) + if !reflect.DeepEqual(tags, tt.outTags) { + t.Errorf("\nexpected %+v\ngot %+v\n", tt.outTags, tags) + } + } +} + +func TestBuildPoint(t *testing.T) { + var tagtests = []struct { + bpIn client.BatchPoints + ptIn client.Point + outPt Point + err error + }{ + { + client.BatchPoints{ + Time: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + }, + client.Point{ + Fields: map[string]interface{}{"value": 0.0}, + }, + Point{float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 0.0}, + nil, + }, + { + client.BatchPoints{}, + client.Point{ + Fields: map[string]interface{}{"value": 1.0}, + Time: time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC), + }, + Point{float64(time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC).Unix()), 1.0}, + nil, + }, + { + client.BatchPoints{ + Time: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + }, + client.Point{ + Fields: map[string]interface{}{"value": 10}, + }, + Point{float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 10.0}, + nil, + }, + { + client.BatchPoints{ + Time: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + }, + client.Point{ + Fields: map[string]interface{}{"value": int32(112345)}, + }, + Point{float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 112345.0}, + nil, + }, + { + client.BatchPoints{ + Time: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + }, + client.Point{ + Fields: map[string]interface{}{"value": int64(112345)}, + }, + Point{float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 112345.0}, + nil, + }, + { + client.BatchPoints{ + Time: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + }, + client.Point{ + Fields: map[string]interface{}{"value": float32(11234.5)}, + }, + Point{float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 11234.5}, + nil, + }, + { + client.BatchPoints{ + Time: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + }, + client.Point{ + Fields: map[string]interface{}{"value": "11234.5"}, + }, + Point{float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 11234.5}, + fmt.Errorf("unable to extract value from Fields, undeterminable type"), + }, + } + for _, tt := range tagtests { + pt, err := buildPoint(tt.bpIn, tt.ptIn) + if err != nil && tt.err == nil { + t.Errorf("unexpected error, %+v\n", err) + } + if tt.err != nil && err == nil { + t.Errorf("expected an error (%s) but none returned", tt.err.Error()) + } + if !reflect.DeepEqual(pt, tt.outPt) && tt.err == nil { + t.Errorf("\nexpected %+v\ngot %+v\n", tt.outPt, pt) + } + } +}