From ceaf6fd67a51ac3f04fa51ffc76b102141788808 Mon Sep 17 00:00:00 2001 From: JP Date: Wed, 12 Aug 2015 15:15:34 -0500 Subject: [PATCH] add datadog output --- cmd/telegraf/telegraf.go | 8 +- config.go | 5 + outputs/all/all.go | 1 + outputs/datadog/datadog.go | 157 +++++++++++++++++++++++++ outputs/datadog/datadog_test.go | 196 ++++++++++++++++++++++++++++++++ 5 files changed, 366 insertions(+), 1 deletion(-) create mode 100644 outputs/datadog/datadog.go create mode 100644 outputs/datadog/datadog_test.go diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index d6fdd965b..6876724c9 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -8,8 +8,9 @@ import ( "os/signal" "strings" - "github.com/influxdb/telegraf" _ "github.com/influxdb/telegraf/plugins/all" + "github.com/jipperinbham/telegraf" + _ "github.com/jipperinbham/telegraf/outputs/all" ) var fDebug = flag.Bool("debug", false, "show metrics as they're generated to stdout") @@ -63,6 +64,10 @@ func main() { if err != nil { log.Fatal(err) } + if len(outputs) == 0 { + log.Printf("Error: no outputs found, did you provide a config file?") + os.Exit(1) + } plugins, err := ag.LoadPlugins(*fPLuginsFilter) if err != nil { @@ -111,6 +116,7 @@ func main() { log.Printf("Agent Config: Interval:%s, Debug:%#v, Hostname:%#v\n", ag.Interval, ag.Debug, ag.Hostname) } + log.Printf("Tags enabled: %v", config.ListTags()) if *fPidfile != "" { f, err := os.Create(*fPidfile) diff --git a/config.go b/config.go index db49dfa2f..cba6a9489 100644 --- a/config.go +++ b/config.go @@ -273,6 +273,7 @@ func LoadConfig(path string) (*Config, error) { } c := &Config{ + Tags: make(map[string]string), plugins: make(map[string]*ast.Table), outputs: make(map[string]*ast.Table), } @@ -286,6 +287,10 @@ func LoadConfig(path string) (*Config, error) { switch name { case "agent": c.agent = subtbl + case "tags": + if err := toml.UnmarshalTable(subtbl, c.Tags); err != nil { + return nil, errInvalidConfig + } case "outputs": for outputName, outputVal := range subtbl.Fields { outputSubtbl, ok := outputVal.(*ast.Table) diff --git a/outputs/all/all.go b/outputs/all/all.go index 2a8018674..fb5e9d451 100644 --- a/outputs/all/all.go +++ b/outputs/all/all.go @@ -2,4 +2,5 @@ package all import ( _ "github.com/influxdb/telegraf/outputs/influxdb" + _ "github.com/jipperinbham/telegraf/outputs/datadog" ) diff --git a/outputs/datadog/datadog.go b/outputs/datadog/datadog.go new file mode 100644 index 000000000..036f4c958 --- /dev/null +++ b/outputs/datadog/datadog.go @@ -0,0 +1,157 @@ +package datadog + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "net/url" + "sort" + + "github.com/influxdb/influxdb/client" + t "github.com/influxdb/telegraf" + "github.com/influxdb/telegraf/outputs" +) + +type Datadog struct { + Apikey string + Timeout t.Duration + + apiUrl string + client *http.Client +} + +type TimeSeries struct { + Series []*Metric `json:"series"` +} + +type Metric struct { + Metric string `json:"metric"` + Points [1]Point `json:"points"` + Tags []string `json:"tags,omitempty"` +} + +type Point [2]float64 + +const datadog_api = "https://app.datadoghq.com/api/v1/series" + +func NewDatadog(apiUrl string) *Datadog { + return &Datadog{ + apiUrl: apiUrl, + } +} + +func (d *Datadog) Connect() error { + if d.Apikey == "" { + return fmt.Errorf("apikey is a required field for datadog output") + } + d.client = &http.Client{ + Timeout: d.Timeout.Duration, + } + return nil +} + +func (d *Datadog) Write(bp client.BatchPoints) error { + if len(bp.Points) == 0 { + return nil + } + ts := TimeSeries{ + Series: make([]*Metric, len(bp.Points)), + } + for index, pt := range bp.Points { + metric := &Metric{ + Metric: pt.Measurement, + Tags: buildTags(bp.Tags, pt.Tags), + } + if p, err := buildPoint(bp, pt); err == nil { + metric.Points[0] = p + } + ts.Series[index] = metric + } + tsBytes, err := json.Marshal(ts) + if err != nil { + return fmt.Errorf("unable to marshal TimeSeries, %s\n", err.Error()) + } + req, err := http.NewRequest("POST", d.authenticatedUrl(), bytes.NewBuffer(tsBytes)) + if err != nil { + return fmt.Errorf("unable to create http.Request, %s\n", err.Error()) + } + req.Header.Add("Content-Type", "application/json") + + fmt.Printf("making POST call\n") + resp, err := d.client.Do(req) + fmt.Printf("POST call made, err is: %+v\n", err) + defer resp.Body.Close() + if err != nil { + return fmt.Errorf("error POSTing metrics, %s\n", err.Error()) + } + + if resp.StatusCode < 200 || resp.StatusCode > 209 { + return fmt.Errorf("received bad status code, %d\n", resp.StatusCode) + } + + return nil +} + +func (d *Datadog) authenticatedUrl() string { + q := url.Values{ + "api_key": []string{d.Apikey}, + } + return fmt.Sprintf("%s?%s", d.apiUrl, q.Encode()) +} + +func buildTags(bpTags map[string]string, ptTags map[string]string) []string { + tags := make([]string, (len(bpTags) + len(ptTags))) + index := 0 + for k, v := range bpTags { + tags[index] = fmt.Sprintf("%s:%s", k, v) + index += 1 + } + for k, v := range ptTags { + tags[index] = fmt.Sprintf("%s:%s", k, v) + index += 1 + } + sort.Strings(tags) + return tags +} + +func buildPoint(bp client.BatchPoints, pt client.Point) (Point, error) { + var p Point + if err := p.setValue(pt.Fields["value"]); err != nil { + return p, fmt.Errorf("unable to extract value from Fields, %s", err.Error()) + } + if pt.Time.IsZero() { + p[0] = float64(bp.Time.Unix()) + } else { + p[0] = float64(pt.Time.Unix()) + } + return p, nil +} + +func (p *Point) setValue(v interface{}) error { + switch d := v.(type) { + case int: + p[1] = float64(int(d)) + case int32: + p[1] = float64(int32(d)) + case int64: + p[1] = float64(int64(d)) + case float32: + p[1] = float64(d) + case float64: + p[1] = float64(d) + default: + return fmt.Errorf("undeterminable type") + } + return nil +} + +func (d *Datadog) Close() error { + return nil +} + +func init() { + outputs.Add("datadog", func() outputs.Output { + return NewDatadog(datadog_api) + }) +} diff --git a/outputs/datadog/datadog_test.go b/outputs/datadog/datadog_test.go new file mode 100644 index 000000000..8f637d03c --- /dev/null +++ b/outputs/datadog/datadog_test.go @@ -0,0 +1,196 @@ +package datadog + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "reflect" + "testing" + "time" + + "github.com/influxdb/influxdb/client" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var ( + fakeUrl = "http://test.datadog.com" + fakeApiKey = "123456" +) + +func fakeDatadog() *Datadog { + d := NewDatadog(fakeUrl) + d.Apikey = fakeApiKey + return d +} + +func testData() client.BatchPoints { + var bp client.BatchPoints + bp.Time = time.Now() + bp.Tags = map[string]string{"tag1": "value1"} + bp.Points = []client.Point{ + Fields: map[string]interface{}{"value": 1.0}, + } + return bp +} + +func TestUriOverride(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(`{"status":"ok"}`) + })) + defer ts.Close() + + d := NewDatadog(ts.URL) + err := d.Write(testData()) + require.NoError(t, err) +} + +func TestBadStatusCode(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + json.NewEncoder(w).Encode(`{ 'errors': [ + 'Something bad happened to the server.', + 'Your query made the server very sad.' + ] + }`) + })) + defer ts.Close() + + d := NewDatadog(ts.URL) + err := d.Write(testData()) + if err == nil { + t.Errorf("error expected but none returned") + } else { + require.EqualError(t, fmt.Errorf("error POSTing metrics, "), err.Error()) + } +} + +func TestAuthenticatedUrl(t *testing.T) { + d := fakeDatadog() + + authUrl := d.authenticatedUrl() + assert.EqualValues(t, fmt.Sprintf("%s?api_key=%s", fakeUrl, fakeApiKey), authUrl) +} + +func TestBuildTags(t *testing.T) { + var tagtests = []struct { + bpIn map[string]string + ptIn map[string]string + outTags []string + }{ + { + map[string]string{"one": "two"}, + map[string]string{"three": "four"}, + []string{"one:two", "three:four"}, + }, + { + map[string]string{"aaa": "bbb"}, + map[string]string{}, + []string{"aaa:bbb"}, + }, + { + map[string]string{}, + map[string]string{}, + []string{}, + }, + } + for _, tt := range tagtests { + tags := buildTags(tt.bpIn, tt.ptIn) + if !reflect.DeepEqual(tags, tt.outTags) { + t.Errorf("\nexpected %+v\ngot %+v\n", tt.outTags, tags) + } + } +} + +func TestBuildPoint(t *testing.T) { + var tagtests = []struct { + bpIn client.BatchPoints + ptIn client.Point + outPt Point + err error + }{ + { + client.BatchPoints{ + Time: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + }, + client.Point{ + Fields: map[string]interface{}{"value": 0.0}, + }, + Point{float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 0.0}, + nil, + }, + { + client.BatchPoints{}, + client.Point{ + Fields: map[string]interface{}{"value": 1.0}, + Time: time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC), + }, + Point{float64(time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC).Unix()), 1.0}, + nil, + }, + { + client.BatchPoints{ + Time: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + }, + client.Point{ + Fields: map[string]interface{}{"value": 10}, + }, + Point{float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 10.0}, + nil, + }, + { + client.BatchPoints{ + Time: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + }, + client.Point{ + Fields: map[string]interface{}{"value": int32(112345)}, + }, + Point{float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 112345.0}, + nil, + }, + { + client.BatchPoints{ + Time: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + }, + client.Point{ + Fields: map[string]interface{}{"value": int64(112345)}, + }, + Point{float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 112345.0}, + nil, + }, + { + client.BatchPoints{ + Time: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + }, + client.Point{ + Fields: map[string]interface{}{"value": float32(11234.5)}, + }, + Point{float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 11234.5}, + nil, + }, + { + client.BatchPoints{ + Time: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + }, + client.Point{ + Fields: map[string]interface{}{"value": "11234.5"}, + }, + Point{float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 11234.5}, + fmt.Errorf("unable to extract value from Fields, undeterminable type"), + }, + } + for _, tt := range tagtests { + pt, err := buildPoint(tt.bpIn, tt.ptIn) + if err != nil && tt.err == nil { + t.Errorf("unexpected error, %+v\n", err) + } + if tt.err != nil && err == nil { + t.Errorf("expected an error (%s) but none returned", tt.err.Error()) + } + if !reflect.DeepEqual(pt, tt.outPt) && tt.err == nil { + t.Errorf("\nexpected %+v\ngot %+v\n", tt.outPt, pt) + } + } +}