telegraf/outputs/amon/amon.go

158 lines
3.3 KiB
Go

package amon
import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
"strings"
"github.com/influxdb/influxdb/client/v2"
"github.com/influxdb/telegraf/internal"
"github.com/influxdb/telegraf/outputs"
)
type Amon struct {
ServerKey string
AmonInstance string
Timeout internal.Duration
client *http.Client
}
var sampleConfig = `
# Amon Server Key
server_key = "my-server-key" # required.
# Amon Instance URL
amon_instance = "https://youramoninstance" # required
# Connection timeout.
# timeout = "5s"
`
type TimeSeries struct {
Series []*Metric `json:"series"`
}
type Metric struct {
Metric string `json:"metric"`
Points [1]Point `json:"points"`
}
type Point [2]float64
func (a *Amon) Connect() error {
if a.ServerKey == "" || a.AmonInstance == "" {
return fmt.Errorf("serverkey and amon_instance are required fields for amon output")
}
a.client = &http.Client{
Timeout: a.Timeout.Duration,
}
return nil
}
func (a *Amon) Write(points []*client.Point) error {
if len(points) == 0 {
return nil
}
ts := TimeSeries{}
var tempSeries = make([]*Metric, len(points))
var acceptablePoints = 0
for _, pt := range points {
mname := strings.Replace(pt.Name(), "_", ".", -1)
if amonPts, err := buildPoints(pt); err == nil {
for fieldName, amonPt := range amonPts {
metric := &Metric{
Metric: mname + "_" + strings.Replace(fieldName, "_", ".", -1),
}
metric.Points[0] = amonPt
tempSeries[acceptablePoints] = metric
acceptablePoints += 1
}
} else {
log.Printf("unable to build Metric for %s, skipping\n", pt.Name())
}
}
ts.Series = make([]*Metric, acceptablePoints)
copy(ts.Series, tempSeries[0:])
tsBytes, err := json.Marshal(ts)
if err != nil {
return fmt.Errorf("unable to marshal TimeSeries, %s\n", err.Error())
}
req, err := http.NewRequest("POST", a.authenticatedUrl(), bytes.NewBuffer(tsBytes))
if err != nil {
return fmt.Errorf("unable to create http.Request, %s\n", err.Error())
}
req.Header.Add("Content-Type", "application/json")
resp, err := a.client.Do(req)
if err != nil {
return fmt.Errorf("error POSTing metrics, %s\n", err.Error())
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode > 209 {
return fmt.Errorf("received bad status code, %d\n", resp.StatusCode)
}
return nil
}
func (a *Amon) SampleConfig() string {
return sampleConfig
}
func (a *Amon) Description() string {
return "Configuration for Amon Server to send metrics to."
}
func (a *Amon) authenticatedUrl() string {
return fmt.Sprintf("%s/api/system/%s", a.AmonInstance, a.ServerKey)
}
func buildPoints(pt *client.Point) (map[string]Point, error) {
pts := make(map[string]Point)
for k, v := range pt.Fields() {
var p Point
if err := p.setValue(v); err != nil {
return pts, fmt.Errorf("unable to extract value from Fields, %s", err.Error())
}
p[0] = float64(pt.Time().Unix())
pts[k] = p
}
return pts, nil
}
func (p *Point) setValue(v interface{}) error {
switch d := v.(type) {
case int:
p[1] = float64(int(d))
case int32:
p[1] = float64(int32(d))
case int64:
p[1] = float64(int64(d))
case float32:
p[1] = float64(d)
case float64:
p[1] = float64(d)
default:
return fmt.Errorf("undeterminable type")
}
return nil
}
func (a *Amon) Close() error {
return nil
}
func init() {
outputs.Add("amon", func() outputs.Output {
return &Amon{}
})
}