Use DescribeStreamSummary in place of ListStreams in kinesis output (#4864)
This commit is contained in:
parent
7cb75ca979
commit
136a5724bd
|
@ -145,7 +145,7 @@
|
|||
revision = "f2867c24984aa53edec54a138c03db934221bdea"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:65a05bde9b02f645c73afa61c9f6af92d94d726c81a268f45cc70218bd58de65"
|
||||
digest = "1:996727880e06dcf037f712c4d046e241d1b1b01844636fefb0fbaa480cfd230e"
|
||||
name = "github.com/aws/aws-sdk-go"
|
||||
packages = [
|
||||
"aws",
|
||||
|
@ -181,8 +181,8 @@
|
|||
"service/sts",
|
||||
]
|
||||
pruneopts = ""
|
||||
revision = "8cf662a972fa7fba8f2c1ec57648cf840e2bb401"
|
||||
version = "v1.14.30"
|
||||
revision = "bf8067ceb6e7f51e150c218972dccfeeed892b85"
|
||||
version = "v1.15.54"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
|
||||
[[constraint]]
|
||||
name = "github.com/aws/aws-sdk-go"
|
||||
version = "1.14.8"
|
||||
version = "1.15.54"
|
||||
|
||||
[[constraint]]
|
||||
name = "github.com/bsm/sarama-cluster"
|
||||
|
|
|
@ -2,7 +2,6 @@ package kinesis
|
|||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
|
@ -115,17 +114,11 @@ func (k *KinesisOutput) Description() string {
|
|||
return "Configuration for the AWS Kinesis output."
|
||||
}
|
||||
|
||||
func checkstream(l []*string, s string) bool {
|
||||
// Check if the StreamName exists in the slice returned from the ListStreams API request.
|
||||
for _, stream := range l {
|
||||
if *stream == s {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (k *KinesisOutput) Connect() error {
|
||||
if k.Partition == nil {
|
||||
log.Print("E! kinesis : Deprecated paritionkey configuration in use, please consider using outputs.kinesis.partition")
|
||||
}
|
||||
|
||||
// We attempt first to create a session to Kinesis using an IAMS role, if that fails it will fall through to using
|
||||
// environment variables, and then Shared Credentials.
|
||||
if k.Debug {
|
||||
|
@ -145,29 +138,10 @@ func (k *KinesisOutput) Connect() error {
|
|||
configProvider := credentialConfig.Credentials()
|
||||
svc := kinesis.New(configProvider)
|
||||
|
||||
KinesisParams := &kinesis.ListStreamsInput{
|
||||
Limit: aws.Int64(100),
|
||||
}
|
||||
|
||||
resp, err := svc.ListStreams(KinesisParams)
|
||||
|
||||
if err != nil {
|
||||
log.Printf("E! kinesis: Error in ListSteams API call : %+v \n", err)
|
||||
}
|
||||
|
||||
if checkstream(resp.StreamNames, k.StreamName) {
|
||||
if k.Debug {
|
||||
log.Printf("E! kinesis: Stream Exists")
|
||||
}
|
||||
k.svc = svc
|
||||
return nil
|
||||
} else {
|
||||
log.Printf("E! kinesis : You have configured a StreamName %+v which does not exist. exiting.", k.StreamName)
|
||||
os.Exit(1)
|
||||
}
|
||||
if k.Partition == nil {
|
||||
log.Print("E! kinesis : Deprecated paritionkey configuration in use, please consider using outputs.kinesis.partition")
|
||||
}
|
||||
_, err := svc.DescribeStreamSummary(&kinesis.DescribeStreamSummaryInput{
|
||||
StreamName: aws.String(k.StreamName),
|
||||
})
|
||||
k.svc = svc
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue