Source improvement for librato output (#1416)

* Source improvement for librato output

Build the source from the list of tag instead of a configuration specified
single tag

Graphite Serializer:
* make buildTags public
* make sure not to use empty tags

Librato output:
* Improve Error handling for librato API base on error or debug flag
* Send Metric per Batch (max 300)
* use Graphite BuildTag function to generate source

The change is made that it should be retro compatible

Metric sample:
server=127.0.0.1 port=80 state=leader env=test
measurement.metric_name value
service_n.metric_x

Metric before with source tags set as "server":
source=127.0.0.1
test.80.127_0_0_1.leader.measurement.metric_name
test.80.127_0_0_1.leader.service_n.metric_x

Metric now:
source=test.80.127.0.0.1.leader
measurement.metric_name
service_n.metric_x

As you can see the source in the "new" version is much more precise
That way when filter (only from source) you can filter by env or any other tags

* Using template to specify which tagsusing for source, default concat all
tags

* revert change in graphite serializer

* better documentation, change default for template

* fmt

* test passing with new host as default tags

* use host tag in api integration test

* Limit 80 char per line, change resolution to be a int in the sample

* fmt

* remove resolution, doc for template

* fmt
This commit is contained in:
tuier 2016-08-09 08:29:15 +01:00 committed by Aurélien Hébert
parent 9c1645f226
commit 416f0ded87
2 changed files with 234 additions and 129 deletions

View File

@ -7,6 +7,7 @@ import (
"io/ioutil" "io/ioutil"
"log" "log"
"net/http" "net/http"
"regexp"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
@ -14,19 +15,22 @@ import (
"github.com/influxdata/telegraf/plugins/serializers/graphite" "github.com/influxdata/telegraf/plugins/serializers/graphite"
) )
// Librato structure for configuration and client
type Librato struct { type Librato struct {
ApiUser string APIUser string
ApiToken string APIToken string
Debug bool Debug bool
NameFromTags bool SourceTag string // Deprecated, keeping for backward-compatibility
SourceTag string Timeout internal.Duration
Timeout internal.Duration Template string
Template string
apiUrl string APIUrl string
client *http.Client client *http.Client
} }
// https://www.librato.com/docs/kb/faq/best_practices/naming_convention_metrics_sources.html#naming-limitations-for-sources-and-metrics
var reUnacceptedChar = regexp.MustCompile("[^.a-zA-Z0-9_-]")
var sampleConfig = ` var sampleConfig = `
## Librator API Docs ## Librator API Docs
## http://dev.librato.com/v1/metrics-authentication ## http://dev.librato.com/v1/metrics-authentication
@ -36,20 +40,21 @@ var sampleConfig = `
api_token = "my-secret-token" # required. api_token = "my-secret-token" # required.
## Debug ## Debug
# debug = false # debug = false
## Tag Field to populate source attribute (optional)
## This is typically the _hostname_ from which the metric was obtained.
source_tag = "host"
## Connection timeout. ## Connection timeout.
# timeout = "5s" # timeout = "5s"
## Output Name Template (same as graphite buckets) ## Output source Template (same as graphite buckets)
## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#graphite ## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#graphite
template = "host.tags.measurement.field" ## This template is used in librato's source (not metric's name)
template = "host"
` `
// LMetrics is the default struct for Librato's API fromat
type LMetrics struct { type LMetrics struct {
Gauges []*Gauge `json:"gauges"` Gauges []*Gauge `json:"gauges"`
} }
// Gauge is the gauge format for Librato's API fromat
type Gauge struct { type Gauge struct {
Name string `json:"name"` Name string `json:"name"`
Value float64 `json:"value"` Value float64 `json:"value"`
@ -57,17 +62,22 @@ type Gauge struct {
MeasureTime int64 `json:"measure_time"` MeasureTime int64 `json:"measure_time"`
} }
const librato_api = "https://metrics-api.librato.com/v1/metrics" const libratoAPI = "https://metrics-api.librato.com/v1/metrics"
func NewLibrato(apiUrl string) *Librato { // NewLibrato is the main constructor for librato output plugins
func NewLibrato(apiURL string) *Librato {
return &Librato{ return &Librato{
apiUrl: apiUrl, APIUrl: apiURL,
Template: "host",
} }
} }
// Connect is the default output plugin connection function who make sure it
// can connect to the endpoint
func (l *Librato) Connect() error { func (l *Librato) Connect() error {
if l.ApiUser == "" || l.ApiToken == "" { if l.APIUser == "" || l.APIToken == "" {
return fmt.Errorf("api_user and api_token are required fields for librato output") return fmt.Errorf(
"api_user and api_token are required fields for librato output")
} }
l.client = &http.Client{ l.client = &http.Client{
Timeout: l.Timeout.Duration, Timeout: l.Timeout.Duration,
@ -76,18 +86,23 @@ func (l *Librato) Connect() error {
} }
func (l *Librato) Write(metrics []telegraf.Metric) error { func (l *Librato) Write(metrics []telegraf.Metric) error {
if len(metrics) == 0 { if len(metrics) == 0 {
return nil return nil
} }
lmetrics := LMetrics{} if l.Template == "" {
l.Template = "host"
}
if l.SourceTag != "" {
l.Template = l.SourceTag
}
tempGauges := []*Gauge{} tempGauges := []*Gauge{}
metricCounter := 0
for _, m := range metrics { for _, m := range metrics {
if gauges, err := l.buildGauges(m); err == nil { if gauges, err := l.buildGauges(m); err == nil {
for _, gauge := range gauges { for _, gauge := range gauges {
tempGauges = append(tempGauges, gauge) tempGauges = append(tempGauges, gauge)
metricCounter++
if l.Debug { if l.Debug {
log.Printf("[DEBUG] Got a gauge: %v\n", gauge) log.Printf("[DEBUG] Got a gauge: %v\n", gauge)
} }
@ -100,81 +115,115 @@ func (l *Librato) Write(metrics []telegraf.Metric) error {
} }
} }
lmetrics.Gauges = make([]*Gauge, metricCounter) metricCounter := len(tempGauges)
copy(lmetrics.Gauges, tempGauges[0:]) // make sur we send a batch of maximum 300
metricsBytes, err := json.Marshal(lmetrics) sizeBatch := 300
if err != nil { for start := 0; start < metricCounter; start += sizeBatch {
return fmt.Errorf("unable to marshal Metrics, %s\n", err.Error()) lmetrics := LMetrics{}
} else { end := start + sizeBatch
if end > metricCounter {
end = metricCounter
sizeBatch = end - start
}
lmetrics.Gauges = make([]*Gauge, sizeBatch)
copy(lmetrics.Gauges, tempGauges[start:end])
metricsBytes, err := json.Marshal(lmetrics)
if err != nil {
return fmt.Errorf("unable to marshal Metrics, %s\n", err.Error())
}
if l.Debug { if l.Debug {
log.Printf("[DEBUG] Librato request: %v\n", string(metricsBytes)) log.Printf("[DEBUG] Librato request: %v\n", string(metricsBytes))
} }
}
req, err := http.NewRequest("POST", l.apiUrl, bytes.NewBuffer(metricsBytes))
if err != nil {
return fmt.Errorf("unable to create http.Request, %s\n", err.Error())
}
req.Header.Add("Content-Type", "application/json")
req.SetBasicAuth(l.ApiUser, l.ApiToken)
resp, err := l.client.Do(req) req, err := http.NewRequest(
if err != nil { "POST",
if l.Debug { l.APIUrl,
log.Printf("[DEBUG] Error POSTing metrics: %v\n", err.Error()) bytes.NewBuffer(metricsBytes))
if err != nil {
return fmt.Errorf(
"unable to create http.Request, %s\n",
err.Error())
} }
return fmt.Errorf("error POSTing metrics, %s\n", err.Error()) req.Header.Add("Content-Type", "application/json")
} else { req.SetBasicAuth(l.APIUser, l.APIToken)
if l.Debug {
resp, err := l.client.Do(req)
if err != nil {
if l.Debug {
log.Printf("[DEBUG] Error POSTing metrics: %v\n", err.Error())
}
return fmt.Errorf("error POSTing metrics, %s\n", err.Error())
}
defer resp.Body.Close()
if resp.StatusCode != 200 || l.Debug {
htmlData, err := ioutil.ReadAll(resp.Body) htmlData, err := ioutil.ReadAll(resp.Body)
if err != nil { if err != nil {
log.Printf("[DEBUG] Couldn't get response! (%v)\n", err) log.Printf("[DEBUG] Couldn't get response! (%v)\n", err)
} else { }
if resp.StatusCode != 200 {
return fmt.Errorf(
"received bad status code, %d\n %s",
resp.StatusCode,
string(htmlData))
}
if l.Debug {
log.Printf("[DEBUG] Librato response: %v\n", string(htmlData)) log.Printf("[DEBUG] Librato response: %v\n", string(htmlData))
} }
} }
} }
defer resp.Body.Close()
if resp.StatusCode != 200 {
return fmt.Errorf("received bad status code, %d\n", resp.StatusCode)
}
return nil return nil
} }
// SampleConfig is function who return the default configuration for this
// output
func (l *Librato) SampleConfig() string { func (l *Librato) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// Description is function who return the Description of this output
func (l *Librato) Description() string { func (l *Librato) Description() string {
return "Configuration for Librato API to send metrics to." return "Configuration for Librato API to send metrics to."
} }
func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) { func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) {
gauges := []*Gauge{} gauges := []*Gauge{}
bucket := graphite.SerializeBucketName(m.Name(), m.Tags(), l.Template, "") if m.Time().Unix() == 0 {
return gauges, fmt.Errorf(
"Measure time must not be zero\n <%s> \n",
m.String())
}
metricSource := graphite.InsertField(
graphite.SerializeBucketName("", m.Tags(), l.Template, ""),
"value")
if metricSource == "" {
return gauges,
fmt.Errorf("undeterminable Source type from Field, %s\n",
l.Template)
}
for fieldName, value := range m.Fields() { for fieldName, value := range m.Fields() {
metricName := m.Name()
if fieldName != "value" {
metricName = fmt.Sprintf("%s.%s", m.Name(), fieldName)
}
gauge := &Gauge{ gauge := &Gauge{
Name: graphite.InsertField(bucket, fieldName), Source: reUnacceptedChar.ReplaceAllString(metricSource, "-"),
Name: reUnacceptedChar.ReplaceAllString(metricName, "-"),
MeasureTime: m.Time().Unix(), MeasureTime: m.Time().Unix(),
} }
if !gauge.verifyValue(value) { if !verifyValue(value) {
continue continue
} }
if err := gauge.setValue(value); err != nil { if err := gauge.setValue(value); err != nil {
return gauges, fmt.Errorf("unable to extract value from Fields, %s\n", return gauges, fmt.Errorf(
"unable to extract value from Fields, %s\n",
err.Error()) err.Error())
} }
if l.SourceTag != "" {
if source, ok := m.Tags()[l.SourceTag]; ok {
gauge.Source = source
} else {
return gauges,
fmt.Errorf("undeterminable Source type from Field, %s\n",
l.SourceTag)
}
}
gauges = append(gauges, gauge) gauges = append(gauges, gauge)
} }
if l.Debug { if l.Debug {
@ -183,7 +232,7 @@ func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) {
return gauges, nil return gauges, nil
} }
func (g *Gauge) verifyValue(v interface{}) bool { func verifyValue(v interface{}) bool {
switch v.(type) { switch v.(type) {
case string: case string:
return false return false
@ -209,12 +258,13 @@ func (g *Gauge) setValue(v interface{}) error {
return nil return nil
} }
//Close is used to close the connection to librato Output
func (l *Librato) Close() error { func (l *Librato) Close() error {
return nil return nil
} }
func init() { func init() {
outputs.Add("librato", func() telegraf.Output { outputs.Add("librato", func() telegraf.Output {
return NewLibrato(librato_api) return NewLibrato(libratoAPI)
}) })
} }

View File

@ -1,7 +1,6 @@
package librato package librato
import ( import (
"encoding/json"
"fmt" "fmt"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
@ -10,141 +9,137 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/serializers/graphite"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
var ( var (
fakeUrl = "http://test.librato.com" fakeURL = "http://test.librato.com"
fakeUser = "telegraf@influxdb.com" fakeUser = "telegraf@influxdb.com"
fakeToken = "123456" fakeToken = "123456"
) )
func fakeLibrato() *Librato { func fakeLibrato() *Librato {
l := NewLibrato(fakeUrl) l := NewLibrato(fakeURL)
l.ApiUser = fakeUser l.APIUser = fakeUser
l.ApiToken = fakeToken l.APIToken = fakeToken
return l return l
} }
func BuildTags(t *testing.T) {
testMetric := testutil.TestMetric(0.0, "test1")
graphiteSerializer := graphite.GraphiteSerializer{}
tags, err := graphiteSerializer.Serialize(testMetric)
fmt.Printf("Tags: %v", tags)
require.NoError(t, err)
}
func TestUriOverride(t *testing.T) { func TestUriOverride(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ts := httptest.NewServer(
w.WriteHeader(http.StatusOK) http.HandlerFunc(
})) func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer ts.Close() defer ts.Close()
l := NewLibrato(ts.URL) l := NewLibrato(ts.URL)
l.ApiUser = "telegraf@influxdb.com" l.APIUser = "telegraf@influxdb.com"
l.ApiToken = "123456" l.APIToken = "123456"
err := l.Connect() err := l.Connect()
require.NoError(t, err) require.NoError(t, err)
err = l.Write(testutil.MockMetrics()) err = l.Write([]telegraf.Metric{newHostMetric(int32(0), "name", "host")})
require.NoError(t, err) require.NoError(t, err)
} }
func TestBadStatusCode(t *testing.T) { func TestBadStatusCode(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ts := httptest.NewServer(
w.WriteHeader(http.StatusServiceUnavailable) http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(`{ w.WriteHeader(http.StatusServiceUnavailable)
"errors": { }))
"system": [
"The API is currently down for maintenance. It'll be back shortly."
]
}
}`)
}))
defer ts.Close() defer ts.Close()
l := NewLibrato(ts.URL) l := NewLibrato(ts.URL)
l.ApiUser = "telegraf@influxdb.com" l.APIUser = "telegraf@influxdb.com"
l.ApiToken = "123456" l.APIToken = "123456"
err := l.Connect() err := l.Connect()
require.NoError(t, err) require.NoError(t, err)
err = l.Write(testutil.MockMetrics()) err = l.Write([]telegraf.Metric{newHostMetric(int32(0), "name", "host")})
if err == nil { if err == nil {
t.Errorf("error expected but none returned") t.Errorf("error expected but none returned")
} else { } else {
require.EqualError(t, fmt.Errorf("received bad status code, 503\n"), err.Error()) require.EqualError(
t,
fmt.Errorf("received bad status code, 503\n "), err.Error())
} }
} }
func TestBuildGauge(t *testing.T) { func TestBuildGauge(t *testing.T) {
mtime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()
var gaugeTests = []struct { var gaugeTests = []struct {
ptIn telegraf.Metric ptIn telegraf.Metric
outGauge *Gauge outGauge *Gauge
err error err error
}{ }{
{ {
testutil.TestMetric(0.0, "test1"), newHostMetric(0.0, "test1", "host1"),
&Gauge{ &Gauge{
Name: "value1.test1", Name: "test1",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), MeasureTime: mtime,
Value: 0.0, Value: 0.0,
Source: "host1",
}, },
nil, nil,
}, },
{ {
testutil.TestMetric(1.0, "test2"), newHostMetric(1.0, "test2", "host2"),
&Gauge{ &Gauge{
Name: "value1.test2", Name: "test2",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), MeasureTime: mtime,
Value: 1.0, Value: 1.0,
Source: "host2",
}, },
nil, nil,
}, },
{ {
testutil.TestMetric(10, "test3"), newHostMetric(10, "test3", "host3"),
&Gauge{ &Gauge{
Name: "value1.test3", Name: "test3",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), MeasureTime: mtime,
Value: 10.0, Value: 10.0,
Source: "host3",
}, },
nil, nil,
}, },
{ {
testutil.TestMetric(int32(112345), "test4"), newHostMetric(int32(112345), "test4", "host4"),
&Gauge{ &Gauge{
Name: "value1.test4", Name: "test4",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), MeasureTime: mtime,
Value: 112345.0, Value: 112345.0,
Source: "host4",
}, },
nil, nil,
}, },
{ {
testutil.TestMetric(int64(112345), "test5"), newHostMetric(int64(112345), "test5", "host5"),
&Gauge{ &Gauge{
Name: "value1.test5", Name: "test5",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), MeasureTime: mtime,
Value: 112345.0, Value: 112345.0,
Source: "host5",
}, },
nil, nil,
}, },
{ {
testutil.TestMetric(float32(11234.5), "test6"), newHostMetric(float32(11234.5), "test6", "host6"),
&Gauge{ &Gauge{
Name: "value1.test6", Name: "test6",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), MeasureTime: mtime,
Value: 11234.5, Value: 11234.5,
Source: "host6",
}, },
nil, nil,
}, },
{ {
testutil.TestMetric("11234.5", "test7"), newHostMetric("11234.5", "test7", "host7"),
nil, nil,
nil, nil,
}, },
} }
l := NewLibrato(fakeUrl) l := NewLibrato(fakeURL)
for _, gt := range gaugeTests { for _, gt := range gaugeTests {
gauges, err := l.buildGauges(gt.ptIn) gauges, err := l.buildGauges(gt.ptIn)
if err != nil && gt.err == nil { if err != nil && gt.err == nil {
@ -167,61 +162,121 @@ func TestBuildGauge(t *testing.T) {
} }
} }
func newHostMetric(value interface{}, name, host string) (metric telegraf.Metric) {
metric, _ = telegraf.NewMetric(
name,
map[string]string{"host": host},
map[string]interface{}{"value": value},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
)
return
}
func TestBuildGaugeWithSource(t *testing.T) { func TestBuildGaugeWithSource(t *testing.T) {
mtime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
pt1, _ := telegraf.NewMetric( pt1, _ := telegraf.NewMetric(
"test1", "test1",
map[string]string{"hostname": "192.168.0.1", "tag1": "value1"}, map[string]string{"hostname": "192.168.0.1", "tag1": "value1"},
map[string]interface{}{"value": 0.0}, map[string]interface{}{"value": 0.0},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), mtime,
) )
pt2, _ := telegraf.NewMetric( pt2, _ := telegraf.NewMetric(
"test2", "test2",
map[string]string{"hostnam": "192.168.0.1", "tag1": "value1"}, map[string]string{"hostnam": "192.168.0.1", "tag1": "value1"},
map[string]interface{}{"value": 1.0}, map[string]interface{}{"value": 1.0},
time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC), mtime,
)
pt3, _ := telegraf.NewMetric(
"test3",
map[string]string{
"hostname": "192.168.0.1",
"tag2": "value2",
"tag1": "value1"},
map[string]interface{}{"value": 1.0},
mtime,
)
pt4, _ := telegraf.NewMetric(
"test4",
map[string]string{
"hostname": "192.168.0.1",
"tag2": "value2",
"tag1": "value1"},
map[string]interface{}{"value": 1.0},
mtime,
) )
var gaugeTests = []struct { var gaugeTests = []struct {
ptIn telegraf.Metric ptIn telegraf.Metric
template string
outGauge *Gauge outGauge *Gauge
err error err error
}{ }{
{ {
pt1, pt1,
"hostname",
&Gauge{ &Gauge{
Name: "192_168_0_1.value1.test1", Name: "test1",
MeasureTime: time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), MeasureTime: mtime.Unix(),
Value: 0.0, Value: 0.0,
Source: "192.168.0.1", Source: "192_168_0_1",
}, },
nil, nil,
}, },
{ {
pt2, pt2,
"hostname",
&Gauge{ &Gauge{
Name: "192_168_0_1.value1.test1", Name: "test2",
MeasureTime: time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC).Unix(), MeasureTime: mtime.Unix(),
Value: 1.0, Value: 1.0,
}, },
fmt.Errorf("undeterminable Source type from Field, hostname"), fmt.Errorf("undeterminable Source type from Field, hostname"),
}, },
{
pt3,
"tags",
&Gauge{
Name: "test3",
MeasureTime: mtime.Unix(),
Value: 1.0,
Source: "192_168_0_1.value1.value2",
},
nil,
},
{
pt4,
"hostname.tag2",
&Gauge{
Name: "test4",
MeasureTime: mtime.Unix(),
Value: 1.0,
Source: "192_168_0_1.value2",
},
nil,
},
} }
l := NewLibrato(fakeUrl) l := NewLibrato(fakeURL)
l.SourceTag = "hostname"
for _, gt := range gaugeTests { for _, gt := range gaugeTests {
l.Template = gt.template
gauges, err := l.buildGauges(gt.ptIn) gauges, err := l.buildGauges(gt.ptIn)
if err != nil && gt.err == nil { if err != nil && gt.err == nil {
t.Errorf("%s: unexpected error, %+v\n", gt.ptIn.Name(), err) t.Errorf("%s: unexpected error, %+v\n", gt.ptIn.Name(), err)
} }
if gt.err != nil && err == nil { if gt.err != nil && err == nil {
t.Errorf("%s: expected an error (%s) but none returned", gt.ptIn.Name(), gt.err.Error()) t.Errorf(
"%s: expected an error (%s) but none returned",
gt.ptIn.Name(),
gt.err.Error())
} }
if len(gauges) == 0 { if len(gauges) == 0 {
continue continue
} }
if gt.err == nil && !reflect.DeepEqual(gauges[0], gt.outGauge) { if gt.err == nil && !reflect.DeepEqual(gauges[0], gt.outGauge) {
t.Errorf("%s: \nexpected %+v\ngot %+v\n", gt.ptIn.Name(), gt.outGauge, gauges[0]) t.Errorf(
"%s: \nexpected %+v\ngot %+v\n",
gt.ptIn.Name(),
gt.outGauge, gauges[0])
} }
} }
} }