From 843d842d02db291ceafa209704a7cf2c674ef921 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Wed, 20 Feb 2019 13:23:59 -0800 Subject: [PATCH] Add stackdriver input plugin (#5406) --- README.md | 1 + metric/series_grouper.go | 86 ++ plugins/inputs/all/all.go | 1 + plugins/inputs/stackdriver/README.md | 161 +++ plugins/inputs/stackdriver/stackdriver.go | 709 +++++++++++ .../inputs/stackdriver/stackdriver_test.go | 1125 +++++++++++++++++ testutil/metric.go | 8 + 7 files changed, 2091 insertions(+) create mode 100644 metric/series_grouper.go create mode 100644 plugins/inputs/stackdriver/README.md create mode 100644 plugins/inputs/stackdriver/stackdriver.go create mode 100644 plugins/inputs/stackdriver/stackdriver_test.go diff --git a/README.md b/README.md index 9ffc7d66b..96f797c73 100644 --- a/README.md +++ b/README.md @@ -253,6 +253,7 @@ For documentation on the latest development code see the [documentation index][d * [socket_listener](./plugins/inputs/socket_listener) * [solr](./plugins/inputs/solr) * [sql server](./plugins/inputs/sqlserver) (microsoft) +* [stackdriver](./plugins/inputs/stackdriver) * [statsd](./plugins/inputs/statsd) * [swap](./plugins/inputs/swap) * [syslog](./plugins/inputs/syslog) diff --git a/metric/series_grouper.go b/metric/series_grouper.go new file mode 100644 index 000000000..5dc66e11b --- /dev/null +++ b/metric/series_grouper.go @@ -0,0 +1,86 @@ +package metric + +import ( + "hash/fnv" + "io" + "sort" + "strconv" + "time" + + "github.com/influxdata/telegraf" +) + +// NewSeriesGrouper returns a type that can be used to group fields by series +// and time, so that fields which share these values will be combined into a +// single telegraf.Metric. +// +// This is useful to build telegraf.Metric's when all fields for a series are +// not available at once. +// +// ex: +// - cpu,host=localhost usage_time=42 +// - cpu,host=localhost idle_time=42 +// + cpu,host=localhost idle_time=42,usage_time=42 +func NewSeriesGrouper() *SeriesGrouper { + return &SeriesGrouper{ + metrics: make(map[uint64]telegraf.Metric), + ordered: []telegraf.Metric{}, + } +} + +type SeriesGrouper struct { + metrics map[uint64]telegraf.Metric + ordered []telegraf.Metric +} + +// Add adds a field key and value to the series. +func (g *SeriesGrouper) Add( + measurement string, + tags map[string]string, + tm time.Time, + field string, + fieldValue interface{}, +) error { + var err error + id := groupID(measurement, tags, tm) + metric := g.metrics[id] + if metric == nil { + metric, err = New(measurement, tags, map[string]interface{}{field: fieldValue}, tm) + if err != nil { + return err + } + g.metrics[id] = metric + g.ordered = append(g.ordered, metric) + } else { + metric.AddField(field, fieldValue) + } + return nil +} + +// Metrics returns the metrics grouped by series and time. +func (g *SeriesGrouper) Metrics() []telegraf.Metric { + return g.ordered +} + +func groupID(measurement string, tags map[string]string, tm time.Time) uint64 { + h := fnv.New64a() + h.Write([]byte(measurement)) + h.Write([]byte("\n")) + + taglist := make([]*telegraf.Tag, 0, len(tags)) + for k, v := range tags { + taglist = append(taglist, + &telegraf.Tag{Key: k, Value: v}) + } + sort.Slice(taglist, func(i, j int) bool { return taglist[i].Key < taglist[j].Key }) + for _, tag := range taglist { + h.Write([]byte(tag.Key)) + h.Write([]byte("\n")) + h.Write([]byte(tag.Value)) + h.Write([]byte("\n")) + } + h.Write([]byte("\n")) + + io.WriteString(h, strconv.FormatInt(tm.UnixNano(), 10)) + return h.Sum64() +} diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 2435e1519..fe440bbba 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -122,6 +122,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/socket_listener" _ "github.com/influxdata/telegraf/plugins/inputs/solr" _ "github.com/influxdata/telegraf/plugins/inputs/sqlserver" + _ "github.com/influxdata/telegraf/plugins/inputs/stackdriver" _ "github.com/influxdata/telegraf/plugins/inputs/statsd" _ "github.com/influxdata/telegraf/plugins/inputs/swap" _ "github.com/influxdata/telegraf/plugins/inputs/syslog" diff --git a/plugins/inputs/stackdriver/README.md b/plugins/inputs/stackdriver/README.md new file mode 100644 index 000000000..f2ec1471b --- /dev/null +++ b/plugins/inputs/stackdriver/README.md @@ -0,0 +1,161 @@ +# Stackdriver Input Plugin + +Stackdriver gathers metrics from the [Stackdriver Monitoring API][stackdriver]. + +This plugin accesses APIs which are [chargeable][pricing]; you might incur +costs. + +### Configuration + +```toml +[[inputs.stackdriver]] + ## GCP Project + project = "erudite-bloom-151019" + + ## Include timeseries that start with the given metric type. + metric_type_prefix_include = [ + "compute.googleapis.com/", + ] + + ## Exclude timeseries that start with the given metric type. + # metric_type_prefix_exclude = [] + + ## Most metrics are updated no more than once per minute; it is recommended + ## to override the agent level interval with a value of 1m or greater. + interval = "1m" + + ## Maximum number of API calls to make per second. The quota for accounts + ## varies, it can be viewed on the API dashboard: + ## https://cloud.google.com/monitoring/quotas#quotas_and_limits + # rate_limit = 14 + + ## The delay and window options control the number of points selected on + ## each gather. When set, metrics are gathered between: + ## start: now() - delay - window + ## end: now() - delay + # + ## Collection delay; if set too low metrics may not yet be available. + # delay = "5m" + # + ## If unset, the window will start at 1m and be updated dynamically to span + ## the time between calls (approximately the length of the plugin interval). + # window = "1m" + + ## TTL for cached list of metric types. This is the maximum amount of time + ## it may take to discover new metrics. + # cache_ttl = "1h" + + ## If true, raw bucket counts are collected for distribution value types. + ## For a more lightweight collection, you may wish to disable and use + ## distribution_aggregation_aligners instead. + # gather_raw_distribution_buckets = true + + ## Aggregate functions to be used for metrics whose value type is + ## distribution. These aggregate values are recorded in in addition to raw + ## bucket counts; if they are enabled. + ## + ## For a list of aligner strings see: + ## https://cloud.google.com/monitoring/api/ref_v3/rpc/google.monitoring.v3#aligner + # distribution_aggregation_aligners = [ + # "ALIGN_PERCENTILE_99", + # "ALIGN_PERCENTILE_95", + # "ALIGN_PERCENTILE_50", + # ] + + ## Filters can be added to reduce the number of time series matched. All + ## functions are supported: starts_with, ends_with, has_substring, and + ## one_of. Only the '=' operator is supported. + ## + ## The logical operators when combining filters are defined statically using + ## the following values: + ## filter ::= {AND } + ## resource_labels ::= {OR } + ## metric_labels ::= {OR } + ## + ## For more details, see https://cloud.google.com/monitoring/api/v3/filters + # + ## Resource labels refine the time series selection with the following expression: + ## resource.labels. = + # [[inputs.stackdriver.filter.resource_labels]] + # key = "instance_name" + # value = 'starts_with("localhost")' + # + ## Metric labels refine the time series selection with the following expression: + ## metric.labels. = + # [[inputs.stackdriver.filter.metric_labels]] + # key = "device_name" + # value = 'one_of("sda", "sdb")' +``` + +#### Authentication + +It is recommended to use a service account to authenticate with the +Stackdriver Monitoring API. [Getting Started with Authentication][auth]. + +### Metrics + +Metrics are created using one of there patterns depending on if the value type +is a scalar value, raw distribution buckets, or aligned bucket values. + +In all cases, the Stackdriver metric type is split on the last component into +the measurement and field: +``` +compute.googleapis.com/instance/disk/read_bytes_count +└────────── measurement ─────────┘ └── field ───┘ +``` + +**Scalar Values:** + +- measurement + - tags: + - resource_labels + - metric_labels + - fields: + - field + + +**Distributions:** + +Distributions are represented by a set of fields along with the bucket values +tagged with the bucket boundary. Buckets are cumulative: each bucket +represents the total number of items less than the `lt` tag. + +- measurement + - tags: + - resource_labels + - metric_labels + - fields: + - field_count + - field_mean + - field_sum_of_squared_deviation + - field_range_min + - field_range_max + ++ measurement + - tags: + - resource_labels + - metric_labels + - lt (less than) + - fields: + - field_bucket + +**Aligned Aggregations:** + +- measurement + - tags: + - resource_labels + - metric_labels + - fields: + - field_alignment_function + +### Troubleshooting + +When Telegraf is ran with `--debug`, detailed information about the performed +queries will be logged. + +### Example Output +``` +``` +[stackdriver]: https://cloud.google.com/monitoring/api/v3/ +[auth]: https://cloud.google.com/docs/authentication/getting-started +[pricing]: https://cloud.google.com/stackdriver/pricing#stackdriver_monitoring_services diff --git a/plugins/inputs/stackdriver/stackdriver.go b/plugins/inputs/stackdriver/stackdriver.go new file mode 100644 index 000000000..4f4e35695 --- /dev/null +++ b/plugins/inputs/stackdriver/stackdriver.go @@ -0,0 +1,709 @@ +package stackdriver + +import ( + "context" + "fmt" + "log" + "math" + "strconv" + "strings" + "sync" + "time" + + monitoring "cloud.google.com/go/monitoring/apiv3" + googlepbduration "github.com/golang/protobuf/ptypes/duration" + googlepbts "github.com/golang/protobuf/ptypes/timestamp" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/limiter" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/inputs" // Imports the Stackdriver Monitoring client package. + "github.com/influxdata/telegraf/selfstat" + "google.golang.org/api/iterator" + distributionpb "google.golang.org/genproto/googleapis/api/distribution" + metricpb "google.golang.org/genproto/googleapis/api/metric" + monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" +) + +const ( + defaultRateLimit = 14 + description = "Gather timeseries from Google Cloud Platform v3 monitoring API" + sampleConfig = ` + ## GCP Project + project = "erudite-bloom-151019" + + ## Include timeseries that start with the given metric type. + metric_type_prefix_include = [ + "compute.googleapis.com/", + ] + + ## Exclude timeseries that start with the given metric type. + # metric_type_prefix_exclude = [] + + ## Many metrics are updated once per minute; it is recommended to override + ## the agent level interval with a value of 1m or greater. + interval = "1m" + + ## Maximum number of API calls to make per second. The quota for accounts + ## varies, it can be viewed on the API dashboard: + ## https://cloud.google.com/monitoring/quotas#quotas_and_limits + # rate_limit = 14 + + ## The delay and window options control the number of points selected on + ## each gather. When set, metrics are gathered between: + ## start: now() - delay - window + ## end: now() - delay + # + ## Collection delay; if set too low metrics may not yet be available. + # delay = "5m" + # + ## If unset, the window will start at 1m and be updated dynamically to span + ## the time between calls (approximately the length of the plugin interval). + # window = "1m" + + ## TTL for cached list of metric types. This is the maximum amount of time + ## it may take to discover new metrics. + # cache_ttl = "1h" + + ## If true, raw bucket counts are collected for distribution value types. + ## For a more lightweight collection, you may wish to disable and use + ## distribution_aggregation_aligners instead. + # gather_raw_distribution_buckets = true + + ## Aggregate functions to be used for metrics whose value type is + ## distribution. These aggregate values are recorded in in addition to raw + ## bucket counts; if they are enabled. + ## + ## For a list of aligner strings see: + ## https://cloud.google.com/monitoring/api/ref_v3/rpc/google.monitoring.v3#aligner + # distribution_aggregation_aligners = [ + # "ALIGN_PERCENTILE_99", + # "ALIGN_PERCENTILE_95", + # "ALIGN_PERCENTILE_50", + # ] + + ## Filters can be added to reduce the number of time series matched. All + ## functions are supported: starts_with, ends_with, has_substring, and + ## one_of. Only the '=' operator is supported. + ## + ## The logical operators when combining filters are defined statically using + ## the following values: + ## filter ::= {AND } + ## resource_labels ::= {OR } + ## metric_labels ::= {OR } + ## + ## For more details, see https://cloud.google.com/monitoring/api/v3/filters + # + ## Resource labels refine the time series selection with the following expression: + ## resource.labels. = + # [[inputs.stackdriver.filter.resource_labels]] + # key = "instance_name" + # value = 'starts_with("localhost")' + # + ## Metric labels refine the time series selection with the following expression: + ## metric.labels. = + # [[inputs.stackdriver.filter.metric_labels]] + # key = "device_name" + # value = 'one_of("sda", "sdb")' +` +) + +var ( + defaultCacheTTL = internal.Duration{Duration: 1 * time.Hour} + defaultWindow = internal.Duration{Duration: 1 * time.Minute} + defaultDelay = internal.Duration{Duration: 5 * time.Minute} +) + +type ( + // Stackdriver is the Google Stackdriver config info. + Stackdriver struct { + Project string `toml:"project"` + RateLimit int `toml:"rate_limit"` + Window internal.Duration `toml:"window"` + Delay internal.Duration `toml:"delay"` + CacheTTL internal.Duration `toml:"cache_ttl"` + MetricTypePrefixInclude []string `toml:"metric_type_prefix_include"` + MetricTypePrefixExclude []string `toml:"metric_type_prefix_exclude"` + GatherRawDistributionBuckets bool `toml:"gather_raw_distribution_buckets"` + DistributionAggregationAligners []string `toml:"distribution_aggregation_aligners"` + Filter *ListTimeSeriesFilter `toml:"filter"` + + client metricClient + timeSeriesConfCache *timeSeriesConfCache + prevEnd time.Time + } + + // ListTimeSeriesFilter contains resource labels and metric labels + ListTimeSeriesFilter struct { + ResourceLabels []*Label `json:"resource_labels"` + MetricLabels []*Label `json:"metric_labels"` + } + + // Label contains key and value + Label struct { + Key string `toml:"key"` + Value string `toml:"value"` + } + + // TimeSeriesConfCache caches generated timeseries configurations + timeSeriesConfCache struct { + TTL time.Duration + Generated time.Time + TimeSeriesConfs []*timeSeriesConf + } + + // Internal structure which holds our configuration for a particular GCP time + // series. + timeSeriesConf struct { + // The influx measurement name this time series maps to + measurement string + // The prefix to use before any influx field names that we'll write for + // this time series. (Or, if we only decide to write one field name, this + // field just holds the value of the field name.) + fieldKey string + // The GCP API request that we'll use to fetch data for this time series. + listTimeSeriesRequest *monitoringpb.ListTimeSeriesRequest + } + + // stackdriverMetricClient is a metric client for stackdriver + stackdriverMetricClient struct { + conn *monitoring.MetricClient + + listMetricDescriptorsCalls selfstat.Stat + listTimeSeriesCalls selfstat.Stat + } + + // metricClient is convenient for testing + metricClient interface { + ListMetricDescriptors(ctx context.Context, req *monitoringpb.ListMetricDescriptorsRequest) (<-chan *metricpb.MetricDescriptor, error) + ListTimeSeries(ctx context.Context, req *monitoringpb.ListTimeSeriesRequest) (<-chan *monitoringpb.TimeSeries, error) + Close() error + } + + lockedSeriesGrouper struct { + sync.Mutex + *metric.SeriesGrouper + } +) + +func (g *lockedSeriesGrouper) Add( + measurement string, + tags map[string]string, + tm time.Time, + field string, + fieldValue interface{}, +) error { + g.Lock() + defer g.Unlock() + return g.SeriesGrouper.Add(measurement, tags, tm, field, fieldValue) +} + +// ListMetricDescriptors implements metricClient interface +func (c *stackdriverMetricClient) ListMetricDescriptors( + ctx context.Context, + req *monitoringpb.ListMetricDescriptorsRequest, +) (<-chan *metricpb.MetricDescriptor, error) { + mdChan := make(chan *metricpb.MetricDescriptor, 1000) + + go func() { + log.Printf("D! [inputs.stackdriver] ListMetricDescriptors: %s", req.Filter) + defer close(mdChan) + + // Iterate over metric descriptors and send them to buffered channel + mdResp := c.conn.ListMetricDescriptors(ctx, req) + c.listMetricDescriptorsCalls.Incr(1) + for { + mdDesc, mdErr := mdResp.Next() + if mdErr != nil { + if mdErr != iterator.Done { + log.Printf("E! [inputs.stackdriver] Received error response: %s: %v", req, mdErr) + } + break + } + mdChan <- mdDesc + } + }() + + return mdChan, nil +} + +// ListTimeSeries implements metricClient interface +func (c *stackdriverMetricClient) ListTimeSeries( + ctx context.Context, + req *monitoringpb.ListTimeSeriesRequest, +) (<-chan *monitoringpb.TimeSeries, error) { + tsChan := make(chan *monitoringpb.TimeSeries, 1000) + + go func() { + log.Printf("D! [inputs.stackdriver] ListTimeSeries: %s", req.Filter) + defer close(tsChan) + + // Iterate over timeseries and send them to buffered channel + tsResp := c.conn.ListTimeSeries(ctx, req) + c.listTimeSeriesCalls.Incr(1) + for { + tsDesc, tsErr := tsResp.Next() + if tsErr != nil { + if tsErr != iterator.Done { + log.Printf("E! [inputs.stackdriver] Received error response: %s: %v", req, tsErr) + } + break + } + tsChan <- tsDesc + } + }() + + return tsChan, nil +} + +// Close implements metricClient interface +func (s *stackdriverMetricClient) Close() error { + return s.conn.Close() +} + +// Description implements telegraf.Input interface +func (s *Stackdriver) Description() string { + return description +} + +// SampleConfig implements telegraf.Input interface +func (s *Stackdriver) SampleConfig() string { + return sampleConfig +} + +// Gather implements telegraf.Input interface +func (s *Stackdriver) Gather(acc telegraf.Accumulator) error { + ctx := context.Background() + + if s.RateLimit == 0 { + s.RateLimit = defaultRateLimit + } + + err := s.initializeStackdriverClient(ctx) + if err != nil { + return err + } + + start, end := s.updateWindow(s.prevEnd) + s.prevEnd = end + + tsConfs, err := s.generatetimeSeriesConfs(ctx, start, end) + if err != nil { + return err + } + + lmtr := limiter.NewRateLimiter(s.RateLimit, time.Second) + defer lmtr.Stop() + + grouper := &lockedSeriesGrouper{ + SeriesGrouper: metric.NewSeriesGrouper(), + } + + var wg sync.WaitGroup + wg.Add(len(tsConfs)) + for _, tsConf := range tsConfs { + <-lmtr.C + go func(tsConf *timeSeriesConf) { + defer wg.Done() + acc.AddError(s.gatherTimeSeries(ctx, grouper, tsConf)) + }(tsConf) + } + wg.Wait() + + for _, metric := range grouper.Metrics() { + acc.AddMetric(metric) + } + + return nil +} + +// Returns the start and end time for the next collection. +func (s *Stackdriver) updateWindow(prevEnd time.Time) (time.Time, time.Time) { + var start time.Time + if s.Window.Duration != 0 { + start = time.Now().Add(-s.Delay.Duration).Add(-s.Window.Duration) + } else if prevEnd.IsZero() { + start = time.Now().Add(-s.Delay.Duration).Add(-defaultWindow.Duration) + } else { + start = prevEnd + } + end := time.Now().Add(-s.Delay.Duration) + return start, end +} + +// Generate filter string for ListTimeSeriesRequest +func (s *Stackdriver) newListTimeSeriesFilter(metricType string) string { + functions := []string{ + "starts_with", + "ends_with", + "has_substring", + "one_of", + } + filterString := fmt.Sprintf(`metric.type = "%s"`, metricType) + if s.Filter == nil { + return filterString + } + + var valueFmt string + if len(s.Filter.ResourceLabels) > 0 { + resourceLabelsFilter := make([]string, len(s.Filter.ResourceLabels)) + for i, resourceLabel := range s.Filter.ResourceLabels { + // check if resource label value contains function + if includeExcludeHelper(resourceLabel.Value, functions, nil) { + valueFmt = `resource.labels.%s = %s` + } else { + valueFmt = `resource.labels.%s = "%s"` + } + resourceLabelsFilter[i] = fmt.Sprintf(valueFmt, resourceLabel.Key, resourceLabel.Value) + } + if len(resourceLabelsFilter) == 1 { + filterString += fmt.Sprintf(" AND %s", resourceLabelsFilter[0]) + } else { + filterString += fmt.Sprintf(" AND (%s)", strings.Join(resourceLabelsFilter, " OR ")) + } + } + + if len(s.Filter.MetricLabels) > 0 { + metricLabelsFilter := make([]string, len(s.Filter.MetricLabels)) + for i, metricLabel := range s.Filter.MetricLabels { + // check if metric label value contains function + if includeExcludeHelper(metricLabel.Value, functions, nil) { + valueFmt = `metric.labels.%s = %s` + } else { + valueFmt = `metric.labels.%s = "%s"` + } + metricLabelsFilter[i] = fmt.Sprintf(valueFmt, metricLabel.Key, metricLabel.Value) + } + if len(metricLabelsFilter) == 1 { + filterString += fmt.Sprintf(" AND %s", metricLabelsFilter[0]) + } else { + filterString += fmt.Sprintf(" AND (%s)", strings.Join(metricLabelsFilter, " OR ")) + } + } + + return filterString +} + +// Create and initialize a timeSeriesConf for a given GCP metric type with +// defaults taken from the gcp_stackdriver plugin configuration. +func (s *Stackdriver) newTimeSeriesConf( + metricType string, startTime, endTime time.Time, +) *timeSeriesConf { + filter := s.newListTimeSeriesFilter(metricType) + interval := &monitoringpb.TimeInterval{ + EndTime: &googlepbts.Timestamp{Seconds: endTime.Unix()}, + StartTime: &googlepbts.Timestamp{Seconds: startTime.Unix()}, + } + tsReq := &monitoringpb.ListTimeSeriesRequest{ + Name: monitoring.MetricProjectPath(s.Project), + Filter: filter, + Interval: interval, + } + cfg := &timeSeriesConf{ + measurement: metricType, + fieldKey: "value", + listTimeSeriesRequest: tsReq, + } + + // GCP metric types have at least one slash, but we'll be defensive anyway. + slashIdx := strings.LastIndex(metricType, "/") + if slashIdx > 0 { + cfg.measurement = metricType[:slashIdx] + cfg.fieldKey = metricType[slashIdx+1:] + } + + return cfg +} + +// Change this configuration to query an aggregate by specifying an "aligner". +// In GCP monitoring, "aligning" is aggregation performed *within* a time +// series, to distill a pile of data points down to a single data point for +// some given time period (here, we specify 60s as our time period). This is +// especially useful for scraping GCP "distribution" metric types, whose raw +// data amounts to a ~60 bucket histogram, which is fairly hard to query and +// visualize in the TICK stack. +func (t *timeSeriesConf) initForAggregate(alignerStr string) { + // Check if alignerStr is valid + alignerInt, isValid := monitoringpb.Aggregation_Aligner_value[alignerStr] + if !isValid { + alignerStr = monitoringpb.Aggregation_Aligner_name[alignerInt] + } + aligner := monitoringpb.Aggregation_Aligner(alignerInt) + agg := &monitoringpb.Aggregation{ + AlignmentPeriod: &googlepbduration.Duration{Seconds: 60}, + PerSeriesAligner: aligner, + } + t.fieldKey = t.fieldKey + "_" + strings.ToLower(alignerStr) + t.listTimeSeriesRequest.Aggregation = agg +} + +// IsValid checks timeseriesconf cache validity +func (c *timeSeriesConfCache) IsValid() bool { + return c.TimeSeriesConfs != nil && time.Since(c.Generated) < c.TTL +} + +func (s *Stackdriver) initializeStackdriverClient(ctx context.Context) error { + if s.client == nil { + client, err := monitoring.NewMetricClient(ctx) + if err != nil { + return fmt.Errorf("failed to create stackdriver monitoring client: %v", err) + } + + tags := map[string]string{ + "project_id": s.Project, + } + listMetricDescriptorsCalls := selfstat.Register( + "stackdriver", "list_metric_descriptors_calls", tags) + listTimeSeriesCalls := selfstat.Register( + "stackdriver", "list_timeseries_calls", tags) + + s.client = &stackdriverMetricClient{ + conn: client, + listMetricDescriptorsCalls: listMetricDescriptorsCalls, + listTimeSeriesCalls: listTimeSeriesCalls, + } + } + + return nil +} + +func includeExcludeHelper(key string, includes []string, excludes []string) bool { + if len(includes) > 0 { + for _, includeStr := range includes { + if strings.HasPrefix(key, includeStr) { + return true + } + } + return false + } + if len(excludes) > 0 { + for _, excludeStr := range excludes { + if strings.HasPrefix(key, excludeStr) { + return false + } + } + return true + } + return true +} + +// Test whether a particular GCP metric type should be scraped by this plugin +// by checking the plugin name against the configuration's +// "includeMetricTypePrefixes" and "excludeMetricTypePrefixes" +func (s *Stackdriver) includeMetricType(metricType string) bool { + k := metricType + inc := s.MetricTypePrefixInclude + exc := s.MetricTypePrefixExclude + + return includeExcludeHelper(k, inc, exc) +} + +// Generates filter for list metric descriptors request +func (s *Stackdriver) newListMetricDescriptorsFilters() []string { + if len(s.MetricTypePrefixInclude) == 0 { + return nil + } + + metricTypeFilters := make([]string, len(s.MetricTypePrefixInclude)) + for i, metricTypePrefix := range s.MetricTypePrefixInclude { + metricTypeFilters[i] = fmt.Sprintf(`metric.type = starts_with(%q)`, metricTypePrefix) + } + return metricTypeFilters +} + +// Generate a list of timeSeriesConfig structs by making a ListMetricDescriptors +// API request and filtering the result against our configuration. +func (s *Stackdriver) generatetimeSeriesConfs( + ctx context.Context, startTime, endTime time.Time, +) ([]*timeSeriesConf, error) { + if s.timeSeriesConfCache != nil && s.timeSeriesConfCache.IsValid() { + // Update interval for timeseries requests in timeseries cache + interval := &monitoringpb.TimeInterval{ + EndTime: &googlepbts.Timestamp{Seconds: endTime.Unix()}, + StartTime: &googlepbts.Timestamp{Seconds: startTime.Unix()}, + } + for _, timeSeriesConf := range s.timeSeriesConfCache.TimeSeriesConfs { + timeSeriesConf.listTimeSeriesRequest.Interval = interval + } + return s.timeSeriesConfCache.TimeSeriesConfs, nil + } + + ret := []*timeSeriesConf{} + req := &monitoringpb.ListMetricDescriptorsRequest{ + Name: monitoring.MetricProjectPath(s.Project), + } + + filters := s.newListMetricDescriptorsFilters() + if len(filters) == 0 { + filters = []string{""} + } + + for _, filter := range filters { + // Add filter for list metric descriptors if + // includeMetricTypePrefixes is specified, + // this is more effecient than iterating over + // all metric descriptors + req.Filter = filter + mdRespChan, err := s.client.ListMetricDescriptors(ctx, req) + if err != nil { + return nil, err + } + + for metricDescriptor := range mdRespChan { + metricType := metricDescriptor.Type + valueType := metricDescriptor.ValueType + + if filter == "" && !s.includeMetricType(metricType) { + continue + } + + if valueType == metricpb.MetricDescriptor_DISTRIBUTION { + if s.GatherRawDistributionBuckets { + tsConf := s.newTimeSeriesConf(metricType, startTime, endTime) + ret = append(ret, tsConf) + } + for _, alignerStr := range s.DistributionAggregationAligners { + tsConf := s.newTimeSeriesConf(metricType, startTime, endTime) + tsConf.initForAggregate(alignerStr) + ret = append(ret, tsConf) + } + } else { + ret = append(ret, s.newTimeSeriesConf(metricType, startTime, endTime)) + } + } + } + + s.timeSeriesConfCache = &timeSeriesConfCache{ + TimeSeriesConfs: ret, + Generated: time.Now(), + TTL: s.CacheTTL.Duration, + } + + return ret, nil +} + +// Do the work to gather an individual time series. Runs inside a +// timeseries-specific goroutine. +func (s *Stackdriver) gatherTimeSeries( + ctx context.Context, grouper *lockedSeriesGrouper, tsConf *timeSeriesConf, +) error { + tsReq := tsConf.listTimeSeriesRequest + + tsRespChan, err := s.client.ListTimeSeries(ctx, tsReq) + if err != nil { + return err + } + + for tsDesc := range tsRespChan { + tags := map[string]string{ + "resource_type": tsDesc.Resource.Type, + } + for k, v := range tsDesc.Resource.Labels { + tags[k] = v + } + for k, v := range tsDesc.Metric.Labels { + tags[k] = v + } + + for _, p := range tsDesc.Points { + ts := time.Unix(p.Interval.EndTime.Seconds, 0) + + if tsDesc.ValueType == metricpb.MetricDescriptor_DISTRIBUTION { + dist := p.Value.GetDistributionValue() + s.addDistribution(dist, tags, ts, grouper, tsConf) + } else { + var value interface{} + + // Types that are valid to be assigned to Value + // See: https://godoc.org/google.golang.org/genproto/googleapis/monitoring/v3#TypedValue + switch tsDesc.ValueType { + case metricpb.MetricDescriptor_BOOL: + value = p.Value.GetBoolValue() + case metricpb.MetricDescriptor_INT64: + value = p.Value.GetInt64Value() + case metricpb.MetricDescriptor_DOUBLE: + value = p.Value.GetDoubleValue() + case metricpb.MetricDescriptor_STRING: + value = p.Value.GetStringValue() + } + + grouper.Add(tsConf.measurement, tags, ts, tsConf.fieldKey, value) + } + } + } + + return nil +} + +// AddDistribution adds metrics from a distribution value type. +func (s *Stackdriver) addDistribution( + metric *distributionpb.Distribution, + tags map[string]string, ts time.Time, grouper *lockedSeriesGrouper, tsConf *timeSeriesConf, +) { + field := tsConf.fieldKey + name := tsConf.measurement + + grouper.Add(name, tags, ts, field+"_count", metric.Count) + grouper.Add(name, tags, ts, field+"_mean", metric.Mean) + grouper.Add(name, tags, ts, field+"_sum_of_squared_deviation", metric.SumOfSquaredDeviation) + + if metric.Range != nil { + grouper.Add(name, tags, ts, field+"_range_min", metric.Range.Min) + grouper.Add(name, tags, ts, field+"_range_max", metric.Range.Max) + } + + linearBuckets := metric.BucketOptions.GetLinearBuckets() + exponentialBuckets := metric.BucketOptions.GetExponentialBuckets() + explicitBuckets := metric.BucketOptions.GetExplicitBuckets() + + var numBuckets int32 + if linearBuckets != nil { + numBuckets = linearBuckets.NumFiniteBuckets + 2 + } else if exponentialBuckets != nil { + numBuckets = exponentialBuckets.NumFiniteBuckets + 2 + } else { + numBuckets = int32(len(explicitBuckets.Bounds)) + 1 + } + + var i int32 + var count int64 + for i = 0; i < numBuckets; i++ { + // The last bucket is the overflow bucket, and includes all values + // greater than the previous bound. + if i == numBuckets-1 { + tags["lt"] = "+Inf" + } else { + var upperBound float64 + if linearBuckets != nil { + upperBound = linearBuckets.Offset + (linearBuckets.Width * float64(i)) + } else if exponentialBuckets != nil { + width := math.Pow(exponentialBuckets.GrowthFactor, float64(i)) + upperBound = exponentialBuckets.Scale * width + } else if explicitBuckets != nil { + upperBound = explicitBuckets.Bounds[i] + } + tags["lt"] = strconv.FormatFloat(upperBound, 'f', -1, 64) + } + + // Add to the cumulative count; trailing buckets with value 0 are + // omitted from the response. + if i < int32(len(metric.BucketCounts)) { + count += metric.BucketCounts[i] + } + grouper.Add(name, tags, ts, field+"_bucket", count) + } +} + +func init() { + f := func() telegraf.Input { + return &Stackdriver{ + CacheTTL: defaultCacheTTL, + RateLimit: defaultRateLimit, + Delay: defaultDelay, + GatherRawDistributionBuckets: true, + DistributionAggregationAligners: []string{}, + } + } + + inputs.Add("stackdriver", f) +} diff --git a/plugins/inputs/stackdriver/stackdriver_test.go b/plugins/inputs/stackdriver/stackdriver_test.go new file mode 100644 index 000000000..99e5deabd --- /dev/null +++ b/plugins/inputs/stackdriver/stackdriver_test.go @@ -0,0 +1,1125 @@ +package stackdriver + +import ( + "context" + "testing" + "time" + + "github.com/golang/protobuf/ptypes/timestamp" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" + "google.golang.org/genproto/googleapis/api/distribution" + metricpb "google.golang.org/genproto/googleapis/api/metric" + "google.golang.org/genproto/googleapis/api/monitoredres" + monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" +) + +type Call struct { + name string + args []interface{} +} + +type MockStackdriverClient struct { + ListMetricDescriptorsF func(ctx context.Context, req *monitoringpb.ListMetricDescriptorsRequest) (<-chan *metricpb.MetricDescriptor, error) + ListTimeSeriesF func(ctx context.Context, req *monitoringpb.ListTimeSeriesRequest) (<-chan *monitoringpb.TimeSeries, error) + CloseF func() error + + calls []*Call +} + +func (m *MockStackdriverClient) ListMetricDescriptors( + ctx context.Context, + req *monitoringpb.ListMetricDescriptorsRequest, +) (<-chan *metricpb.MetricDescriptor, error) { + call := &Call{name: "ListMetricDescriptors", args: []interface{}{ctx, req}} + m.calls = append(m.calls, call) + return m.ListMetricDescriptorsF(ctx, req) +} + +func (m *MockStackdriverClient) ListTimeSeries( + ctx context.Context, + req *monitoringpb.ListTimeSeriesRequest, +) (<-chan *monitoringpb.TimeSeries, error) { + call := &Call{name: "ListTimeSeries", args: []interface{}{ctx, req}} + m.calls = append(m.calls, call) + return m.ListTimeSeriesF(ctx, req) +} + +func (m *MockStackdriverClient) Close() error { + call := &Call{name: "Close", args: []interface{}{}} + m.calls = append(m.calls, call) + return m.CloseF() +} + +func TestInitAndRegister(t *testing.T) { + expected := &Stackdriver{ + CacheTTL: defaultCacheTTL, + RateLimit: defaultRateLimit, + Delay: defaultDelay, + GatherRawDistributionBuckets: true, + DistributionAggregationAligners: []string{}, + } + require.Equal(t, expected, inputs.Inputs["stackdriver"]()) +} + +func createTimeSeries( + point *monitoringpb.Point, valueType metricpb.MetricDescriptor_ValueType, +) *monitoringpb.TimeSeries { + return &monitoringpb.TimeSeries{ + Metric: &metricpb.Metric{Labels: make(map[string]string)}, + Resource: &monitoredres.MonitoredResource{ + Type: "global", + Labels: map[string]string{ + "project_id": "test", + }, + }, + Points: []*monitoringpb.Point{point}, + ValueType: valueType, + } +} + +func TestGather(t *testing.T) { + now := time.Now().Round(time.Second) + tests := []struct { + name string + descriptor *metricpb.MetricDescriptor + timeseries *monitoringpb.TimeSeries + expected []telegraf.Metric + }{ + { + name: "double", + descriptor: &metricpb.MetricDescriptor{ + Type: "telegraf/cpu/usage", + ValueType: metricpb.MetricDescriptor_DOUBLE, + }, + timeseries: createTimeSeries( + &monitoringpb.Point{ + Interval: &monitoringpb.TimeInterval{ + EndTime: ×tamp.Timestamp{ + Seconds: now.Unix(), + }, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DoubleValue{ + DoubleValue: 42.0, + }, + }, + }, + metricpb.MetricDescriptor_DOUBLE, + ), + expected: []telegraf.Metric{ + testutil.MustMetric("telegraf/cpu", + map[string]string{ + "resource_type": "global", + "project_id": "test", + }, + map[string]interface{}{ + "usage": 42.0, + }, + now), + }, + }, + { + name: "int64", + descriptor: &metricpb.MetricDescriptor{ + Type: "telegraf/cpu/usage", + ValueType: metricpb.MetricDescriptor_INT64, + }, + timeseries: createTimeSeries( + &monitoringpb.Point{ + Interval: &monitoringpb.TimeInterval{ + EndTime: ×tamp.Timestamp{ + Seconds: now.Unix(), + }, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_Int64Value{ + Int64Value: 42, + }, + }, + }, + metricpb.MetricDescriptor_INT64, + ), + expected: []telegraf.Metric{ + testutil.MustMetric("telegraf/cpu", + map[string]string{ + "resource_type": "global", + "project_id": "test", + }, + map[string]interface{}{ + "usage": 42, + }, + now), + }, + }, + { + name: "bool", + descriptor: &metricpb.MetricDescriptor{ + Type: "telegraf/cpu/usage", + ValueType: metricpb.MetricDescriptor_BOOL, + }, + timeseries: createTimeSeries( + &monitoringpb.Point{ + Interval: &monitoringpb.TimeInterval{ + EndTime: ×tamp.Timestamp{ + Seconds: now.Unix(), + }, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_BoolValue{ + BoolValue: true, + }, + }, + }, + metricpb.MetricDescriptor_BOOL, + ), + expected: []telegraf.Metric{ + testutil.MustMetric("telegraf/cpu", + map[string]string{ + "resource_type": "global", + "project_id": "test", + }, + map[string]interface{}{ + "usage": true, + }, + now), + }, + }, + { + name: "string", + descriptor: &metricpb.MetricDescriptor{ + Type: "telegraf/cpu/usage", + ValueType: metricpb.MetricDescriptor_STRING, + }, + timeseries: createTimeSeries( + &monitoringpb.Point{ + Interval: &monitoringpb.TimeInterval{ + EndTime: ×tamp.Timestamp{ + Seconds: now.Unix(), + }, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_StringValue{ + StringValue: "foo", + }, + }, + }, + metricpb.MetricDescriptor_STRING, + ), + expected: []telegraf.Metric{ + testutil.MustMetric("telegraf/cpu", + map[string]string{ + "resource_type": "global", + "project_id": "test", + }, + map[string]interface{}{ + "usage": "foo", + }, + now), + }, + }, + { + name: "metric labels", + descriptor: &metricpb.MetricDescriptor{ + Type: "telegraf/cpu/usage", + ValueType: metricpb.MetricDescriptor_DOUBLE, + }, + timeseries: &monitoringpb.TimeSeries{ + Metric: &metricpb.Metric{ + Labels: map[string]string{ + "resource_type": "instance", + }, + }, + Resource: &monitoredres.MonitoredResource{ + Type: "global", + Labels: map[string]string{ + "project_id": "test", + }, + }, + Points: []*monitoringpb.Point{ + { + Interval: &monitoringpb.TimeInterval{ + EndTime: ×tamp.Timestamp{ + Seconds: now.Unix(), + }, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DoubleValue{ + DoubleValue: 42.0, + }, + }, + }, + }, + ValueType: metricpb.MetricDescriptor_DOUBLE, + }, + expected: []telegraf.Metric{ + testutil.MustMetric("telegraf/cpu", + map[string]string{ + "resource_type": "instance", + "project_id": "test", + }, + map[string]interface{}{ + "usage": 42.0, + }, + now), + }, + }, + { + name: "linear buckets", + descriptor: &metricpb.MetricDescriptor{ + Type: "telegraf/cpu/usage", + ValueType: metricpb.MetricDescriptor_DISTRIBUTION, + }, + timeseries: createTimeSeries( + &monitoringpb.Point{ + Interval: &monitoringpb.TimeInterval{ + EndTime: ×tamp.Timestamp{ + Seconds: now.Unix(), + }, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DistributionValue{ + DistributionValue: &distribution.Distribution{ + Count: 2, + Mean: 2.0, + SumOfSquaredDeviation: 1.0, + Range: &distribution.Distribution_Range{ + Min: 0.0, + Max: 3.0, + }, + BucketCounts: []int64{0, 1, 3, 0}, + BucketOptions: &distribution.Distribution_BucketOptions{ + Options: &distribution.Distribution_BucketOptions_LinearBuckets{ + LinearBuckets: &distribution.Distribution_BucketOptions_Linear{ + NumFiniteBuckets: 2, + Width: 1, + Offset: 1, + }, + }, + }, + }, + }, + }, + }, + metricpb.MetricDescriptor_DISTRIBUTION, + ), + expected: []telegraf.Metric{ + testutil.MustMetric("telegraf/cpu", + map[string]string{ + "resource_type": "global", + "project_id": "test", + }, + map[string]interface{}{ + "usage_count": int64(2), + "usage_range_min": 0.0, + "usage_range_max": 3.0, + "usage_mean": 2.0, + "usage_sum_of_squared_deviation": 1.0, + }, + now), + testutil.MustMetric("telegraf/cpu", + map[string]string{ + "resource_type": "global", + "project_id": "test", + "lt": "1", + }, + map[string]interface{}{ + "usage_bucket": int64(0), + }, + now), + testutil.MustMetric("telegraf/cpu", + map[string]string{ + "resource_type": "global", + "project_id": "test", + "lt": "2", + }, + map[string]interface{}{ + "usage_bucket": int64(1), + }, + now), + testutil.MustMetric("telegraf/cpu", + map[string]string{ + "resource_type": "global", + "project_id": "test", + "lt": "3", + }, + map[string]interface{}{ + "usage_bucket": int64(4), + }, + now), + testutil.MustMetric("telegraf/cpu", + map[string]string{ + "resource_type": "global", + "project_id": "test", + "lt": "+Inf", + }, + map[string]interface{}{ + "usage_bucket": int64(4), + }, + now), + }, + }, + { + name: "exponential buckets", + descriptor: &metricpb.MetricDescriptor{ + Type: "telegraf/cpu/usage", + ValueType: metricpb.MetricDescriptor_DISTRIBUTION, + }, + timeseries: createTimeSeries( + &monitoringpb.Point{ + Interval: &monitoringpb.TimeInterval{ + EndTime: ×tamp.Timestamp{ + Seconds: now.Unix(), + }, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DistributionValue{ + DistributionValue: &distribution.Distribution{ + Count: 2, + Mean: 2.0, + SumOfSquaredDeviation: 1.0, + Range: &distribution.Distribution_Range{ + Min: 0.0, + Max: 3.0, + }, + BucketCounts: []int64{0, 1, 3, 0}, + BucketOptions: &distribution.Distribution_BucketOptions{ + Options: &distribution.Distribution_BucketOptions_ExponentialBuckets{ + ExponentialBuckets: &distribution.Distribution_BucketOptions_Exponential{ + NumFiniteBuckets: 2, + GrowthFactor: 2, + Scale: 1, + }, + }, + }, + }, + }, + }, + }, + metricpb.MetricDescriptor_DISTRIBUTION, + ), + expected: []telegraf.Metric{ + testutil.MustMetric("telegraf/cpu", + map[string]string{ + "resource_type": "global", + "project_id": "test", + }, + map[string]interface{}{ + "usage_count": int64(2), + "usage_range_min": 0.0, + "usage_range_max": 3.0, + "usage_mean": 2.0, + "usage_sum_of_squared_deviation": 1.0, + }, + now), + testutil.MustMetric("telegraf/cpu", + map[string]string{ + "resource_type": "global", + "project_id": "test", + "lt": "1", + }, + map[string]interface{}{ + "usage_bucket": int64(0), + }, + now), + testutil.MustMetric("telegraf/cpu", + map[string]string{ + "resource_type": "global", + "project_id": "test", + "lt": "2", + }, + map[string]interface{}{ + "usage_bucket": int64(1), + }, + now), + testutil.MustMetric("telegraf/cpu", + map[string]string{ + "resource_type": "global", + "project_id": "test", + "lt": "4", + }, + map[string]interface{}{ + "usage_bucket": int64(4), + }, + now), + testutil.MustMetric("telegraf/cpu", + map[string]string{ + "resource_type": "global", + "project_id": "test", + "lt": "+Inf", + }, + map[string]interface{}{ + "usage_bucket": int64(4), + }, + now), + }, + }, + { + name: "explicit buckets", + descriptor: &metricpb.MetricDescriptor{ + Type: "telegraf/cpu/usage", + ValueType: metricpb.MetricDescriptor_DISTRIBUTION, + }, + timeseries: createTimeSeries( + &monitoringpb.Point{ + Interval: &monitoringpb.TimeInterval{ + EndTime: ×tamp.Timestamp{ + Seconds: now.Unix(), + }, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DistributionValue{ + DistributionValue: &distribution.Distribution{ + Count: 4, + Mean: 2.0, + SumOfSquaredDeviation: 1.0, + Range: &distribution.Distribution_Range{ + Min: 0.0, + Max: 3.0, + }, + BucketCounts: []int64{0, 1, 3}, + BucketOptions: &distribution.Distribution_BucketOptions{ + Options: &distribution.Distribution_BucketOptions_ExplicitBuckets{ + ExplicitBuckets: &distribution.Distribution_BucketOptions_Explicit{ + Bounds: []float64{1.0, 2.0}, + }, + }, + }, + }, + }, + }, + }, + metricpb.MetricDescriptor_DISTRIBUTION, + ), + expected: []telegraf.Metric{ + testutil.MustMetric("telegraf/cpu", + map[string]string{ + "resource_type": "global", + "project_id": "test", + }, + map[string]interface{}{ + "usage_count": int64(4), + "usage_range_min": 0.0, + "usage_range_max": 3.0, + "usage_mean": 2.0, + "usage_sum_of_squared_deviation": 1.0, + }, + now), + testutil.MustMetric("telegraf/cpu", + map[string]string{ + "resource_type": "global", + "project_id": "test", + "lt": "1", + }, + map[string]interface{}{ + "usage_bucket": int64(0), + }, + now), + testutil.MustMetric("telegraf/cpu", + map[string]string{ + "resource_type": "global", + "project_id": "test", + "lt": "2", + }, + map[string]interface{}{ + "usage_bucket": int64(1), + }, + now), + testutil.MustMetric("telegraf/cpu", + map[string]string{ + "resource_type": "global", + "project_id": "test", + "lt": "+Inf", + }, + map[string]interface{}{ + "usage_bucket": int64(4), + }, + now), + }, + }, + { + name: "implicit buckets are zero", + descriptor: &metricpb.MetricDescriptor{ + Type: "telegraf/cpu/usage", + ValueType: metricpb.MetricDescriptor_DISTRIBUTION, + }, + timeseries: createTimeSeries( + &monitoringpb.Point{ + Interval: &monitoringpb.TimeInterval{ + EndTime: ×tamp.Timestamp{ + Seconds: now.Unix(), + }, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DistributionValue{ + DistributionValue: &distribution.Distribution{ + Count: 2, + Mean: 2.0, + SumOfSquaredDeviation: 1.0, + Range: &distribution.Distribution_Range{ + Min: 0.0, + Max: 3.0, + }, + BucketCounts: []int64{0, 1}, + BucketOptions: &distribution.Distribution_BucketOptions{ + Options: &distribution.Distribution_BucketOptions_LinearBuckets{ + LinearBuckets: &distribution.Distribution_BucketOptions_Linear{ + NumFiniteBuckets: 2, + Width: 1, + Offset: 1, + }, + }, + }, + }, + }, + }, + }, + metricpb.MetricDescriptor_DISTRIBUTION, + ), + expected: []telegraf.Metric{ + testutil.MustMetric("telegraf/cpu", + map[string]string{ + "resource_type": "global", + "project_id": "test", + }, + map[string]interface{}{ + "usage_count": int64(2), + "usage_range_min": 0.0, + "usage_range_max": 3.0, + "usage_mean": 2.0, + "usage_sum_of_squared_deviation": 1.0, + }, + now), + testutil.MustMetric("telegraf/cpu", + map[string]string{ + "resource_type": "global", + "project_id": "test", + "lt": "1", + }, + map[string]interface{}{ + "usage_bucket": int64(0), + }, + now), + testutil.MustMetric("telegraf/cpu", + map[string]string{ + "resource_type": "global", + "project_id": "test", + "lt": "2", + }, + map[string]interface{}{ + "usage_bucket": int64(1), + }, + now), + testutil.MustMetric("telegraf/cpu", + map[string]string{ + "resource_type": "global", + "project_id": "test", + "lt": "3", + }, + map[string]interface{}{ + "usage_bucket": int64(1), + }, + now), + testutil.MustMetric("telegraf/cpu", + map[string]string{ + "resource_type": "global", + "project_id": "test", + "lt": "+Inf", + }, + map[string]interface{}{ + "usage_bucket": int64(1), + }, + now), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var acc testutil.Accumulator + s := &Stackdriver{ + Project: "test", + RateLimit: 10, + GatherRawDistributionBuckets: true, + client: &MockStackdriverClient{ + ListMetricDescriptorsF: func(ctx context.Context, req *monitoringpb.ListMetricDescriptorsRequest) (<-chan *metricpb.MetricDescriptor, error) { + ch := make(chan *metricpb.MetricDescriptor, 1) + ch <- tt.descriptor + close(ch) + return ch, nil + }, + ListTimeSeriesF: func(ctx context.Context, req *monitoringpb.ListTimeSeriesRequest) (<-chan *monitoringpb.TimeSeries, error) { + ch := make(chan *monitoringpb.TimeSeries, 1) + ch <- tt.timeseries + close(ch) + return ch, nil + }, + CloseF: func() error { + return nil + }, + }, + } + + err := s.Gather(&acc) + require.NoError(t, err) + + actual := []telegraf.Metric{} + for _, m := range acc.Metrics { + actual = append(actual, testutil.FromTestMetric(m)) + } + + testutil.RequireMetricsEqual(t, tt.expected, actual) + }) + } +} + +func TestGatherAlign(t *testing.T) { + now := time.Now().Round(time.Second) + tests := []struct { + name string + descriptor *metricpb.MetricDescriptor + timeseries []*monitoringpb.TimeSeries + expected []telegraf.Metric + }{ + { + name: "align", + descriptor: &metricpb.MetricDescriptor{ + Type: "telegraf/cpu/usage", + ValueType: metricpb.MetricDescriptor_DISTRIBUTION, + }, + timeseries: []*monitoringpb.TimeSeries{ + createTimeSeries( + &monitoringpb.Point{ + Interval: &monitoringpb.TimeInterval{ + EndTime: ×tamp.Timestamp{ + Seconds: now.Unix(), + }, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DoubleValue{ + DoubleValue: 42.0, + }, + }, + }, + metricpb.MetricDescriptor_DOUBLE, + ), + createTimeSeries( + &monitoringpb.Point{ + Interval: &monitoringpb.TimeInterval{ + EndTime: ×tamp.Timestamp{ + Seconds: now.Unix(), + }, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DoubleValue{ + DoubleValue: 42.0, + }, + }, + }, + metricpb.MetricDescriptor_DOUBLE, + ), + createTimeSeries( + &monitoringpb.Point{ + Interval: &monitoringpb.TimeInterval{ + EndTime: ×tamp.Timestamp{ + Seconds: now.Unix(), + }, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DoubleValue{ + DoubleValue: 42.0, + }, + }, + }, + metricpb.MetricDescriptor_DOUBLE, + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("telegraf/cpu", + map[string]string{ + "resource_type": "global", + "project_id": "test", + }, + map[string]interface{}{ + "usage_align_percentile_99": 42.0, + "usage_align_percentile_95": 42.0, + "usage_align_percentile_50": 42.0, + }, + now), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + listCall := 0 + var acc testutil.Accumulator + client := &MockStackdriverClient{ + ListMetricDescriptorsF: func(ctx context.Context, req *monitoringpb.ListMetricDescriptorsRequest) (<-chan *metricpb.MetricDescriptor, error) { + ch := make(chan *metricpb.MetricDescriptor, 1) + ch <- tt.descriptor + close(ch) + return ch, nil + }, + ListTimeSeriesF: func(ctx context.Context, req *monitoringpb.ListTimeSeriesRequest) (<-chan *monitoringpb.TimeSeries, error) { + ch := make(chan *monitoringpb.TimeSeries, 1) + ch <- tt.timeseries[listCall] + listCall++ + close(ch) + return ch, nil + }, + CloseF: func() error { + return nil + }, + } + + s := &Stackdriver{ + Project: "test", + RateLimit: 10, + GatherRawDistributionBuckets: false, + DistributionAggregationAligners: []string{ + "ALIGN_PERCENTILE_99", + "ALIGN_PERCENTILE_95", + "ALIGN_PERCENTILE_50", + }, + client: client, + } + + err := s.Gather(&acc) + require.NoError(t, err) + + actual := []telegraf.Metric{} + for _, m := range acc.Metrics { + actual = append(actual, testutil.FromTestMetric(m)) + } + + testutil.RequireMetricsEqual(t, tt.expected, actual) + + }) + } +} + +func TestListMetricDescriptorFilter(t *testing.T) { + type call struct { + name string + filter string + } + now := time.Now().Round(time.Second) + tests := []struct { + name string + stackdriver *Stackdriver + descriptor *metricpb.MetricDescriptor + calls []call + }{ + { + name: "simple", + stackdriver: &Stackdriver{ + Project: "test", + MetricTypePrefixInclude: []string{"telegraf/cpu/usage"}, + RateLimit: 1, + }, + descriptor: &metricpb.MetricDescriptor{ + Type: "telegraf/cpu/usage", + ValueType: metricpb.MetricDescriptor_DOUBLE, + }, + calls: []call{ + { + name: "ListMetricDescriptors", + filter: `metric.type = starts_with("telegraf/cpu/usage")`, + }, { + name: "ListTimeSeries", + filter: `metric.type = "telegraf/cpu/usage"`, + }, + }, + }, + { + name: "single resource labels string", + stackdriver: &Stackdriver{ + Project: "test", + MetricTypePrefixInclude: []string{"telegraf/cpu/usage"}, + Filter: &ListTimeSeriesFilter{ + ResourceLabels: []*Label{ + { + Key: "instance_name", + Value: `localhost`, + }, + }, + }, + RateLimit: 1, + }, + descriptor: &metricpb.MetricDescriptor{ + Type: "telegraf/cpu/usage", + ValueType: metricpb.MetricDescriptor_DOUBLE, + }, + calls: []call{ + { + name: "ListMetricDescriptors", + filter: `metric.type = starts_with("telegraf/cpu/usage")`, + }, { + name: "ListTimeSeries", + filter: `metric.type = "telegraf/cpu/usage" AND resource.labels.instance_name = "localhost"`, + }, + }, + }, + { + name: "single resource labels function", + stackdriver: &Stackdriver{ + Project: "test", + MetricTypePrefixInclude: []string{"telegraf/cpu/usage"}, + Filter: &ListTimeSeriesFilter{ + ResourceLabels: []*Label{ + { + Key: "instance_name", + Value: `starts_with("localhost")`, + }, + }, + }, + RateLimit: 1, + }, + descriptor: &metricpb.MetricDescriptor{ + Type: "telegraf/cpu/usage", + ValueType: metricpb.MetricDescriptor_DOUBLE, + }, + calls: []call{ + { + name: "ListMetricDescriptors", + filter: `metric.type = starts_with("telegraf/cpu/usage")`, + }, { + name: "ListTimeSeries", + filter: `metric.type = "telegraf/cpu/usage" AND resource.labels.instance_name = starts_with("localhost")`, + }, + }, + }, + { + name: "multiple resource labels", + stackdriver: &Stackdriver{ + Project: "test", + MetricTypePrefixInclude: []string{"telegraf/cpu/usage"}, + Filter: &ListTimeSeriesFilter{ + ResourceLabels: []*Label{ + { + Key: "instance_name", + Value: `localhost`, + }, + { + Key: "zone", + Value: `starts_with("us-")`, + }, + }, + }, + RateLimit: 1, + }, + descriptor: &metricpb.MetricDescriptor{ + Type: "telegraf/cpu/usage", + ValueType: metricpb.MetricDescriptor_DOUBLE, + }, + calls: []call{ + { + name: "ListMetricDescriptors", + filter: `metric.type = starts_with("telegraf/cpu/usage")`, + }, { + name: "ListTimeSeries", + filter: `metric.type = "telegraf/cpu/usage" AND (resource.labels.instance_name = "localhost" OR resource.labels.zone = starts_with("us-"))`, + }, + }, + }, + { + name: "single metric label string", + stackdriver: &Stackdriver{ + Project: "test", + MetricTypePrefixInclude: []string{"telegraf/cpu/usage"}, + Filter: &ListTimeSeriesFilter{ + MetricLabels: []*Label{ + { + Key: "resource_type", + Value: `instance`, + }, + }, + }, + RateLimit: 1, + }, + descriptor: &metricpb.MetricDescriptor{ + Type: "telegraf/cpu/usage", + ValueType: metricpb.MetricDescriptor_DOUBLE, + }, + calls: []call{ + { + name: "ListMetricDescriptors", + filter: `metric.type = starts_with("telegraf/cpu/usage")`, + }, { + name: "ListTimeSeries", + filter: `metric.type = "telegraf/cpu/usage" AND metric.labels.resource_type = "instance"`, + }, + }, + }, + { + name: "single metric label function", + stackdriver: &Stackdriver{ + Project: "test", + MetricTypePrefixInclude: []string{"telegraf/cpu/usage"}, + Filter: &ListTimeSeriesFilter{ + MetricLabels: []*Label{ + { + Key: "resource_id", + Value: `starts_with("abc-")`, + }, + }, + }, + RateLimit: 1, + }, + descriptor: &metricpb.MetricDescriptor{ + Type: "telegraf/cpu/usage", + ValueType: metricpb.MetricDescriptor_DOUBLE, + }, + calls: []call{ + { + name: "ListMetricDescriptors", + filter: `metric.type = starts_with("telegraf/cpu/usage")`, + }, { + name: "ListTimeSeries", + filter: `metric.type = "telegraf/cpu/usage" AND metric.labels.resource_id = starts_with("abc-")`, + }, + }, + }, + { + name: "multiple metric labels", + stackdriver: &Stackdriver{ + Project: "test", + MetricTypePrefixInclude: []string{"telegraf/cpu/usage"}, + Filter: &ListTimeSeriesFilter{ + MetricLabels: []*Label{ + { + Key: "resource_type", + Value: "instance", + }, + { + Key: "resource_id", + Value: `starts_with("abc-")`, + }, + }, + }, + RateLimit: 1, + }, + descriptor: &metricpb.MetricDescriptor{ + Type: "telegraf/cpu/usage", + ValueType: metricpb.MetricDescriptor_DOUBLE, + }, + calls: []call{ + { + name: "ListMetricDescriptors", + filter: `metric.type = starts_with("telegraf/cpu/usage")`, + }, { + name: "ListTimeSeries", + filter: `metric.type = "telegraf/cpu/usage" AND (metric.labels.resource_type = "instance" OR metric.labels.resource_id = starts_with("abc-"))`, + }, + }, + }, + { + name: "all labels filters", + stackdriver: &Stackdriver{ + Project: "test", + MetricTypePrefixInclude: []string{"telegraf/cpu/usage"}, + Filter: &ListTimeSeriesFilter{ + ResourceLabels: []*Label{ + { + Key: "instance_name", + Value: `localhost`, + }, + { + Key: "zone", + Value: `starts_with("us-")`, + }, + }, + MetricLabels: []*Label{ + { + Key: "resource_type", + Value: "instance", + }, + { + Key: "resource_id", + Value: `starts_with("abc-")`, + }, + }, + }, + RateLimit: 1, + }, + descriptor: &metricpb.MetricDescriptor{ + Type: "telegraf/cpu/usage", + ValueType: metricpb.MetricDescriptor_DOUBLE, + }, + calls: []call{ + { + name: "ListMetricDescriptors", + filter: `metric.type = starts_with("telegraf/cpu/usage")`, + }, { + name: "ListTimeSeries", + filter: `metric.type = "telegraf/cpu/usage" AND (resource.labels.instance_name = "localhost" OR resource.labels.zone = starts_with("us-")) AND (metric.labels.resource_type = "instance" OR metric.labels.resource_id = starts_with("abc-"))`, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var acc testutil.Accumulator + client := &MockStackdriverClient{ + ListMetricDescriptorsF: func(ctx context.Context, req *monitoringpb.ListMetricDescriptorsRequest) (<-chan *metricpb.MetricDescriptor, error) { + ch := make(chan *metricpb.MetricDescriptor, 1) + ch <- tt.descriptor + close(ch) + return ch, nil + }, + ListTimeSeriesF: func(ctx context.Context, req *monitoringpb.ListTimeSeriesRequest) (<-chan *monitoringpb.TimeSeries, error) { + ch := make(chan *monitoringpb.TimeSeries, 1) + ch <- createTimeSeries( + &monitoringpb.Point{ + Interval: &monitoringpb.TimeInterval{ + EndTime: ×tamp.Timestamp{ + Seconds: now.Unix(), + }, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DoubleValue{ + DoubleValue: 42.0, + }, + }, + }, + metricpb.MetricDescriptor_DOUBLE, + ) + close(ch) + return ch, nil + }, + CloseF: func() error { + return nil + }, + } + + s := tt.stackdriver + s.client = client + + err := s.Gather(&acc) + require.NoError(t, err) + + require.Equal(t, len(client.calls), len(tt.calls)) + for i, expected := range tt.calls { + actual := client.calls[i] + require.Equal(t, expected.name, actual.name) + + switch req := actual.args[1].(type) { + case *monitoringpb.ListMetricDescriptorsRequest: + require.Equal(t, expected.filter, req.Filter) + case *monitoringpb.ListTimeSeriesRequest: + require.Equal(t, expected.filter, req.Filter) + default: + panic("unknown request type") + } + } + }) + } +} + +func TestNewListTimeSeriesFilter(t *testing.T) { +} + +func TestTimeSeriesConfCacheIsValid(t *testing.T) { +} diff --git a/testutil/metric.go b/testutil/metric.go index 5ce0a99a6..afb3de7fe 100644 --- a/testutil/metric.go +++ b/testutil/metric.go @@ -103,3 +103,11 @@ func MustMetric( } return m } + +func FromTestMetric(met *Metric) telegraf.Metric { + m, err := metric.New(met.Measurement, met.Tags, met.Fields, met.Time) + if err != nil { + panic("MustMetric") + } + return m +}