diff --git a/plugins/outputs/stackdriver/README.md b/plugins/outputs/stackdriver/README.md index ce3eb626e..a2d13e6e1 100644 --- a/plugins/outputs/stackdriver/README.md +++ b/plugins/outputs/stackdriver/README.md @@ -22,3 +22,15 @@ Metrics are grouped by the `namespace` variable and metric key - eg: `custom.goo Stackdriver does not support string values in custom metrics, any string fields will not be written. + +The Stackdriver API does not allow writing points which are out of order, +older than 24 hours, or more with resolution greater than than one per point +minute. Since Telegraf writes the newest points first and moves backwards +through the metric buffer, it may not be possible to write historical data +after an interruption. + +Points collected with greater than 1 minute precision may need to be +aggregated before then can be written. Consider using the [basicstats][] +aggregator to do this. + +[basicstats]: /plugins/aggregators/basicstats/README.md diff --git a/plugins/outputs/stackdriver/stackdriver.go b/plugins/outputs/stackdriver/stackdriver.go index a1cafdd98..a1c9e20da 100644 --- a/plugins/outputs/stackdriver/stackdriver.go +++ b/plugins/outputs/stackdriver/stackdriver.go @@ -5,6 +5,8 @@ import ( "fmt" "log" "path" + "sort" + "strings" monitoring "cloud.google.com/go/monitoring/apiv3" // Imports the Stackdriver Monitoring client package. googlepb "github.com/golang/protobuf/ptypes/timestamp" @@ -38,6 +40,10 @@ const ( StartTime = int64(1) // MaxInt is the max int64 value. MaxInt = int(^uint(0) >> 1) + + errStringPointsOutOfOrder = "One or more of the points specified had an older end time than the most recent point" + errStringPointsTooOld = "Data points cannot be written more than 24h in the past" + errStringPointsTooFrequent = "One or more points were written more frequently than the maximum sampling period configured for the metric" ) var sampleConfig = ` @@ -70,17 +76,33 @@ func (s *Stackdriver) Connect() error { return nil } +// Sorted returns a copy of the metrics in time ascending order. A copy is +// made to avoid modifying the input metric slice since doing so is not +// allowed. +func sorted(metrics []telegraf.Metric) []telegraf.Metric { + batch := make([]telegraf.Metric, 0, len(metrics)) + for i := len(metrics) - 1; i >= 0; i-- { + batch = append(batch, metrics[i]) + } + sort.Slice(batch, func(i, j int) bool { + return batch[i].Time().Before(batch[j].Time()) + }) + return batch +} + // Write the metrics to Google Cloud Stackdriver. func (s *Stackdriver) Write(metrics []telegraf.Metric) error { ctx := context.Background() - for _, m := range metrics { + batch := sorted(metrics) + + for _, m := range batch { timeSeries := []*monitoringpb.TimeSeries{} for _, f := range m.FieldList() { value, err := getStackdriverTypedValue(f.Value) if err != nil { - log.Printf("E! [output.stackdriver] get type failed: %s", err) + log.Printf("E! [outputs.stackdriver] get type failed: %s", err) continue } @@ -90,13 +112,13 @@ func (s *Stackdriver) Write(metrics []telegraf.Metric) error { metricKind, err := getStackdriverMetricKind(m.Type()) if err != nil { - log.Printf("E! [output.stackdriver] get metric failed: %s", err) + log.Printf("E! [outputs.stackdriver] get metric failed: %s", err) continue } timeInterval, err := getStackdriverTimeInterval(metricKind, StartTime, m.Time().Unix()) if err != nil { - log.Printf("E! [output.stackdriver] get time interval failed: %s", err) + log.Printf("E! [outputs.stackdriver] get time interval failed: %s", err) continue } @@ -139,7 +161,13 @@ func (s *Stackdriver) Write(metrics []telegraf.Metric) error { // Create the time series in Stackdriver. err := s.client.CreateTimeSeries(ctx, timeSeriesRequest) if err != nil { - log.Printf("E! [output.stackdriver] unable to write to Stackdriver: %s", err) + if strings.Contains(err.Error(), errStringPointsOutOfOrder) || + strings.Contains(err.Error(), errStringPointsTooOld) || + strings.Contains(err.Error(), errStringPointsTooFrequent) { + log.Printf("D! [outputs.stackdriver] unable to write to Stackdriver: %s", err) + return nil + } + log.Printf("E! [outputs.stackdriver] unable to write to Stackdriver: %s", err) return err } } @@ -239,7 +267,7 @@ func getStackdriverLabels(tags []*telegraf.Tag) map[string]string { for k, v := range labels { if len(k) > QuotaStringLengthForLabelKey { log.Printf( - "W! [output.stackdriver] removing tag [%s] key exceeds string length for label key [%d]", + "W! [outputs.stackdriver] removing tag [%s] key exceeds string length for label key [%d]", k, QuotaStringLengthForLabelKey, ) @@ -248,7 +276,7 @@ func getStackdriverLabels(tags []*telegraf.Tag) map[string]string { } if len(v) > QuotaStringLengthForLabelValue { log.Printf( - "W! [output.stackdriver] removing tag [%s] value exceeds string length for label value [%d]", + "W! [outputs.stackdriver] removing tag [%s] value exceeds string length for label value [%d]", k, QuotaStringLengthForLabelValue, ) @@ -259,7 +287,7 @@ func getStackdriverLabels(tags []*telegraf.Tag) map[string]string { if len(labels) > QuotaLabelsPerMetricDescriptor { excess := len(labels) - QuotaLabelsPerMetricDescriptor log.Printf( - "W! [output.stackdriver] tag count [%d] exceeds quota for stackdriver labels [%d] removing [%d] random tags", + "W! [outputs.stackdriver] tag count [%d] exceeds quota for stackdriver labels [%d] removing [%d] random tags", len(labels), QuotaLabelsPerMetricDescriptor, excess, diff --git a/plugins/outputs/stackdriver/stackdriver_test.go b/plugins/outputs/stackdriver/stackdriver_test.go index 94a3e6ce4..d9aab38fd 100644 --- a/plugins/outputs/stackdriver/stackdriver_test.go +++ b/plugins/outputs/stackdriver/stackdriver_test.go @@ -2,16 +2,19 @@ package stackdriver import ( "context" + "errors" "fmt" "log" "net" "os" "strings" "testing" + "time" monitoring "cloud.google.com/go/monitoring/apiv3" "github.com/golang/protobuf/proto" emptypb "github.com/golang/protobuf/ptypes/empty" + googlepb "github.com/golang/protobuf/ptypes/timestamp" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" @@ -96,6 +99,130 @@ func TestWrite(t *testing.T) { require.NoError(t, err) } +func TestWriteAscendingTime(t *testing.T) { + expectedResponse := &emptypb.Empty{} + mockMetric.err = nil + mockMetric.reqs = nil + mockMetric.resps = append(mockMetric.resps[:0], expectedResponse) + + c, err := monitoring.NewMetricClient(context.Background(), clientOpt) + if err != nil { + t.Fatal(err) + } + + s := &Stackdriver{ + Project: fmt.Sprintf("projects/%s", "[PROJECT]"), + Namespace: "test", + client: c, + } + + // Metrics in descending order of timestamp + metrics := []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42, + }, + time.Unix(2, 0), + ), + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "value": 43, + }, + time.Unix(1, 0), + ), + } + + err = s.Connect() + require.NoError(t, err) + err = s.Write(metrics) + require.NoError(t, err) + + require.Len(t, mockMetric.reqs, 2) + request := mockMetric.reqs[0].(*monitoringpb.CreateTimeSeriesRequest) + require.Len(t, request.TimeSeries, 1) + ts := request.TimeSeries[0] + require.Len(t, ts.Points, 1) + require.Equal(t, ts.Points[0].Interval, &monitoringpb.TimeInterval{ + EndTime: &googlepb.Timestamp{ + Seconds: 1, + }, + }) + require.Equal(t, ts.Points[0].Value, &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_Int64Value{ + Int64Value: int64(43), + }, + }) + + request = mockMetric.reqs[1].(*monitoringpb.CreateTimeSeriesRequest) + require.Len(t, request.TimeSeries, 1) + ts = request.TimeSeries[0] + require.Len(t, ts.Points, 1) + require.Equal(t, ts.Points[0].Interval, &monitoringpb.TimeInterval{ + EndTime: &googlepb.Timestamp{ + Seconds: 2, + }, + }) + require.Equal(t, ts.Points[0].Value, &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_Int64Value{ + Int64Value: int64(42), + }, + }) +} + +func TestWriteIgnoredErrors(t *testing.T) { + tests := []struct { + name string + err error + expectedErr bool + }{ + { + name: "points too old", + err: errors.New(errStringPointsTooOld), + }, + { + name: "points out of order", + err: errors.New(errStringPointsOutOfOrder), + }, + { + name: "points too frequent", + err: errors.New(errStringPointsTooFrequent), + }, + { + name: "other errors reported", + err: errors.New("test"), + expectedErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockMetric.err = tt.err + mockMetric.reqs = nil + + c, err := monitoring.NewMetricClient(context.Background(), clientOpt) + if err != nil { + t.Fatal(err) + } + + s := &Stackdriver{ + Project: fmt.Sprintf("projects/%s", "[PROJECT]"), + Namespace: "test", + client: c, + } + + err = s.Connect() + require.NoError(t, err) + err = s.Write(testutil.MockMetrics()) + if tt.expectedErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + func TestGetStackdriverLabels(t *testing.T) { tags := []*telegraf.Tag{ {Key: "project", Value: "bar"},