rename kinesis_output to kinesis

This commit is contained in:
James Lamb 2015-12-11 14:01:26 +11:00
parent d5d785654b
commit c23a261dbf
4 changed files with 62 additions and 87 deletions

View File

@ -46,3 +46,16 @@ plugin will result in telegraf exiting with an exit code of 1.
This is used to group data within a stream. Currently this plugin only supports a single partitionkey.
Manually configuring different hosts, or groups of hosts with manually selected partitionkeys might be a workable
solution to scale out.
### format
The format configuration value has been designated to allow people to change the format of the Point as written to
Kinesis. Right now there are two supported formats string and custom.
#### string
String is defined using the default Point.String() value and translated to []byte for the Kinesis stream.
#### custom
Custom is a string defined by a number of values in the FormatMetric() function.

View File

@ -1,4 +1,4 @@
package kinesis_output
package kinesis
import (
"errors"
@ -64,7 +64,7 @@ func (k *KinesisOutput) Connect() error {
// We attempt first to create a session to Kinesis using an IAMS role, if that fails it will fall through to using
// environment variables, and then Shared Credentials.
if k.Debug {
log.Printf("kinesis_output: Establishing a connection to Kinesis in %+v", k.Region)
log.Printf("kinesis: Establishing a connection to Kinesis in %+v", k.Region)
}
Config := &aws.Config{
Region: aws.String(k.Region),
@ -84,17 +84,17 @@ func (k *KinesisOutput) Connect() error {
resp, err := svc.ListStreams(KinesisParams)
if err != nil {
log.Printf("kinesis_output: Error in ListSteams API call : %+v \n", err)
log.Printf("kinesis: Error in ListSteams API call : %+v \n", err)
}
if checkstream(resp.StreamNames, k.StreamName) {
if k.Debug {
log.Printf("kinesis_output: Stream Exists")
log.Printf("kinesis: Stream Exists")
}
k.svc = svc
return nil
} else {
log.Printf("kinesis_output : You have configured a StreamName %+v which does not exist. exiting.", k.StreamName)
log.Printf("kinesis : You have configured a StreamName %+v which does not exist. exiting.", k.StreamName)
os.Exit(1)
}
return err
@ -126,14 +126,14 @@ func writekinesis(k *KinesisOutput, r []*kinesis.PutRecordsRequestEntry) time.Du
if k.Debug {
resp, err := k.svc.PutRecords(payload)
if err != nil {
log.Printf("kinesis_output: Unable to write to Kinesis : %+v \n", err.Error())
log.Printf("kinesis: Unable to write to Kinesis : %+v \n", err.Error())
}
log.Printf("%+v \n", resp)
} else {
_, err := k.svc.PutRecords(payload)
if err != nil {
log.Printf("kinesis_output: Unable to write to Kinesis : %+v \n", err.Error())
log.Printf("kinesis: Unable to write to Kinesis : %+v \n", err.Error())
}
}
return time.Since(start)
@ -173,7 +173,7 @@ func (k *KinesisOutput) Write(points []*client.Point) error {
}
func init() {
outputs.Add("kinesis_output", func() outputs.Output {
outputs.Add("kinesis", func() outputs.Output {
return &KinesisOutput{}
})
}

View File

@ -1,78 +0,0 @@
package kinesis_output
import (
"testing"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestConnectAndWrite(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
k := &KinesisOutput{
Region: "us-west-2",
}
// Verify that we can connect kinesis endpoint. This test allows for a chain of credential
// so that various authentication methods can pass depending on the system that executes.
Config := &aws.Config{
Region: aws.String(k.Region),
Credentials: credentials.NewChainCredentials(
[]credentials.Provider{
&ec2rolecreds.EC2RoleProvider{Client: ec2metadata.New(session.New())},
&credentials.EnvProvider{},
&credentials.SharedCredentialsProvider{},
}),
}
svc := kinesis.New(session.New(Config))
KinesisParams := &kinesis.ListStreamsInput{
Limit: aws.Int64(1)}
_, err := svc.ListStreams(KinesisParams)
if err != nil {
t.Error("Unable to connect to Kinesis")
}
require.NoError(t, err)
}
func TestFormatMetric(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
k := &KinesisOutput{
Format: "string",
}
p := testutil.MockBatchPoints().Points()[0]
valid_string := "test1,tag1=value1 value=1 1257894000000000000"
func_string, err := FormatMetric(k, p)
if func_string != valid_string {
t.Error("Expected ", valid_string)
}
require.NoError(t, err)
k = &KinesisOutput{
Format: "custom",
}
valid_custom := "test1,map[tag1:value1],test1,tag1=value1 value=1 1257894000000000000"
func_custom, err := FormatMetric(k, p)
if func_custom != valid_custom {
t.Error("Expected ", valid_custom)
}
require.NoError(t, err)
}

View File

@ -0,0 +1,40 @@
package kinesis
import (
"testing"
"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestFormatMetric(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
k := &KinesisOutput{
Format: "string",
}
p := testutil.MockBatchPoints().Points()[0]
valid_string := "test1,tag1=value1 value=1 1257894000000000000"
func_string, err := FormatMetric(k, p)
if func_string != valid_string {
t.Error("Expected ", valid_string)
}
require.NoError(t, err)
k = &KinesisOutput{
Format: "custom",
}
valid_custom := "test1,map[tag1:value1],test1,tag1=value1 value=1 1257894000000000000"
func_custom, err := FormatMetric(k, p)
if func_custom != valid_custom {
t.Error("Expected ", valid_custom)
}
require.NoError(t, err)
}