From 1accab02edc72d5aead0b06c8ab523d4c961f4ee Mon Sep 17 00:00:00 2001 From: martinrusev Date: Fri, 6 Nov 2015 11:27:17 +0200 Subject: [PATCH] Amon output closes #350 --- CHANGELOG.md | 1 + README.md | 1 + outputs/all/all.go | 1 + outputs/amon/README.md | 9 +++ outputs/amon/amon.go | 148 ++++++++++++++++++++++++++++++++++ outputs/amon/amon_test.go | 163 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 323 insertions(+) create mode 100644 outputs/amon/README.md create mode 100644 outputs/amon/amon.go create mode 100644 outputs/amon/amon_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 53246705c..f5853ac28 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ changed to just run docker commands in the Makefile. See `make docker-run` and - [#318](https://github.com/influxdb/telegraf/pull/318): Prometheus output. Thanks @oldmantaiter! - [#338](https://github.com/influxdb/telegraf/pull/338): Restart Telegraf on package upgrade. Thanks @linsomniac! - [#337](https://github.com/influxdb/telegraf/pull/337): Jolokia plugin, thanks @saiello! +- [#350](https://github.com/influxdb/telegraf/pull/350): Amon output. - [#317](https://github.com/influxdb/telegraf/issues/317): ZFS plugin, thanks @cornerot! ### Bugfixes diff --git a/README.md b/README.md index f85a4b1e4..69378adbb 100644 --- a/README.md +++ b/README.md @@ -227,6 +227,7 @@ found by running `telegraf -sample-config`. * mqtt * librato * prometheus +* amon ## Contributing diff --git a/outputs/all/all.go b/outputs/all/all.go index c51a24c59..be9c4cf42 100644 --- a/outputs/all/all.go +++ b/outputs/all/all.go @@ -1,6 +1,7 @@ package all import ( + _ "github.com/influxdb/telegraf/outputs/amon" _ "github.com/influxdb/telegraf/outputs/amqp" _ "github.com/influxdb/telegraf/outputs/datadog" _ "github.com/influxdb/telegraf/outputs/influxdb" diff --git a/outputs/amon/README.md b/outputs/amon/README.md new file mode 100644 index 000000000..3860e4371 --- /dev/null +++ b/outputs/amon/README.md @@ -0,0 +1,9 @@ +# Amon Output Plugin + +This plugin writes to [Amon](https://www.amon.cx) +and requires an `serverkey` and `amoninstance` URL which can be obtained [here](https://www.amon.cx/docs/monitoring/) +for the account. + +If the point value being sent cannot be converted to a float64, the metric is skipped. + +Metrics are grouped by converting any `_` characters to `.` in the Point Name. \ No newline at end of file diff --git a/outputs/amon/amon.go b/outputs/amon/amon.go new file mode 100644 index 000000000..08275f52d --- /dev/null +++ b/outputs/amon/amon.go @@ -0,0 +1,148 @@ +package amon + +import ( + "bytes" + "encoding/json" + "fmt" + "log" + "net/http" + "strings" + + "github.com/influxdb/influxdb/client/v2" + "github.com/influxdb/telegraf/duration" + "github.com/influxdb/telegraf/outputs" +) + +type Amon struct { + ServerKey string + AmonInstance string + Timeout duration.Duration + + client *http.Client +} + +var sampleConfig = ` + # Amon Server Key + server_key = "my-server-key" # required. + + # Amon Instance URL + amon_instance = "https://youramoninstance" # required + + # Connection timeout. + # timeout = "5s" +` + +type TimeSeries struct { + Series []*Metric `json:"series"` +} + +type Metric struct { + Metric string `json:"metric"` + Points [1]Point `json:"points"` +} + +type Point [2]float64 + +func (a *Amon) Connect() error { + if a.ServerKey == "" || a.AmonInstance == "" { + return fmt.Errorf("serverkey and amon_instance are required fields for amon output") + } + a.client = &http.Client{ + Timeout: a.Timeout.Duration, + } + return nil +} + +func (a *Amon) Write(points []*client.Point) error { + if len(points) == 0 { + return nil + } + ts := TimeSeries{} + var tempSeries = make([]*Metric, len(points)) + var acceptablePoints = 0 + for _, pt := range points { + metric := &Metric{ + Metric: strings.Replace(pt.Name(), "_", ".", -1), + } + if p, err := buildPoint(pt); err == nil { + metric.Points[0] = p + tempSeries[acceptablePoints] = metric + acceptablePoints += 1 + } else { + log.Printf("unable to build Metric for %s, skipping\n", pt.Name()) + } + } + ts.Series = make([]*Metric, acceptablePoints) + copy(ts.Series, tempSeries[0:]) + tsBytes, err := json.Marshal(ts) + if err != nil { + return fmt.Errorf("unable to marshal TimeSeries, %s\n", err.Error()) + } + req, err := http.NewRequest("POST", a.authenticatedUrl(), bytes.NewBuffer(tsBytes)) + if err != nil { + return fmt.Errorf("unable to create http.Request, %s\n", err.Error()) + } + req.Header.Add("Content-Type", "application/json") + + resp, err := a.client.Do(req) + if err != nil { + return fmt.Errorf("error POSTing metrics, %s\n", err.Error()) + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode > 209 { + return fmt.Errorf("received bad status code, %d\n", resp.StatusCode) + } + + return nil +} + +func (a *Amon) SampleConfig() string { + return sampleConfig +} + +func (a *Amon) Description() string { + return "Configuration for Amon Server to send metrics to." +} + +func (a *Amon) authenticatedUrl() string { + + return fmt.Sprintf("%s/api/system/%s", a.AmonInstance, a.ServerKey) +} + +func buildPoint(pt *client.Point) (Point, error) { + var p Point + if err := p.setValue(pt.Fields()["value"]); err != nil { + return p, fmt.Errorf("unable to extract value from Fields, %s", err.Error()) + } + p[0] = float64(pt.Time().Unix()) + return p, nil +} + +func (p *Point) setValue(v interface{}) error { + switch d := v.(type) { + case int: + p[1] = float64(int(d)) + case int32: + p[1] = float64(int32(d)) + case int64: + p[1] = float64(int64(d)) + case float32: + p[1] = float64(d) + case float64: + p[1] = float64(d) + default: + return fmt.Errorf("undeterminable type") + } + return nil +} + +func (a *Amon) Close() error { + return nil +} + +func init() { + outputs.Add("amon", func() outputs.Output { + return &Amon{} + }) +} diff --git a/outputs/amon/amon_test.go b/outputs/amon/amon_test.go new file mode 100644 index 000000000..7856d4540 --- /dev/null +++ b/outputs/amon/amon_test.go @@ -0,0 +1,163 @@ +package amon + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "reflect" + "testing" + "time" + + "github.com/influxdb/telegraf/testutil" + + "github.com/influxdb/influxdb/client/v2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var ( + fakeServerKey = "123456" + fakeAmonInstance = "https://demo.amon.cx" +) + +func TestUriOverride(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(`{"status":"ok"}`) + })) + defer ts.Close() + + a := &Amon{ + ServerKey: fakeServerKey, + AmonInstance: fakeAmonInstance, + } + + err := a.Connect() + require.NoError(t, err) + err = a.Write(testutil.MockBatchPoints().Points()) + require.NoError(t, err) +} + +func TestAuthenticatedUrl(t *testing.T) { + a := &Amon{ + ServerKey: fakeServerKey, + AmonInstance: fakeAmonInstance, + } + + authUrl := a.authenticatedUrl() + assert.EqualValues(t, fmt.Sprintf("%s/api/system/%s", fakeAmonInstance, fakeServerKey), authUrl) +} + +func TestBuildPoint(t *testing.T) { + tags := make(map[string]string) + var tagtests = []struct { + ptIn *client.Point + outPt Point + err error + }{ + { + client.NewPoint( + "test1", + tags, + map[string]interface{}{"value": 0.0}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ), + Point{ + float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), + 0.0, + }, + nil, + }, + { + client.NewPoint( + "test2", + tags, + map[string]interface{}{"value": 1.0}, + time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC), + ), + Point{ + float64(time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC).Unix()), + 1.0, + }, + nil, + }, + { + client.NewPoint( + "test3", + tags, + map[string]interface{}{"value": 10}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ), + Point{ + float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), + 10.0, + }, + nil, + }, + { + client.NewPoint( + "test4", + tags, + map[string]interface{}{"value": int32(112345)}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ), + Point{ + float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), + 112345.0, + }, + nil, + }, + { + client.NewPoint( + "test5", + tags, + map[string]interface{}{"value": int64(112345)}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ), + Point{ + float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), + 112345.0, + }, + nil, + }, + { + client.NewPoint( + "test6", + tags, + map[string]interface{}{"value": float32(11234.5)}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ), + Point{ + float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), + 11234.5, + }, + nil, + }, + { + client.NewPoint( + "test7", + tags, + map[string]interface{}{"value": "11234.5"}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ), + Point{ + float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), + 11234.5, + }, + fmt.Errorf("unable to extract value from Fields, undeterminable type"), + }, + } + for _, tt := range tagtests { + pt, err := buildPoint(tt.ptIn) + if err != nil && tt.err == nil { + t.Errorf("%s: unexpected error, %+v\n", tt.ptIn.Name(), err) + } + if tt.err != nil && err == nil { + t.Errorf("%s: expected an error (%s) but none returned", tt.ptIn.Name(), tt.err.Error()) + } + if !reflect.DeepEqual(pt, tt.outPt) && tt.err == nil { + t.Errorf("%s: \nexpected %+v\ngot %+v\n", tt.ptIn.Name(), tt.outPt, pt) + } + } +}