package librato import ( "bytes" "encoding/json" "fmt" "io/ioutil" "log" "net/http" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/serializers/graphite" ) type Librato struct { ApiUser string ApiToken string Debug bool NameFromTags bool SourceTag string Timeout internal.Duration Template string apiUrl string client *http.Client } var sampleConfig = ` ## Librator API Docs ## http://dev.librato.com/v1/metrics-authentication ## Librato API user api_user = "telegraf@influxdb.com" # required. ## Librato API token api_token = "my-secret-token" # required. ## Debug # debug = false ## Tag Field to populate source attribute (optional) ## This is typically the _hostname_ from which the metric was obtained. source_tag = "host" ## Connection timeout. # timeout = "5s" ## Output Name Template (same as graphite buckets) ## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#graphite template = "host.tags.measurement.field" ` type LMetrics struct { Gauges []*Gauge `json:"gauges"` } type Gauge struct { Name string `json:"name"` Value float64 `json:"value"` Source string `json:"source"` MeasureTime int64 `json:"measure_time"` } const librato_api = "https://metrics-api.librato.com/v1/metrics" func NewLibrato(apiUrl string) *Librato { return &Librato{ apiUrl: apiUrl, } } func (l *Librato) Connect() error { if l.ApiUser == "" || l.ApiToken == "" { return fmt.Errorf("api_user and api_token are required fields for librato output") } l.client = &http.Client{ Timeout: l.Timeout.Duration, } return nil } func (l *Librato) Write(metrics []telegraf.Metric) error { if len(metrics) == 0 { return nil } lmetrics := LMetrics{} tempGauges := []*Gauge{} metricCounter := 0 for _, m := range metrics { if gauges, err := l.buildGauges(m); err == nil { for _, gauge := range gauges { tempGauges = append(tempGauges, gauge) metricCounter++ if l.Debug { log.Printf("[DEBUG] Got a gauge: %v\n", gauge) } } } else { log.Printf("unable to build Gauge for %s, skipping\n", m.Name()) if l.Debug { log.Printf("[DEBUG] Couldn't build gauge: %v\n", err) } } } lmetrics.Gauges = make([]*Gauge, metricCounter) copy(lmetrics.Gauges, tempGauges[0:]) metricsBytes, err := json.Marshal(lmetrics) if err != nil { return fmt.Errorf("unable to marshal Metrics, %s\n", err.Error()) } else { if l.Debug { log.Printf("[DEBUG] Librato request: %v\n", string(metricsBytes)) } } req, err := http.NewRequest("POST", l.apiUrl, bytes.NewBuffer(metricsBytes)) if err != nil { return fmt.Errorf("unable to create http.Request, %s\n", err.Error()) } req.Header.Add("Content-Type", "application/json") req.SetBasicAuth(l.ApiUser, l.ApiToken) resp, err := l.client.Do(req) if err != nil { if l.Debug { log.Printf("[DEBUG] Error POSTing metrics: %v\n", err.Error()) } return fmt.Errorf("error POSTing metrics, %s\n", err.Error()) } else { if l.Debug { htmlData, err := ioutil.ReadAll(resp.Body) if err != nil { log.Printf("[DEBUG] Couldn't get response! (%v)\n", err) } else { log.Printf("[DEBUG] Librato response: %v\n", string(htmlData)) } } } defer resp.Body.Close() if resp.StatusCode != 200 { return fmt.Errorf("received bad status code, %d\n", resp.StatusCode) } return nil } func (l *Librato) SampleConfig() string { return sampleConfig } func (l *Librato) Description() string { return "Configuration for Librato API to send metrics to." } func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) { gauges := []*Gauge{} bucket := graphite.SerializeBucketName(m.Name(), m.Tags(), l.Template, "") for fieldName, value := range m.Fields() { gauge := &Gauge{ Name: graphite.InsertField(bucket, fieldName), MeasureTime: m.Time().Unix(), } if !gauge.verifyValue(value) { continue } if err := gauge.setValue(value); err != nil { return gauges, fmt.Errorf("unable to extract value from Fields, %s\n", err.Error()) } if l.SourceTag != "" { if source, ok := m.Tags()[l.SourceTag]; ok { gauge.Source = source } else { return gauges, fmt.Errorf("undeterminable Source type from Field, %s\n", l.SourceTag) } } gauges = append(gauges, gauge) } if l.Debug { fmt.Printf("[DEBUG] Built gauges: %v\n", gauges) } return gauges, nil } func (g *Gauge) verifyValue(v interface{}) bool { switch v.(type) { case string: return false } return true } func (g *Gauge) setValue(v interface{}) error { switch d := v.(type) { case int: g.Value = float64(int(d)) case int32: g.Value = float64(int32(d)) case int64: g.Value = float64(int64(d)) case float32: g.Value = float64(d) case float64: g.Value = float64(d) default: return fmt.Errorf("undeterminable type %+v", d) } return nil } func (l *Librato) Close() error { return nil } func init() { outputs.Add("librato", func() telegraf.Output { return NewLibrato(librato_api) }) }