diff --git a/plugins/outputs/librato/librato.go b/plugins/outputs/librato/librato.go index 3897e0b4f..ed15350fc 100644 --- a/plugins/outputs/librato/librato.go +++ b/plugins/outputs/librato/librato.go @@ -4,19 +4,24 @@ import ( "bytes" "encoding/json" "fmt" + "io/ioutil" "log" "net/http" + "strings" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf/plugins/serializers/graphite" ) type Librato struct { - ApiUser string - ApiToken string - SourceTag string - Timeout internal.Duration + ApiUser string + ApiToken string + Debug bool + NameFromTags bool + SourceTag string + Timeout internal.Duration apiUrl string client *http.Client @@ -32,9 +37,12 @@ var sampleConfig = ` ## Librato API token api_token = "my-secret-token" # required. - ## Tag Field to populate source attribute (optional) - ## This is typically the _hostname_ from which the metric was obtained. - source_tag = "hostname" + ### Debug + # debug = false + + ### Tag Field to populate source attribute (optional) + ### This is typically the _hostname_ from which the metric was obtained. + source_tag = "host" ## Connection timeout. # timeout = "5s" @@ -82,17 +90,27 @@ func (l *Librato) Write(metrics []telegraf.Metric) error { for _, gauge := range gauges { tempGauges = append(tempGauges, gauge) metricCounter++ + if l.Debug { + log.Printf("[DEBUG] Got a gauge: %v\n", gauge) + } } } else { log.Printf("unable to build Gauge for %s, skipping\n", m.Name()) + if l.Debug { + log.Printf("[DEBUG] Couldn't build gauge: %v\n", err) + } } } lmetrics.Gauges = make([]*Gauge, metricCounter) copy(lmetrics.Gauges, tempGauges[0:]) - metricsBytes, err := json.Marshal(metrics) + metricsBytes, err := json.Marshal(lmetrics) if err != nil { return fmt.Errorf("unable to marshal Metrics, %s\n", err.Error()) + } else { + if l.Debug { + log.Printf("[DEBUG] Librato request: %v\n", string(metricsBytes)) + } } req, err := http.NewRequest("POST", l.apiUrl, bytes.NewBuffer(metricsBytes)) if err != nil { @@ -103,8 +121,21 @@ func (l *Librato) Write(metrics []telegraf.Metric) error { resp, err := l.client.Do(req) if err != nil { + if l.Debug { + log.Printf("[DEBUG] Error POSTing metrics: %v\n", err.Error()) + } return fmt.Errorf("error POSTing metrics, %s\n", err.Error()) + } else { + if l.Debug { + htmlData, err := ioutil.ReadAll(resp.Body) + if err != nil { + log.Printf("[DEBUG] Couldn't get response! (%v)\n", err) + } else { + log.Printf("[DEBUG] Librato response: %v\n", string(htmlData)) + } + } } + defer resp.Body.Close() if resp.StatusCode != 200 { @@ -122,11 +153,20 @@ func (l *Librato) Description() string { return "Configuration for Librato API to send metrics to." } +func (l *Librato) buildGaugeName(m telegraf.Metric, fieldName string) string { + // Use the GraphiteSerializer + graphiteSerializer := graphite.GraphiteSerializer{} + serializedMetric := graphiteSerializer.SerializeBucketName(m, fieldName) + + // Deal with slash characters: + return strings.Replace(serializedMetric, "/", "-", -1) +} + func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) { gauges := []*Gauge{} for fieldName, value := range m.Fields() { gauge := &Gauge{ - Name: m.Name() + "_" + fieldName, + Name: l.buildGaugeName(m, fieldName), MeasureTime: m.Time().Unix(), } if err := gauge.setValue(value); err != nil { @@ -142,6 +182,10 @@ func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) { l.SourceTag) } } + gauges = append(gauges, gauge) + } + if l.Debug { + fmt.Printf("[DEBUG] Built gauges: %v\n", gauges) } return gauges, nil } diff --git a/plugins/outputs/librato/librato_test.go b/plugins/outputs/librato/librato_test.go index c0b6ba021..ae08793e0 100644 --- a/plugins/outputs/librato/librato_test.go +++ b/plugins/outputs/librato/librato_test.go @@ -9,9 +9,9 @@ import ( "testing" "time" - "github.com/influxdata/telegraf/testutil" - "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/serializers/graphite" + "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" ) @@ -28,6 +28,14 @@ func fakeLibrato() *Librato { return l } +func BuildTags(t *testing.T) { + testMetric := testutil.TestMetric(0.0, "test1") + graphiteSerializer := graphite.GraphiteSerializer{} + tags, err := graphiteSerializer.Serialize(testMetric) + fmt.Printf("Tags: %v", tags) + require.NoError(t, err) +} + func TestUriOverride(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) @@ -78,7 +86,7 @@ func TestBuildGauge(t *testing.T) { { testutil.TestMetric(0.0, "test1"), &Gauge{ - Name: "test1", + Name: "value1.test1.value", MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), Value: 0.0, }, @@ -87,7 +95,7 @@ func TestBuildGauge(t *testing.T) { { testutil.TestMetric(1.0, "test2"), &Gauge{ - Name: "test2", + Name: "value1.test2.value", MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), Value: 1.0, }, @@ -96,7 +104,7 @@ func TestBuildGauge(t *testing.T) { { testutil.TestMetric(10, "test3"), &Gauge{ - Name: "test3", + Name: "value1.test3.value", MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), Value: 10.0, }, @@ -105,7 +113,7 @@ func TestBuildGauge(t *testing.T) { { testutil.TestMetric(int32(112345), "test4"), &Gauge{ - Name: "test4", + Name: "value1.test4.value", MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), Value: 112345.0, }, @@ -114,7 +122,7 @@ func TestBuildGauge(t *testing.T) { { testutil.TestMetric(int64(112345), "test5"), &Gauge{ - Name: "test5", + Name: "value1.test5.value", MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), Value: 112345.0, }, @@ -123,7 +131,7 @@ func TestBuildGauge(t *testing.T) { { testutil.TestMetric(float32(11234.5), "test6"), &Gauge{ - Name: "test6", + Name: "value1.test6.value", MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), Value: 11234.5, }, @@ -132,7 +140,7 @@ func TestBuildGauge(t *testing.T) { { testutil.TestMetric("11234.5", "test7"), &Gauge{ - Name: "test7", + Name: "value1.test7.value", MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), Value: 11234.5, }, @@ -163,13 +171,13 @@ func TestBuildGauge(t *testing.T) { func TestBuildGaugeWithSource(t *testing.T) { pt1, _ := telegraf.NewMetric( "test1", - map[string]string{"hostname": "192.168.0.1"}, + map[string]string{"hostname": "192.168.0.1", "tag1": "value1"}, map[string]interface{}{"value": 0.0}, time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), ) pt2, _ := telegraf.NewMetric( "test2", - map[string]string{"hostnam": "192.168.0.1"}, + map[string]string{"hostnam": "192.168.0.1", "tag1": "value1"}, map[string]interface{}{"value": 1.0}, time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC), ) @@ -182,7 +190,7 @@ func TestBuildGaugeWithSource(t *testing.T) { { pt1, &Gauge{ - Name: "test1", + Name: "192_168_0_1.value1.test1.value", MeasureTime: time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), Value: 0.0, Source: "192.168.0.1", @@ -192,7 +200,7 @@ func TestBuildGaugeWithSource(t *testing.T) { { pt2, &Gauge{ - Name: "test2", + Name: "192_168_0_1.value1.test1.value", MeasureTime: time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC).Unix(), Value: 1.0, }, diff --git a/plugins/serializers/graphite/graphite.go b/plugins/serializers/graphite/graphite.go index d04f756c1..908dce8fa 100644 --- a/plugins/serializers/graphite/graphite.go +++ b/plugins/serializers/graphite/graphite.go @@ -14,39 +14,49 @@ type GraphiteSerializer struct { func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]string, error) { out := []string{} - // Get name - name := metric.Name() + // Convert UnixNano to Unix timestamps timestamp := metric.UnixNano() / 1000000000 - tag_str := buildTags(metric) for field_name, value := range metric.Fields() { // Convert value value_str := fmt.Sprintf("%#v", value) // Write graphite metric var graphitePoint string - if name == field_name { - graphitePoint = fmt.Sprintf("%s.%s %s %d", - tag_str, - strings.Replace(name, ".", "_", -1), - value_str, - timestamp) - } else { - graphitePoint = fmt.Sprintf("%s.%s.%s %s %d", - tag_str, - strings.Replace(name, ".", "_", -1), - strings.Replace(field_name, ".", "_", -1), - value_str, - timestamp) - } - if s.Prefix != "" { - graphitePoint = fmt.Sprintf("%s.%s", s.Prefix, graphitePoint) - } + graphitePoint = fmt.Sprintf("%s %s %d", + s.SerializeBucketName(metric, field_name), + value_str, + timestamp) out = append(out, graphitePoint) } return out, nil } +func (s *GraphiteSerializer) SerializeBucketName(metric telegraf.Metric, field_name string) string { + // Get the metric name + name := metric.Name() + + // Convert UnixNano to Unix timestamps + tag_str := buildTags(metric) + + // Write graphite metric + var serializedBucketName string + if name == field_name { + serializedBucketName = fmt.Sprintf("%s.%s", + tag_str, + strings.Replace(name, ".", "_", -1)) + } else { + serializedBucketName = fmt.Sprintf("%s.%s.%s", + tag_str, + strings.Replace(name, ".", "_", -1), + strings.Replace(field_name, ".", "_", -1)) + } + if s.Prefix != "" { + serializedBucketName = fmt.Sprintf("%s.%s", s.Prefix, serializedBucketName) + } + return serializedBucketName +} + func buildTags(metric telegraf.Metric) string { var keys []string tags := metric.Tags() diff --git a/plugins/serializers/graphite/graphite_test.go b/plugins/serializers/graphite/graphite_test.go index 72b203b7a..8d25bf937 100644 --- a/plugins/serializers/graphite/graphite_test.go +++ b/plugins/serializers/graphite/graphite_test.go @@ -119,3 +119,62 @@ func TestSerializeMetricPrefix(t *testing.T) { sort.Strings(expS) assert.Equal(t, expS, mS) } + +func TestSerializeBucketNameNoHost(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "cpu": "cpu0", + "datacenter": "us-west-2", + } + fields := map[string]interface{}{ + "usage_idle": float64(91.5), + } + m, err := telegraf.NewMetric("cpu", tags, fields, now) + assert.NoError(t, err) + + s := GraphiteSerializer{} + mS := s.SerializeBucketName(m, "usage_idle") + + expS := fmt.Sprintf("cpu0.us-west-2.cpu.usage_idle") + assert.Equal(t, expS, mS) +} + +func TestSerializeBucketNameHost(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "host": "localhost", + "cpu": "cpu0", + "datacenter": "us-west-2", + } + fields := map[string]interface{}{ + "usage_idle": float64(91.5), + } + m, err := telegraf.NewMetric("cpu", tags, fields, now) + assert.NoError(t, err) + + s := GraphiteSerializer{} + mS := s.SerializeBucketName(m, "usage_idle") + + expS := fmt.Sprintf("localhost.cpu0.us-west-2.cpu.usage_idle") + assert.Equal(t, expS, mS) +} + +func TestSerializeBucketNamePrefix(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "host": "localhost", + "cpu": "cpu0", + "datacenter": "us-west-2", + } + fields := map[string]interface{}{ + "usage_idle": float64(91.5), + } + m, err := telegraf.NewMetric("cpu", tags, fields, now) + assert.NoError(t, err) + + s := GraphiteSerializer{Prefix: "prefix"} + mS := s.SerializeBucketName(m, "usage_idle") + + expS := fmt.Sprintf("prefix.localhost.cpu0.us-west-2.cpu.usage_idle") + assert.Equal(t, expS, mS) +}