Fixing Librato plugin
This commit is contained in:
parent
6284e2011c
commit
7eaee5f2d3
|
@ -4,19 +4,24 @@ import (
|
|||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
"github.com/influxdata/telegraf/plugins/serializers/graphite"
|
||||
)
|
||||
|
||||
type Librato struct {
|
||||
ApiUser string
|
||||
ApiToken string
|
||||
SourceTag string
|
||||
Timeout internal.Duration
|
||||
ApiUser string
|
||||
ApiToken string
|
||||
Debug bool
|
||||
NameFromTags bool
|
||||
SourceTag string
|
||||
Timeout internal.Duration
|
||||
|
||||
apiUrl string
|
||||
client *http.Client
|
||||
|
@ -32,9 +37,12 @@ var sampleConfig = `
|
|||
## Librato API token
|
||||
api_token = "my-secret-token" # required.
|
||||
|
||||
## Tag Field to populate source attribute (optional)
|
||||
## This is typically the _hostname_ from which the metric was obtained.
|
||||
source_tag = "hostname"
|
||||
### Debug
|
||||
# debug = false
|
||||
|
||||
### Tag Field to populate source attribute (optional)
|
||||
### This is typically the _hostname_ from which the metric was obtained.
|
||||
source_tag = "host"
|
||||
|
||||
## Connection timeout.
|
||||
# timeout = "5s"
|
||||
|
@ -82,17 +90,27 @@ func (l *Librato) Write(metrics []telegraf.Metric) error {
|
|||
for _, gauge := range gauges {
|
||||
tempGauges = append(tempGauges, gauge)
|
||||
metricCounter++
|
||||
if l.Debug {
|
||||
log.Printf("[DEBUG] Got a gauge: %v\n", gauge)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log.Printf("unable to build Gauge for %s, skipping\n", m.Name())
|
||||
if l.Debug {
|
||||
log.Printf("[DEBUG] Couldn't build gauge: %v\n", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
lmetrics.Gauges = make([]*Gauge, metricCounter)
|
||||
copy(lmetrics.Gauges, tempGauges[0:])
|
||||
metricsBytes, err := json.Marshal(metrics)
|
||||
metricsBytes, err := json.Marshal(lmetrics)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to marshal Metrics, %s\n", err.Error())
|
||||
} else {
|
||||
if l.Debug {
|
||||
log.Printf("[DEBUG] Librato request: %v\n", string(metricsBytes))
|
||||
}
|
||||
}
|
||||
req, err := http.NewRequest("POST", l.apiUrl, bytes.NewBuffer(metricsBytes))
|
||||
if err != nil {
|
||||
|
@ -103,8 +121,21 @@ func (l *Librato) Write(metrics []telegraf.Metric) error {
|
|||
|
||||
resp, err := l.client.Do(req)
|
||||
if err != nil {
|
||||
if l.Debug {
|
||||
log.Printf("[DEBUG] Error POSTing metrics: %v\n", err.Error())
|
||||
}
|
||||
return fmt.Errorf("error POSTing metrics, %s\n", err.Error())
|
||||
} else {
|
||||
if l.Debug {
|
||||
htmlData, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
log.Printf("[DEBUG] Couldn't get response! (%v)\n", err)
|
||||
} else {
|
||||
log.Printf("[DEBUG] Librato response: %v\n", string(htmlData))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != 200 {
|
||||
|
@ -122,11 +153,20 @@ func (l *Librato) Description() string {
|
|||
return "Configuration for Librato API to send metrics to."
|
||||
}
|
||||
|
||||
func (l *Librato) buildGaugeName(m telegraf.Metric, fieldName string) string {
|
||||
// Use the GraphiteSerializer
|
||||
graphiteSerializer := graphite.GraphiteSerializer{}
|
||||
serializedMetric := graphiteSerializer.SerializeBucketName(m, fieldName)
|
||||
|
||||
// Deal with slash characters:
|
||||
return strings.Replace(serializedMetric, "/", "-", -1)
|
||||
}
|
||||
|
||||
func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) {
|
||||
gauges := []*Gauge{}
|
||||
for fieldName, value := range m.Fields() {
|
||||
gauge := &Gauge{
|
||||
Name: m.Name() + "_" + fieldName,
|
||||
Name: l.buildGaugeName(m, fieldName),
|
||||
MeasureTime: m.Time().Unix(),
|
||||
}
|
||||
if err := gauge.setValue(value); err != nil {
|
||||
|
@ -142,6 +182,10 @@ func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) {
|
|||
l.SourceTag)
|
||||
}
|
||||
}
|
||||
gauges = append(gauges, gauge)
|
||||
}
|
||||
if l.Debug {
|
||||
fmt.Printf("[DEBUG] Built gauges: %v\n", gauges)
|
||||
}
|
||||
return gauges, nil
|
||||
}
|
||||
|
|
|
@ -9,9 +9,9 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/serializers/graphite"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
@ -28,6 +28,14 @@ func fakeLibrato() *Librato {
|
|||
return l
|
||||
}
|
||||
|
||||
func BuildTags(t *testing.T) {
|
||||
testMetric := testutil.TestMetric(0.0, "test1")
|
||||
graphiteSerializer := graphite.GraphiteSerializer{}
|
||||
tags, err := graphiteSerializer.Serialize(testMetric)
|
||||
fmt.Printf("Tags: %v", tags)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestUriOverride(t *testing.T) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
|
@ -78,7 +86,7 @@ func TestBuildGauge(t *testing.T) {
|
|||
{
|
||||
testutil.TestMetric(0.0, "test1"),
|
||||
&Gauge{
|
||||
Name: "test1",
|
||||
Name: "value1.test1.value",
|
||||
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
|
||||
Value: 0.0,
|
||||
},
|
||||
|
@ -87,7 +95,7 @@ func TestBuildGauge(t *testing.T) {
|
|||
{
|
||||
testutil.TestMetric(1.0, "test2"),
|
||||
&Gauge{
|
||||
Name: "test2",
|
||||
Name: "value1.test2.value",
|
||||
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
|
||||
Value: 1.0,
|
||||
},
|
||||
|
@ -96,7 +104,7 @@ func TestBuildGauge(t *testing.T) {
|
|||
{
|
||||
testutil.TestMetric(10, "test3"),
|
||||
&Gauge{
|
||||
Name: "test3",
|
||||
Name: "value1.test3.value",
|
||||
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
|
||||
Value: 10.0,
|
||||
},
|
||||
|
@ -105,7 +113,7 @@ func TestBuildGauge(t *testing.T) {
|
|||
{
|
||||
testutil.TestMetric(int32(112345), "test4"),
|
||||
&Gauge{
|
||||
Name: "test4",
|
||||
Name: "value1.test4.value",
|
||||
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
|
||||
Value: 112345.0,
|
||||
},
|
||||
|
@ -114,7 +122,7 @@ func TestBuildGauge(t *testing.T) {
|
|||
{
|
||||
testutil.TestMetric(int64(112345), "test5"),
|
||||
&Gauge{
|
||||
Name: "test5",
|
||||
Name: "value1.test5.value",
|
||||
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
|
||||
Value: 112345.0,
|
||||
},
|
||||
|
@ -123,7 +131,7 @@ func TestBuildGauge(t *testing.T) {
|
|||
{
|
||||
testutil.TestMetric(float32(11234.5), "test6"),
|
||||
&Gauge{
|
||||
Name: "test6",
|
||||
Name: "value1.test6.value",
|
||||
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
|
||||
Value: 11234.5,
|
||||
},
|
||||
|
@ -132,7 +140,7 @@ func TestBuildGauge(t *testing.T) {
|
|||
{
|
||||
testutil.TestMetric("11234.5", "test7"),
|
||||
&Gauge{
|
||||
Name: "test7",
|
||||
Name: "value1.test7.value",
|
||||
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
|
||||
Value: 11234.5,
|
||||
},
|
||||
|
@ -163,13 +171,13 @@ func TestBuildGauge(t *testing.T) {
|
|||
func TestBuildGaugeWithSource(t *testing.T) {
|
||||
pt1, _ := telegraf.NewMetric(
|
||||
"test1",
|
||||
map[string]string{"hostname": "192.168.0.1"},
|
||||
map[string]string{"hostname": "192.168.0.1", "tag1": "value1"},
|
||||
map[string]interface{}{"value": 0.0},
|
||||
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||
)
|
||||
pt2, _ := telegraf.NewMetric(
|
||||
"test2",
|
||||
map[string]string{"hostnam": "192.168.0.1"},
|
||||
map[string]string{"hostnam": "192.168.0.1", "tag1": "value1"},
|
||||
map[string]interface{}{"value": 1.0},
|
||||
time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC),
|
||||
)
|
||||
|
@ -182,7 +190,7 @@ func TestBuildGaugeWithSource(t *testing.T) {
|
|||
{
|
||||
pt1,
|
||||
&Gauge{
|
||||
Name: "test1",
|
||||
Name: "192_168_0_1.value1.test1.value",
|
||||
MeasureTime: time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
|
||||
Value: 0.0,
|
||||
Source: "192.168.0.1",
|
||||
|
@ -192,7 +200,7 @@ func TestBuildGaugeWithSource(t *testing.T) {
|
|||
{
|
||||
pt2,
|
||||
&Gauge{
|
||||
Name: "test2",
|
||||
Name: "192_168_0_1.value1.test1.value",
|
||||
MeasureTime: time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC).Unix(),
|
||||
Value: 1.0,
|
||||
},
|
||||
|
|
|
@ -14,39 +14,49 @@ type GraphiteSerializer struct {
|
|||
|
||||
func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]string, error) {
|
||||
out := []string{}
|
||||
// Get name
|
||||
name := metric.Name()
|
||||
|
||||
// Convert UnixNano to Unix timestamps
|
||||
timestamp := metric.UnixNano() / 1000000000
|
||||
tag_str := buildTags(metric)
|
||||
|
||||
for field_name, value := range metric.Fields() {
|
||||
// Convert value
|
||||
value_str := fmt.Sprintf("%#v", value)
|
||||
// Write graphite metric
|
||||
var graphitePoint string
|
||||
if name == field_name {
|
||||
graphitePoint = fmt.Sprintf("%s.%s %s %d",
|
||||
tag_str,
|
||||
strings.Replace(name, ".", "_", -1),
|
||||
value_str,
|
||||
timestamp)
|
||||
} else {
|
||||
graphitePoint = fmt.Sprintf("%s.%s.%s %s %d",
|
||||
tag_str,
|
||||
strings.Replace(name, ".", "_", -1),
|
||||
strings.Replace(field_name, ".", "_", -1),
|
||||
value_str,
|
||||
timestamp)
|
||||
}
|
||||
if s.Prefix != "" {
|
||||
graphitePoint = fmt.Sprintf("%s.%s", s.Prefix, graphitePoint)
|
||||
}
|
||||
graphitePoint = fmt.Sprintf("%s %s %d",
|
||||
s.SerializeBucketName(metric, field_name),
|
||||
value_str,
|
||||
timestamp)
|
||||
out = append(out, graphitePoint)
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (s *GraphiteSerializer) SerializeBucketName(metric telegraf.Metric, field_name string) string {
|
||||
// Get the metric name
|
||||
name := metric.Name()
|
||||
|
||||
// Convert UnixNano to Unix timestamps
|
||||
tag_str := buildTags(metric)
|
||||
|
||||
// Write graphite metric
|
||||
var serializedBucketName string
|
||||
if name == field_name {
|
||||
serializedBucketName = fmt.Sprintf("%s.%s",
|
||||
tag_str,
|
||||
strings.Replace(name, ".", "_", -1))
|
||||
} else {
|
||||
serializedBucketName = fmt.Sprintf("%s.%s.%s",
|
||||
tag_str,
|
||||
strings.Replace(name, ".", "_", -1),
|
||||
strings.Replace(field_name, ".", "_", -1))
|
||||
}
|
||||
if s.Prefix != "" {
|
||||
serializedBucketName = fmt.Sprintf("%s.%s", s.Prefix, serializedBucketName)
|
||||
}
|
||||
return serializedBucketName
|
||||
}
|
||||
|
||||
func buildTags(metric telegraf.Metric) string {
|
||||
var keys []string
|
||||
tags := metric.Tags()
|
||||
|
|
|
@ -119,3 +119,62 @@ func TestSerializeMetricPrefix(t *testing.T) {
|
|||
sort.Strings(expS)
|
||||
assert.Equal(t, expS, mS)
|
||||
}
|
||||
|
||||
func TestSerializeBucketNameNoHost(t *testing.T) {
|
||||
now := time.Now()
|
||||
tags := map[string]string{
|
||||
"cpu": "cpu0",
|
||||
"datacenter": "us-west-2",
|
||||
}
|
||||
fields := map[string]interface{}{
|
||||
"usage_idle": float64(91.5),
|
||||
}
|
||||
m, err := telegraf.NewMetric("cpu", tags, fields, now)
|
||||
assert.NoError(t, err)
|
||||
|
||||
s := GraphiteSerializer{}
|
||||
mS := s.SerializeBucketName(m, "usage_idle")
|
||||
|
||||
expS := fmt.Sprintf("cpu0.us-west-2.cpu.usage_idle")
|
||||
assert.Equal(t, expS, mS)
|
||||
}
|
||||
|
||||
func TestSerializeBucketNameHost(t *testing.T) {
|
||||
now := time.Now()
|
||||
tags := map[string]string{
|
||||
"host": "localhost",
|
||||
"cpu": "cpu0",
|
||||
"datacenter": "us-west-2",
|
||||
}
|
||||
fields := map[string]interface{}{
|
||||
"usage_idle": float64(91.5),
|
||||
}
|
||||
m, err := telegraf.NewMetric("cpu", tags, fields, now)
|
||||
assert.NoError(t, err)
|
||||
|
||||
s := GraphiteSerializer{}
|
||||
mS := s.SerializeBucketName(m, "usage_idle")
|
||||
|
||||
expS := fmt.Sprintf("localhost.cpu0.us-west-2.cpu.usage_idle")
|
||||
assert.Equal(t, expS, mS)
|
||||
}
|
||||
|
||||
func TestSerializeBucketNamePrefix(t *testing.T) {
|
||||
now := time.Now()
|
||||
tags := map[string]string{
|
||||
"host": "localhost",
|
||||
"cpu": "cpu0",
|
||||
"datacenter": "us-west-2",
|
||||
}
|
||||
fields := map[string]interface{}{
|
||||
"usage_idle": float64(91.5),
|
||||
}
|
||||
m, err := telegraf.NewMetric("cpu", tags, fields, now)
|
||||
assert.NoError(t, err)
|
||||
|
||||
s := GraphiteSerializer{Prefix: "prefix"}
|
||||
mS := s.SerializeBucketName(m, "usage_idle")
|
||||
|
||||
expS := fmt.Sprintf("prefix.localhost.cpu0.us-west-2.cpu.usage_idle")
|
||||
assert.Equal(t, expS, mS)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue