Group stackdriver requests to send one point per timeseries (#5407)
This commit is contained in:
parent
431c58d84f
commit
5823fefb7a
|
@ -3,6 +3,7 @@ package stackdriver
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"hash/fnv"
|
||||||
"log"
|
"log"
|
||||||
"path"
|
"path"
|
||||||
"sort"
|
"sort"
|
||||||
|
@ -111,15 +112,34 @@ func sorted(metrics []telegraf.Metric) []telegraf.Metric {
|
||||||
return batch
|
return batch
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type timeSeriesBuckets map[uint64][]*monitoringpb.TimeSeries
|
||||||
|
|
||||||
|
func (tsb timeSeriesBuckets) Add(m telegraf.Metric, f *telegraf.Field, ts *monitoringpb.TimeSeries) {
|
||||||
|
h := fnv.New64a()
|
||||||
|
h.Write([]byte(m.Name()))
|
||||||
|
h.Write([]byte{'\n'})
|
||||||
|
h.Write([]byte(f.Key))
|
||||||
|
h.Write([]byte{'\n'})
|
||||||
|
for key, value := range m.Tags() {
|
||||||
|
h.Write([]byte(key))
|
||||||
|
h.Write([]byte{'\n'})
|
||||||
|
h.Write([]byte(value))
|
||||||
|
h.Write([]byte{'\n'})
|
||||||
|
}
|
||||||
|
k := h.Sum64()
|
||||||
|
|
||||||
|
s := tsb[k]
|
||||||
|
s = append(s, ts)
|
||||||
|
tsb[k] = s
|
||||||
|
}
|
||||||
|
|
||||||
// Write the metrics to Google Cloud Stackdriver.
|
// Write the metrics to Google Cloud Stackdriver.
|
||||||
func (s *Stackdriver) Write(metrics []telegraf.Metric) error {
|
func (s *Stackdriver) Write(metrics []telegraf.Metric) error {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
batch := sorted(metrics)
|
batch := sorted(metrics)
|
||||||
|
buckets := make(timeSeriesBuckets)
|
||||||
for _, m := range batch {
|
for _, m := range batch {
|
||||||
timeSeries := []*monitoringpb.TimeSeries{}
|
|
||||||
|
|
||||||
for _, f := range m.FieldList() {
|
for _, f := range m.FieldList() {
|
||||||
value, err := getStackdriverTypedValue(f.Value)
|
value, err := getStackdriverTypedValue(f.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -150,25 +170,50 @@ func (s *Stackdriver) Write(metrics []telegraf.Metric) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Prepare time series.
|
// Prepare time series.
|
||||||
timeSeries = append(timeSeries,
|
timeSeries := &monitoringpb.TimeSeries{
|
||||||
&monitoringpb.TimeSeries{
|
Metric: &metricpb.Metric{
|
||||||
Metric: &metricpb.Metric{
|
Type: path.Join("custom.googleapis.com", s.Namespace, m.Name(), f.Key),
|
||||||
Type: path.Join("custom.googleapis.com", s.Namespace, m.Name(), f.Key),
|
Labels: getStackdriverLabels(m.TagList()),
|
||||||
Labels: getStackdriverLabels(m.TagList()),
|
},
|
||||||
},
|
MetricKind: metricKind,
|
||||||
MetricKind: metricKind,
|
Resource: &monitoredrespb.MonitoredResource{
|
||||||
Resource: &monitoredrespb.MonitoredResource{
|
Type: s.ResourceType,
|
||||||
Type: s.ResourceType,
|
Labels: s.ResourceLabels,
|
||||||
Labels: s.ResourceLabels,
|
},
|
||||||
},
|
Points: []*monitoringpb.Point{
|
||||||
Points: []*monitoringpb.Point{
|
dataPoint,
|
||||||
dataPoint,
|
},
|
||||||
},
|
}
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(timeSeries) < 1 {
|
buckets.Add(m, f, timeSeries)
|
||||||
continue
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// process the buckets in order
|
||||||
|
keys := make([]uint64, 0, len(buckets))
|
||||||
|
for k := range buckets {
|
||||||
|
keys = append(keys, k)
|
||||||
|
}
|
||||||
|
sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })
|
||||||
|
|
||||||
|
for len(buckets) != 0 {
|
||||||
|
// can send up to 200 time series to stackdriver
|
||||||
|
timeSeries := make([]*monitoringpb.TimeSeries, 0, 200)
|
||||||
|
for i, k := range keys {
|
||||||
|
s := buckets[k]
|
||||||
|
timeSeries = append(timeSeries, s[0])
|
||||||
|
if len(s) == 1 {
|
||||||
|
delete(buckets, k)
|
||||||
|
keys = append(keys[:i], keys[i+1:]...)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
s = s[1:]
|
||||||
|
buckets[k] = s
|
||||||
|
|
||||||
|
if len(timeSeries) == cap(timeSeries) {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Prepare time series request.
|
// Prepare time series request.
|
||||||
|
|
|
@ -207,6 +207,89 @@ func TestWriteAscendingTime(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWriteBatchable(t *testing.T) {
|
||||||
|
expectedResponse := &emptypb.Empty{}
|
||||||
|
mockMetric.err = nil
|
||||||
|
mockMetric.reqs = nil
|
||||||
|
mockMetric.resps = append(mockMetric.resps[:0], expectedResponse)
|
||||||
|
|
||||||
|
c, err := monitoring.NewMetricClient(context.Background(), clientOpt)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s := &Stackdriver{
|
||||||
|
Project: fmt.Sprintf("projects/%s", "[PROJECT]"),
|
||||||
|
Namespace: "test",
|
||||||
|
client: c,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Metrics in descending order of timestamp
|
||||||
|
metrics := []telegraf.Metric{
|
||||||
|
testutil.MustMetric("cpu",
|
||||||
|
map[string]string{
|
||||||
|
"foo": "bar",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": 42,
|
||||||
|
},
|
||||||
|
time.Unix(2, 0),
|
||||||
|
),
|
||||||
|
testutil.MustMetric("cpu",
|
||||||
|
map[string]string{
|
||||||
|
"foo": "foo",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": 43,
|
||||||
|
},
|
||||||
|
time.Unix(3, 0),
|
||||||
|
),
|
||||||
|
testutil.MustMetric("cpu",
|
||||||
|
map[string]string{
|
||||||
|
"foo": "bar",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": 43,
|
||||||
|
},
|
||||||
|
time.Unix(1, 0),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
err = s.Connect()
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = s.Write(metrics)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Len(t, mockMetric.reqs, 2)
|
||||||
|
request := mockMetric.reqs[0].(*monitoringpb.CreateTimeSeriesRequest)
|
||||||
|
require.Len(t, request.TimeSeries, 2)
|
||||||
|
ts := request.TimeSeries[0]
|
||||||
|
require.Len(t, ts.Points, 1)
|
||||||
|
require.Equal(t, ts.Points[0].Interval, &monitoringpb.TimeInterval{
|
||||||
|
EndTime: &googlepb.Timestamp{
|
||||||
|
Seconds: 3,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
require.Equal(t, ts.Points[0].Value, &monitoringpb.TypedValue{
|
||||||
|
Value: &monitoringpb.TypedValue_Int64Value{
|
||||||
|
Int64Value: int64(43),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
ts = request.TimeSeries[1]
|
||||||
|
require.Len(t, ts.Points, 1)
|
||||||
|
require.Equal(t, ts.Points[0].Interval, &monitoringpb.TimeInterval{
|
||||||
|
EndTime: &googlepb.Timestamp{
|
||||||
|
Seconds: 1,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
require.Equal(t, ts.Points[0].Value, &monitoringpb.TypedValue{
|
||||||
|
Value: &monitoringpb.TypedValue_Int64Value{
|
||||||
|
Int64Value: int64(43),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestWriteIgnoredErrors(t *testing.T) {
|
func TestWriteIgnoredErrors(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
|
|
Loading…
Reference in New Issue