Improve the HTTP JSON plugin README with more examples.
This commit is contained in:
@@ -74,7 +74,11 @@ func (k *Kafka) Gather(acc plugins.Accumulator) error {
|
||||
k.Consumer.Close()
|
||||
}()
|
||||
|
||||
go readFromKafka(k.Consumer.Messages(), metricQueue, k.BatchSize, k.Consumer.CommitUpto, halt)
|
||||
go readFromKafka(k.Consumer.Messages(),
|
||||
metricQueue,
|
||||
k.BatchSize,
|
||||
k.Consumer.CommitUpto,
|
||||
halt)
|
||||
}
|
||||
|
||||
return emitMetrics(k, acc, metricQueue)
|
||||
@@ -105,7 +109,13 @@ const millisecond = 1000000 * time.Nanosecond
|
||||
|
||||
type ack func(*sarama.ConsumerMessage) error
|
||||
|
||||
func readFromKafka(kafkaMsgs <-chan *sarama.ConsumerMessage, metricProducer chan<- []byte, maxBatchSize int, ackMsg ack, halt <-chan bool) {
|
||||
func readFromKafka(
|
||||
kafkaMsgs <-chan *sarama.ConsumerMessage,
|
||||
metricProducer chan<- []byte,
|
||||
maxBatchSize int,
|
||||
ackMsg ack,
|
||||
halt <-chan bool,
|
||||
) {
|
||||
batch := make([]byte, 0)
|
||||
currentBatchSize := 0
|
||||
timeout := time.After(500 * millisecond)
|
||||
|
||||
Reference in New Issue
Block a user