Add stackdriver output plugin (#3876)

This commit is contained in:
James Maidment
2018-10-16 14:47:10 -04:00
committed by Daniel Nelson
parent ef848b4924
commit 0b601513a2
7 changed files with 2082 additions and 1 deletions

View File

@@ -0,0 +1,18 @@
# Stackdriver Output Plugin
This plugin writes to the [Google Cloud Stackdriver API](https://cloud.google.com/monitoring/api/v3/)
and requires [authentication](https://cloud.google.com/docs/authentication/getting-started) with Google Cloud using either a service account or user credentials. See the [Stackdriver documentation](https://cloud.google.com/stackdriver/pricing#stackdriver_monitoring_services) for details on pricing.
Requires `project` to specify where Stackdriver metrics will be delivered to.
Metrics are grouped by the `namespace` variable and metric key - eg: `custom.googleapis.com/telegraf/system/load5`
### Configuration
```
# GCP Project
project = "erudite-bloom-151019"
# The namespace for the metric descriptor
namespace = "telegraf"
```

View File

@@ -0,0 +1,303 @@
package stackdriver
import (
"context"
"fmt"
"log"
"path"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
// Imports the Stackdriver Monitoring client package.
monitoring "cloud.google.com/go/monitoring/apiv3"
googlepb "github.com/golang/protobuf/ptypes/timestamp"
metricpb "google.golang.org/genproto/googleapis/api/metric"
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
)
// Stackdriver is the Google Stackdriver config info.
type Stackdriver struct {
Project string
Namespace string
client *monitoring.MetricClient
}
const (
// QuotaLabelsPerMetricDescriptor is the limit
// to labels (tags) per metric descriptor.
QuotaLabelsPerMetricDescriptor = 10
// QuotaStringLengthForLabelKey is the limit
// to string length for label key.
QuotaStringLengthForLabelKey = 100
// QuotaStringLengthForLabelValue is the limit
// to string length for label value.
QuotaStringLengthForLabelValue = 1024
// StartTime for cumulative metrics.
StartTime = int64(1)
// MaxInt is the max int64 value.
MaxInt = int(^uint(0) >> 1)
)
var sampleConfig = `
# GCP Project
project = "erudite-bloom-151019"
# The namespace for the metric descriptor
namespace = "telegraf"
`
// Connect initiates the primary connection to the GCP project.
func (s *Stackdriver) Connect() error {
if s.Project == "" {
return fmt.Errorf("Project is a required field for stackdriver output")
}
if s.Namespace == "" {
return fmt.Errorf("Namespace is a required field for stackdriver output")
}
if s.client == nil {
ctx := context.Background()
client, err := monitoring.NewMetricClient(ctx)
if err != nil {
return err
}
s.client = client
}
return nil
}
// Write the metrics to Google Cloud Stackdriver.
func (s *Stackdriver) Write(metrics []telegraf.Metric) error {
ctx := context.Background()
for _, m := range metrics {
timeSeries := []*monitoringpb.TimeSeries{}
for _, f := range m.FieldList() {
value, err := getStackdriverTypedValue(f.Value)
if err != nil {
log.Printf("E! [output.stackdriver] get type failed: %s", err)
continue
}
metricKind, err := getStackdriverMetricKind(m.Type())
if err != nil {
log.Printf("E! [output.stackdriver] get metric failed: %s", err)
continue
}
timeInterval, err := getStackdriverTimeInterval(metricKind, StartTime, m.Time().Unix())
if err != nil {
log.Printf("E! [output.stackdriver] get time interval failed: %s", err)
continue
}
// Prepare an individual data point.
dataPoint := &monitoringpb.Point{
Interval: timeInterval,
Value: value,
}
// Prepare time series.
timeSeries = append(timeSeries,
&monitoringpb.TimeSeries{
Metric: &metricpb.Metric{
Type: path.Join("custom.googleapis.com", s.Namespace, m.Name(), f.Key),
Labels: getStackdriverLabels(m.TagList()),
},
MetricKind: metricKind,
Resource: &monitoredrespb.MonitoredResource{
Type: "global",
Labels: map[string]string{
"project_id": s.Project,
},
},
Points: []*monitoringpb.Point{
dataPoint,
},
})
}
if len(timeSeries) < 1 {
continue
}
// Prepare time series request.
timeSeriesRequest := &monitoringpb.CreateTimeSeriesRequest{
Name: monitoring.MetricProjectPath(s.Project),
TimeSeries: timeSeries,
}
// Create the time series in Stackdriver.
err := s.client.CreateTimeSeries(ctx, timeSeriesRequest)
if err != nil {
log.Printf("E! [output.stackdriver] unable to write to Stackdriver: %s", err)
return err
}
}
return nil
}
func getStackdriverTimeInterval(
m metricpb.MetricDescriptor_MetricKind,
start int64,
end int64,
) (*monitoringpb.TimeInterval, error) {
switch m {
case metricpb.MetricDescriptor_GAUGE:
return &monitoringpb.TimeInterval{
EndTime: &googlepb.Timestamp{
Seconds: end,
},
}, nil
case metricpb.MetricDescriptor_CUMULATIVE:
return &monitoringpb.TimeInterval{
StartTime: &googlepb.Timestamp{
Seconds: start,
},
EndTime: &googlepb.Timestamp{
Seconds: end,
},
}, nil
case metricpb.MetricDescriptor_DELTA, metricpb.MetricDescriptor_METRIC_KIND_UNSPECIFIED:
fallthrough
default:
return nil, fmt.Errorf("unsupported metric kind %T", m)
}
}
func getStackdriverMetricKind(vt telegraf.ValueType) (metricpb.MetricDescriptor_MetricKind, error) {
switch vt {
case telegraf.Untyped:
return metricpb.MetricDescriptor_GAUGE, nil
case telegraf.Gauge:
return metricpb.MetricDescriptor_GAUGE, nil
case telegraf.Counter:
return metricpb.MetricDescriptor_CUMULATIVE, nil
case telegraf.Histogram, telegraf.Summary:
fallthrough
default:
return metricpb.MetricDescriptor_METRIC_KIND_UNSPECIFIED, fmt.Errorf("unsupported telegraf value type")
}
}
func getStackdriverTypedValue(value interface{}) (*monitoringpb.TypedValue, error) {
switch v := value.(type) {
case uint64:
if v <= uint64(MaxInt) {
return &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_Int64Value{
Int64Value: int64(v),
},
}, nil
}
return &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_Int64Value{
Int64Value: int64(MaxInt),
},
}, nil
case int64:
return &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_Int64Value{
Int64Value: int64(v),
},
}, nil
case float64:
return &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_DoubleValue{
DoubleValue: float64(v),
},
}, nil
case bool:
return &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_BoolValue{
BoolValue: bool(v),
},
}, nil
case string:
return &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_StringValue{
StringValue: string(v),
},
}, nil
default:
return nil, fmt.Errorf("value type \"%T\" not supported for stackdriver custom metrics", v)
}
}
func getStackdriverLabels(tags []*telegraf.Tag) map[string]string {
labels := make(map[string]string)
for _, t := range tags {
labels[t.Key] = t.Value
}
for k, v := range labels {
if len(k) > QuotaStringLengthForLabelKey {
log.Printf(
"W! [output.stackdriver] removing tag [%s] key exceeds string length for label key [%d]",
k,
QuotaStringLengthForLabelKey,
)
delete(labels, k)
continue
}
if len(v) > QuotaStringLengthForLabelValue {
log.Printf(
"W! [output.stackdriver] removing tag [%s] value exceeds string length for label value [%d]",
k,
QuotaStringLengthForLabelValue,
)
delete(labels, k)
continue
}
}
if len(labels) > QuotaLabelsPerMetricDescriptor {
excess := len(labels) - QuotaLabelsPerMetricDescriptor
log.Printf(
"W! [output.stackdriver] tag count [%d] exceeds quota for stackdriver labels [%d] removing [%d] random tags",
len(labels),
QuotaLabelsPerMetricDescriptor,
excess,
)
for k := range labels {
if excess == 0 {
break
}
excess--
delete(labels, k)
}
}
return labels
}
// Close will terminate the session to the backend, returning error if an issue arises.
func (s *Stackdriver) Close() error {
return s.client.Close()
}
// SampleConfig returns the formatted sample configuration for the plugin.
func (s *Stackdriver) SampleConfig() string {
return sampleConfig
}
// Description returns the human-readable function definition of the plugin.
func (s *Stackdriver) Description() string {
return "Configuration for Google Cloud Stackdriver to send metrics to"
}
func newStackdriver() *Stackdriver {
return &Stackdriver{}
}
func init() {
outputs.Add("stackdriver", func() telegraf.Output {
return newStackdriver()
})
}

View File

@@ -0,0 +1,119 @@
package stackdriver
import (
"context"
"fmt"
"log"
"net"
"os"
"strings"
"testing"
monitoring "cloud.google.com/go/monitoring/apiv3"
"github.com/golang/protobuf/proto"
emptypb "github.com/golang/protobuf/ptypes/empty"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"google.golang.org/api/option"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
// clientOpt is the option tests should use to connect to the test server.
// It is initialized by TestMain.
var clientOpt option.ClientOption
var mockMetric mockMetricServer
type mockMetricServer struct {
// Embed for forward compatibility.
// Tests will keep working if more methods are added
// in the future.
monitoringpb.MetricServiceServer
reqs []proto.Message
// If set, all calls return this error.
err error
// responses to return if err == nil
resps []proto.Message
}
func (s *mockMetricServer) CreateTimeSeries(ctx context.Context, req *monitoringpb.CreateTimeSeriesRequest) (*emptypb.Empty, error) {
md, _ := metadata.FromIncomingContext(ctx)
if xg := md["x-goog-api-client"]; len(xg) == 0 || !strings.Contains(xg[0], "gl-go/") {
return nil, fmt.Errorf("x-goog-api-client = %v, expected gl-go key", xg)
}
s.reqs = append(s.reqs, req)
if s.err != nil {
return nil, s.err
}
return s.resps[0].(*emptypb.Empty), nil
}
func TestMain(m *testing.M) {
serv := grpc.NewServer()
monitoringpb.RegisterMetricServiceServer(serv, &mockMetric)
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
log.Fatal(err)
}
go serv.Serve(lis)
conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
clientOpt = option.WithGRPCConn(conn)
os.Exit(m.Run())
}
func TestWrite(t *testing.T) {
expectedResponse := &emptypb.Empty{}
mockMetric.err = nil
mockMetric.reqs = nil
mockMetric.resps = append(mockMetric.resps[:0], expectedResponse)
c, err := monitoring.NewMetricClient(context.Background(), clientOpt)
if err != nil {
t.Fatal(err)
}
s := &Stackdriver{
Project: fmt.Sprintf("projects/%s", "[PROJECT]"),
Namespace: "test",
client: c,
}
err = s.Connect()
require.NoError(t, err)
err = s.Write(testutil.MockMetrics())
require.NoError(t, err)
}
func TestGetStackdriverLabels(t *testing.T) {
tags := []*telegraf.Tag{
{Key: "project", Value: "bar"},
{Key: "discuss", Value: "revolutionary"},
{Key: "marble", Value: "discount"},
{Key: "applied", Value: "falsify"},
{Key: "test", Value: "foo"},
{Key: "porter", Value: "discount"},
{Key: "play", Value: "tiger"},
{Key: "fireplace", Value: "display"},
{Key: "host", Value: "this"},
{Key: "name", Value: "bat"},
{Key: "device", Value: "local"},
{Key: "reserve", Value: "publication"},
{Key: "xpfqacltlmpguimhtjlou2qlmf9uqqwk3teajwlwqkoxtsppbnjksaxvzc1aa973pho9m96gfnl5op8ku7sv93rexyx42qe3zty12ityv", Value: "keyquota"},
{Key: "valuequota", Value: "icym5wcpejnhljcvy2vwk15svmhrtueoppwlvix61vlbaeedufn1g6u4jgwjoekwew9s2dboxtgrkiyuircnl8h1lbzntt9gzcf60qunhxurhiz0g2bynzy1v6eyn4ravndeiiugobsrsj2bfaguahg4gxn7nx4irwfknunhkk6jdlldevawj8levebjajcrcbeugewd14fa8o34ycfwx2ymalyeqxhfqrsksxnii2deqq6cghrzi6qzwmittkzdtye3imoygqmjjshiskvnzz1e4ipd9c6wfor5jsygn1kvcg6jm4clnsl1fnxotbei9xp4swrkjpgursmfmkyvxcgq9hoy435nwnolo3ipnvdlhk6pmlzpdjn6gqi3v9gv7jn5ro2p1t5ufxzfsvqq1fyrgoi7gvmttil1banh3cftkph1dcoaqfhl7y0wkvhwwvrmslmmxp1wedyn8bacd7akmjgfwdvcmrymbzvmrzfvq1gs1xnmmg8rsfxci2h6r1ralo3splf4f3bdg4c7cy0yy9qbxzxhcmdpwekwc7tdjs8uj6wmofm2aor4hum8nwyfwwlxy3yvsnbjy32oucsrmhcnu6l2i8laujkrhvsr9fcix5jflygznlydbqw5uhw1rg1g5wiihqumwmqgggemzoaivm3ut41vjaff4uqtqyuhuwblmuiphfkd7si49vgeeswzg7tpuw0oxmkesgibkcjtev2h9ouxzjs3eb71jffhdacyiuyhuxwvm5bnrjewbm4x2kmhgbirz3eoj7ijgplggdkx5vixufg65ont8zi1jabsuxx0vsqgprunwkugqkxg2r7iy6fmgs4lob4dlseinowkst6gp6x1ejreauyzjz7atzm3hbmr5rbynuqp4lxrnhhcbuoun69mavvaaki0bdz5ybmbbbz5qdv0odtpjo2aezat5uosjuhzbvic05jlyclikynjgfhencdkz3qcqzbzhnsynj1zdke0sk4zfpvfyryzsxv9pu0qm"},
}
labels := getStackdriverLabels(tags)
require.Equal(t, QuotaLabelsPerMetricDescriptor, len(labels))
}