add amazon kinesis as an output plugin

This commit is contained in:
James Lamb 2015-12-10 13:07:55 +11:00
parent d62e63c448
commit 158d82fdb9
4 changed files with 208 additions and 0 deletions

View File

@ -6,6 +6,7 @@ import (
_ "github.com/influxdb/telegraf/outputs/datadog" _ "github.com/influxdb/telegraf/outputs/datadog"
_ "github.com/influxdb/telegraf/outputs/influxdb" _ "github.com/influxdb/telegraf/outputs/influxdb"
_ "github.com/influxdb/telegraf/outputs/kafka" _ "github.com/influxdb/telegraf/outputs/kafka"
_ "github.com/influxdb/telegraf/outputs/kinesis"
_ "github.com/influxdb/telegraf/outputs/librato" _ "github.com/influxdb/telegraf/outputs/librato"
_ "github.com/influxdb/telegraf/outputs/mqtt" _ "github.com/influxdb/telegraf/outputs/mqtt"
_ "github.com/influxdb/telegraf/outputs/nsq" _ "github.com/influxdb/telegraf/outputs/nsq"

43
outputs/kinesis/README.md Normal file
View File

@ -0,0 +1,43 @@
## Amazon Kinesis Output for Telegraf
This is an experimental plugin that is still in the early stages of development. It will batch up all of the Points
in one Put request to Kinesis. This should save the number of API requests by a considerable level.
## About Kinesis
This is not the place to document all of the various Kinesis terms however it
maybe useful for users to review Amazons official docuementation which is available
[here](http://docs.aws.amazon.com/kinesis/latest/dev/key-concepts.html).
## Config
For this output plugin to function correctly the following variables must be configured.
* region
* streamname
* partitionkey
### region
The region is the Amazon region that you wish to connect to. Examples include but are not limited to
* us-west-1
* us-west-2
* us-east-1
* ap-southeast-1
* ap-southeast-2
### streamname
The streamname is used by the plugin to ensure that data is sent to the correct Kinesis stream. It is important to
note that the stream *MUST* be pre-configured for this plugin to function correctly.
### partitionkey
This is used to group data within a stream. Currently this plugin only supports a single partitionkey which means
that data will be entirely sent through a single Shard and Partition from a single host. If you have to scale out the
kinesis throughput using a different partition key on different hosts or host groups might be a workable solution.
## todo
Check if the stream exists so that we have a graceful exit.

View File

@ -0,0 +1,134 @@
package kinesis_output
import (
"errors"
"log"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/influxdb/influxdb/client/v2"
"github.com/influxdb/telegraf/outputs"
)
type KinesisOutput struct {
Region string `toml:"region"`
StreamName string `toml:"streamname"`
PartitionKey string `toml:"partitionkey"`
Format string `toml:"format"`
Debug bool `toml:"debug"`
svc *kinesis.Kinesis
}
var sampleConfig = `
# Amazon REGION of kinesis endpoint.
# Most AWS services have a region specific endpoint this will be used by
# telegraf to output data.
# Authentication is provided by an IAMS role, SharedCredentials or Environment
# Variables.
region = "ap-southeast-2"
# Kinesis StreamName must exist prior to starting telegraf.
streamname = "StreamName"
# PartitionKey as used for sharding data.
partitionkey = "PartitionKey"
# format of the Data payload in the kinesis PutRecord, supported
# String and Custom.
format = "string"
# debug will show upstream aws messages.
debug = false
`
func (k *KinesisOutput) SampleConfig() string {
return sampleConfig
}
func (k *KinesisOutput) Description() string {
return "Configuration for the AWS Kinesis output."
}
func (k *KinesisOutput) Connect() error {
// We attempt first to create a session to Kinesis using an IAMS role, if that fails it will fall through to using
// environment variables, and then Shared Credentials.
if k.Debug {
log.Printf("Establishing a connection to Kinesis in %+v", k.Region)
}
Config := &aws.Config{
Region: aws.String(k.Region),
Credentials: credentials.NewChainCredentials(
[]credentials.Provider{
&ec2rolecreds.EC2RoleProvider{Client: ec2metadata.New(session.New())},
&credentials.EnvProvider{},
&credentials.SharedCredentialsProvider{},
}),
}
svc := kinesis.New(session.New(Config))
k.svc = svc
return nil
}
func (k *KinesisOutput) Close() error {
return errors.New("Error")
}
func formatmetric(k *KinesisOutput, point *client.Point) (string, error) {
if k.Format == "string" {
return point.String(), nil
} else {
m := fmt.Sprintf("%+v,%+v,%+v %+v",
point.Name(),
point.Tags(),
point.String(),
point.Time(),
)
return m, nil
}
}
func (k *KinesisOutput) Write(points []*client.Point) error {
if len(points) == 0 {
return nil
}
r := []*kinesis.PutRecordsRequestEntry{}
for _, p := range points {
metric, _ := formatmetric(k, p)
d := kinesis.PutRecordsRequestEntry{
Data: []byte(metric),
PartitionKey: aws.String(k.PartitionKey),
}
r = append(r, &d)
}
payload := &kinesis.PutRecordsInput{
Records: r,
StreamName: aws.String(k.StreamName),
}
if k.Debug {
resp, err := k.svc.PutRecords(payload)
if err != nil {
log.Printf("Unable to write to Kinesis : %+v \n", err.Error())
}
log.Printf("%+v \n", resp)
} else {
_, err := k.svc.PutRecords(payload)
if err != nil {
log.Printf("Unable to write to Kinesis : %+v \n", err.Error())
}
}
return nil
}
func init() {
outputs.Add("kinesis_output", func() outputs.Output {
return &KinesisOutput{}
})
}

View File

@ -0,0 +1,30 @@
package kinesis_output
import (
"testing"
"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestConnectAndWrite(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
// Verify that we can connect kinesis endpoint. This test allows for a chain of credential
// so that various authentication methods can pass depending on the system that executes.
Config := &aws.Config{
Region: aws.String(k.Region),
Credentials: credentials.NewChainCredentials(
[]credentials.Provider{
&ec2rolecreds.EC2RoleProvider{Client: ec2metadata.New(session.New())},
&credentials.EnvProvider{},
&credentials.SharedCredentialsProvider{},
}),
}
err := kinesis.New(session.New(Config))
require.NoError(t, err)
}