adds functionality that will exit if the straem does not exist

This commit is contained in:
James Lamb 2015-12-11 09:04:28 +11:00
parent 2cb66ad02c
commit d2737fc5d0
1 changed files with 35 additions and 9 deletions

View File

@ -4,6 +4,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"log" "log"
"os"
"sync/atomic" "sync/atomic"
"time" "time"
@ -29,10 +30,6 @@ type KinesisOutput struct {
var sampleConfig = ` var sampleConfig = `
# Amazon REGION of kinesis endpoint. # Amazon REGION of kinesis endpoint.
# Most AWS services have a region specific endpoint this will be used by
# telegraf to output data.
# Authentication is provided by an IAMS role, SharedCredentials or Environment
# Variables.
region = "ap-southeast-2" region = "ap-southeast-2"
# Kinesis StreamName must exist prior to starting telegraf. # Kinesis StreamName must exist prior to starting telegraf.
streamname = "StreamName" streamname = "StreamName"
@ -53,11 +50,21 @@ func (k *KinesisOutput) Description() string {
return "Configuration for the AWS Kinesis output." return "Configuration for the AWS Kinesis output."
} }
func checkstream(l []*string, s string) bool {
// Check if the StreamName exists in the slice returned from the ListStreams API request.
for _, stream := range l {
if *stream == s {
return true
}
}
return false
}
func (k *KinesisOutput) Connect() error { func (k *KinesisOutput) Connect() error {
// We attempt first to create a session to Kinesis using an IAMS role, if that fails it will fall through to using // We attempt first to create a session to Kinesis using an IAMS role, if that fails it will fall through to using
// environment variables, and then Shared Credentials. // environment variables, and then Shared Credentials.
if k.Debug { if k.Debug {
log.Printf("Establishing a connection to Kinesis in %+v", k.Region) log.Printf("kinesis_output: Establishing a connection to Kinesis in %+v", k.Region)
} }
Config := &aws.Config{ Config := &aws.Config{
Region: aws.String(k.Region), Region: aws.String(k.Region),
@ -70,8 +77,27 @@ func (k *KinesisOutput) Connect() error {
} }
svc := kinesis.New(session.New(Config)) svc := kinesis.New(session.New(Config))
KinesisParams := &kinesis.ListStreamsInput{
Limit: aws.Int64(100),
}
resp, err := svc.ListStreams(KinesisParams)
if err != nil {
log.Printf("kinesis_output: Error in ListSteams API call : %+v \n", err)
}
if checkstream(resp.StreamNames, k.StreamName) {
if k.Debug {
log.Printf("kinesis_output: Stream Exists")
}
k.svc = svc k.svc = svc
return nil return nil
} else {
log.Printf("kinesis_output : You have configured a StreamName %+v which does not exist. exiting.", k.StreamName)
os.Exit(1)
}
return err
} }
func (k *KinesisOutput) Close() error { func (k *KinesisOutput) Close() error {
@ -102,14 +128,14 @@ func writekinesis(k *KinesisOutput, r []*kinesis.PutRecordsRequestEntry) time.Du
if k.Debug { if k.Debug {
resp, err := k.svc.PutRecords(payload) resp, err := k.svc.PutRecords(payload)
if err != nil { if err != nil {
log.Printf("Unable to write to Kinesis : %+v \n", err.Error()) log.Printf("kinesis_output: Unable to write to Kinesis : %+v \n", err.Error())
} }
log.Printf("%+v \n", resp) log.Printf("%+v \n", resp)
} else { } else {
_, err := k.svc.PutRecords(payload) _, err := k.svc.PutRecords(payload)
if err != nil { if err != nil {
log.Printf("Unable to write to Kinesis : %+v \n", err.Error()) log.Printf("kinesis_output: Unable to write to Kinesis : %+v \n", err.Error())
} }
} }
return time.Since(start) return time.Since(start)