diff --git a/agent/accumulator.go b/agent/accumulator.go index 9361ad82e..a19aa035b 100644 --- a/agent/accumulator.go +++ b/agent/accumulator.go @@ -14,10 +14,13 @@ import ( func NewAccumulator( inputConfig *internal_models.InputConfig, metrics chan telegraf.Metric, + interval time.Duration, ) *accumulator { acc := accumulator{} acc.metrics = metrics acc.inputConfig = inputConfig + // at what interval are these being accumulated + acc.interval = interval return &acc } @@ -33,6 +36,8 @@ type accumulator struct { inputConfig *internal_models.InputConfig prefix string + + interval time.Duration } func (ac *accumulator) Add( @@ -140,6 +145,7 @@ func (ac *accumulator) AddFields( log.Printf("Error adding point [%s]: %s\n", measurement, err.Error()) return } + m.SetInterval(ac.interval) if ac.debug { fmt.Println("> " + m.String()) } diff --git a/agent/agent.go b/agent/agent.go index bd52e7875..2e66c5e77 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -116,7 +116,7 @@ func (a *Agent) gatherParallel(metricC chan telegraf.Metric) error { defer panicRecover(input) defer wg.Done() - acc := NewAccumulator(input.Config, metricC) + acc := NewAccumulator(input.Config, metricC, a.Config.Agent.Interval.Duration) acc.SetDebug(a.Config.Agent.Debug) acc.setDefaultTags(a.Config.Tags) @@ -167,7 +167,7 @@ func (a *Agent) gatherSeparate( var outerr error start := time.Now() - acc := NewAccumulator(input.Config, metricC) + acc := NewAccumulator(input.Config, metricC, input.Config.Interval) acc.SetDebug(a.Config.Agent.Debug) acc.setDefaultTags(a.Config.Tags) @@ -214,7 +214,7 @@ func (a *Agent) Test() error { }() for _, input := range a.Config.Inputs { - acc := NewAccumulator(input.Config, metricC) + acc := NewAccumulator(input.Config, metricC, input.Config.Interval) acc.SetDebug(true) fmt.Printf("* Plugin: %s, Collection 1\n", input.Name) diff --git a/metric.go b/metric.go index fcbb1b291..83e4dd5e1 100644 --- a/metric.go +++ b/metric.go @@ -20,6 +20,12 @@ type Metric interface { // UnixNano returns the unix nano time of the metric UnixNano() int64 + // Interval returns the interval at which the metric was collected + Interval() time.Duration + + // SetInterval sets the interval above. + SetInterval(interval time.Duration) + // Fields returns the fields for the metric Fields() map[string]interface{} @@ -35,7 +41,8 @@ type Metric interface { // metric is a wrapper of the influxdb client.Point struct type metric struct { - pt *client.Point + pt *client.Point + interval time.Duration } // NewMetric returns a metric with the given timestamp. If a timestamp is not @@ -79,6 +86,14 @@ func ParseMetrics(buf []byte) ([]Metric, error) { return metrics, err } +func (m *metric) Interval() time.Duration { + return m.interval +} + +func (m *metric) SetInterval(interval time.Duration) { + m.interval = interval +} + func (m *metric) Name() string { return m.pt.Name() } diff --git a/plugins/outputs/datadog/datadog.go b/plugins/outputs/datadog/datadog.go index d84322071..fec31d121 100644 --- a/plugins/outputs/datadog/datadog.go +++ b/plugins/outputs/datadog/datadog.go @@ -9,6 +9,7 @@ import ( "net/url" "sort" "strings" + "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" @@ -36,10 +37,13 @@ type TimeSeries struct { } type Metric struct { - Metric string `json:"metric"` - Points [1]Point `json:"points"` - Host string `json:"host"` - Tags []string `json:"tags,omitempty"` + Metric string `json:"metric"` + Points [1]Point `json:"points"` + Host string `json:"host"` + Tags []string `json:"tags,omitempty"` + Interval float64 `json:"interval,omitempty"` + Type string `json:"type"` + DeviceName string `json:"device_name"` } type Point [2]float64 @@ -67,37 +71,15 @@ func (d *Datadog) Write(metrics []telegraf.Metric) error { return nil } ts := TimeSeries{} - tempSeries := []*Metric{} - metricCounter := 0 for _, m := range metrics { - mname := strings.Replace(m.Name(), "_", ".", -1) if dogMs, err := buildMetrics(m); err == nil { - for fieldName, dogM := range dogMs { - // name of the datadog measurement - var dname string - if fieldName == "value" { - // adding .value seems redundant here - dname = mname - } else { - dname = mname + "." + strings.Replace(fieldName, "_", ".", -1) - } - metric := &Metric{ - Metric: dname, - Tags: buildTags(m.Tags()), - Host: m.Tags()["host"], - } - metric.Points[0] = dogM - tempSeries = append(tempSeries, metric) - metricCounter++ - } + ts.Series = append(ts.Series, dogMs...) } else { log.Printf("unable to build Metric for %s, skipping\n", m.Name()) } } - ts.Series = make([]*Metric, metricCounter) - copy(ts.Series, tempSeries[0:]) tsBytes, err := json.Marshal(ts) if err != nil { return fmt.Errorf("unable to marshal TimeSeries, %s\n", err.Error()) @@ -106,7 +88,11 @@ func (d *Datadog) Write(metrics []telegraf.Metric) error { if err != nil { return fmt.Errorf("unable to create http.Request, %s\n", err.Error()) } - req.Header.Add("Content-Type", "application/json") + req.Header.Set("Content-Type", "application/json") + // If we don't pretend to be dogstatsd, it ignores our carefully + // calculated type. + req.Header.Set("DD-Dogstatsd-Version", "5.6.3") + req.Header.Set("User-Agent", "python-requests/2.6.0 CPython/2.7.10") resp, err := d.client.Do(req) if err != nil { @@ -136,17 +122,44 @@ func (d *Datadog) authenticatedUrl() string { return fmt.Sprintf("%s?%s", d.apiUrl, q.Encode()) } -func buildMetrics(m telegraf.Metric) (map[string]Point, error) { - ms := make(map[string]Point) - for k, v := range m.Fields() { - var p Point - if err := p.setValue(v); err != nil { - return ms, fmt.Errorf("unable to extract value from Fields, %s", err.Error()) - } - p[0] = float64(m.Time().Unix()) - ms[k] = p +// Convert a telegraf metric to datadog metrics; +// we need a separate metric for each field. +// (also has magic for statsd field names) +func buildMetrics(m telegraf.Metric) ([]*Metric, error) { + var datadogMetrics []*Metric + tags := m.Tags() + metricType := tags["metric_type"] + baseDatadogMetric := Metric{ + Metric: m.Name(), + Tags: buildTags(tags), + Host: tags["host"], + Interval: float64(m.Interval()) / float64(time.Second), } - return ms, nil + + for field, value := range m.Fields() { + metric := baseDatadogMetric + if field != "value" { + metric.Metric += "_" + field + } + metric.Metric = strings.Replace(metric.Metric, "_", ".", -1) + p := &metric.Points[0] + p[0] = float64(m.Time().Unix()) + if err := p.setValue(value); err != nil { + return nil, fmt.Errorf("unable to extract value from Fields, %s", err.Error()) + } + + if metricType == "counter" || + ((metricType == "histogram" || metricType == "timing") && field == "count") { + metric.Type = "rate" + p[1] /= baseDatadogMetric.Interval + } else { + metric.Type = "gauge" + } + + datadogMetrics = append(datadogMetrics, &metric) + } + + return datadogMetrics, nil } func buildTags(mTags map[string]string) []string {