Files
telegraf/plugins/outputs/librato/librato.go
tuier e457b7a8df Source improvement for librato output (#1416)
* Source improvement for librato output

Build the source from the list of tag instead of a configuration specified
single tag

Graphite Serializer:
* make buildTags public
* make sure not to use empty tags

Librato output:
* Improve Error handling for librato API base on error or debug flag
* Send Metric per Batch (max 300)
* use Graphite BuildTag function to generate source

The change is made that it should be retro compatible

Metric sample:
server=127.0.0.1 port=80 state=leader env=test
measurement.metric_name value
service_n.metric_x

Metric before with source tags set as "server":
source=127.0.0.1
test.80.127_0_0_1.leader.measurement.metric_name
test.80.127_0_0_1.leader.service_n.metric_x

Metric now:
source=test.80.127.0.0.1.leader
measurement.metric_name
service_n.metric_x

As you can see the source in the "new" version is much more precise
That way when filter (only from source) you can filter by env or any other tags

* Using template to specify which tagsusing for source, default concat all
tags

* revert change in graphite serializer

* better documentation, change default for template

* fmt

* test passing with new host as default tags

* use host tag in api integration test

* Limit 80 char per line, change resolution to be a int in the sample

* fmt

* remove resolution, doc for template

* fmt
2016-08-09 08:29:15 +01:00

271 lines
6.4 KiB
Go

package librato
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"regexp"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers/graphite"
)
// Librato structure for configuration and client
type Librato struct {
APIUser string
APIToken string
Debug bool
SourceTag string // Deprecated, keeping for backward-compatibility
Timeout internal.Duration
Template string
APIUrl string
client *http.Client
}
// https://www.librato.com/docs/kb/faq/best_practices/naming_convention_metrics_sources.html#naming-limitations-for-sources-and-metrics
var reUnacceptedChar = regexp.MustCompile("[^.a-zA-Z0-9_-]")
var sampleConfig = `
## Librator API Docs
## http://dev.librato.com/v1/metrics-authentication
## Librato API user
api_user = "telegraf@influxdb.com" # required.
## Librato API token
api_token = "my-secret-token" # required.
## Debug
# debug = false
## Connection timeout.
# timeout = "5s"
## Output source Template (same as graphite buckets)
## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#graphite
## This template is used in librato's source (not metric's name)
template = "host"
`
// LMetrics is the default struct for Librato's API fromat
type LMetrics struct {
Gauges []*Gauge `json:"gauges"`
}
// Gauge is the gauge format for Librato's API fromat
type Gauge struct {
Name string `json:"name"`
Value float64 `json:"value"`
Source string `json:"source"`
MeasureTime int64 `json:"measure_time"`
}
const libratoAPI = "https://metrics-api.librato.com/v1/metrics"
// NewLibrato is the main constructor for librato output plugins
func NewLibrato(apiURL string) *Librato {
return &Librato{
APIUrl: apiURL,
Template: "host",
}
}
// Connect is the default output plugin connection function who make sure it
// can connect to the endpoint
func (l *Librato) Connect() error {
if l.APIUser == "" || l.APIToken == "" {
return fmt.Errorf(
"api_user and api_token are required fields for librato output")
}
l.client = &http.Client{
Timeout: l.Timeout.Duration,
}
return nil
}
func (l *Librato) Write(metrics []telegraf.Metric) error {
if len(metrics) == 0 {
return nil
}
if l.Template == "" {
l.Template = "host"
}
if l.SourceTag != "" {
l.Template = l.SourceTag
}
tempGauges := []*Gauge{}
for _, m := range metrics {
if gauges, err := l.buildGauges(m); err == nil {
for _, gauge := range gauges {
tempGauges = append(tempGauges, gauge)
if l.Debug {
log.Printf("[DEBUG] Got a gauge: %v\n", gauge)
}
}
} else {
log.Printf("unable to build Gauge for %s, skipping\n", m.Name())
if l.Debug {
log.Printf("[DEBUG] Couldn't build gauge: %v\n", err)
}
}
}
metricCounter := len(tempGauges)
// make sur we send a batch of maximum 300
sizeBatch := 300
for start := 0; start < metricCounter; start += sizeBatch {
lmetrics := LMetrics{}
end := start + sizeBatch
if end > metricCounter {
end = metricCounter
sizeBatch = end - start
}
lmetrics.Gauges = make([]*Gauge, sizeBatch)
copy(lmetrics.Gauges, tempGauges[start:end])
metricsBytes, err := json.Marshal(lmetrics)
if err != nil {
return fmt.Errorf("unable to marshal Metrics, %s\n", err.Error())
}
if l.Debug {
log.Printf("[DEBUG] Librato request: %v\n", string(metricsBytes))
}
req, err := http.NewRequest(
"POST",
l.APIUrl,
bytes.NewBuffer(metricsBytes))
if err != nil {
return fmt.Errorf(
"unable to create http.Request, %s\n",
err.Error())
}
req.Header.Add("Content-Type", "application/json")
req.SetBasicAuth(l.APIUser, l.APIToken)
resp, err := l.client.Do(req)
if err != nil {
if l.Debug {
log.Printf("[DEBUG] Error POSTing metrics: %v\n", err.Error())
}
return fmt.Errorf("error POSTing metrics, %s\n", err.Error())
}
defer resp.Body.Close()
if resp.StatusCode != 200 || l.Debug {
htmlData, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Printf("[DEBUG] Couldn't get response! (%v)\n", err)
}
if resp.StatusCode != 200 {
return fmt.Errorf(
"received bad status code, %d\n %s",
resp.StatusCode,
string(htmlData))
}
if l.Debug {
log.Printf("[DEBUG] Librato response: %v\n", string(htmlData))
}
}
}
return nil
}
// SampleConfig is function who return the default configuration for this
// output
func (l *Librato) SampleConfig() string {
return sampleConfig
}
// Description is function who return the Description of this output
func (l *Librato) Description() string {
return "Configuration for Librato API to send metrics to."
}
func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) {
gauges := []*Gauge{}
if m.Time().Unix() == 0 {
return gauges, fmt.Errorf(
"Measure time must not be zero\n <%s> \n",
m.String())
}
metricSource := graphite.InsertField(
graphite.SerializeBucketName("", m.Tags(), l.Template, ""),
"value")
if metricSource == "" {
return gauges,
fmt.Errorf("undeterminable Source type from Field, %s\n",
l.Template)
}
for fieldName, value := range m.Fields() {
metricName := m.Name()
if fieldName != "value" {
metricName = fmt.Sprintf("%s.%s", m.Name(), fieldName)
}
gauge := &Gauge{
Source: reUnacceptedChar.ReplaceAllString(metricSource, "-"),
Name: reUnacceptedChar.ReplaceAllString(metricName, "-"),
MeasureTime: m.Time().Unix(),
}
if !verifyValue(value) {
continue
}
if err := gauge.setValue(value); err != nil {
return gauges, fmt.Errorf(
"unable to extract value from Fields, %s\n",
err.Error())
}
gauges = append(gauges, gauge)
}
if l.Debug {
fmt.Printf("[DEBUG] Built gauges: %v\n", gauges)
}
return gauges, nil
}
func verifyValue(v interface{}) bool {
switch v.(type) {
case string:
return false
}
return true
}
func (g *Gauge) setValue(v interface{}) error {
switch d := v.(type) {
case int:
g.Value = float64(int(d))
case int32:
g.Value = float64(int32(d))
case int64:
g.Value = float64(int64(d))
case float32:
g.Value = float64(d)
case float64:
g.Value = float64(d)
default:
return fmt.Errorf("undeterminable type %+v", d)
}
return nil
}
//Close is used to close the connection to librato Output
func (l *Librato) Close() error {
return nil
}
func init() {
outputs.Add("librato", func() telegraf.Output {
return NewLibrato(libratoAPI)
})
}