Use numerical codes instead of symbolic ones
This commit is contained in:
parent
0cb4babab5
commit
e918bfbbec
|
@ -3,8 +3,6 @@ package kafka
|
||||||
import (
|
import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
@ -22,11 +20,11 @@ type Kafka struct {
|
||||||
// Routing Key Tag
|
// Routing Key Tag
|
||||||
RoutingTag string `toml:"routing_tag"`
|
RoutingTag string `toml:"routing_tag"`
|
||||||
// Compression Codec Tag
|
// Compression Codec Tag
|
||||||
CompressionCodec string
|
CompressionCodec int
|
||||||
// RequiredAcks Tag
|
// RequiredAcks Tag
|
||||||
RequiredAcks string
|
RequiredAcks int
|
||||||
// MaxRetry Tag
|
// MaxRetry Tag
|
||||||
MaxRetry string
|
MaxRetry int
|
||||||
|
|
||||||
// Legacy SSL config options
|
// Legacy SSL config options
|
||||||
// TLS client certificate
|
// TLS client certificate
|
||||||
|
@ -61,20 +59,20 @@ var sampleConfig = `
|
||||||
## ie, if this tag exists, it's value will be used as the routing key
|
## ie, if this tag exists, it's value will be used as the routing key
|
||||||
routing_tag = "host"
|
routing_tag = "host"
|
||||||
|
|
||||||
## CompressionCodec represents the various compression codecs recognized by Kafka in messages.
|
## CompressionCodec represents the various compression codecs recognized by Kafka in messages.
|
||||||
## "none" : No compression
|
## 0 : No compression
|
||||||
## "gzip" : Gzip compression
|
## 1 : Gzip compression
|
||||||
## "snappy" : Snappy compression
|
## 2 : Snappy compression
|
||||||
# compression_codec = "none"
|
compression_codec = 0
|
||||||
|
|
||||||
## RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements it must see before responding
|
## RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements it must see before responding
|
||||||
## "none" : the producer never waits for an acknowledgement from the broker. This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails).
|
## 0 : the producer never waits for an acknowledgement from the broker. This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails).
|
||||||
## "leader" : the producer gets an acknowledgement after the leader replica has received the data. This option provides better durability as the client waits until the server acknowledges the request as successful (only messages that were written to the now-dead leader but not yet replicated will be lost).
|
## 1 : the producer gets an acknowledgement after the leader replica has received the data. This option provides better durability as the client waits until the server acknowledges the request as successful (only messages that were written to the now-dead leader but not yet replicated will be lost).
|
||||||
## "leader_and_replicas" : the producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the best durability, we guarantee that no messages will be lost as long as at least one in sync replica remains.
|
## -1 : the producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the best durability, we guarantee that no messages will be lost as long as at least one in sync replica remains.
|
||||||
# required_acks = "leader_and_replicas"
|
required_acks = -1
|
||||||
|
|
||||||
## The total number of times to retry sending a message
|
## The total number of times to retry sending a message
|
||||||
# max_retry = "3"
|
max_retry = 3
|
||||||
|
|
||||||
## Optional SSL Config
|
## Optional SSL Config
|
||||||
# ssl_ca = "/etc/telegraf/ca.pem"
|
# ssl_ca = "/etc/telegraf/ca.pem"
|
||||||
|
@ -94,66 +92,12 @@ func (k *Kafka) SetSerializer(serializer serializers.Serializer) {
|
||||||
k.serializer = serializer
|
k.serializer = serializer
|
||||||
}
|
}
|
||||||
|
|
||||||
func requiredAcks(value string) (sarama.RequiredAcks, error) {
|
|
||||||
switch strings.ToLower(value) {
|
|
||||||
case "none":
|
|
||||||
return sarama.NoResponse, nil
|
|
||||||
case "leader":
|
|
||||||
return sarama.WaitForLocal, nil
|
|
||||||
case "", "leader_and_replicas":
|
|
||||||
return sarama.WaitForAll, nil
|
|
||||||
default:
|
|
||||||
return 0, fmt.Errorf("Failed to recognize required_acks: %s", value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func compressionCodec(value string) (sarama.CompressionCodec, error) {
|
|
||||||
switch strings.ToLower(value) {
|
|
||||||
case "gzip":
|
|
||||||
return sarama.CompressionGZIP, nil
|
|
||||||
case "snappy":
|
|
||||||
return sarama.CompressionSnappy, nil
|
|
||||||
case "", "none":
|
|
||||||
return sarama.CompressionNone, nil
|
|
||||||
default:
|
|
||||||
return 0, fmt.Errorf("Failed to recognize compression_codec: %s", value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func maxRetry(value string) (int, error) {
|
|
||||||
if value == "" {
|
|
||||||
return 3, nil
|
|
||||||
}
|
|
||||||
maxRetry, err := strconv.Atoi(value)
|
|
||||||
if err != nil {
|
|
||||||
return -1, fmt.Errorf("Failed to parse max_retry: %s", value)
|
|
||||||
}
|
|
||||||
if maxRetry < 0 {
|
|
||||||
return -1, fmt.Errorf("max_retry is %s but it should not be negative", value)
|
|
||||||
}
|
|
||||||
return maxRetry, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (k *Kafka) Connect() error {
|
func (k *Kafka) Connect() error {
|
||||||
config := sarama.NewConfig()
|
config := sarama.NewConfig()
|
||||||
|
|
||||||
requiredAcks, err := requiredAcks(k.RequiredAcks)
|
config.Producer.RequiredAcks = sarama.RequiredAcks(k.RequiredAcks)
|
||||||
if err != nil {
|
config.Producer.Compression = sarama.CompressionCodec(k.CompressionCodec)
|
||||||
return err
|
config.Producer.Retry.Max = k.MaxRetry
|
||||||
}
|
|
||||||
config.Producer.RequiredAcks = requiredAcks
|
|
||||||
|
|
||||||
compressionCodec, err := compressionCodec(k.CompressionCodec)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
config.Producer.Compression = compressionCodec
|
|
||||||
|
|
||||||
maxRetry, err := maxRetry(k.MaxRetry)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
config.Producer.Retry.Max = maxRetry
|
|
||||||
|
|
||||||
// Legacy support ssl config
|
// Legacy support ssl config
|
||||||
if k.Certificate != "" {
|
if k.Certificate != "" {
|
||||||
|
|
Loading…
Reference in New Issue