Add kinesis input plugin (#5341)

This commit is contained in:
Greg 2019-02-25 13:02:57 -07:00 committed by Daniel Nelson
parent 2506da80c2
commit 9d8a574ac7
6 changed files with 466 additions and 2 deletions

18
Gopkg.lock generated
View File

@ -187,7 +187,11 @@
"private/protocol/rest",
"private/protocol/xml/xmlutil",
"service/cloudwatch",
"service/dynamodb",
"service/dynamodb/dynamodbattribute",
"service/dynamodb/dynamodbiface",
"service/kinesis",
"service/kinesis/kinesisiface",
"service/sts",
]
pruneopts = ""
@ -566,6 +570,17 @@
pruneopts = ""
revision = "e80d13ce29ede4452c43dea11e79b9bc8a15b478"
[[projects]]
branch = "master"
digest = "1:c191ec4c50122cdfeedba867d25bbe2ed63ed6dd2130729220c6c0d654361ea4"
name = "github.com/harlow/kinesis-consumer"
packages = [
".",
"checkpoint/ddb",
]
pruneopts = ""
revision = "2f58b136fee036f5de256b81a8461cc724fdf9df"
[[projects]]
digest = "1:e7224669901bab4094e6d6697c136557b7177db6ceb01b7fc8b20d08f4b5aacd"
name = "github.com/hashicorp/consul"
@ -1525,6 +1540,7 @@
"github.com/aws/aws-sdk-go/aws/credentials/stscreds",
"github.com/aws/aws-sdk-go/aws/session",
"github.com/aws/aws-sdk-go/service/cloudwatch",
"github.com/aws/aws-sdk-go/service/dynamodb",
"github.com/aws/aws-sdk-go/service/kinesis",
"github.com/bsm/sarama-cluster",
"github.com/couchbase/go-couchbase",
@ -1554,6 +1570,8 @@
"github.com/golang/protobuf/ptypes/timestamp",
"github.com/google/go-cmp/cmp",
"github.com/gorilla/mux",
"github.com/harlow/kinesis-consumer",
"github.com/harlow/kinesis-consumer/checkpoint/ddb",
"github.com/hashicorp/consul/api",
"github.com/influxdata/go-syslog",
"github.com/influxdata/go-syslog/nontransparent",

View File

@ -254,6 +254,10 @@
name = "github.com/karrick/godirwalk"
version = "1.7.5"
[[override]]
name = "github.com/harlow/kinesis-consumer"
branch = "master"
[[constraint]]
branch = "master"
name = "github.com/kubernetes/apimachinery"

View File

@ -63,6 +63,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/kernel"
_ "github.com/influxdata/telegraf/plugins/inputs/kernel_vmstat"
_ "github.com/influxdata/telegraf/plugins/inputs/kibana"
_ "github.com/influxdata/telegraf/plugins/inputs/kinesis_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/kube_inventory"
_ "github.com/influxdata/telegraf/plugins/inputs/kubernetes"
_ "github.com/influxdata/telegraf/plugins/inputs/leofs"

View File

@ -0,0 +1,90 @@
# Kinesis Consumer Input Plugin
The [Kinesis][kinesis] consumer plugin reads from a Kinesis data stream
and creates metrics using one of the supported [input data formats][].
### Configuration
```toml
[[inputs.kinesis_consumer]]
## Amazon REGION of kinesis endpoint.
region = "ap-southeast-2"
## Amazon Credentials
## Credentials are loaded in the following order
## 1) Assumed credentials via STS if role_arn is specified
## 2) explicit credentials from 'access_key' and 'secret_key'
## 3) shared profile from 'profile'
## 4) environment variables
## 5) shared credentials file
## 6) EC2 Instance Profile
# access_key = ""
# secret_key = ""
# token = ""
# role_arn = ""
# profile = ""
# shared_credential_file = ""
## Endpoint to make request against, the correct endpoint is automatically
## determined and this option should only be set if you wish to override the
## default.
## ex: endpoint_url = "http://localhost:8000"
# endpoint_url = ""
## Kinesis StreamName must exist prior to starting telegraf.
streamname = "StreamName"
## Shard iterator type (only 'TRIM_HORIZON' and 'LATEST' currently supported)
# shard_iterator_type = "TRIM_HORIZON"
## Maximum messages to read from the broker that have not been written by an
## output. For best throughput set based on the number of metrics within
## each message and the size of the output's metric_batch_size.
##
## For example, if each message from the queue contains 10 metrics and the
## output metric_batch_size is 1000, setting this to 100 will ensure that a
## full batch is collected and the write is triggered immediately without
## waiting until the next flush_interval.
# max_undelivered_messages = 1000
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
## Optional
## Configuration for a dynamodb checkpoint
[inputs.kinesis_consumer.checkpoint_dynamodb]
## unique name for this consumer
app_name = "default"
table_name = "default"
```
#### Required AWS IAM permissions
Kinesis:
- DescribeStream
- GetRecords
- GetShardIterator
DynamoDB:
- GetItem
- PutItem
#### DynamoDB Checkpoint
The DynamoDB checkpoint stores the last processed record in a DynamoDB. To leverage
this functionality, create a table with the folowing string type keys:
```
Partition key: namespace
Sort key: shard_id
```
[kinesis]: https://aws.amazon.com/kinesis/
[input data formats]: /docs/DATA_FORMATS_INPUT.md

View File

@ -0,0 +1,351 @@
package kinesis_consumer
import (
"context"
"fmt"
"log"
"math/big"
"strings"
"sync"
"time"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/kinesis"
consumer "github.com/harlow/kinesis-consumer"
"github.com/harlow/kinesis-consumer/checkpoint/ddb"
"github.com/influxdata/telegraf"
internalaws "github.com/influxdata/telegraf/internal/config/aws"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
)
type (
DynamoDB struct {
AppName string `toml:"app_name"`
TableName string `toml:"table_name"`
}
KinesisConsumer struct {
Region string `toml:"region"`
AccessKey string `toml:"access_key"`
SecretKey string `toml:"secret_key"`
RoleARN string `toml:"role_arn"`
Profile string `toml:"profile"`
Filename string `toml:"shared_credential_file"`
Token string `toml:"token"`
EndpointURL string `toml:"endpoint_url"`
StreamName string `toml:"streamname"`
ShardIteratorType string `toml:"shard_iterator_type"`
DynamoDB *DynamoDB `toml:"checkpoint_dynamodb"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
cons *consumer.Consumer
parser parsers.Parser
cancel context.CancelFunc
ctx context.Context
acc telegraf.TrackingAccumulator
sem chan struct{}
checkpoint consumer.Checkpoint
checkpoints map[string]checkpoint
records map[telegraf.TrackingID]string
checkpointTex sync.Mutex
recordsTex sync.Mutex
wg sync.WaitGroup
lastSeqNum *big.Int
}
checkpoint struct {
streamName string
shardID string
}
)
const (
defaultMaxUndeliveredMessages = 1000
)
// this is the largest sequence number allowed - https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SequenceNumberRange.html
var maxSeq = strToBint(strings.Repeat("9", 129))
var sampleConfig = `
## Amazon REGION of kinesis endpoint.
region = "ap-southeast-2"
## Amazon Credentials
## Credentials are loaded in the following order
## 1) Assumed credentials via STS if role_arn is specified
## 2) explicit credentials from 'access_key' and 'secret_key'
## 3) shared profile from 'profile'
## 4) environment variables
## 5) shared credentials file
## 6) EC2 Instance Profile
# access_key = ""
# secret_key = ""
# token = ""
# role_arn = ""
# profile = ""
# shared_credential_file = ""
## Endpoint to make request against, the correct endpoint is automatically
## determined and this option should only be set if you wish to override the
## default.
## ex: endpoint_url = "http://localhost:8000"
# endpoint_url = ""
## Kinesis StreamName must exist prior to starting telegraf.
streamname = "StreamName"
## Shard iterator type (only 'TRIM_HORIZON' and 'LATEST' currently supported)
# shard_iterator_type = "TRIM_HORIZON"
## Maximum messages to read from the broker that have not been written by an
## output. For best throughput set based on the number of metrics within
## each message and the size of the output's metric_batch_size.
##
## For example, if each message from the queue contains 10 metrics and the
## output metric_batch_size is 1000, setting this to 100 will ensure that a
## full batch is collected and the write is triggered immediately without
## waiting until the next flush_interval.
# max_undelivered_messages = 1000
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
## Optional
## Configuration for a dynamodb checkpoint
[inputs.kinesis_consumer.checkpoint_dynamodb]
## unique name for this consumer
app_name = "default"
table_name = "default"
`
func (k *KinesisConsumer) SampleConfig() string {
return sampleConfig
}
func (k *KinesisConsumer) Description() string {
return "Configuration for the AWS Kinesis input."
}
func (k *KinesisConsumer) SetParser(parser parsers.Parser) {
k.parser = parser
}
func (k *KinesisConsumer) connect(ac telegraf.Accumulator) error {
credentialConfig := &internalaws.CredentialConfig{
Region: k.Region,
AccessKey: k.AccessKey,
SecretKey: k.SecretKey,
RoleARN: k.RoleARN,
Profile: k.Profile,
Filename: k.Filename,
Token: k.Token,
EndpointURL: k.EndpointURL,
}
configProvider := credentialConfig.Credentials()
client := kinesis.New(configProvider)
k.checkpoint = &noopCheckpoint{}
if k.DynamoDB != nil {
var err error
k.checkpoint, err = ddb.New(
k.DynamoDB.AppName,
k.DynamoDB.TableName,
ddb.WithDynamoClient(dynamodb.New((&internalaws.CredentialConfig{
Region: k.Region,
AccessKey: k.AccessKey,
SecretKey: k.SecretKey,
RoleARN: k.RoleARN,
Profile: k.Profile,
Filename: k.Filename,
Token: k.Token,
EndpointURL: k.EndpointURL,
}).Credentials())),
ddb.WithMaxInterval(time.Second*10),
)
if err != nil {
return err
}
}
cons, err := consumer.New(
k.StreamName,
consumer.WithClient(client),
consumer.WithShardIteratorType(k.ShardIteratorType),
consumer.WithCheckpoint(k),
)
if err != nil {
return err
}
k.cons = cons
k.acc = ac.WithTracking(k.MaxUndeliveredMessages)
k.records = make(map[telegraf.TrackingID]string, k.MaxUndeliveredMessages)
k.checkpoints = make(map[string]checkpoint, k.MaxUndeliveredMessages)
k.sem = make(chan struct{}, k.MaxUndeliveredMessages)
ctx := context.Background()
ctx, k.cancel = context.WithCancel(ctx)
k.wg.Add(1)
go func() {
defer k.wg.Done()
k.onDelivery(ctx)
}()
k.wg.Add(1)
go func() {
defer k.wg.Done()
err := k.cons.Scan(ctx, func(r *consumer.Record) consumer.ScanStatus {
select {
case <-ctx.Done():
return consumer.ScanStatus{Error: ctx.Err()}
case k.sem <- struct{}{}:
break
}
err := k.onMessage(k.acc, r)
if err != nil {
k.sem <- struct{}{}
return consumer.ScanStatus{Error: err}
}
return consumer.ScanStatus{}
})
if err != nil {
k.cancel()
log.Printf("E! [inputs.kinesis_consumer] Scan encounterred an error - %s", err.Error())
k.cons = nil
}
}()
return nil
}
func (k *KinesisConsumer) Start(ac telegraf.Accumulator) error {
err := k.connect(ac)
if err != nil {
return err
}
return nil
}
func (k *KinesisConsumer) onMessage(acc telegraf.TrackingAccumulator, r *consumer.Record) error {
metrics, err := k.parser.Parse(r.Data)
if err != nil {
return err
}
k.recordsTex.Lock()
id := acc.AddTrackingMetricGroup(metrics)
k.records[id] = *r.SequenceNumber
k.recordsTex.Unlock()
return nil
}
func (k *KinesisConsumer) onDelivery(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case info := <-k.acc.Delivered():
k.recordsTex.Lock()
sequenceNum, ok := k.records[info.ID()]
if !ok {
k.recordsTex.Unlock()
continue
}
<-k.sem
delete(k.records, info.ID())
k.recordsTex.Unlock()
if info.Delivered() {
k.checkpointTex.Lock()
chk, ok := k.checkpoints[sequenceNum]
if !ok {
k.checkpointTex.Unlock()
continue
}
delete(k.checkpoints, sequenceNum)
k.checkpointTex.Unlock()
// at least once
if strToBint(sequenceNum).Cmp(k.lastSeqNum) > 0 {
continue
}
k.lastSeqNum = strToBint(sequenceNum)
k.checkpoint.Set(chk.streamName, chk.shardID, sequenceNum)
} else {
log.Println("D! [inputs.kinesis_consumer] Metric group failed to process")
}
}
}
}
var negOne *big.Int
func strToBint(s string) *big.Int {
n, ok := new(big.Int).SetString(s, 10)
if !ok {
return negOne
}
return n
}
func (k *KinesisConsumer) Stop() {
k.cancel()
k.wg.Wait()
}
func (k *KinesisConsumer) Gather(acc telegraf.Accumulator) error {
if k.cons == nil {
return k.connect(acc)
}
k.lastSeqNum = maxSeq
return nil
}
// Get wraps the checkpoint's Get function (called by consumer library)
func (k *KinesisConsumer) Get(streamName, shardID string) (string, error) {
return k.checkpoint.Get(streamName, shardID)
}
// Set wraps the checkpoint's Set function (called by consumer library)
func (k *KinesisConsumer) Set(streamName, shardID, sequenceNumber string) error {
if sequenceNumber == "" {
return fmt.Errorf("sequence number should not be empty")
}
k.checkpointTex.Lock()
k.checkpoints[sequenceNumber] = checkpoint{streamName: streamName, shardID: shardID}
k.checkpointTex.Unlock()
return nil
}
type noopCheckpoint struct{}
func (n noopCheckpoint) Set(string, string, string) error { return nil }
func (n noopCheckpoint) Get(string, string) (string, error) { return "", nil }
func init() {
negOne, _ = new(big.Int).SetString("-1", 10)
inputs.Add("kinesis_consumer", func() telegraf.Input {
return &KinesisConsumer{
ShardIteratorType: "TRIM_HORIZON",
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
lastSeqNum: maxSeq,
}
})
}

View File

@ -236,7 +236,7 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error {
if sz == 500 {
// Max Messages Per PutRecordRequest is 500
elapsed := writekinesis(k, r)
log.Printf("I! Wrote a %d point batch to Kinesis in %+v.", sz, elapsed)
log.Printf("D! Wrote a %d point batch to Kinesis in %+v.", sz, elapsed)
sz = 0
r = nil
}
@ -244,7 +244,7 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error {
}
if sz > 0 {
elapsed := writekinesis(k, r)
log.Printf("I! Wrote a %d point batch to Kinesis in %+v.", sz, elapsed)
log.Printf("D! Wrote a %d point batch to Kinesis in %+v.", sz, elapsed)
}
return nil