From d2737fc5d00cab636b1c994cda6c1a9cade3fa07 Mon Sep 17 00:00:00 2001 From: James Lamb Date: Fri, 11 Dec 2015 09:04:28 +1100 Subject: [PATCH] adds functionality that will exit if the straem does not exist --- outputs/kinesis/kinesis_output.go | 44 ++++++++++++++++++++++++------- 1 file changed, 35 insertions(+), 9 deletions(-) diff --git a/outputs/kinesis/kinesis_output.go b/outputs/kinesis/kinesis_output.go index 6d0544290..739062f23 100644 --- a/outputs/kinesis/kinesis_output.go +++ b/outputs/kinesis/kinesis_output.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "log" + "os" "sync/atomic" "time" @@ -29,10 +30,6 @@ type KinesisOutput struct { var sampleConfig = ` # Amazon REGION of kinesis endpoint. - # Most AWS services have a region specific endpoint this will be used by - # telegraf to output data. - # Authentication is provided by an IAMS role, SharedCredentials or Environment - # Variables. region = "ap-southeast-2" # Kinesis StreamName must exist prior to starting telegraf. streamname = "StreamName" @@ -53,11 +50,21 @@ func (k *KinesisOutput) Description() string { return "Configuration for the AWS Kinesis output." } +func checkstream(l []*string, s string) bool { + // Check if the StreamName exists in the slice returned from the ListStreams API request. + for _, stream := range l { + if *stream == s { + return true + } + } + return false +} + func (k *KinesisOutput) Connect() error { // We attempt first to create a session to Kinesis using an IAMS role, if that fails it will fall through to using // environment variables, and then Shared Credentials. if k.Debug { - log.Printf("Establishing a connection to Kinesis in %+v", k.Region) + log.Printf("kinesis_output: Establishing a connection to Kinesis in %+v", k.Region) } Config := &aws.Config{ Region: aws.String(k.Region), @@ -70,8 +77,27 @@ func (k *KinesisOutput) Connect() error { } svc := kinesis.New(session.New(Config)) - k.svc = svc - return nil + KinesisParams := &kinesis.ListStreamsInput{ + Limit: aws.Int64(100), + } + + resp, err := svc.ListStreams(KinesisParams) + + if err != nil { + log.Printf("kinesis_output: Error in ListSteams API call : %+v \n", err) + } + + if checkstream(resp.StreamNames, k.StreamName) { + if k.Debug { + log.Printf("kinesis_output: Stream Exists") + } + k.svc = svc + return nil + } else { + log.Printf("kinesis_output : You have configured a StreamName %+v which does not exist. exiting.", k.StreamName) + os.Exit(1) + } + return err } func (k *KinesisOutput) Close() error { @@ -102,14 +128,14 @@ func writekinesis(k *KinesisOutput, r []*kinesis.PutRecordsRequestEntry) time.Du if k.Debug { resp, err := k.svc.PutRecords(payload) if err != nil { - log.Printf("Unable to write to Kinesis : %+v \n", err.Error()) + log.Printf("kinesis_output: Unable to write to Kinesis : %+v \n", err.Error()) } log.Printf("%+v \n", resp) } else { _, err := k.svc.PutRecords(payload) if err != nil { - log.Printf("Unable to write to Kinesis : %+v \n", err.Error()) + log.Printf("kinesis_output: Unable to write to Kinesis : %+v \n", err.Error()) } } return time.Since(start)