From 5823fefb7ab4754577df13da7ba62cf01677c78d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Edstr=C3=B6m?= <108799+Legogris@users.noreply.github.com> Date: Wed, 20 Feb 2019 22:16:02 +0100 Subject: [PATCH] Group stackdriver requests to send one point per timeseries (#5407) --- plugins/outputs/stackdriver/stackdriver.go | 87 ++++++++++++++----- .../outputs/stackdriver/stackdriver_test.go | 83 ++++++++++++++++++ 2 files changed, 149 insertions(+), 21 deletions(-) diff --git a/plugins/outputs/stackdriver/stackdriver.go b/plugins/outputs/stackdriver/stackdriver.go index 10823c8ed..c7d9e45bc 100644 --- a/plugins/outputs/stackdriver/stackdriver.go +++ b/plugins/outputs/stackdriver/stackdriver.go @@ -3,6 +3,7 @@ package stackdriver import ( "context" "fmt" + "hash/fnv" "log" "path" "sort" @@ -111,15 +112,34 @@ func sorted(metrics []telegraf.Metric) []telegraf.Metric { return batch } +type timeSeriesBuckets map[uint64][]*monitoringpb.TimeSeries + +func (tsb timeSeriesBuckets) Add(m telegraf.Metric, f *telegraf.Field, ts *monitoringpb.TimeSeries) { + h := fnv.New64a() + h.Write([]byte(m.Name())) + h.Write([]byte{'\n'}) + h.Write([]byte(f.Key)) + h.Write([]byte{'\n'}) + for key, value := range m.Tags() { + h.Write([]byte(key)) + h.Write([]byte{'\n'}) + h.Write([]byte(value)) + h.Write([]byte{'\n'}) + } + k := h.Sum64() + + s := tsb[k] + s = append(s, ts) + tsb[k] = s +} + // Write the metrics to Google Cloud Stackdriver. func (s *Stackdriver) Write(metrics []telegraf.Metric) error { ctx := context.Background() batch := sorted(metrics) - + buckets := make(timeSeriesBuckets) for _, m := range batch { - timeSeries := []*monitoringpb.TimeSeries{} - for _, f := range m.FieldList() { value, err := getStackdriverTypedValue(f.Value) if err != nil { @@ -150,25 +170,50 @@ func (s *Stackdriver) Write(metrics []telegraf.Metric) error { } // Prepare time series. - timeSeries = append(timeSeries, - &monitoringpb.TimeSeries{ - Metric: &metricpb.Metric{ - Type: path.Join("custom.googleapis.com", s.Namespace, m.Name(), f.Key), - Labels: getStackdriverLabels(m.TagList()), - }, - MetricKind: metricKind, - Resource: &monitoredrespb.MonitoredResource{ - Type: s.ResourceType, - Labels: s.ResourceLabels, - }, - Points: []*monitoringpb.Point{ - dataPoint, - }, - }) - } + timeSeries := &monitoringpb.TimeSeries{ + Metric: &metricpb.Metric{ + Type: path.Join("custom.googleapis.com", s.Namespace, m.Name(), f.Key), + Labels: getStackdriverLabels(m.TagList()), + }, + MetricKind: metricKind, + Resource: &monitoredrespb.MonitoredResource{ + Type: s.ResourceType, + Labels: s.ResourceLabels, + }, + Points: []*monitoringpb.Point{ + dataPoint, + }, + } - if len(timeSeries) < 1 { - continue + buckets.Add(m, f, timeSeries) + } + } + + // process the buckets in order + keys := make([]uint64, 0, len(buckets)) + for k := range buckets { + keys = append(keys, k) + } + sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] }) + + for len(buckets) != 0 { + // can send up to 200 time series to stackdriver + timeSeries := make([]*monitoringpb.TimeSeries, 0, 200) + for i, k := range keys { + s := buckets[k] + timeSeries = append(timeSeries, s[0]) + if len(s) == 1 { + delete(buckets, k) + keys = append(keys[:i], keys[i+1:]...) + continue + } + + s = s[1:] + buckets[k] = s + + if len(timeSeries) == cap(timeSeries) { + break + } } // Prepare time series request. diff --git a/plugins/outputs/stackdriver/stackdriver_test.go b/plugins/outputs/stackdriver/stackdriver_test.go index c60d72d36..151c84020 100644 --- a/plugins/outputs/stackdriver/stackdriver_test.go +++ b/plugins/outputs/stackdriver/stackdriver_test.go @@ -207,6 +207,89 @@ func TestWriteAscendingTime(t *testing.T) { }) } +func TestWriteBatchable(t *testing.T) { + expectedResponse := &emptypb.Empty{} + mockMetric.err = nil + mockMetric.reqs = nil + mockMetric.resps = append(mockMetric.resps[:0], expectedResponse) + + c, err := monitoring.NewMetricClient(context.Background(), clientOpt) + if err != nil { + t.Fatal(err) + } + + s := &Stackdriver{ + Project: fmt.Sprintf("projects/%s", "[PROJECT]"), + Namespace: "test", + client: c, + } + + // Metrics in descending order of timestamp + metrics := []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "foo": "bar", + }, + map[string]interface{}{ + "value": 42, + }, + time.Unix(2, 0), + ), + testutil.MustMetric("cpu", + map[string]string{ + "foo": "foo", + }, + map[string]interface{}{ + "value": 43, + }, + time.Unix(3, 0), + ), + testutil.MustMetric("cpu", + map[string]string{ + "foo": "bar", + }, + map[string]interface{}{ + "value": 43, + }, + time.Unix(1, 0), + ), + } + + err = s.Connect() + require.NoError(t, err) + err = s.Write(metrics) + require.NoError(t, err) + + require.Len(t, mockMetric.reqs, 2) + request := mockMetric.reqs[0].(*monitoringpb.CreateTimeSeriesRequest) + require.Len(t, request.TimeSeries, 2) + ts := request.TimeSeries[0] + require.Len(t, ts.Points, 1) + require.Equal(t, ts.Points[0].Interval, &monitoringpb.TimeInterval{ + EndTime: &googlepb.Timestamp{ + Seconds: 3, + }, + }) + require.Equal(t, ts.Points[0].Value, &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_Int64Value{ + Int64Value: int64(43), + }, + }) + + ts = request.TimeSeries[1] + require.Len(t, ts.Points, 1) + require.Equal(t, ts.Points[0].Interval, &monitoringpb.TimeInterval{ + EndTime: &googlepb.Timestamp{ + Seconds: 1, + }, + }) + require.Equal(t, ts.Points[0].Value, &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_Int64Value{ + Int64Value: int64(43), + }, + }) +} + func TestWriteIgnoredErrors(t *testing.T) { tests := []struct { name string