parent
1a05899be0
commit
1accab02ed
|
@ -13,6 +13,7 @@ changed to just run docker commands in the Makefile. See `make docker-run` and
|
||||||
- [#318](https://github.com/influxdb/telegraf/pull/318): Prometheus output. Thanks @oldmantaiter!
|
- [#318](https://github.com/influxdb/telegraf/pull/318): Prometheus output. Thanks @oldmantaiter!
|
||||||
- [#338](https://github.com/influxdb/telegraf/pull/338): Restart Telegraf on package upgrade. Thanks @linsomniac!
|
- [#338](https://github.com/influxdb/telegraf/pull/338): Restart Telegraf on package upgrade. Thanks @linsomniac!
|
||||||
- [#337](https://github.com/influxdb/telegraf/pull/337): Jolokia plugin, thanks @saiello!
|
- [#337](https://github.com/influxdb/telegraf/pull/337): Jolokia plugin, thanks @saiello!
|
||||||
|
- [#350](https://github.com/influxdb/telegraf/pull/350): Amon output.
|
||||||
- [#317](https://github.com/influxdb/telegraf/issues/317): ZFS plugin, thanks @cornerot!
|
- [#317](https://github.com/influxdb/telegraf/issues/317): ZFS plugin, thanks @cornerot!
|
||||||
|
|
||||||
### Bugfixes
|
### Bugfixes
|
||||||
|
|
|
@ -227,6 +227,7 @@ found by running `telegraf -sample-config`.
|
||||||
* mqtt
|
* mqtt
|
||||||
* librato
|
* librato
|
||||||
* prometheus
|
* prometheus
|
||||||
|
* amon
|
||||||
|
|
||||||
## Contributing
|
## Contributing
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package all
|
package all
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
_ "github.com/influxdb/telegraf/outputs/amon"
|
||||||
_ "github.com/influxdb/telegraf/outputs/amqp"
|
_ "github.com/influxdb/telegraf/outputs/amqp"
|
||||||
_ "github.com/influxdb/telegraf/outputs/datadog"
|
_ "github.com/influxdb/telegraf/outputs/datadog"
|
||||||
_ "github.com/influxdb/telegraf/outputs/influxdb"
|
_ "github.com/influxdb/telegraf/outputs/influxdb"
|
||||||
|
|
|
@ -0,0 +1,9 @@
|
||||||
|
# Amon Output Plugin
|
||||||
|
|
||||||
|
This plugin writes to [Amon](https://www.amon.cx)
|
||||||
|
and requires an `serverkey` and `amoninstance` URL which can be obtained [here](https://www.amon.cx/docs/monitoring/)
|
||||||
|
for the account.
|
||||||
|
|
||||||
|
If the point value being sent cannot be converted to a float64, the metric is skipped.
|
||||||
|
|
||||||
|
Metrics are grouped by converting any `_` characters to `.` in the Point Name.
|
|
@ -0,0 +1,148 @@
|
||||||
|
package amon
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/influxdb/influxdb/client/v2"
|
||||||
|
"github.com/influxdb/telegraf/duration"
|
||||||
|
"github.com/influxdb/telegraf/outputs"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Amon struct {
|
||||||
|
ServerKey string
|
||||||
|
AmonInstance string
|
||||||
|
Timeout duration.Duration
|
||||||
|
|
||||||
|
client *http.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
var sampleConfig = `
|
||||||
|
# Amon Server Key
|
||||||
|
server_key = "my-server-key" # required.
|
||||||
|
|
||||||
|
# Amon Instance URL
|
||||||
|
amon_instance = "https://youramoninstance" # required
|
||||||
|
|
||||||
|
# Connection timeout.
|
||||||
|
# timeout = "5s"
|
||||||
|
`
|
||||||
|
|
||||||
|
type TimeSeries struct {
|
||||||
|
Series []*Metric `json:"series"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Metric struct {
|
||||||
|
Metric string `json:"metric"`
|
||||||
|
Points [1]Point `json:"points"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Point [2]float64
|
||||||
|
|
||||||
|
func (a *Amon) Connect() error {
|
||||||
|
if a.ServerKey == "" || a.AmonInstance == "" {
|
||||||
|
return fmt.Errorf("serverkey and amon_instance are required fields for amon output")
|
||||||
|
}
|
||||||
|
a.client = &http.Client{
|
||||||
|
Timeout: a.Timeout.Duration,
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Amon) Write(points []*client.Point) error {
|
||||||
|
if len(points) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
ts := TimeSeries{}
|
||||||
|
var tempSeries = make([]*Metric, len(points))
|
||||||
|
var acceptablePoints = 0
|
||||||
|
for _, pt := range points {
|
||||||
|
metric := &Metric{
|
||||||
|
Metric: strings.Replace(pt.Name(), "_", ".", -1),
|
||||||
|
}
|
||||||
|
if p, err := buildPoint(pt); err == nil {
|
||||||
|
metric.Points[0] = p
|
||||||
|
tempSeries[acceptablePoints] = metric
|
||||||
|
acceptablePoints += 1
|
||||||
|
} else {
|
||||||
|
log.Printf("unable to build Metric for %s, skipping\n", pt.Name())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ts.Series = make([]*Metric, acceptablePoints)
|
||||||
|
copy(ts.Series, tempSeries[0:])
|
||||||
|
tsBytes, err := json.Marshal(ts)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to marshal TimeSeries, %s\n", err.Error())
|
||||||
|
}
|
||||||
|
req, err := http.NewRequest("POST", a.authenticatedUrl(), bytes.NewBuffer(tsBytes))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to create http.Request, %s\n", err.Error())
|
||||||
|
}
|
||||||
|
req.Header.Add("Content-Type", "application/json")
|
||||||
|
|
||||||
|
resp, err := a.client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error POSTing metrics, %s\n", err.Error())
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode < 200 || resp.StatusCode > 209 {
|
||||||
|
return fmt.Errorf("received bad status code, %d\n", resp.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Amon) SampleConfig() string {
|
||||||
|
return sampleConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Amon) Description() string {
|
||||||
|
return "Configuration for Amon Server to send metrics to."
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Amon) authenticatedUrl() string {
|
||||||
|
|
||||||
|
return fmt.Sprintf("%s/api/system/%s", a.AmonInstance, a.ServerKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildPoint(pt *client.Point) (Point, error) {
|
||||||
|
var p Point
|
||||||
|
if err := p.setValue(pt.Fields()["value"]); err != nil {
|
||||||
|
return p, fmt.Errorf("unable to extract value from Fields, %s", err.Error())
|
||||||
|
}
|
||||||
|
p[0] = float64(pt.Time().Unix())
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Point) setValue(v interface{}) error {
|
||||||
|
switch d := v.(type) {
|
||||||
|
case int:
|
||||||
|
p[1] = float64(int(d))
|
||||||
|
case int32:
|
||||||
|
p[1] = float64(int32(d))
|
||||||
|
case int64:
|
||||||
|
p[1] = float64(int64(d))
|
||||||
|
case float32:
|
||||||
|
p[1] = float64(d)
|
||||||
|
case float64:
|
||||||
|
p[1] = float64(d)
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("undeterminable type")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Amon) Close() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
outputs.Add("amon", func() outputs.Output {
|
||||||
|
return &Amon{}
|
||||||
|
})
|
||||||
|
}
|
|
@ -0,0 +1,163 @@
|
||||||
|
package amon
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdb/telegraf/testutil"
|
||||||
|
|
||||||
|
"github.com/influxdb/influxdb/client/v2"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
fakeServerKey = "123456"
|
||||||
|
fakeAmonInstance = "https://demo.amon.cx"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestUriOverride(t *testing.T) {
|
||||||
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
json.NewEncoder(w).Encode(`{"status":"ok"}`)
|
||||||
|
}))
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
a := &Amon{
|
||||||
|
ServerKey: fakeServerKey,
|
||||||
|
AmonInstance: fakeAmonInstance,
|
||||||
|
}
|
||||||
|
|
||||||
|
err := a.Connect()
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = a.Write(testutil.MockBatchPoints().Points())
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAuthenticatedUrl(t *testing.T) {
|
||||||
|
a := &Amon{
|
||||||
|
ServerKey: fakeServerKey,
|
||||||
|
AmonInstance: fakeAmonInstance,
|
||||||
|
}
|
||||||
|
|
||||||
|
authUrl := a.authenticatedUrl()
|
||||||
|
assert.EqualValues(t, fmt.Sprintf("%s/api/system/%s", fakeAmonInstance, fakeServerKey), authUrl)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildPoint(t *testing.T) {
|
||||||
|
tags := make(map[string]string)
|
||||||
|
var tagtests = []struct {
|
||||||
|
ptIn *client.Point
|
||||||
|
outPt Point
|
||||||
|
err error
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
client.NewPoint(
|
||||||
|
"test1",
|
||||||
|
tags,
|
||||||
|
map[string]interface{}{"value": 0.0},
|
||||||
|
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
),
|
||||||
|
Point{
|
||||||
|
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
|
||||||
|
0.0,
|
||||||
|
},
|
||||||
|
nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
client.NewPoint(
|
||||||
|
"test2",
|
||||||
|
tags,
|
||||||
|
map[string]interface{}{"value": 1.0},
|
||||||
|
time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
),
|
||||||
|
Point{
|
||||||
|
float64(time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC).Unix()),
|
||||||
|
1.0,
|
||||||
|
},
|
||||||
|
nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
client.NewPoint(
|
||||||
|
"test3",
|
||||||
|
tags,
|
||||||
|
map[string]interface{}{"value": 10},
|
||||||
|
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
),
|
||||||
|
Point{
|
||||||
|
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
|
||||||
|
10.0,
|
||||||
|
},
|
||||||
|
nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
client.NewPoint(
|
||||||
|
"test4",
|
||||||
|
tags,
|
||||||
|
map[string]interface{}{"value": int32(112345)},
|
||||||
|
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
),
|
||||||
|
Point{
|
||||||
|
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
|
||||||
|
112345.0,
|
||||||
|
},
|
||||||
|
nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
client.NewPoint(
|
||||||
|
"test5",
|
||||||
|
tags,
|
||||||
|
map[string]interface{}{"value": int64(112345)},
|
||||||
|
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
),
|
||||||
|
Point{
|
||||||
|
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
|
||||||
|
112345.0,
|
||||||
|
},
|
||||||
|
nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
client.NewPoint(
|
||||||
|
"test6",
|
||||||
|
tags,
|
||||||
|
map[string]interface{}{"value": float32(11234.5)},
|
||||||
|
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
),
|
||||||
|
Point{
|
||||||
|
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
|
||||||
|
11234.5,
|
||||||
|
},
|
||||||
|
nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
client.NewPoint(
|
||||||
|
"test7",
|
||||||
|
tags,
|
||||||
|
map[string]interface{}{"value": "11234.5"},
|
||||||
|
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
),
|
||||||
|
Point{
|
||||||
|
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
|
||||||
|
11234.5,
|
||||||
|
},
|
||||||
|
fmt.Errorf("unable to extract value from Fields, undeterminable type"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tagtests {
|
||||||
|
pt, err := buildPoint(tt.ptIn)
|
||||||
|
if err != nil && tt.err == nil {
|
||||||
|
t.Errorf("%s: unexpected error, %+v\n", tt.ptIn.Name(), err)
|
||||||
|
}
|
||||||
|
if tt.err != nil && err == nil {
|
||||||
|
t.Errorf("%s: expected an error (%s) but none returned", tt.ptIn.Name(), tt.err.Error())
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(pt, tt.outPt) && tt.err == nil {
|
||||||
|
t.Errorf("%s: \nexpected %+v\ngot %+v\n", tt.ptIn.Name(), tt.outPt, pt)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue