* Source improvement for librato output Build the source from the list of tag instead of a configuration specified single tag Graphite Serializer: * make buildTags public * make sure not to use empty tags Librato output: * Improve Error handling for librato API base on error or debug flag * Send Metric per Batch (max 300) * use Graphite BuildTag function to generate source The change is made that it should be retro compatible Metric sample: server=127.0.0.1 port=80 state=leader env=test measurement.metric_name value service_n.metric_x Metric before with source tags set as "server": source=127.0.0.1 test.80.127_0_0_1.leader.measurement.metric_name test.80.127_0_0_1.leader.service_n.metric_x Metric now: source=test.80.127.0.0.1.leader measurement.metric_name service_n.metric_x As you can see the source in the "new" version is much more precise That way when filter (only from source) you can filter by env or any other tags * Using template to specify which tagsusing for source, default concat all tags * revert change in graphite serializer * better documentation, change default for template * fmt * test passing with new host as default tags * use host tag in api integration test * Limit 80 char per line, change resolution to be a int in the sample * fmt * remove resolution, doc for template * fmt
271 lines
6.4 KiB
Go
271 lines
6.4 KiB
Go
package librato
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"log"
|
|
"net/http"
|
|
"regexp"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
"github.com/influxdata/telegraf/internal"
|
|
"github.com/influxdata/telegraf/plugins/outputs"
|
|
"github.com/influxdata/telegraf/plugins/serializers/graphite"
|
|
)
|
|
|
|
// Librato structure for configuration and client
|
|
type Librato struct {
|
|
APIUser string
|
|
APIToken string
|
|
Debug bool
|
|
SourceTag string // Deprecated, keeping for backward-compatibility
|
|
Timeout internal.Duration
|
|
Template string
|
|
|
|
APIUrl string
|
|
client *http.Client
|
|
}
|
|
|
|
// https://www.librato.com/docs/kb/faq/best_practices/naming_convention_metrics_sources.html#naming-limitations-for-sources-and-metrics
|
|
var reUnacceptedChar = regexp.MustCompile("[^.a-zA-Z0-9_-]")
|
|
|
|
var sampleConfig = `
|
|
## Librator API Docs
|
|
## http://dev.librato.com/v1/metrics-authentication
|
|
## Librato API user
|
|
api_user = "telegraf@influxdb.com" # required.
|
|
## Librato API token
|
|
api_token = "my-secret-token" # required.
|
|
## Debug
|
|
# debug = false
|
|
## Connection timeout.
|
|
# timeout = "5s"
|
|
## Output source Template (same as graphite buckets)
|
|
## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#graphite
|
|
## This template is used in librato's source (not metric's name)
|
|
template = "host"
|
|
|
|
`
|
|
|
|
// LMetrics is the default struct for Librato's API fromat
|
|
type LMetrics struct {
|
|
Gauges []*Gauge `json:"gauges"`
|
|
}
|
|
|
|
// Gauge is the gauge format for Librato's API fromat
|
|
type Gauge struct {
|
|
Name string `json:"name"`
|
|
Value float64 `json:"value"`
|
|
Source string `json:"source"`
|
|
MeasureTime int64 `json:"measure_time"`
|
|
}
|
|
|
|
const libratoAPI = "https://metrics-api.librato.com/v1/metrics"
|
|
|
|
// NewLibrato is the main constructor for librato output plugins
|
|
func NewLibrato(apiURL string) *Librato {
|
|
return &Librato{
|
|
APIUrl: apiURL,
|
|
Template: "host",
|
|
}
|
|
}
|
|
|
|
// Connect is the default output plugin connection function who make sure it
|
|
// can connect to the endpoint
|
|
func (l *Librato) Connect() error {
|
|
if l.APIUser == "" || l.APIToken == "" {
|
|
return fmt.Errorf(
|
|
"api_user and api_token are required fields for librato output")
|
|
}
|
|
l.client = &http.Client{
|
|
Timeout: l.Timeout.Duration,
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (l *Librato) Write(metrics []telegraf.Metric) error {
|
|
|
|
if len(metrics) == 0 {
|
|
return nil
|
|
}
|
|
if l.Template == "" {
|
|
l.Template = "host"
|
|
}
|
|
if l.SourceTag != "" {
|
|
l.Template = l.SourceTag
|
|
}
|
|
|
|
tempGauges := []*Gauge{}
|
|
|
|
for _, m := range metrics {
|
|
if gauges, err := l.buildGauges(m); err == nil {
|
|
for _, gauge := range gauges {
|
|
tempGauges = append(tempGauges, gauge)
|
|
if l.Debug {
|
|
log.Printf("[DEBUG] Got a gauge: %v\n", gauge)
|
|
}
|
|
}
|
|
} else {
|
|
log.Printf("unable to build Gauge for %s, skipping\n", m.Name())
|
|
if l.Debug {
|
|
log.Printf("[DEBUG] Couldn't build gauge: %v\n", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
metricCounter := len(tempGauges)
|
|
// make sur we send a batch of maximum 300
|
|
sizeBatch := 300
|
|
for start := 0; start < metricCounter; start += sizeBatch {
|
|
lmetrics := LMetrics{}
|
|
end := start + sizeBatch
|
|
if end > metricCounter {
|
|
end = metricCounter
|
|
sizeBatch = end - start
|
|
}
|
|
lmetrics.Gauges = make([]*Gauge, sizeBatch)
|
|
copy(lmetrics.Gauges, tempGauges[start:end])
|
|
metricsBytes, err := json.Marshal(lmetrics)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to marshal Metrics, %s\n", err.Error())
|
|
}
|
|
|
|
if l.Debug {
|
|
log.Printf("[DEBUG] Librato request: %v\n", string(metricsBytes))
|
|
}
|
|
|
|
req, err := http.NewRequest(
|
|
"POST",
|
|
l.APIUrl,
|
|
bytes.NewBuffer(metricsBytes))
|
|
if err != nil {
|
|
return fmt.Errorf(
|
|
"unable to create http.Request, %s\n",
|
|
err.Error())
|
|
}
|
|
req.Header.Add("Content-Type", "application/json")
|
|
req.SetBasicAuth(l.APIUser, l.APIToken)
|
|
|
|
resp, err := l.client.Do(req)
|
|
if err != nil {
|
|
if l.Debug {
|
|
log.Printf("[DEBUG] Error POSTing metrics: %v\n", err.Error())
|
|
}
|
|
return fmt.Errorf("error POSTing metrics, %s\n", err.Error())
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != 200 || l.Debug {
|
|
htmlData, err := ioutil.ReadAll(resp.Body)
|
|
if err != nil {
|
|
log.Printf("[DEBUG] Couldn't get response! (%v)\n", err)
|
|
}
|
|
if resp.StatusCode != 200 {
|
|
return fmt.Errorf(
|
|
"received bad status code, %d\n %s",
|
|
resp.StatusCode,
|
|
string(htmlData))
|
|
}
|
|
if l.Debug {
|
|
log.Printf("[DEBUG] Librato response: %v\n", string(htmlData))
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// SampleConfig is function who return the default configuration for this
|
|
// output
|
|
func (l *Librato) SampleConfig() string {
|
|
return sampleConfig
|
|
}
|
|
|
|
// Description is function who return the Description of this output
|
|
func (l *Librato) Description() string {
|
|
return "Configuration for Librato API to send metrics to."
|
|
}
|
|
|
|
func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) {
|
|
|
|
gauges := []*Gauge{}
|
|
if m.Time().Unix() == 0 {
|
|
return gauges, fmt.Errorf(
|
|
"Measure time must not be zero\n <%s> \n",
|
|
m.String())
|
|
}
|
|
metricSource := graphite.InsertField(
|
|
graphite.SerializeBucketName("", m.Tags(), l.Template, ""),
|
|
"value")
|
|
if metricSource == "" {
|
|
return gauges,
|
|
fmt.Errorf("undeterminable Source type from Field, %s\n",
|
|
l.Template)
|
|
}
|
|
for fieldName, value := range m.Fields() {
|
|
|
|
metricName := m.Name()
|
|
if fieldName != "value" {
|
|
metricName = fmt.Sprintf("%s.%s", m.Name(), fieldName)
|
|
}
|
|
|
|
gauge := &Gauge{
|
|
Source: reUnacceptedChar.ReplaceAllString(metricSource, "-"),
|
|
Name: reUnacceptedChar.ReplaceAllString(metricName, "-"),
|
|
MeasureTime: m.Time().Unix(),
|
|
}
|
|
if !verifyValue(value) {
|
|
continue
|
|
}
|
|
if err := gauge.setValue(value); err != nil {
|
|
return gauges, fmt.Errorf(
|
|
"unable to extract value from Fields, %s\n",
|
|
err.Error())
|
|
}
|
|
gauges = append(gauges, gauge)
|
|
}
|
|
if l.Debug {
|
|
fmt.Printf("[DEBUG] Built gauges: %v\n", gauges)
|
|
}
|
|
return gauges, nil
|
|
}
|
|
|
|
func verifyValue(v interface{}) bool {
|
|
switch v.(type) {
|
|
case string:
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (g *Gauge) setValue(v interface{}) error {
|
|
switch d := v.(type) {
|
|
case int:
|
|
g.Value = float64(int(d))
|
|
case int32:
|
|
g.Value = float64(int32(d))
|
|
case int64:
|
|
g.Value = float64(int64(d))
|
|
case float32:
|
|
g.Value = float64(d)
|
|
case float64:
|
|
g.Value = float64(d)
|
|
default:
|
|
return fmt.Errorf("undeterminable type %+v", d)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
//Close is used to close the connection to librato Output
|
|
func (l *Librato) Close() error {
|
|
return nil
|
|
}
|
|
|
|
func init() {
|
|
outputs.Add("librato", func() telegraf.Output {
|
|
return NewLibrato(libratoAPI)
|
|
})
|
|
}
|