Send metrics in ascending time order in stackdriver output (#5385)

This commit is contained in:
Daniel Nelson 2019-02-06 14:17:51 -08:00 committed by GitHub
parent e65ab593b5
commit 7f54ae18b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 175 additions and 8 deletions

View File

@ -22,3 +22,15 @@ Metrics are grouped by the `namespace` variable and metric key - eg: `custom.goo
Stackdriver does not support string values in custom metrics, any string Stackdriver does not support string values in custom metrics, any string
fields will not be written. fields will not be written.
The Stackdriver API does not allow writing points which are out of order,
older than 24 hours, or more with resolution greater than than one per point
minute. Since Telegraf writes the newest points first and moves backwards
through the metric buffer, it may not be possible to write historical data
after an interruption.
Points collected with greater than 1 minute precision may need to be
aggregated before then can be written. Consider using the [basicstats][]
aggregator to do this.
[basicstats]: /plugins/aggregators/basicstats/README.md

View File

@ -5,6 +5,8 @@ import (
"fmt" "fmt"
"log" "log"
"path" "path"
"sort"
"strings"
monitoring "cloud.google.com/go/monitoring/apiv3" // Imports the Stackdriver Monitoring client package. monitoring "cloud.google.com/go/monitoring/apiv3" // Imports the Stackdriver Monitoring client package.
googlepb "github.com/golang/protobuf/ptypes/timestamp" googlepb "github.com/golang/protobuf/ptypes/timestamp"
@ -38,6 +40,10 @@ const (
StartTime = int64(1) StartTime = int64(1)
// MaxInt is the max int64 value. // MaxInt is the max int64 value.
MaxInt = int(^uint(0) >> 1) MaxInt = int(^uint(0) >> 1)
errStringPointsOutOfOrder = "One or more of the points specified had an older end time than the most recent point"
errStringPointsTooOld = "Data points cannot be written more than 24h in the past"
errStringPointsTooFrequent = "One or more points were written more frequently than the maximum sampling period configured for the metric"
) )
var sampleConfig = ` var sampleConfig = `
@ -70,17 +76,33 @@ func (s *Stackdriver) Connect() error {
return nil return nil
} }
// Sorted returns a copy of the metrics in time ascending order. A copy is
// made to avoid modifying the input metric slice since doing so is not
// allowed.
func sorted(metrics []telegraf.Metric) []telegraf.Metric {
batch := make([]telegraf.Metric, 0, len(metrics))
for i := len(metrics) - 1; i >= 0; i-- {
batch = append(batch, metrics[i])
}
sort.Slice(batch, func(i, j int) bool {
return batch[i].Time().Before(batch[j].Time())
})
return batch
}
// Write the metrics to Google Cloud Stackdriver. // Write the metrics to Google Cloud Stackdriver.
func (s *Stackdriver) Write(metrics []telegraf.Metric) error { func (s *Stackdriver) Write(metrics []telegraf.Metric) error {
ctx := context.Background() ctx := context.Background()
for _, m := range metrics { batch := sorted(metrics)
for _, m := range batch {
timeSeries := []*monitoringpb.TimeSeries{} timeSeries := []*monitoringpb.TimeSeries{}
for _, f := range m.FieldList() { for _, f := range m.FieldList() {
value, err := getStackdriverTypedValue(f.Value) value, err := getStackdriverTypedValue(f.Value)
if err != nil { if err != nil {
log.Printf("E! [output.stackdriver] get type failed: %s", err) log.Printf("E! [outputs.stackdriver] get type failed: %s", err)
continue continue
} }
@ -90,13 +112,13 @@ func (s *Stackdriver) Write(metrics []telegraf.Metric) error {
metricKind, err := getStackdriverMetricKind(m.Type()) metricKind, err := getStackdriverMetricKind(m.Type())
if err != nil { if err != nil {
log.Printf("E! [output.stackdriver] get metric failed: %s", err) log.Printf("E! [outputs.stackdriver] get metric failed: %s", err)
continue continue
} }
timeInterval, err := getStackdriverTimeInterval(metricKind, StartTime, m.Time().Unix()) timeInterval, err := getStackdriverTimeInterval(metricKind, StartTime, m.Time().Unix())
if err != nil { if err != nil {
log.Printf("E! [output.stackdriver] get time interval failed: %s", err) log.Printf("E! [outputs.stackdriver] get time interval failed: %s", err)
continue continue
} }
@ -139,7 +161,13 @@ func (s *Stackdriver) Write(metrics []telegraf.Metric) error {
// Create the time series in Stackdriver. // Create the time series in Stackdriver.
err := s.client.CreateTimeSeries(ctx, timeSeriesRequest) err := s.client.CreateTimeSeries(ctx, timeSeriesRequest)
if err != nil { if err != nil {
log.Printf("E! [output.stackdriver] unable to write to Stackdriver: %s", err) if strings.Contains(err.Error(), errStringPointsOutOfOrder) ||
strings.Contains(err.Error(), errStringPointsTooOld) ||
strings.Contains(err.Error(), errStringPointsTooFrequent) {
log.Printf("D! [outputs.stackdriver] unable to write to Stackdriver: %s", err)
return nil
}
log.Printf("E! [outputs.stackdriver] unable to write to Stackdriver: %s", err)
return err return err
} }
} }
@ -239,7 +267,7 @@ func getStackdriverLabels(tags []*telegraf.Tag) map[string]string {
for k, v := range labels { for k, v := range labels {
if len(k) > QuotaStringLengthForLabelKey { if len(k) > QuotaStringLengthForLabelKey {
log.Printf( log.Printf(
"W! [output.stackdriver] removing tag [%s] key exceeds string length for label key [%d]", "W! [outputs.stackdriver] removing tag [%s] key exceeds string length for label key [%d]",
k, k,
QuotaStringLengthForLabelKey, QuotaStringLengthForLabelKey,
) )
@ -248,7 +276,7 @@ func getStackdriverLabels(tags []*telegraf.Tag) map[string]string {
} }
if len(v) > QuotaStringLengthForLabelValue { if len(v) > QuotaStringLengthForLabelValue {
log.Printf( log.Printf(
"W! [output.stackdriver] removing tag [%s] value exceeds string length for label value [%d]", "W! [outputs.stackdriver] removing tag [%s] value exceeds string length for label value [%d]",
k, k,
QuotaStringLengthForLabelValue, QuotaStringLengthForLabelValue,
) )
@ -259,7 +287,7 @@ func getStackdriverLabels(tags []*telegraf.Tag) map[string]string {
if len(labels) > QuotaLabelsPerMetricDescriptor { if len(labels) > QuotaLabelsPerMetricDescriptor {
excess := len(labels) - QuotaLabelsPerMetricDescriptor excess := len(labels) - QuotaLabelsPerMetricDescriptor
log.Printf( log.Printf(
"W! [output.stackdriver] tag count [%d] exceeds quota for stackdriver labels [%d] removing [%d] random tags", "W! [outputs.stackdriver] tag count [%d] exceeds quota for stackdriver labels [%d] removing [%d] random tags",
len(labels), len(labels),
QuotaLabelsPerMetricDescriptor, QuotaLabelsPerMetricDescriptor,
excess, excess,

View File

@ -2,16 +2,19 @@ package stackdriver
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"log" "log"
"net" "net"
"os" "os"
"strings" "strings"
"testing" "testing"
"time"
monitoring "cloud.google.com/go/monitoring/apiv3" monitoring "cloud.google.com/go/monitoring/apiv3"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
emptypb "github.com/golang/protobuf/ptypes/empty" emptypb "github.com/golang/protobuf/ptypes/empty"
googlepb "github.com/golang/protobuf/ptypes/timestamp"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -96,6 +99,130 @@ func TestWrite(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
} }
func TestWriteAscendingTime(t *testing.T) {
expectedResponse := &emptypb.Empty{}
mockMetric.err = nil
mockMetric.reqs = nil
mockMetric.resps = append(mockMetric.resps[:0], expectedResponse)
c, err := monitoring.NewMetricClient(context.Background(), clientOpt)
if err != nil {
t.Fatal(err)
}
s := &Stackdriver{
Project: fmt.Sprintf("projects/%s", "[PROJECT]"),
Namespace: "test",
client: c,
}
// Metrics in descending order of timestamp
metrics := []telegraf.Metric{
testutil.MustMetric("cpu",
map[string]string{},
map[string]interface{}{
"value": 42,
},
time.Unix(2, 0),
),
testutil.MustMetric("cpu",
map[string]string{},
map[string]interface{}{
"value": 43,
},
time.Unix(1, 0),
),
}
err = s.Connect()
require.NoError(t, err)
err = s.Write(metrics)
require.NoError(t, err)
require.Len(t, mockMetric.reqs, 2)
request := mockMetric.reqs[0].(*monitoringpb.CreateTimeSeriesRequest)
require.Len(t, request.TimeSeries, 1)
ts := request.TimeSeries[0]
require.Len(t, ts.Points, 1)
require.Equal(t, ts.Points[0].Interval, &monitoringpb.TimeInterval{
EndTime: &googlepb.Timestamp{
Seconds: 1,
},
})
require.Equal(t, ts.Points[0].Value, &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_Int64Value{
Int64Value: int64(43),
},
})
request = mockMetric.reqs[1].(*monitoringpb.CreateTimeSeriesRequest)
require.Len(t, request.TimeSeries, 1)
ts = request.TimeSeries[0]
require.Len(t, ts.Points, 1)
require.Equal(t, ts.Points[0].Interval, &monitoringpb.TimeInterval{
EndTime: &googlepb.Timestamp{
Seconds: 2,
},
})
require.Equal(t, ts.Points[0].Value, &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_Int64Value{
Int64Value: int64(42),
},
})
}
func TestWriteIgnoredErrors(t *testing.T) {
tests := []struct {
name string
err error
expectedErr bool
}{
{
name: "points too old",
err: errors.New(errStringPointsTooOld),
},
{
name: "points out of order",
err: errors.New(errStringPointsOutOfOrder),
},
{
name: "points too frequent",
err: errors.New(errStringPointsTooFrequent),
},
{
name: "other errors reported",
err: errors.New("test"),
expectedErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mockMetric.err = tt.err
mockMetric.reqs = nil
c, err := monitoring.NewMetricClient(context.Background(), clientOpt)
if err != nil {
t.Fatal(err)
}
s := &Stackdriver{
Project: fmt.Sprintf("projects/%s", "[PROJECT]"),
Namespace: "test",
client: c,
}
err = s.Connect()
require.NoError(t, err)
err = s.Write(testutil.MockMetrics())
if tt.expectedErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}
func TestGetStackdriverLabels(t *testing.T) { func TestGetStackdriverLabels(t *testing.T) {
tags := []*telegraf.Tag{ tags := []*telegraf.Tag{
{Key: "project", Value: "bar"}, {Key: "project", Value: "bar"},