diff --git a/plugins/outputs/librato/librato.go b/plugins/outputs/librato/librato.go index ccb2acd9a..17d0d4c6a 100644 --- a/plugins/outputs/librato/librato.go +++ b/plugins/outputs/librato/librato.go @@ -7,6 +7,7 @@ import ( "io/ioutil" "log" "net/http" + "regexp" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" @@ -14,19 +15,22 @@ import ( "github.com/influxdata/telegraf/plugins/serializers/graphite" ) +// Librato structure for configuration and client type Librato struct { - ApiUser string - ApiToken string - Debug bool - NameFromTags bool - SourceTag string - Timeout internal.Duration - Template string + APIUser string + APIToken string + Debug bool + SourceTag string // Deprecated, keeping for backward-compatibility + Timeout internal.Duration + Template string - apiUrl string + APIUrl string client *http.Client } +// https://www.librato.com/docs/kb/faq/best_practices/naming_convention_metrics_sources.html#naming-limitations-for-sources-and-metrics +var reUnacceptedChar = regexp.MustCompile("[^.a-zA-Z0-9_-]") + var sampleConfig = ` ## Librator API Docs ## http://dev.librato.com/v1/metrics-authentication @@ -36,20 +40,21 @@ var sampleConfig = ` api_token = "my-secret-token" # required. ## Debug # debug = false - ## Tag Field to populate source attribute (optional) - ## This is typically the _hostname_ from which the metric was obtained. - source_tag = "host" ## Connection timeout. # timeout = "5s" - ## Output Name Template (same as graphite buckets) + ## Output source Template (same as graphite buckets) ## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#graphite - template = "host.tags.measurement.field" + ## This template is used in librato's source (not metric's name) + template = "host" + ` +// LMetrics is the default struct for Librato's API fromat type LMetrics struct { Gauges []*Gauge `json:"gauges"` } +// Gauge is the gauge format for Librato's API fromat type Gauge struct { Name string `json:"name"` Value float64 `json:"value"` @@ -57,17 +62,22 @@ type Gauge struct { MeasureTime int64 `json:"measure_time"` } -const librato_api = "https://metrics-api.librato.com/v1/metrics" +const libratoAPI = "https://metrics-api.librato.com/v1/metrics" -func NewLibrato(apiUrl string) *Librato { +// NewLibrato is the main constructor for librato output plugins +func NewLibrato(apiURL string) *Librato { return &Librato{ - apiUrl: apiUrl, + APIUrl: apiURL, + Template: "host", } } +// Connect is the default output plugin connection function who make sure it +// can connect to the endpoint func (l *Librato) Connect() error { - if l.ApiUser == "" || l.ApiToken == "" { - return fmt.Errorf("api_user and api_token are required fields for librato output") + if l.APIUser == "" || l.APIToken == "" { + return fmt.Errorf( + "api_user and api_token are required fields for librato output") } l.client = &http.Client{ Timeout: l.Timeout.Duration, @@ -76,18 +86,23 @@ func (l *Librato) Connect() error { } func (l *Librato) Write(metrics []telegraf.Metric) error { + if len(metrics) == 0 { return nil } - lmetrics := LMetrics{} + if l.Template == "" { + l.Template = "host" + } + if l.SourceTag != "" { + l.Template = l.SourceTag + } + tempGauges := []*Gauge{} - metricCounter := 0 for _, m := range metrics { if gauges, err := l.buildGauges(m); err == nil { for _, gauge := range gauges { tempGauges = append(tempGauges, gauge) - metricCounter++ if l.Debug { log.Printf("[DEBUG] Got a gauge: %v\n", gauge) } @@ -100,81 +115,115 @@ func (l *Librato) Write(metrics []telegraf.Metric) error { } } - lmetrics.Gauges = make([]*Gauge, metricCounter) - copy(lmetrics.Gauges, tempGauges[0:]) - metricsBytes, err := json.Marshal(lmetrics) - if err != nil { - return fmt.Errorf("unable to marshal Metrics, %s\n", err.Error()) - } else { + metricCounter := len(tempGauges) + // make sur we send a batch of maximum 300 + sizeBatch := 300 + for start := 0; start < metricCounter; start += sizeBatch { + lmetrics := LMetrics{} + end := start + sizeBatch + if end > metricCounter { + end = metricCounter + sizeBatch = end - start + } + lmetrics.Gauges = make([]*Gauge, sizeBatch) + copy(lmetrics.Gauges, tempGauges[start:end]) + metricsBytes, err := json.Marshal(lmetrics) + if err != nil { + return fmt.Errorf("unable to marshal Metrics, %s\n", err.Error()) + } + if l.Debug { log.Printf("[DEBUG] Librato request: %v\n", string(metricsBytes)) } - } - req, err := http.NewRequest("POST", l.apiUrl, bytes.NewBuffer(metricsBytes)) - if err != nil { - return fmt.Errorf("unable to create http.Request, %s\n", err.Error()) - } - req.Header.Add("Content-Type", "application/json") - req.SetBasicAuth(l.ApiUser, l.ApiToken) - resp, err := l.client.Do(req) - if err != nil { - if l.Debug { - log.Printf("[DEBUG] Error POSTing metrics: %v\n", err.Error()) + req, err := http.NewRequest( + "POST", + l.APIUrl, + bytes.NewBuffer(metricsBytes)) + if err != nil { + return fmt.Errorf( + "unable to create http.Request, %s\n", + err.Error()) } - return fmt.Errorf("error POSTing metrics, %s\n", err.Error()) - } else { - if l.Debug { + req.Header.Add("Content-Type", "application/json") + req.SetBasicAuth(l.APIUser, l.APIToken) + + resp, err := l.client.Do(req) + if err != nil { + if l.Debug { + log.Printf("[DEBUG] Error POSTing metrics: %v\n", err.Error()) + } + return fmt.Errorf("error POSTing metrics, %s\n", err.Error()) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 || l.Debug { htmlData, err := ioutil.ReadAll(resp.Body) if err != nil { log.Printf("[DEBUG] Couldn't get response! (%v)\n", err) - } else { + } + if resp.StatusCode != 200 { + return fmt.Errorf( + "received bad status code, %d\n %s", + resp.StatusCode, + string(htmlData)) + } + if l.Debug { log.Printf("[DEBUG] Librato response: %v\n", string(htmlData)) } } } - defer resp.Body.Close() - - if resp.StatusCode != 200 { - return fmt.Errorf("received bad status code, %d\n", resp.StatusCode) - } - return nil } +// SampleConfig is function who return the default configuration for this +// output func (l *Librato) SampleConfig() string { return sampleConfig } +// Description is function who return the Description of this output func (l *Librato) Description() string { return "Configuration for Librato API to send metrics to." } func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) { + gauges := []*Gauge{} - bucket := graphite.SerializeBucketName(m.Name(), m.Tags(), l.Template, "") + if m.Time().Unix() == 0 { + return gauges, fmt.Errorf( + "Measure time must not be zero\n <%s> \n", + m.String()) + } + metricSource := graphite.InsertField( + graphite.SerializeBucketName("", m.Tags(), l.Template, ""), + "value") + if metricSource == "" { + return gauges, + fmt.Errorf("undeterminable Source type from Field, %s\n", + l.Template) + } for fieldName, value := range m.Fields() { + + metricName := m.Name() + if fieldName != "value" { + metricName = fmt.Sprintf("%s.%s", m.Name(), fieldName) + } + gauge := &Gauge{ - Name: graphite.InsertField(bucket, fieldName), + Source: reUnacceptedChar.ReplaceAllString(metricSource, "-"), + Name: reUnacceptedChar.ReplaceAllString(metricName, "-"), MeasureTime: m.Time().Unix(), } - if !gauge.verifyValue(value) { + if !verifyValue(value) { continue } if err := gauge.setValue(value); err != nil { - return gauges, fmt.Errorf("unable to extract value from Fields, %s\n", + return gauges, fmt.Errorf( + "unable to extract value from Fields, %s\n", err.Error()) } - if l.SourceTag != "" { - if source, ok := m.Tags()[l.SourceTag]; ok { - gauge.Source = source - } else { - return gauges, - fmt.Errorf("undeterminable Source type from Field, %s\n", - l.SourceTag) - } - } gauges = append(gauges, gauge) } if l.Debug { @@ -183,7 +232,7 @@ func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) { return gauges, nil } -func (g *Gauge) verifyValue(v interface{}) bool { +func verifyValue(v interface{}) bool { switch v.(type) { case string: return false @@ -209,12 +258,13 @@ func (g *Gauge) setValue(v interface{}) error { return nil } +//Close is used to close the connection to librato Output func (l *Librato) Close() error { return nil } func init() { outputs.Add("librato", func() telegraf.Output { - return NewLibrato(librato_api) + return NewLibrato(libratoAPI) }) } diff --git a/plugins/outputs/librato/librato_test.go b/plugins/outputs/librato/librato_test.go index e90339928..dd5755a8c 100644 --- a/plugins/outputs/librato/librato_test.go +++ b/plugins/outputs/librato/librato_test.go @@ -1,7 +1,6 @@ package librato import ( - "encoding/json" "fmt" "net/http" "net/http/httptest" @@ -10,141 +9,137 @@ import ( "time" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/plugins/serializers/graphite" - "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" ) var ( - fakeUrl = "http://test.librato.com" + fakeURL = "http://test.librato.com" fakeUser = "telegraf@influxdb.com" fakeToken = "123456" ) func fakeLibrato() *Librato { - l := NewLibrato(fakeUrl) - l.ApiUser = fakeUser - l.ApiToken = fakeToken + l := NewLibrato(fakeURL) + l.APIUser = fakeUser + l.APIToken = fakeToken return l } -func BuildTags(t *testing.T) { - testMetric := testutil.TestMetric(0.0, "test1") - graphiteSerializer := graphite.GraphiteSerializer{} - tags, err := graphiteSerializer.Serialize(testMetric) - fmt.Printf("Tags: %v", tags) - require.NoError(t, err) -} - func TestUriOverride(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - })) + ts := httptest.NewServer( + http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) defer ts.Close() l := NewLibrato(ts.URL) - l.ApiUser = "telegraf@influxdb.com" - l.ApiToken = "123456" + l.APIUser = "telegraf@influxdb.com" + l.APIToken = "123456" err := l.Connect() require.NoError(t, err) - err = l.Write(testutil.MockMetrics()) + err = l.Write([]telegraf.Metric{newHostMetric(int32(0), "name", "host")}) require.NoError(t, err) } func TestBadStatusCode(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusServiceUnavailable) - json.NewEncoder(w).Encode(`{ - "errors": { - "system": [ - "The API is currently down for maintenance. It'll be back shortly." - ] - } - }`) - })) + ts := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusServiceUnavailable) + })) defer ts.Close() l := NewLibrato(ts.URL) - l.ApiUser = "telegraf@influxdb.com" - l.ApiToken = "123456" + l.APIUser = "telegraf@influxdb.com" + l.APIToken = "123456" err := l.Connect() require.NoError(t, err) - err = l.Write(testutil.MockMetrics()) + err = l.Write([]telegraf.Metric{newHostMetric(int32(0), "name", "host")}) if err == nil { t.Errorf("error expected but none returned") } else { - require.EqualError(t, fmt.Errorf("received bad status code, 503\n"), err.Error()) + require.EqualError( + t, + fmt.Errorf("received bad status code, 503\n "), err.Error()) } } func TestBuildGauge(t *testing.T) { + + mtime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix() var gaugeTests = []struct { ptIn telegraf.Metric outGauge *Gauge err error }{ { - testutil.TestMetric(0.0, "test1"), + newHostMetric(0.0, "test1", "host1"), &Gauge{ - Name: "value1.test1", - MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), + Name: "test1", + MeasureTime: mtime, Value: 0.0, + Source: "host1", }, nil, }, { - testutil.TestMetric(1.0, "test2"), + newHostMetric(1.0, "test2", "host2"), &Gauge{ - Name: "value1.test2", - MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), + Name: "test2", + MeasureTime: mtime, Value: 1.0, + Source: "host2", }, nil, }, { - testutil.TestMetric(10, "test3"), + newHostMetric(10, "test3", "host3"), &Gauge{ - Name: "value1.test3", - MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), + Name: "test3", + MeasureTime: mtime, Value: 10.0, + Source: "host3", }, nil, }, { - testutil.TestMetric(int32(112345), "test4"), + newHostMetric(int32(112345), "test4", "host4"), &Gauge{ - Name: "value1.test4", - MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), + Name: "test4", + MeasureTime: mtime, Value: 112345.0, + Source: "host4", }, nil, }, { - testutil.TestMetric(int64(112345), "test5"), + newHostMetric(int64(112345), "test5", "host5"), &Gauge{ - Name: "value1.test5", - MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), + Name: "test5", + MeasureTime: mtime, Value: 112345.0, + Source: "host5", }, nil, }, { - testutil.TestMetric(float32(11234.5), "test6"), + newHostMetric(float32(11234.5), "test6", "host6"), &Gauge{ - Name: "value1.test6", - MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), + Name: "test6", + MeasureTime: mtime, Value: 11234.5, + Source: "host6", }, nil, }, { - testutil.TestMetric("11234.5", "test7"), + newHostMetric("11234.5", "test7", "host7"), nil, nil, }, } - l := NewLibrato(fakeUrl) + l := NewLibrato(fakeURL) for _, gt := range gaugeTests { gauges, err := l.buildGauges(gt.ptIn) if err != nil && gt.err == nil { @@ -167,61 +162,121 @@ func TestBuildGauge(t *testing.T) { } } +func newHostMetric(value interface{}, name, host string) (metric telegraf.Metric) { + metric, _ = telegraf.NewMetric( + name, + map[string]string{"host": host}, + map[string]interface{}{"value": value}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + return +} + func TestBuildGaugeWithSource(t *testing.T) { + mtime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) pt1, _ := telegraf.NewMetric( "test1", map[string]string{"hostname": "192.168.0.1", "tag1": "value1"}, map[string]interface{}{"value": 0.0}, - time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + mtime, ) pt2, _ := telegraf.NewMetric( "test2", map[string]string{"hostnam": "192.168.0.1", "tag1": "value1"}, map[string]interface{}{"value": 1.0}, - time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC), + mtime, + ) + pt3, _ := telegraf.NewMetric( + "test3", + map[string]string{ + "hostname": "192.168.0.1", + "tag2": "value2", + "tag1": "value1"}, + map[string]interface{}{"value": 1.0}, + mtime, + ) + pt4, _ := telegraf.NewMetric( + "test4", + map[string]string{ + "hostname": "192.168.0.1", + "tag2": "value2", + "tag1": "value1"}, + map[string]interface{}{"value": 1.0}, + mtime, ) var gaugeTests = []struct { ptIn telegraf.Metric + template string outGauge *Gauge err error }{ { pt1, + "hostname", &Gauge{ - Name: "192_168_0_1.value1.test1", - MeasureTime: time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), + Name: "test1", + MeasureTime: mtime.Unix(), Value: 0.0, - Source: "192.168.0.1", + Source: "192_168_0_1", }, nil, }, { pt2, + "hostname", &Gauge{ - Name: "192_168_0_1.value1.test1", - MeasureTime: time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC).Unix(), + Name: "test2", + MeasureTime: mtime.Unix(), Value: 1.0, }, fmt.Errorf("undeterminable Source type from Field, hostname"), }, + { + pt3, + "tags", + &Gauge{ + Name: "test3", + MeasureTime: mtime.Unix(), + Value: 1.0, + Source: "192_168_0_1.value1.value2", + }, + nil, + }, + { + pt4, + "hostname.tag2", + &Gauge{ + Name: "test4", + MeasureTime: mtime.Unix(), + Value: 1.0, + Source: "192_168_0_1.value2", + }, + nil, + }, } - l := NewLibrato(fakeUrl) - l.SourceTag = "hostname" + l := NewLibrato(fakeURL) for _, gt := range gaugeTests { + l.Template = gt.template gauges, err := l.buildGauges(gt.ptIn) if err != nil && gt.err == nil { t.Errorf("%s: unexpected error, %+v\n", gt.ptIn.Name(), err) } if gt.err != nil && err == nil { - t.Errorf("%s: expected an error (%s) but none returned", gt.ptIn.Name(), gt.err.Error()) + t.Errorf( + "%s: expected an error (%s) but none returned", + gt.ptIn.Name(), + gt.err.Error()) } if len(gauges) == 0 { continue } if gt.err == nil && !reflect.DeepEqual(gauges[0], gt.outGauge) { - t.Errorf("%s: \nexpected %+v\ngot %+v\n", gt.ptIn.Name(), gt.outGauge, gauges[0]) + t.Errorf( + "%s: \nexpected %+v\ngot %+v\n", + gt.ptIn.Name(), + gt.outGauge, gauges[0]) } } }