PoC for datadog metric types

Sadly, you have to pretend to be the datadog agent...
This commit is contained in:
James Haggerty 2016-01-29 07:22:07 +00:00
parent f088dd7e00
commit 2df4b21dbf
4 changed files with 76 additions and 42 deletions

View File

@ -14,10 +14,13 @@ import (
func NewAccumulator( func NewAccumulator(
inputConfig *internal_models.InputConfig, inputConfig *internal_models.InputConfig,
metrics chan telegraf.Metric, metrics chan telegraf.Metric,
interval time.Duration,
) *accumulator { ) *accumulator {
acc := accumulator{} acc := accumulator{}
acc.metrics = metrics acc.metrics = metrics
acc.inputConfig = inputConfig acc.inputConfig = inputConfig
// at what interval are these being accumulated
acc.interval = interval
return &acc return &acc
} }
@ -33,6 +36,8 @@ type accumulator struct {
inputConfig *internal_models.InputConfig inputConfig *internal_models.InputConfig
prefix string prefix string
interval time.Duration
} }
func (ac *accumulator) Add( func (ac *accumulator) Add(
@ -140,6 +145,7 @@ func (ac *accumulator) AddFields(
log.Printf("Error adding point [%s]: %s\n", measurement, err.Error()) log.Printf("Error adding point [%s]: %s\n", measurement, err.Error())
return return
} }
m.SetInterval(ac.interval)
if ac.debug { if ac.debug {
fmt.Println("> " + m.String()) fmt.Println("> " + m.String())
} }

View File

@ -116,7 +116,7 @@ func (a *Agent) gatherParallel(metricC chan telegraf.Metric) error {
defer panicRecover(input) defer panicRecover(input)
defer wg.Done() defer wg.Done()
acc := NewAccumulator(input.Config, metricC) acc := NewAccumulator(input.Config, metricC, a.Config.Agent.Interval.Duration)
acc.SetDebug(a.Config.Agent.Debug) acc.SetDebug(a.Config.Agent.Debug)
acc.setDefaultTags(a.Config.Tags) acc.setDefaultTags(a.Config.Tags)
@ -167,7 +167,7 @@ func (a *Agent) gatherSeparate(
var outerr error var outerr error
start := time.Now() start := time.Now()
acc := NewAccumulator(input.Config, metricC) acc := NewAccumulator(input.Config, metricC, input.Config.Interval)
acc.SetDebug(a.Config.Agent.Debug) acc.SetDebug(a.Config.Agent.Debug)
acc.setDefaultTags(a.Config.Tags) acc.setDefaultTags(a.Config.Tags)
@ -214,7 +214,7 @@ func (a *Agent) Test() error {
}() }()
for _, input := range a.Config.Inputs { for _, input := range a.Config.Inputs {
acc := NewAccumulator(input.Config, metricC) acc := NewAccumulator(input.Config, metricC, input.Config.Interval)
acc.SetDebug(true) acc.SetDebug(true)
fmt.Printf("* Plugin: %s, Collection 1\n", input.Name) fmt.Printf("* Plugin: %s, Collection 1\n", input.Name)

View File

@ -20,6 +20,12 @@ type Metric interface {
// UnixNano returns the unix nano time of the metric // UnixNano returns the unix nano time of the metric
UnixNano() int64 UnixNano() int64
// Interval returns the interval at which the metric was collected
Interval() time.Duration
// SetInterval sets the interval above.
SetInterval(interval time.Duration)
// Fields returns the fields for the metric // Fields returns the fields for the metric
Fields() map[string]interface{} Fields() map[string]interface{}
@ -35,7 +41,8 @@ type Metric interface {
// metric is a wrapper of the influxdb client.Point struct // metric is a wrapper of the influxdb client.Point struct
type metric struct { type metric struct {
pt *client.Point pt *client.Point
interval time.Duration
} }
// NewMetric returns a metric with the given timestamp. If a timestamp is not // NewMetric returns a metric with the given timestamp. If a timestamp is not
@ -79,6 +86,14 @@ func ParseMetrics(buf []byte) ([]Metric, error) {
return metrics, err return metrics, err
} }
func (m *metric) Interval() time.Duration {
return m.interval
}
func (m *metric) SetInterval(interval time.Duration) {
m.interval = interval
}
func (m *metric) Name() string { func (m *metric) Name() string {
return m.pt.Name() return m.pt.Name()
} }

View File

@ -9,6 +9,7 @@ import (
"net/url" "net/url"
"sort" "sort"
"strings" "strings"
"time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
@ -36,10 +37,13 @@ type TimeSeries struct {
} }
type Metric struct { type Metric struct {
Metric string `json:"metric"` Metric string `json:"metric"`
Points [1]Point `json:"points"` Points [1]Point `json:"points"`
Host string `json:"host"` Host string `json:"host"`
Tags []string `json:"tags,omitempty"` Tags []string `json:"tags,omitempty"`
Interval float64 `json:"interval,omitempty"`
Type string `json:"type"`
DeviceName string `json:"device_name"`
} }
type Point [2]float64 type Point [2]float64
@ -67,37 +71,15 @@ func (d *Datadog) Write(metrics []telegraf.Metric) error {
return nil return nil
} }
ts := TimeSeries{} ts := TimeSeries{}
tempSeries := []*Metric{}
metricCounter := 0
for _, m := range metrics { for _, m := range metrics {
mname := strings.Replace(m.Name(), "_", ".", -1)
if dogMs, err := buildMetrics(m); err == nil { if dogMs, err := buildMetrics(m); err == nil {
for fieldName, dogM := range dogMs { ts.Series = append(ts.Series, dogMs...)
// name of the datadog measurement
var dname string
if fieldName == "value" {
// adding .value seems redundant here
dname = mname
} else {
dname = mname + "." + strings.Replace(fieldName, "_", ".", -1)
}
metric := &Metric{
Metric: dname,
Tags: buildTags(m.Tags()),
Host: m.Tags()["host"],
}
metric.Points[0] = dogM
tempSeries = append(tempSeries, metric)
metricCounter++
}
} else { } else {
log.Printf("unable to build Metric for %s, skipping\n", m.Name()) log.Printf("unable to build Metric for %s, skipping\n", m.Name())
} }
} }
ts.Series = make([]*Metric, metricCounter)
copy(ts.Series, tempSeries[0:])
tsBytes, err := json.Marshal(ts) tsBytes, err := json.Marshal(ts)
if err != nil { if err != nil {
return fmt.Errorf("unable to marshal TimeSeries, %s\n", err.Error()) return fmt.Errorf("unable to marshal TimeSeries, %s\n", err.Error())
@ -106,7 +88,11 @@ func (d *Datadog) Write(metrics []telegraf.Metric) error {
if err != nil { if err != nil {
return fmt.Errorf("unable to create http.Request, %s\n", err.Error()) return fmt.Errorf("unable to create http.Request, %s\n", err.Error())
} }
req.Header.Add("Content-Type", "application/json") req.Header.Set("Content-Type", "application/json")
// If we don't pretend to be dogstatsd, it ignores our carefully
// calculated type.
req.Header.Set("DD-Dogstatsd-Version", "5.6.3")
req.Header.Set("User-Agent", "python-requests/2.6.0 CPython/2.7.10")
resp, err := d.client.Do(req) resp, err := d.client.Do(req)
if err != nil { if err != nil {
@ -136,17 +122,44 @@ func (d *Datadog) authenticatedUrl() string {
return fmt.Sprintf("%s?%s", d.apiUrl, q.Encode()) return fmt.Sprintf("%s?%s", d.apiUrl, q.Encode())
} }
func buildMetrics(m telegraf.Metric) (map[string]Point, error) { // Convert a telegraf metric to datadog metrics;
ms := make(map[string]Point) // we need a separate metric for each field.
for k, v := range m.Fields() { // (also has magic for statsd field names)
var p Point func buildMetrics(m telegraf.Metric) ([]*Metric, error) {
if err := p.setValue(v); err != nil { var datadogMetrics []*Metric
return ms, fmt.Errorf("unable to extract value from Fields, %s", err.Error()) tags := m.Tags()
} metricType := tags["metric_type"]
p[0] = float64(m.Time().Unix()) baseDatadogMetric := Metric{
ms[k] = p Metric: m.Name(),
Tags: buildTags(tags),
Host: tags["host"],
Interval: float64(m.Interval()) / float64(time.Second),
} }
return ms, nil
for field, value := range m.Fields() {
metric := baseDatadogMetric
if field != "value" {
metric.Metric += "_" + field
}
metric.Metric = strings.Replace(metric.Metric, "_", ".", -1)
p := &metric.Points[0]
p[0] = float64(m.Time().Unix())
if err := p.setValue(value); err != nil {
return nil, fmt.Errorf("unable to extract value from Fields, %s", err.Error())
}
if metricType == "counter" ||
((metricType == "histogram" || metricType == "timing") && field == "count") {
metric.Type = "rate"
p[1] /= baseDatadogMetric.Interval
} else {
metric.Type = "gauge"
}
datadogMetrics = append(datadogMetrics, &metric)
}
return datadogMetrics, nil
} }
func buildTags(mTags map[string]string) []string { func buildTags(mTags map[string]string) []string {