From 9d8a574ac77e42fbfafb3375d0c15c7a8eba44e4 Mon Sep 17 00:00:00 2001 From: Greg <2653109+glinton@users.noreply.github.com> Date: Mon, 25 Feb 2019 13:02:57 -0700 Subject: [PATCH] Add kinesis input plugin (#5341) --- Gopkg.lock | 18 + Gopkg.toml | 4 + plugins/inputs/all/all.go | 1 + plugins/inputs/kinesis_consumer/README.md | 90 +++++ .../kinesis_consumer/kinesis_consumer.go | 351 ++++++++++++++++++ plugins/outputs/kinesis/kinesis.go | 4 +- 6 files changed, 466 insertions(+), 2 deletions(-) create mode 100644 plugins/inputs/kinesis_consumer/README.md create mode 100644 plugins/inputs/kinesis_consumer/kinesis_consumer.go diff --git a/Gopkg.lock b/Gopkg.lock index 233fd9f3f..97c69b1b7 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -187,7 +187,11 @@ "private/protocol/rest", "private/protocol/xml/xmlutil", "service/cloudwatch", + "service/dynamodb", + "service/dynamodb/dynamodbattribute", + "service/dynamodb/dynamodbiface", "service/kinesis", + "service/kinesis/kinesisiface", "service/sts", ] pruneopts = "" @@ -566,6 +570,17 @@ pruneopts = "" revision = "e80d13ce29ede4452c43dea11e79b9bc8a15b478" +[[projects]] + branch = "master" + digest = "1:c191ec4c50122cdfeedba867d25bbe2ed63ed6dd2130729220c6c0d654361ea4" + name = "github.com/harlow/kinesis-consumer" + packages = [ + ".", + "checkpoint/ddb", + ] + pruneopts = "" + revision = "2f58b136fee036f5de256b81a8461cc724fdf9df" + [[projects]] digest = "1:e7224669901bab4094e6d6697c136557b7177db6ceb01b7fc8b20d08f4b5aacd" name = "github.com/hashicorp/consul" @@ -1525,6 +1540,7 @@ "github.com/aws/aws-sdk-go/aws/credentials/stscreds", "github.com/aws/aws-sdk-go/aws/session", "github.com/aws/aws-sdk-go/service/cloudwatch", + "github.com/aws/aws-sdk-go/service/dynamodb", "github.com/aws/aws-sdk-go/service/kinesis", "github.com/bsm/sarama-cluster", "github.com/couchbase/go-couchbase", @@ -1554,6 +1570,8 @@ "github.com/golang/protobuf/ptypes/timestamp", "github.com/google/go-cmp/cmp", "github.com/gorilla/mux", + "github.com/harlow/kinesis-consumer", + "github.com/harlow/kinesis-consumer/checkpoint/ddb", "github.com/hashicorp/consul/api", "github.com/influxdata/go-syslog", "github.com/influxdata/go-syslog/nontransparent", diff --git a/Gopkg.toml b/Gopkg.toml index e14f5e763..cd7825ccb 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -254,6 +254,10 @@ name = "github.com/karrick/godirwalk" version = "1.7.5" +[[override]] + name = "github.com/harlow/kinesis-consumer" + branch = "master" + [[constraint]] branch = "master" name = "github.com/kubernetes/apimachinery" diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index fe440bbba..e03648036 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -63,6 +63,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/kernel" _ "github.com/influxdata/telegraf/plugins/inputs/kernel_vmstat" _ "github.com/influxdata/telegraf/plugins/inputs/kibana" + _ "github.com/influxdata/telegraf/plugins/inputs/kinesis_consumer" _ "github.com/influxdata/telegraf/plugins/inputs/kube_inventory" _ "github.com/influxdata/telegraf/plugins/inputs/kubernetes" _ "github.com/influxdata/telegraf/plugins/inputs/leofs" diff --git a/plugins/inputs/kinesis_consumer/README.md b/plugins/inputs/kinesis_consumer/README.md new file mode 100644 index 000000000..d6f3a707b --- /dev/null +++ b/plugins/inputs/kinesis_consumer/README.md @@ -0,0 +1,90 @@ +# Kinesis Consumer Input Plugin + +The [Kinesis][kinesis] consumer plugin reads from a Kinesis data stream +and creates metrics using one of the supported [input data formats][]. + + +### Configuration + +```toml +[[inputs.kinesis_consumer]] + ## Amazon REGION of kinesis endpoint. + region = "ap-southeast-2" + + ## Amazon Credentials + ## Credentials are loaded in the following order + ## 1) Assumed credentials via STS if role_arn is specified + ## 2) explicit credentials from 'access_key' and 'secret_key' + ## 3) shared profile from 'profile' + ## 4) environment variables + ## 5) shared credentials file + ## 6) EC2 Instance Profile + # access_key = "" + # secret_key = "" + # token = "" + # role_arn = "" + # profile = "" + # shared_credential_file = "" + + ## Endpoint to make request against, the correct endpoint is automatically + ## determined and this option should only be set if you wish to override the + ## default. + ## ex: endpoint_url = "http://localhost:8000" + # endpoint_url = "" + + ## Kinesis StreamName must exist prior to starting telegraf. + streamname = "StreamName" + + ## Shard iterator type (only 'TRIM_HORIZON' and 'LATEST' currently supported) + # shard_iterator_type = "TRIM_HORIZON" + + ## Maximum messages to read from the broker that have not been written by an + ## output. For best throughput set based on the number of metrics within + ## each message and the size of the output's metric_batch_size. + ## + ## For example, if each message from the queue contains 10 metrics and the + ## output metric_batch_size is 1000, setting this to 100 will ensure that a + ## full batch is collected and the write is triggered immediately without + ## waiting until the next flush_interval. + # max_undelivered_messages = 1000 + + ## Data format to consume. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" + + ## Optional + ## Configuration for a dynamodb checkpoint + [inputs.kinesis_consumer.checkpoint_dynamodb] + ## unique name for this consumer + app_name = "default" + table_name = "default" +``` + + +#### Required AWS IAM permissions + +Kinesis: + - DescribeStream + - GetRecords + - GetShardIterator + +DynamoDB: + - GetItem + - PutItem + + +#### DynamoDB Checkpoint + +The DynamoDB checkpoint stores the last processed record in a DynamoDB. To leverage +this functionality, create a table with the folowing string type keys: + +``` +Partition key: namespace +Sort key: shard_id +``` + + +[kinesis]: https://aws.amazon.com/kinesis/ +[input data formats]: /docs/DATA_FORMATS_INPUT.md diff --git a/plugins/inputs/kinesis_consumer/kinesis_consumer.go b/plugins/inputs/kinesis_consumer/kinesis_consumer.go new file mode 100644 index 000000000..b9b98243b --- /dev/null +++ b/plugins/inputs/kinesis_consumer/kinesis_consumer.go @@ -0,0 +1,351 @@ +package kinesis_consumer + +import ( + "context" + "fmt" + "log" + "math/big" + "strings" + "sync" + "time" + + "github.com/aws/aws-sdk-go/service/dynamodb" + "github.com/aws/aws-sdk-go/service/kinesis" + consumer "github.com/harlow/kinesis-consumer" + "github.com/harlow/kinesis-consumer/checkpoint/ddb" + + "github.com/influxdata/telegraf" + internalaws "github.com/influxdata/telegraf/internal/config/aws" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/parsers" +) + +type ( + DynamoDB struct { + AppName string `toml:"app_name"` + TableName string `toml:"table_name"` + } + + KinesisConsumer struct { + Region string `toml:"region"` + AccessKey string `toml:"access_key"` + SecretKey string `toml:"secret_key"` + RoleARN string `toml:"role_arn"` + Profile string `toml:"profile"` + Filename string `toml:"shared_credential_file"` + Token string `toml:"token"` + EndpointURL string `toml:"endpoint_url"` + StreamName string `toml:"streamname"` + ShardIteratorType string `toml:"shard_iterator_type"` + DynamoDB *DynamoDB `toml:"checkpoint_dynamodb"` + MaxUndeliveredMessages int `toml:"max_undelivered_messages"` + + cons *consumer.Consumer + parser parsers.Parser + cancel context.CancelFunc + ctx context.Context + acc telegraf.TrackingAccumulator + sem chan struct{} + + checkpoint consumer.Checkpoint + checkpoints map[string]checkpoint + records map[telegraf.TrackingID]string + checkpointTex sync.Mutex + recordsTex sync.Mutex + wg sync.WaitGroup + + lastSeqNum *big.Int + } + + checkpoint struct { + streamName string + shardID string + } +) + +const ( + defaultMaxUndeliveredMessages = 1000 +) + +// this is the largest sequence number allowed - https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SequenceNumberRange.html +var maxSeq = strToBint(strings.Repeat("9", 129)) + +var sampleConfig = ` + ## Amazon REGION of kinesis endpoint. + region = "ap-southeast-2" + + ## Amazon Credentials + ## Credentials are loaded in the following order + ## 1) Assumed credentials via STS if role_arn is specified + ## 2) explicit credentials from 'access_key' and 'secret_key' + ## 3) shared profile from 'profile' + ## 4) environment variables + ## 5) shared credentials file + ## 6) EC2 Instance Profile + # access_key = "" + # secret_key = "" + # token = "" + # role_arn = "" + # profile = "" + # shared_credential_file = "" + + ## Endpoint to make request against, the correct endpoint is automatically + ## determined and this option should only be set if you wish to override the + ## default. + ## ex: endpoint_url = "http://localhost:8000" + # endpoint_url = "" + + ## Kinesis StreamName must exist prior to starting telegraf. + streamname = "StreamName" + + ## Shard iterator type (only 'TRIM_HORIZON' and 'LATEST' currently supported) + # shard_iterator_type = "TRIM_HORIZON" + + ## Maximum messages to read from the broker that have not been written by an + ## output. For best throughput set based on the number of metrics within + ## each message and the size of the output's metric_batch_size. + ## + ## For example, if each message from the queue contains 10 metrics and the + ## output metric_batch_size is 1000, setting this to 100 will ensure that a + ## full batch is collected and the write is triggered immediately without + ## waiting until the next flush_interval. + # max_undelivered_messages = 1000 + + ## Data format to consume. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" + + ## Optional + ## Configuration for a dynamodb checkpoint + [inputs.kinesis_consumer.checkpoint_dynamodb] + ## unique name for this consumer + app_name = "default" + table_name = "default" +` + +func (k *KinesisConsumer) SampleConfig() string { + return sampleConfig +} + +func (k *KinesisConsumer) Description() string { + return "Configuration for the AWS Kinesis input." +} + +func (k *KinesisConsumer) SetParser(parser parsers.Parser) { + k.parser = parser +} + +func (k *KinesisConsumer) connect(ac telegraf.Accumulator) error { + credentialConfig := &internalaws.CredentialConfig{ + Region: k.Region, + AccessKey: k.AccessKey, + SecretKey: k.SecretKey, + RoleARN: k.RoleARN, + Profile: k.Profile, + Filename: k.Filename, + Token: k.Token, + EndpointURL: k.EndpointURL, + } + configProvider := credentialConfig.Credentials() + client := kinesis.New(configProvider) + + k.checkpoint = &noopCheckpoint{} + if k.DynamoDB != nil { + var err error + k.checkpoint, err = ddb.New( + k.DynamoDB.AppName, + k.DynamoDB.TableName, + ddb.WithDynamoClient(dynamodb.New((&internalaws.CredentialConfig{ + Region: k.Region, + AccessKey: k.AccessKey, + SecretKey: k.SecretKey, + RoleARN: k.RoleARN, + Profile: k.Profile, + Filename: k.Filename, + Token: k.Token, + EndpointURL: k.EndpointURL, + }).Credentials())), + ddb.WithMaxInterval(time.Second*10), + ) + if err != nil { + return err + } + } + + cons, err := consumer.New( + k.StreamName, + consumer.WithClient(client), + consumer.WithShardIteratorType(k.ShardIteratorType), + consumer.WithCheckpoint(k), + ) + if err != nil { + return err + } + + k.cons = cons + + k.acc = ac.WithTracking(k.MaxUndeliveredMessages) + k.records = make(map[telegraf.TrackingID]string, k.MaxUndeliveredMessages) + k.checkpoints = make(map[string]checkpoint, k.MaxUndeliveredMessages) + k.sem = make(chan struct{}, k.MaxUndeliveredMessages) + + ctx := context.Background() + ctx, k.cancel = context.WithCancel(ctx) + + k.wg.Add(1) + go func() { + defer k.wg.Done() + k.onDelivery(ctx) + }() + + k.wg.Add(1) + go func() { + defer k.wg.Done() + err := k.cons.Scan(ctx, func(r *consumer.Record) consumer.ScanStatus { + select { + case <-ctx.Done(): + return consumer.ScanStatus{Error: ctx.Err()} + case k.sem <- struct{}{}: + break + } + err := k.onMessage(k.acc, r) + if err != nil { + k.sem <- struct{}{} + return consumer.ScanStatus{Error: err} + } + + return consumer.ScanStatus{} + }) + if err != nil { + k.cancel() + log.Printf("E! [inputs.kinesis_consumer] Scan encounterred an error - %s", err.Error()) + k.cons = nil + } + }() + + return nil +} + +func (k *KinesisConsumer) Start(ac telegraf.Accumulator) error { + err := k.connect(ac) + if err != nil { + return err + } + + return nil +} + +func (k *KinesisConsumer) onMessage(acc telegraf.TrackingAccumulator, r *consumer.Record) error { + metrics, err := k.parser.Parse(r.Data) + if err != nil { + return err + } + + k.recordsTex.Lock() + id := acc.AddTrackingMetricGroup(metrics) + k.records[id] = *r.SequenceNumber + k.recordsTex.Unlock() + + return nil +} + +func (k *KinesisConsumer) onDelivery(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case info := <-k.acc.Delivered(): + k.recordsTex.Lock() + sequenceNum, ok := k.records[info.ID()] + if !ok { + k.recordsTex.Unlock() + continue + } + <-k.sem + delete(k.records, info.ID()) + k.recordsTex.Unlock() + + if info.Delivered() { + k.checkpointTex.Lock() + chk, ok := k.checkpoints[sequenceNum] + if !ok { + k.checkpointTex.Unlock() + continue + } + delete(k.checkpoints, sequenceNum) + k.checkpointTex.Unlock() + + // at least once + if strToBint(sequenceNum).Cmp(k.lastSeqNum) > 0 { + continue + } + + k.lastSeqNum = strToBint(sequenceNum) + k.checkpoint.Set(chk.streamName, chk.shardID, sequenceNum) + } else { + log.Println("D! [inputs.kinesis_consumer] Metric group failed to process") + } + } + } +} + +var negOne *big.Int + +func strToBint(s string) *big.Int { + n, ok := new(big.Int).SetString(s, 10) + if !ok { + return negOne + } + return n +} + +func (k *KinesisConsumer) Stop() { + k.cancel() + k.wg.Wait() +} + +func (k *KinesisConsumer) Gather(acc telegraf.Accumulator) error { + if k.cons == nil { + return k.connect(acc) + } + k.lastSeqNum = maxSeq + + return nil +} + +// Get wraps the checkpoint's Get function (called by consumer library) +func (k *KinesisConsumer) Get(streamName, shardID string) (string, error) { + return k.checkpoint.Get(streamName, shardID) +} + +// Set wraps the checkpoint's Set function (called by consumer library) +func (k *KinesisConsumer) Set(streamName, shardID, sequenceNumber string) error { + if sequenceNumber == "" { + return fmt.Errorf("sequence number should not be empty") + } + + k.checkpointTex.Lock() + k.checkpoints[sequenceNumber] = checkpoint{streamName: streamName, shardID: shardID} + k.checkpointTex.Unlock() + + return nil +} + +type noopCheckpoint struct{} + +func (n noopCheckpoint) Set(string, string, string) error { return nil } +func (n noopCheckpoint) Get(string, string) (string, error) { return "", nil } + +func init() { + negOne, _ = new(big.Int).SetString("-1", 10) + + inputs.Add("kinesis_consumer", func() telegraf.Input { + return &KinesisConsumer{ + ShardIteratorType: "TRIM_HORIZON", + MaxUndeliveredMessages: defaultMaxUndeliveredMessages, + lastSeqNum: maxSeq, + } + }) +} diff --git a/plugins/outputs/kinesis/kinesis.go b/plugins/outputs/kinesis/kinesis.go index d2f52abcd..497676486 100644 --- a/plugins/outputs/kinesis/kinesis.go +++ b/plugins/outputs/kinesis/kinesis.go @@ -236,7 +236,7 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error { if sz == 500 { // Max Messages Per PutRecordRequest is 500 elapsed := writekinesis(k, r) - log.Printf("I! Wrote a %d point batch to Kinesis in %+v.", sz, elapsed) + log.Printf("D! Wrote a %d point batch to Kinesis in %+v.", sz, elapsed) sz = 0 r = nil } @@ -244,7 +244,7 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error { } if sz > 0 { elapsed := writekinesis(k, r) - log.Printf("I! Wrote a %d point batch to Kinesis in %+v.", sz, elapsed) + log.Printf("D! Wrote a %d point batch to Kinesis in %+v.", sz, elapsed) } return nil