Source improvement for librato output (#1416)

* Source improvement for librato output

Build the source from the list of tag instead of a configuration specified
single tag

Graphite Serializer:
* make buildTags public
* make sure not to use empty tags

Librato output:
* Improve Error handling for librato API base on error or debug flag
* Send Metric per Batch (max 300)
* use Graphite BuildTag function to generate source

The change is made that it should be retro compatible

Metric sample:
server=127.0.0.1 port=80 state=leader env=test
measurement.metric_name value
service_n.metric_x

Metric before with source tags set as "server":
source=127.0.0.1
test.80.127_0_0_1.leader.measurement.metric_name
test.80.127_0_0_1.leader.service_n.metric_x

Metric now:
source=test.80.127.0.0.1.leader
measurement.metric_name
service_n.metric_x

As you can see the source in the "new" version is much more precise
That way when filter (only from source) you can filter by env or any other tags

* Using template to specify which tagsusing for source, default concat all
tags

* revert change in graphite serializer

* better documentation, change default for template

* fmt

* test passing with new host as default tags

* use host tag in api integration test

* Limit 80 char per line, change resolution to be a int in the sample

* fmt

* remove resolution, doc for template

* fmt
This commit is contained in:
tuier 2016-08-09 08:29:15 +01:00 committed by Cameron Sparr
parent 3853d0d065
commit e457b7a8df
2 changed files with 234 additions and 129 deletions

View File

@ -7,6 +7,7 @@ import (
"io/ioutil"
"log"
"net/http"
"regexp"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
@ -14,19 +15,22 @@ import (
"github.com/influxdata/telegraf/plugins/serializers/graphite"
)
// Librato structure for configuration and client
type Librato struct {
ApiUser string
ApiToken string
Debug bool
NameFromTags bool
SourceTag string
Timeout internal.Duration
Template string
APIUser string
APIToken string
Debug bool
SourceTag string // Deprecated, keeping for backward-compatibility
Timeout internal.Duration
Template string
apiUrl string
APIUrl string
client *http.Client
}
// https://www.librato.com/docs/kb/faq/best_practices/naming_convention_metrics_sources.html#naming-limitations-for-sources-and-metrics
var reUnacceptedChar = regexp.MustCompile("[^.a-zA-Z0-9_-]")
var sampleConfig = `
## Librator API Docs
## http://dev.librato.com/v1/metrics-authentication
@ -36,20 +40,21 @@ var sampleConfig = `
api_token = "my-secret-token" # required.
## Debug
# debug = false
## Tag Field to populate source attribute (optional)
## This is typically the _hostname_ from which the metric was obtained.
source_tag = "host"
## Connection timeout.
# timeout = "5s"
## Output Name Template (same as graphite buckets)
## Output source Template (same as graphite buckets)
## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#graphite
template = "host.tags.measurement.field"
## This template is used in librato's source (not metric's name)
template = "host"
`
// LMetrics is the default struct for Librato's API fromat
type LMetrics struct {
Gauges []*Gauge `json:"gauges"`
}
// Gauge is the gauge format for Librato's API fromat
type Gauge struct {
Name string `json:"name"`
Value float64 `json:"value"`
@ -57,17 +62,22 @@ type Gauge struct {
MeasureTime int64 `json:"measure_time"`
}
const librato_api = "https://metrics-api.librato.com/v1/metrics"
const libratoAPI = "https://metrics-api.librato.com/v1/metrics"
func NewLibrato(apiUrl string) *Librato {
// NewLibrato is the main constructor for librato output plugins
func NewLibrato(apiURL string) *Librato {
return &Librato{
apiUrl: apiUrl,
APIUrl: apiURL,
Template: "host",
}
}
// Connect is the default output plugin connection function who make sure it
// can connect to the endpoint
func (l *Librato) Connect() error {
if l.ApiUser == "" || l.ApiToken == "" {
return fmt.Errorf("api_user and api_token are required fields for librato output")
if l.APIUser == "" || l.APIToken == "" {
return fmt.Errorf(
"api_user and api_token are required fields for librato output")
}
l.client = &http.Client{
Timeout: l.Timeout.Duration,
@ -76,18 +86,23 @@ func (l *Librato) Connect() error {
}
func (l *Librato) Write(metrics []telegraf.Metric) error {
if len(metrics) == 0 {
return nil
}
lmetrics := LMetrics{}
if l.Template == "" {
l.Template = "host"
}
if l.SourceTag != "" {
l.Template = l.SourceTag
}
tempGauges := []*Gauge{}
metricCounter := 0
for _, m := range metrics {
if gauges, err := l.buildGauges(m); err == nil {
for _, gauge := range gauges {
tempGauges = append(tempGauges, gauge)
metricCounter++
if l.Debug {
log.Printf("[DEBUG] Got a gauge: %v\n", gauge)
}
@ -100,81 +115,115 @@ func (l *Librato) Write(metrics []telegraf.Metric) error {
}
}
lmetrics.Gauges = make([]*Gauge, metricCounter)
copy(lmetrics.Gauges, tempGauges[0:])
metricsBytes, err := json.Marshal(lmetrics)
if err != nil {
return fmt.Errorf("unable to marshal Metrics, %s\n", err.Error())
} else {
metricCounter := len(tempGauges)
// make sur we send a batch of maximum 300
sizeBatch := 300
for start := 0; start < metricCounter; start += sizeBatch {
lmetrics := LMetrics{}
end := start + sizeBatch
if end > metricCounter {
end = metricCounter
sizeBatch = end - start
}
lmetrics.Gauges = make([]*Gauge, sizeBatch)
copy(lmetrics.Gauges, tempGauges[start:end])
metricsBytes, err := json.Marshal(lmetrics)
if err != nil {
return fmt.Errorf("unable to marshal Metrics, %s\n", err.Error())
}
if l.Debug {
log.Printf("[DEBUG] Librato request: %v\n", string(metricsBytes))
}
}
req, err := http.NewRequest("POST", l.apiUrl, bytes.NewBuffer(metricsBytes))
if err != nil {
return fmt.Errorf("unable to create http.Request, %s\n", err.Error())
}
req.Header.Add("Content-Type", "application/json")
req.SetBasicAuth(l.ApiUser, l.ApiToken)
resp, err := l.client.Do(req)
if err != nil {
if l.Debug {
log.Printf("[DEBUG] Error POSTing metrics: %v\n", err.Error())
req, err := http.NewRequest(
"POST",
l.APIUrl,
bytes.NewBuffer(metricsBytes))
if err != nil {
return fmt.Errorf(
"unable to create http.Request, %s\n",
err.Error())
}
return fmt.Errorf("error POSTing metrics, %s\n", err.Error())
} else {
if l.Debug {
req.Header.Add("Content-Type", "application/json")
req.SetBasicAuth(l.APIUser, l.APIToken)
resp, err := l.client.Do(req)
if err != nil {
if l.Debug {
log.Printf("[DEBUG] Error POSTing metrics: %v\n", err.Error())
}
return fmt.Errorf("error POSTing metrics, %s\n", err.Error())
}
defer resp.Body.Close()
if resp.StatusCode != 200 || l.Debug {
htmlData, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Printf("[DEBUG] Couldn't get response! (%v)\n", err)
} else {
}
if resp.StatusCode != 200 {
return fmt.Errorf(
"received bad status code, %d\n %s",
resp.StatusCode,
string(htmlData))
}
if l.Debug {
log.Printf("[DEBUG] Librato response: %v\n", string(htmlData))
}
}
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return fmt.Errorf("received bad status code, %d\n", resp.StatusCode)
}
return nil
}
// SampleConfig is function who return the default configuration for this
// output
func (l *Librato) SampleConfig() string {
return sampleConfig
}
// Description is function who return the Description of this output
func (l *Librato) Description() string {
return "Configuration for Librato API to send metrics to."
}
func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) {
gauges := []*Gauge{}
bucket := graphite.SerializeBucketName(m.Name(), m.Tags(), l.Template, "")
if m.Time().Unix() == 0 {
return gauges, fmt.Errorf(
"Measure time must not be zero\n <%s> \n",
m.String())
}
metricSource := graphite.InsertField(
graphite.SerializeBucketName("", m.Tags(), l.Template, ""),
"value")
if metricSource == "" {
return gauges,
fmt.Errorf("undeterminable Source type from Field, %s\n",
l.Template)
}
for fieldName, value := range m.Fields() {
metricName := m.Name()
if fieldName != "value" {
metricName = fmt.Sprintf("%s.%s", m.Name(), fieldName)
}
gauge := &Gauge{
Name: graphite.InsertField(bucket, fieldName),
Source: reUnacceptedChar.ReplaceAllString(metricSource, "-"),
Name: reUnacceptedChar.ReplaceAllString(metricName, "-"),
MeasureTime: m.Time().Unix(),
}
if !gauge.verifyValue(value) {
if !verifyValue(value) {
continue
}
if err := gauge.setValue(value); err != nil {
return gauges, fmt.Errorf("unable to extract value from Fields, %s\n",
return gauges, fmt.Errorf(
"unable to extract value from Fields, %s\n",
err.Error())
}
if l.SourceTag != "" {
if source, ok := m.Tags()[l.SourceTag]; ok {
gauge.Source = source
} else {
return gauges,
fmt.Errorf("undeterminable Source type from Field, %s\n",
l.SourceTag)
}
}
gauges = append(gauges, gauge)
}
if l.Debug {
@ -183,7 +232,7 @@ func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) {
return gauges, nil
}
func (g *Gauge) verifyValue(v interface{}) bool {
func verifyValue(v interface{}) bool {
switch v.(type) {
case string:
return false
@ -209,12 +258,13 @@ func (g *Gauge) setValue(v interface{}) error {
return nil
}
//Close is used to close the connection to librato Output
func (l *Librato) Close() error {
return nil
}
func init() {
outputs.Add("librato", func() telegraf.Output {
return NewLibrato(librato_api)
return NewLibrato(libratoAPI)
})
}

View File

@ -1,7 +1,6 @@
package librato
import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
@ -10,141 +9,137 @@ import (
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/serializers/graphite"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
var (
fakeUrl = "http://test.librato.com"
fakeURL = "http://test.librato.com"
fakeUser = "telegraf@influxdb.com"
fakeToken = "123456"
)
func fakeLibrato() *Librato {
l := NewLibrato(fakeUrl)
l.ApiUser = fakeUser
l.ApiToken = fakeToken
l := NewLibrato(fakeURL)
l.APIUser = fakeUser
l.APIToken = fakeToken
return l
}
func BuildTags(t *testing.T) {
testMetric := testutil.TestMetric(0.0, "test1")
graphiteSerializer := graphite.GraphiteSerializer{}
tags, err := graphiteSerializer.Serialize(testMetric)
fmt.Printf("Tags: %v", tags)
require.NoError(t, err)
}
func TestUriOverride(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
ts := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer ts.Close()
l := NewLibrato(ts.URL)
l.ApiUser = "telegraf@influxdb.com"
l.ApiToken = "123456"
l.APIUser = "telegraf@influxdb.com"
l.APIToken = "123456"
err := l.Connect()
require.NoError(t, err)
err = l.Write(testutil.MockMetrics())
err = l.Write([]telegraf.Metric{newHostMetric(int32(0), "name", "host")})
require.NoError(t, err)
}
func TestBadStatusCode(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusServiceUnavailable)
json.NewEncoder(w).Encode(`{
"errors": {
"system": [
"The API is currently down for maintenance. It'll be back shortly."
]
}
}`)
}))
ts := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusServiceUnavailable)
}))
defer ts.Close()
l := NewLibrato(ts.URL)
l.ApiUser = "telegraf@influxdb.com"
l.ApiToken = "123456"
l.APIUser = "telegraf@influxdb.com"
l.APIToken = "123456"
err := l.Connect()
require.NoError(t, err)
err = l.Write(testutil.MockMetrics())
err = l.Write([]telegraf.Metric{newHostMetric(int32(0), "name", "host")})
if err == nil {
t.Errorf("error expected but none returned")
} else {
require.EqualError(t, fmt.Errorf("received bad status code, 503\n"), err.Error())
require.EqualError(
t,
fmt.Errorf("received bad status code, 503\n "), err.Error())
}
}
func TestBuildGauge(t *testing.T) {
mtime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()
var gaugeTests = []struct {
ptIn telegraf.Metric
outGauge *Gauge
err error
}{
{
testutil.TestMetric(0.0, "test1"),
newHostMetric(0.0, "test1", "host1"),
&Gauge{
Name: "value1.test1",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Name: "test1",
MeasureTime: mtime,
Value: 0.0,
Source: "host1",
},
nil,
},
{
testutil.TestMetric(1.0, "test2"),
newHostMetric(1.0, "test2", "host2"),
&Gauge{
Name: "value1.test2",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Name: "test2",
MeasureTime: mtime,
Value: 1.0,
Source: "host2",
},
nil,
},
{
testutil.TestMetric(10, "test3"),
newHostMetric(10, "test3", "host3"),
&Gauge{
Name: "value1.test3",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Name: "test3",
MeasureTime: mtime,
Value: 10.0,
Source: "host3",
},
nil,
},
{
testutil.TestMetric(int32(112345), "test4"),
newHostMetric(int32(112345), "test4", "host4"),
&Gauge{
Name: "value1.test4",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Name: "test4",
MeasureTime: mtime,
Value: 112345.0,
Source: "host4",
},
nil,
},
{
testutil.TestMetric(int64(112345), "test5"),
newHostMetric(int64(112345), "test5", "host5"),
&Gauge{
Name: "value1.test5",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Name: "test5",
MeasureTime: mtime,
Value: 112345.0,
Source: "host5",
},
nil,
},
{
testutil.TestMetric(float32(11234.5), "test6"),
newHostMetric(float32(11234.5), "test6", "host6"),
&Gauge{
Name: "value1.test6",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Name: "test6",
MeasureTime: mtime,
Value: 11234.5,
Source: "host6",
},
nil,
},
{
testutil.TestMetric("11234.5", "test7"),
newHostMetric("11234.5", "test7", "host7"),
nil,
nil,
},
}
l := NewLibrato(fakeUrl)
l := NewLibrato(fakeURL)
for _, gt := range gaugeTests {
gauges, err := l.buildGauges(gt.ptIn)
if err != nil && gt.err == nil {
@ -167,61 +162,121 @@ func TestBuildGauge(t *testing.T) {
}
}
func newHostMetric(value interface{}, name, host string) (metric telegraf.Metric) {
metric, _ = telegraf.NewMetric(
name,
map[string]string{"host": host},
map[string]interface{}{"value": value},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
)
return
}
func TestBuildGaugeWithSource(t *testing.T) {
mtime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
pt1, _ := telegraf.NewMetric(
"test1",
map[string]string{"hostname": "192.168.0.1", "tag1": "value1"},
map[string]interface{}{"value": 0.0},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
mtime,
)
pt2, _ := telegraf.NewMetric(
"test2",
map[string]string{"hostnam": "192.168.0.1", "tag1": "value1"},
map[string]interface{}{"value": 1.0},
time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC),
mtime,
)
pt3, _ := telegraf.NewMetric(
"test3",
map[string]string{
"hostname": "192.168.0.1",
"tag2": "value2",
"tag1": "value1"},
map[string]interface{}{"value": 1.0},
mtime,
)
pt4, _ := telegraf.NewMetric(
"test4",
map[string]string{
"hostname": "192.168.0.1",
"tag2": "value2",
"tag1": "value1"},
map[string]interface{}{"value": 1.0},
mtime,
)
var gaugeTests = []struct {
ptIn telegraf.Metric
template string
outGauge *Gauge
err error
}{
{
pt1,
"hostname",
&Gauge{
Name: "192_168_0_1.value1.test1",
MeasureTime: time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Name: "test1",
MeasureTime: mtime.Unix(),
Value: 0.0,
Source: "192.168.0.1",
Source: "192_168_0_1",
},
nil,
},
{
pt2,
"hostname",
&Gauge{
Name: "192_168_0_1.value1.test1",
MeasureTime: time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC).Unix(),
Name: "test2",
MeasureTime: mtime.Unix(),
Value: 1.0,
},
fmt.Errorf("undeterminable Source type from Field, hostname"),
},
{
pt3,
"tags",
&Gauge{
Name: "test3",
MeasureTime: mtime.Unix(),
Value: 1.0,
Source: "192_168_0_1.value1.value2",
},
nil,
},
{
pt4,
"hostname.tag2",
&Gauge{
Name: "test4",
MeasureTime: mtime.Unix(),
Value: 1.0,
Source: "192_168_0_1.value2",
},
nil,
},
}
l := NewLibrato(fakeUrl)
l.SourceTag = "hostname"
l := NewLibrato(fakeURL)
for _, gt := range gaugeTests {
l.Template = gt.template
gauges, err := l.buildGauges(gt.ptIn)
if err != nil && gt.err == nil {
t.Errorf("%s: unexpected error, %+v\n", gt.ptIn.Name(), err)
}
if gt.err != nil && err == nil {
t.Errorf("%s: expected an error (%s) but none returned", gt.ptIn.Name(), gt.err.Error())
t.Errorf(
"%s: expected an error (%s) but none returned",
gt.ptIn.Name(),
gt.err.Error())
}
if len(gauges) == 0 {
continue
}
if gt.err == nil && !reflect.DeepEqual(gauges[0], gt.outGauge) {
t.Errorf("%s: \nexpected %+v\ngot %+v\n", gt.ptIn.Name(), gt.outGauge, gauges[0])
t.Errorf(
"%s: \nexpected %+v\ngot %+v\n",
gt.ptIn.Name(),
gt.outGauge, gauges[0])
}
}
}