Add burrow input plugin (#3489)
This commit is contained in:
parent
a86c2d54ad
commit
fd22b1ef1f
|
@ -132,6 +132,7 @@ configuration options.
|
||||||
* [bcache](./plugins/inputs/bcache)
|
* [bcache](./plugins/inputs/bcache)
|
||||||
* [bond](./plugins/inputs/bond)
|
* [bond](./plugins/inputs/bond)
|
||||||
* [cassandra](./plugins/inputs/cassandra) (deprecated, use [jolokia2](./plugins/inputs/jolokia2))
|
* [cassandra](./plugins/inputs/cassandra) (deprecated, use [jolokia2](./plugins/inputs/jolokia2))
|
||||||
|
* [burrow](./plugins/inputs/burrow)
|
||||||
* [ceph](./plugins/inputs/ceph)
|
* [ceph](./plugins/inputs/ceph)
|
||||||
* [cgroup](./plugins/inputs/cgroup)
|
* [cgroup](./plugins/inputs/cgroup)
|
||||||
* [chrony](./plugins/inputs/chrony)
|
* [chrony](./plugins/inputs/chrony)
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/aurora"
|
_ "github.com/influxdata/telegraf/plugins/inputs/aurora"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/bcache"
|
_ "github.com/influxdata/telegraf/plugins/inputs/bcache"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/bond"
|
_ "github.com/influxdata/telegraf/plugins/inputs/bond"
|
||||||
|
_ "github.com/influxdata/telegraf/plugins/inputs/burrow"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/cassandra"
|
_ "github.com/influxdata/telegraf/plugins/inputs/cassandra"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/ceph"
|
_ "github.com/influxdata/telegraf/plugins/inputs/ceph"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/cgroup"
|
_ "github.com/influxdata/telegraf/plugins/inputs/cgroup"
|
||||||
|
|
|
@ -0,0 +1,98 @@
|
||||||
|
# Telegraf Plugin: Burrow
|
||||||
|
|
||||||
|
Collect Kafka topic, consumer and partition status
|
||||||
|
via [Burrow](https://github.com/linkedin/Burrow) HTTP [API](https://github.com/linkedin/Burrow/wiki/HTTP-Endpoint).
|
||||||
|
|
||||||
|
Supported Burrow version: `1.x`
|
||||||
|
|
||||||
|
### Configuration
|
||||||
|
|
||||||
|
```
|
||||||
|
## Burrow API endpoints in format "schema://host:port".
|
||||||
|
## Default is "http://localhost:8000".
|
||||||
|
servers = ["http://localhost:8000"]
|
||||||
|
|
||||||
|
## Override Burrow API prefix.
|
||||||
|
## Useful when Burrow is behind reverse-proxy.
|
||||||
|
# api_prefix = "/v3/kafka"
|
||||||
|
|
||||||
|
## Maximum time to receive response.
|
||||||
|
# response_timeout = "5s"
|
||||||
|
|
||||||
|
## Limit per-server concurrent connections.
|
||||||
|
## Useful in case of large number of topics or consumer groups.
|
||||||
|
# concurrent_connections = 20
|
||||||
|
|
||||||
|
## Filter clusters, default is no filtering.
|
||||||
|
## Values can be specified as glob patterns.
|
||||||
|
# clusters_include = []
|
||||||
|
# clusters_exclude = []
|
||||||
|
|
||||||
|
## Filter consumer groups, default is no filtering.
|
||||||
|
## Values can be specified as glob patterns.
|
||||||
|
# groups_include = []
|
||||||
|
# groups_exclude = []
|
||||||
|
|
||||||
|
## Filter topics, default is no filtering.
|
||||||
|
## Values can be specified as glob patterns.
|
||||||
|
# topics_include = []
|
||||||
|
# topics_exclude = []
|
||||||
|
|
||||||
|
## Credentials for basic HTTP authentication.
|
||||||
|
# username = ""
|
||||||
|
# password = ""
|
||||||
|
|
||||||
|
## Optional SSL config
|
||||||
|
# ssl_ca = "/etc/telegraf/ca.pem"
|
||||||
|
# ssl_cert = "/etc/telegraf/cert.pem"
|
||||||
|
# ssl_key = "/etc/telegraf/key.pem"
|
||||||
|
# insecure_skip_verify = false
|
||||||
|
```
|
||||||
|
|
||||||
|
### Partition Status mappings
|
||||||
|
|
||||||
|
* `OK` = 1
|
||||||
|
* `NOT_FOUND` = 2
|
||||||
|
* `WARN` = 3
|
||||||
|
* `ERR` = 4
|
||||||
|
* `STOP` = 5
|
||||||
|
* `STALL` = 6
|
||||||
|
|
||||||
|
> unknown value will be mapped to 0
|
||||||
|
|
||||||
|
### Fields
|
||||||
|
|
||||||
|
* `burrow_group` (one event per each consumer group)
|
||||||
|
- status (string, see Partition Status mappings)
|
||||||
|
- status_code (int, `1..6`, see Partition status mappings)
|
||||||
|
- parition_count (int, `number of partitions`)
|
||||||
|
- total_lag (int64, `totallag`)
|
||||||
|
- lag (int64, `maxlag.current_lag || 0`)
|
||||||
|
|
||||||
|
* `burrow_partition` (one event per each topic partition)
|
||||||
|
- status (string, see Partition Status mappings)
|
||||||
|
- status_code (int, `1..6`, see Partition status mappings)
|
||||||
|
- lag (int64, `current_lag || 0`)
|
||||||
|
- offset (int64, `end.timestamp`)
|
||||||
|
- timestamp (int64, `end.timestamp`)
|
||||||
|
|
||||||
|
* `burrow_topic` (one event per topic offset)
|
||||||
|
- offset (int64)
|
||||||
|
|
||||||
|
|
||||||
|
### Tags
|
||||||
|
|
||||||
|
* `burrow_group`
|
||||||
|
- cluster (string)
|
||||||
|
- group (string)
|
||||||
|
|
||||||
|
* `burrow_partition`
|
||||||
|
- cluster (string)
|
||||||
|
- group (string)
|
||||||
|
- topic (string)
|
||||||
|
- partition (int)
|
||||||
|
|
||||||
|
* `burrow_topic`
|
||||||
|
- cluster (string)
|
||||||
|
- topic (string)
|
||||||
|
- partition (int)
|
|
@ -0,0 +1,485 @@
|
||||||
|
package burrow
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/filter"
|
||||||
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
"github.com/influxdata/telegraf/internal/tls"
|
||||||
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultBurrowPrefix = "/v3/kafka"
|
||||||
|
defaultConcurrentConnections = 20
|
||||||
|
defaultResponseTimeout = time.Second * 5
|
||||||
|
defaultServer = "http://localhost:8000"
|
||||||
|
)
|
||||||
|
|
||||||
|
const configSample = `
|
||||||
|
## Burrow API endpoints in format "schema://host:port".
|
||||||
|
## Default is "http://localhost:8000".
|
||||||
|
servers = ["http://localhost:8000"]
|
||||||
|
|
||||||
|
## Override Burrow API prefix.
|
||||||
|
## Useful when Burrow is behind reverse-proxy.
|
||||||
|
# api_prefix = "/v3/kafka"
|
||||||
|
|
||||||
|
## Maximum time to receive response.
|
||||||
|
# response_timeout = "5s"
|
||||||
|
|
||||||
|
## Limit per-server concurrent connections.
|
||||||
|
## Useful in case of large number of topics or consumer groups.
|
||||||
|
# concurrent_connections = 20
|
||||||
|
|
||||||
|
## Filter clusters, default is no filtering.
|
||||||
|
## Values can be specified as glob patterns.
|
||||||
|
# clusters_include = []
|
||||||
|
# clusters_exclude = []
|
||||||
|
|
||||||
|
## Filter consumer groups, default is no filtering.
|
||||||
|
## Values can be specified as glob patterns.
|
||||||
|
# groups_include = []
|
||||||
|
# groups_exclude = []
|
||||||
|
|
||||||
|
## Filter topics, default is no filtering.
|
||||||
|
## Values can be specified as glob patterns.
|
||||||
|
# topics_include = []
|
||||||
|
# topics_exclude = []
|
||||||
|
|
||||||
|
## Credentials for basic HTTP authentication.
|
||||||
|
# username = ""
|
||||||
|
# password = ""
|
||||||
|
|
||||||
|
## Optional SSL config
|
||||||
|
# ssl_ca = "/etc/telegraf/ca.pem"
|
||||||
|
# ssl_cert = "/etc/telegraf/cert.pem"
|
||||||
|
# ssl_key = "/etc/telegraf/key.pem"
|
||||||
|
# insecure_skip_verify = false
|
||||||
|
`
|
||||||
|
|
||||||
|
type (
|
||||||
|
burrow struct {
|
||||||
|
tls.ClientConfig
|
||||||
|
|
||||||
|
Servers []string
|
||||||
|
Username string
|
||||||
|
Password string
|
||||||
|
ResponseTimeout internal.Duration
|
||||||
|
ConcurrentConnections int
|
||||||
|
|
||||||
|
APIPrefix string `toml:"api_prefix"`
|
||||||
|
ClustersExclude []string
|
||||||
|
ClustersInclude []string
|
||||||
|
GroupsExclude []string
|
||||||
|
GroupsInclude []string
|
||||||
|
TopicsExclude []string
|
||||||
|
TopicsInclude []string
|
||||||
|
|
||||||
|
client *http.Client
|
||||||
|
filterClusters filter.Filter
|
||||||
|
filterGroups filter.Filter
|
||||||
|
filterTopics filter.Filter
|
||||||
|
}
|
||||||
|
|
||||||
|
// response
|
||||||
|
apiResponse struct {
|
||||||
|
Clusters []string `json:"clusters"`
|
||||||
|
Groups []string `json:"consumers"`
|
||||||
|
Topics []string `json:"topics"`
|
||||||
|
Offsets []int64 `json:"offsets"`
|
||||||
|
Status apiStatusResponse `json:"status"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// response: status field
|
||||||
|
apiStatusResponse struct {
|
||||||
|
Partitions []apiStatusResponseLag `json:"partitions"`
|
||||||
|
Status string `json:"status"`
|
||||||
|
PartitionCount int `json:"partition_count"`
|
||||||
|
Maxlag *apiStatusResponseLag `json:"maxlag"`
|
||||||
|
TotalLag int64 `json:"totallag"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// response: lag field
|
||||||
|
apiStatusResponseLag struct {
|
||||||
|
Topic string `json:"topic"`
|
||||||
|
Partition int32 `json:"partition"`
|
||||||
|
Status string `json:"status"`
|
||||||
|
Start apiStatusResponseLagItem `json:"start"`
|
||||||
|
End apiStatusResponseLagItem `json:"end"`
|
||||||
|
CurrentLag int64 `json:"current_lag"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// response: lag field item
|
||||||
|
apiStatusResponseLagItem struct {
|
||||||
|
Offset int64 `json:"offset"`
|
||||||
|
Timestamp int64 `json:"timestamp"`
|
||||||
|
Lag int64 `json:"lag"`
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
inputs.Add("burrow", func() telegraf.Input {
|
||||||
|
return &burrow{}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *burrow) SampleConfig() string {
|
||||||
|
return configSample
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *burrow) Description() string {
|
||||||
|
return "Collect Kafka topics and consumers status from Burrow HTTP API."
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *burrow) Gather(acc telegraf.Accumulator) error {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
if len(b.Servers) == 0 {
|
||||||
|
b.Servers = []string{defaultServer}
|
||||||
|
}
|
||||||
|
|
||||||
|
if b.client == nil {
|
||||||
|
b.setDefaults()
|
||||||
|
if err := b.compileGlobs(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
c, err := b.createClient()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
b.client = c
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, addr := range b.Servers {
|
||||||
|
u, err := url.Parse(addr)
|
||||||
|
if err != nil {
|
||||||
|
acc.AddError(fmt.Errorf("unable to parse address '%s': %s", addr, err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if u.Path == "" {
|
||||||
|
u.Path = b.APIPrefix
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func(u *url.URL) {
|
||||||
|
defer wg.Done()
|
||||||
|
acc.AddError(b.gatherServer(u, acc))
|
||||||
|
}(u)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *burrow) setDefaults() {
|
||||||
|
if b.APIPrefix == "" {
|
||||||
|
b.APIPrefix = defaultBurrowPrefix
|
||||||
|
}
|
||||||
|
if b.ConcurrentConnections < 1 {
|
||||||
|
b.ConcurrentConnections = defaultConcurrentConnections
|
||||||
|
}
|
||||||
|
if b.ResponseTimeout.Duration < time.Second {
|
||||||
|
b.ResponseTimeout = internal.Duration{
|
||||||
|
Duration: defaultResponseTimeout,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *burrow) compileGlobs() error {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
// compile glob patterns
|
||||||
|
b.filterClusters, err = filter.NewIncludeExcludeFilter(b.ClustersInclude, b.ClustersExclude)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
b.filterGroups, err = filter.NewIncludeExcludeFilter(b.GroupsInclude, b.GroupsExclude)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
b.filterTopics, err = filter.NewIncludeExcludeFilter(b.TopicsInclude, b.TopicsExclude)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *burrow) createClient() (*http.Client, error) {
|
||||||
|
tlsCfg, err := b.ClientConfig.TLSConfig()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
client := &http.Client{
|
||||||
|
Transport: &http.Transport{
|
||||||
|
TLSClientConfig: tlsCfg,
|
||||||
|
},
|
||||||
|
Timeout: b.ResponseTimeout.Duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
return client, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *burrow) getResponse(u *url.URL) (*apiResponse, error) {
|
||||||
|
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if b.Username != "" {
|
||||||
|
req.SetBasicAuth(b.Username, b.Password)
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := b.client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
defer res.Body.Close()
|
||||||
|
if res.StatusCode != http.StatusOK {
|
||||||
|
return nil, fmt.Errorf("wrong response: %d", res.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
ares := &apiResponse{}
|
||||||
|
dec := json.NewDecoder(res.Body)
|
||||||
|
|
||||||
|
return ares, dec.Decode(ares)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *burrow) gatherServer(src *url.URL, acc telegraf.Accumulator) error {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
r, err := b.getResponse(src)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
guard := make(chan struct{}, b.ConcurrentConnections)
|
||||||
|
for _, cluster := range r.Clusters {
|
||||||
|
if !b.filterClusters.Match(cluster) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func(cluster string) {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
// fetch topic list
|
||||||
|
// endpoint: <api_prefix>/(cluster)/topic
|
||||||
|
ut := appendPathToURL(src, cluster, "topic")
|
||||||
|
b.gatherTopics(guard, ut, cluster, acc)
|
||||||
|
}(cluster)
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func(cluster string) {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
// fetch consumer group list
|
||||||
|
// endpoint: <api_prefix>/(cluster)/consumer
|
||||||
|
uc := appendPathToURL(src, cluster, "consumer")
|
||||||
|
b.gatherGroups(guard, uc, cluster, acc)
|
||||||
|
}(cluster)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *burrow) gatherTopics(guard chan struct{}, src *url.URL, cluster string, acc telegraf.Accumulator) {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
r, err := b.getResponse(src)
|
||||||
|
if err != nil {
|
||||||
|
acc.AddError(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, topic := range r.Topics {
|
||||||
|
if !b.filterTopics.Match(topic) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
guard <- struct{}{}
|
||||||
|
wg.Add(1)
|
||||||
|
|
||||||
|
go func(topic string) {
|
||||||
|
defer func() {
|
||||||
|
<-guard
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
|
// fetch topic offsets
|
||||||
|
// endpoint: <api_prefix>/<cluster>/topic/<topic>
|
||||||
|
tu := appendPathToURL(src, topic)
|
||||||
|
tr, err := b.getResponse(tu)
|
||||||
|
if err != nil {
|
||||||
|
acc.AddError(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
b.genTopicMetrics(tr, cluster, topic, acc)
|
||||||
|
}(topic)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *burrow) genTopicMetrics(r *apiResponse, cluster, topic string, acc telegraf.Accumulator) {
|
||||||
|
for i, offset := range r.Offsets {
|
||||||
|
tags := map[string]string{
|
||||||
|
"cluster": cluster,
|
||||||
|
"topic": topic,
|
||||||
|
"partition": strconv.Itoa(i),
|
||||||
|
}
|
||||||
|
|
||||||
|
acc.AddFields(
|
||||||
|
"burrow_topic",
|
||||||
|
map[string]interface{}{
|
||||||
|
"offset": offset,
|
||||||
|
},
|
||||||
|
tags,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *burrow) gatherGroups(guard chan struct{}, src *url.URL, cluster string, acc telegraf.Accumulator) {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
r, err := b.getResponse(src)
|
||||||
|
if err != nil {
|
||||||
|
acc.AddError(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, group := range r.Groups {
|
||||||
|
if !b.filterGroups.Match(group) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
guard <- struct{}{}
|
||||||
|
wg.Add(1)
|
||||||
|
|
||||||
|
go func(group string) {
|
||||||
|
defer func() {
|
||||||
|
<-guard
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
|
// fetch consumer group status
|
||||||
|
// endpoint: <api_prefix>/<cluster>/consumer/<group>/lag
|
||||||
|
gl := appendPathToURL(src, group, "lag")
|
||||||
|
gr, err := b.getResponse(gl)
|
||||||
|
if err != nil {
|
||||||
|
acc.AddError(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
b.genGroupStatusMetrics(gr, cluster, group, acc)
|
||||||
|
b.genGroupLagMetrics(gr, cluster, group, acc)
|
||||||
|
}(group)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *burrow) genGroupStatusMetrics(r *apiResponse, cluster, group string, acc telegraf.Accumulator) {
|
||||||
|
partitionCount := r.Status.PartitionCount
|
||||||
|
if partitionCount == 0 {
|
||||||
|
partitionCount = len(r.Status.Partitions)
|
||||||
|
}
|
||||||
|
|
||||||
|
// get max timestamp and offset from partitions list
|
||||||
|
offset := int64(0)
|
||||||
|
timestamp := int64(0)
|
||||||
|
for _, partition := range r.Status.Partitions {
|
||||||
|
if partition.End.Offset > offset {
|
||||||
|
offset = partition.End.Offset
|
||||||
|
}
|
||||||
|
if partition.End.Timestamp > timestamp {
|
||||||
|
timestamp = partition.End.Timestamp
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
lag := int64(0)
|
||||||
|
if r.Status.Maxlag != nil {
|
||||||
|
lag = r.Status.Maxlag.CurrentLag
|
||||||
|
}
|
||||||
|
|
||||||
|
acc.AddFields(
|
||||||
|
"burrow_group",
|
||||||
|
map[string]interface{}{
|
||||||
|
"status": r.Status.Status,
|
||||||
|
"status_code": mapStatusToCode(r.Status.Status),
|
||||||
|
"partition_count": partitionCount,
|
||||||
|
"total_lag": r.Status.TotalLag,
|
||||||
|
"lag": lag,
|
||||||
|
"offset": offset,
|
||||||
|
"timestamp": timestamp,
|
||||||
|
},
|
||||||
|
map[string]string{
|
||||||
|
"cluster": cluster,
|
||||||
|
"group": group,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *burrow) genGroupLagMetrics(r *apiResponse, cluster, group string, acc telegraf.Accumulator) {
|
||||||
|
for _, partition := range r.Status.Partitions {
|
||||||
|
acc.AddFields(
|
||||||
|
"burrow_partition",
|
||||||
|
map[string]interface{}{
|
||||||
|
"status": partition.Status,
|
||||||
|
"status_code": mapStatusToCode(partition.Status),
|
||||||
|
"lag": partition.CurrentLag,
|
||||||
|
"offset": partition.End.Offset,
|
||||||
|
"timestamp": partition.End.Timestamp,
|
||||||
|
},
|
||||||
|
map[string]string{
|
||||||
|
"cluster": cluster,
|
||||||
|
"group": group,
|
||||||
|
"topic": partition.Topic,
|
||||||
|
"partition": strconv.FormatInt(int64(partition.Partition), 10),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func appendPathToURL(src *url.URL, parts ...string) *url.URL {
|
||||||
|
dst := new(url.URL)
|
||||||
|
*dst = *src
|
||||||
|
|
||||||
|
for i, part := range parts {
|
||||||
|
parts[i] = url.PathEscape(part)
|
||||||
|
}
|
||||||
|
|
||||||
|
ext := strings.Join(parts, "/")
|
||||||
|
dst.Path = fmt.Sprintf("%s/%s", src.Path, ext)
|
||||||
|
return dst
|
||||||
|
}
|
||||||
|
|
||||||
|
func mapStatusToCode(src string) int {
|
||||||
|
switch src {
|
||||||
|
case "OK":
|
||||||
|
return 1
|
||||||
|
case "NOT_FOUND":
|
||||||
|
return 2
|
||||||
|
case "WARN":
|
||||||
|
return 3
|
||||||
|
case "ERR":
|
||||||
|
return 4
|
||||||
|
case "STOP":
|
||||||
|
return 5
|
||||||
|
case "STALL":
|
||||||
|
return 6
|
||||||
|
default:
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,285 @@
|
||||||
|
package burrow
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
// remap uri to json file, eg: /v3/kafka -> ./testdata/v3_kafka.json
|
||||||
|
func getResponseJSON(requestURI string) ([]byte, int) {
|
||||||
|
uri := strings.TrimLeft(requestURI, "/")
|
||||||
|
mappedFile := strings.Replace(uri, "/", "_", -1)
|
||||||
|
jsonFile := fmt.Sprintf("./testdata/%s.json", mappedFile)
|
||||||
|
|
||||||
|
code := 200
|
||||||
|
_, err := os.Stat(jsonFile)
|
||||||
|
if err != nil {
|
||||||
|
code = 404
|
||||||
|
jsonFile = "./testdata/error.json"
|
||||||
|
}
|
||||||
|
|
||||||
|
// respond with file
|
||||||
|
b, _ := ioutil.ReadFile(jsonFile)
|
||||||
|
return b, code
|
||||||
|
}
|
||||||
|
|
||||||
|
// return mocked HTTP server
|
||||||
|
func getHTTPServer() *httptest.Server {
|
||||||
|
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
body, code := getResponseJSON(r.RequestURI)
|
||||||
|
w.WriteHeader(code)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
w.Write(body)
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
// return mocked HTTP server with basic auth
|
||||||
|
func getHTTPServerBasicAuth() *httptest.Server {
|
||||||
|
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("WWW-Authenticate", `Basic realm="Restricted"`)
|
||||||
|
|
||||||
|
username, password, authOK := r.BasicAuth()
|
||||||
|
if authOK == false {
|
||||||
|
http.Error(w, "Not authorized", 401)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if username != "test" && password != "test" {
|
||||||
|
http.Error(w, "Not authorized", 401)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// ok, continue
|
||||||
|
body, code := getResponseJSON(r.RequestURI)
|
||||||
|
w.WriteHeader(code)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
w.Write(body)
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
// test burrow_topic measurement
|
||||||
|
func TestBurrowTopic(t *testing.T) {
|
||||||
|
s := getHTTPServer()
|
||||||
|
defer s.Close()
|
||||||
|
|
||||||
|
plugin := &burrow{Servers: []string{s.URL}}
|
||||||
|
acc := &testutil.Accumulator{}
|
||||||
|
plugin.Gather(acc)
|
||||||
|
|
||||||
|
fields := []map[string]interface{}{
|
||||||
|
// topicA
|
||||||
|
{"offset": int64(459178195)},
|
||||||
|
{"offset": int64(459178022)},
|
||||||
|
{"offset": int64(456491598)},
|
||||||
|
}
|
||||||
|
tags := []map[string]string{
|
||||||
|
// topicA
|
||||||
|
{"cluster": "clustername1", "topic": "topicA", "partition": "0"},
|
||||||
|
{"cluster": "clustername1", "topic": "topicA", "partition": "1"},
|
||||||
|
{"cluster": "clustername1", "topic": "topicA", "partition": "2"},
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Empty(t, acc.Errors)
|
||||||
|
require.Equal(t, true, acc.HasMeasurement("burrow_topic"))
|
||||||
|
for i := 0; i < len(fields); i++ {
|
||||||
|
acc.AssertContainsTaggedFields(t, "burrow_topic", fields[i], tags[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// test burrow_partition measurement
|
||||||
|
func TestBurrowPartition(t *testing.T) {
|
||||||
|
s := getHTTPServer()
|
||||||
|
defer s.Close()
|
||||||
|
|
||||||
|
plugin := &burrow{
|
||||||
|
Servers: []string{s.URL},
|
||||||
|
}
|
||||||
|
acc := &testutil.Accumulator{}
|
||||||
|
plugin.Gather(acc)
|
||||||
|
|
||||||
|
fields := []map[string]interface{}{
|
||||||
|
{
|
||||||
|
"status": "OK",
|
||||||
|
"status_code": 1,
|
||||||
|
"lag": int64(0),
|
||||||
|
"offset": int64(431323195),
|
||||||
|
"timestamp": int64(1515609490008),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"status": "OK",
|
||||||
|
"status_code": 1,
|
||||||
|
"lag": int64(0),
|
||||||
|
"offset": int64(431322962),
|
||||||
|
"timestamp": int64(1515609490008),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"status": "OK",
|
||||||
|
"status_code": 1,
|
||||||
|
"lag": int64(0),
|
||||||
|
"offset": int64(428636563),
|
||||||
|
"timestamp": int64(1515609490008),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
tags := []map[string]string{
|
||||||
|
{"cluster": "clustername1", "group": "group1", "topic": "topicA", "partition": "0"},
|
||||||
|
{"cluster": "clustername1", "group": "group1", "topic": "topicA", "partition": "1"},
|
||||||
|
{"cluster": "clustername1", "group": "group1", "topic": "topicA", "partition": "2"},
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Empty(t, acc.Errors)
|
||||||
|
require.Equal(t, true, acc.HasMeasurement("burrow_partition"))
|
||||||
|
|
||||||
|
for i := 0; i < len(fields); i++ {
|
||||||
|
acc.AssertContainsTaggedFields(t, "burrow_partition", fields[i], tags[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// burrow_group
|
||||||
|
func TestBurrowGroup(t *testing.T) {
|
||||||
|
s := getHTTPServer()
|
||||||
|
defer s.Close()
|
||||||
|
|
||||||
|
plugin := &burrow{
|
||||||
|
Servers: []string{s.URL},
|
||||||
|
}
|
||||||
|
acc := &testutil.Accumulator{}
|
||||||
|
plugin.Gather(acc)
|
||||||
|
|
||||||
|
fields := []map[string]interface{}{
|
||||||
|
{
|
||||||
|
"status": "OK",
|
||||||
|
"status_code": 1,
|
||||||
|
"partition_count": 3,
|
||||||
|
"total_lag": int64(0),
|
||||||
|
"lag": int64(0),
|
||||||
|
"offset": int64(431323195),
|
||||||
|
"timestamp": int64(1515609490008),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
tags := []map[string]string{
|
||||||
|
{"cluster": "clustername1", "group": "group1"},
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Empty(t, acc.Errors)
|
||||||
|
require.Equal(t, true, acc.HasMeasurement("burrow_group"))
|
||||||
|
|
||||||
|
for i := 0; i < len(fields); i++ {
|
||||||
|
acc.AssertContainsTaggedFields(t, "burrow_group", fields[i], tags[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// collect from multiple servers
|
||||||
|
func TestMultipleServers(t *testing.T) {
|
||||||
|
s1 := getHTTPServer()
|
||||||
|
defer s1.Close()
|
||||||
|
|
||||||
|
s2 := getHTTPServer()
|
||||||
|
defer s2.Close()
|
||||||
|
|
||||||
|
plugin := &burrow{
|
||||||
|
Servers: []string{s1.URL, s2.URL},
|
||||||
|
}
|
||||||
|
acc := &testutil.Accumulator{}
|
||||||
|
plugin.Gather(acc)
|
||||||
|
|
||||||
|
require.Exactly(t, 14, len(acc.Metrics))
|
||||||
|
require.Empty(t, acc.Errors)
|
||||||
|
}
|
||||||
|
|
||||||
|
// collect multiple times
|
||||||
|
func TestMultipleRuns(t *testing.T) {
|
||||||
|
s := getHTTPServer()
|
||||||
|
defer s.Close()
|
||||||
|
|
||||||
|
plugin := &burrow{
|
||||||
|
Servers: []string{s.URL},
|
||||||
|
}
|
||||||
|
for i := 0; i < 4; i++ {
|
||||||
|
acc := &testutil.Accumulator{}
|
||||||
|
plugin.Gather(acc)
|
||||||
|
|
||||||
|
require.Exactly(t, 7, len(acc.Metrics))
|
||||||
|
require.Empty(t, acc.Errors)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// collect from http basic auth server
|
||||||
|
func TestBasicAuthConfig(t *testing.T) {
|
||||||
|
s := getHTTPServerBasicAuth()
|
||||||
|
defer s.Close()
|
||||||
|
|
||||||
|
plugin := &burrow{
|
||||||
|
Servers: []string{s.URL},
|
||||||
|
Username: "test",
|
||||||
|
Password: "test",
|
||||||
|
}
|
||||||
|
|
||||||
|
acc := &testutil.Accumulator{}
|
||||||
|
plugin.Gather(acc)
|
||||||
|
|
||||||
|
require.Exactly(t, 7, len(acc.Metrics))
|
||||||
|
require.Empty(t, acc.Errors)
|
||||||
|
}
|
||||||
|
|
||||||
|
// collect from whitelisted clusters
|
||||||
|
func TestFilterClusters(t *testing.T) {
|
||||||
|
s := getHTTPServer()
|
||||||
|
defer s.Close()
|
||||||
|
|
||||||
|
plugin := &burrow{
|
||||||
|
Servers: []string{s.URL},
|
||||||
|
ClustersInclude: []string{"wrongname*"}, // clustername1 -> no match
|
||||||
|
}
|
||||||
|
|
||||||
|
acc := &testutil.Accumulator{}
|
||||||
|
plugin.Gather(acc)
|
||||||
|
|
||||||
|
// no match by cluster
|
||||||
|
require.Exactly(t, 0, len(acc.Metrics))
|
||||||
|
require.Empty(t, acc.Errors)
|
||||||
|
}
|
||||||
|
|
||||||
|
// collect from whitelisted groups
|
||||||
|
func TestFilterGroups(t *testing.T) {
|
||||||
|
s := getHTTPServer()
|
||||||
|
defer s.Close()
|
||||||
|
|
||||||
|
plugin := &burrow{
|
||||||
|
Servers: []string{s.URL},
|
||||||
|
GroupsInclude: []string{"group?"}, // group1 -> match
|
||||||
|
TopicsExclude: []string{"*"}, // exclude all
|
||||||
|
}
|
||||||
|
|
||||||
|
acc := &testutil.Accumulator{}
|
||||||
|
plugin.Gather(acc)
|
||||||
|
|
||||||
|
require.Exactly(t, 4, len(acc.Metrics))
|
||||||
|
require.Empty(t, acc.Errors)
|
||||||
|
}
|
||||||
|
|
||||||
|
// collect from whitelisted topics
|
||||||
|
func TestFilterTopics(t *testing.T) {
|
||||||
|
s := getHTTPServer()
|
||||||
|
defer s.Close()
|
||||||
|
|
||||||
|
plugin := &burrow{
|
||||||
|
Servers: []string{s.URL},
|
||||||
|
TopicsInclude: []string{"topic?"}, // topicA -> match
|
||||||
|
GroupsExclude: []string{"*"}, // exclude all
|
||||||
|
}
|
||||||
|
|
||||||
|
acc := &testutil.Accumulator{}
|
||||||
|
plugin.Gather(acc)
|
||||||
|
|
||||||
|
require.Exactly(t, 3, len(acc.Metrics))
|
||||||
|
require.Empty(t, acc.Errors)
|
||||||
|
}
|
|
@ -0,0 +1,11 @@
|
||||||
|
{
|
||||||
|
"error": true,
|
||||||
|
"message": "Detailed error message",
|
||||||
|
"request": {
|
||||||
|
"uri": "/invalid/request",
|
||||||
|
"host": "responding.host.example.com",
|
||||||
|
"cluster": "",
|
||||||
|
"group": "",
|
||||||
|
"topic": ""
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,11 @@
|
||||||
|
{
|
||||||
|
"error": false,
|
||||||
|
"message": "cluster list returned",
|
||||||
|
"clusters": [
|
||||||
|
"clustername1"
|
||||||
|
],
|
||||||
|
"request": {
|
||||||
|
"url": "/v3/kafka",
|
||||||
|
"host": "example.com"
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,11 @@
|
||||||
|
{
|
||||||
|
"error": false,
|
||||||
|
"message": "consumer list returned",
|
||||||
|
"consumers": [
|
||||||
|
"group1"
|
||||||
|
],
|
||||||
|
"request": {
|
||||||
|
"url": "/v3/kafka/clustername1/consumer",
|
||||||
|
"host": "example.com"
|
||||||
|
}
|
||||||
|
}
|
90
plugins/inputs/burrow/testdata/v3_kafka_clustername1_consumer_group1_lag.json
vendored
Normal file
90
plugins/inputs/burrow/testdata/v3_kafka_clustername1_consumer_group1_lag.json
vendored
Normal file
|
@ -0,0 +1,90 @@
|
||||||
|
{
|
||||||
|
"error": false,
|
||||||
|
"message": "consumer status returned",
|
||||||
|
"status": {
|
||||||
|
"cluster": "clustername1",
|
||||||
|
"group": "group1",
|
||||||
|
"status": "OK",
|
||||||
|
"complete": 1,
|
||||||
|
"partitions": [
|
||||||
|
{
|
||||||
|
"topic": "topicA",
|
||||||
|
"partition": 0,
|
||||||
|
"owner": "kafka",
|
||||||
|
"status": "OK",
|
||||||
|
"start": {
|
||||||
|
"offset": 431323195,
|
||||||
|
"timestamp": 1515609445004,
|
||||||
|
"lag": 0
|
||||||
|
},
|
||||||
|
"end": {
|
||||||
|
"offset": 431323195,
|
||||||
|
"timestamp": 1515609490008,
|
||||||
|
"lag": 0
|
||||||
|
},
|
||||||
|
"current_lag": 0,
|
||||||
|
"complete": 1
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"topic": "topicA",
|
||||||
|
"partition": 1,
|
||||||
|
"owner": "kafka",
|
||||||
|
"status": "OK",
|
||||||
|
"start": {
|
||||||
|
"offset": 431322962,
|
||||||
|
"timestamp": 1515609445004,
|
||||||
|
"lag": 0
|
||||||
|
},
|
||||||
|
"end": {
|
||||||
|
"offset": 431322962,
|
||||||
|
"timestamp": 1515609490008,
|
||||||
|
"lag": 0
|
||||||
|
},
|
||||||
|
"current_lag": 0,
|
||||||
|
"complete": 1
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"topic": "topicA",
|
||||||
|
"partition": 2,
|
||||||
|
"owner": "kafka",
|
||||||
|
"status": "OK",
|
||||||
|
"start": {
|
||||||
|
"offset": 428636563,
|
||||||
|
"timestamp": 1515609445004,
|
||||||
|
"lag": 0
|
||||||
|
},
|
||||||
|
"end": {
|
||||||
|
"offset": 428636563,
|
||||||
|
"timestamp": 1515609490008,
|
||||||
|
"lag": 0
|
||||||
|
},
|
||||||
|
"current_lag": 0,
|
||||||
|
"complete": 1
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"partition_count": 3,
|
||||||
|
"maxlag": {
|
||||||
|
"topic": "topicA",
|
||||||
|
"partition": 0,
|
||||||
|
"owner": "kafka",
|
||||||
|
"status": "OK",
|
||||||
|
"start": {
|
||||||
|
"offset": 431323195,
|
||||||
|
"timestamp": 1515609445004,
|
||||||
|
"lag": 0
|
||||||
|
},
|
||||||
|
"end": {
|
||||||
|
"offset": 431323195,
|
||||||
|
"timestamp": 1515609490008,
|
||||||
|
"lag": 0
|
||||||
|
},
|
||||||
|
"current_lag": 0,
|
||||||
|
"complete": 1
|
||||||
|
},
|
||||||
|
"totallag": 0
|
||||||
|
},
|
||||||
|
"request": {
|
||||||
|
"url": "/v3/kafka/clustername1/consumer/group1/lag",
|
||||||
|
"host": "example.com"
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,11 @@
|
||||||
|
{
|
||||||
|
"error": false,
|
||||||
|
"message": "topic list returned",
|
||||||
|
"topics": [
|
||||||
|
"topicA"
|
||||||
|
],
|
||||||
|
"request": {
|
||||||
|
"url": "/v3/kafka/clustername1/topic",
|
||||||
|
"host": "example.com"
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,13 @@
|
||||||
|
{
|
||||||
|
"error": false,
|
||||||
|
"message": "topic offsets returned",
|
||||||
|
"offsets": [
|
||||||
|
459178195,
|
||||||
|
459178022,
|
||||||
|
456491598
|
||||||
|
],
|
||||||
|
"request": {
|
||||||
|
"url": "/v3/kafka/clustername1/topic/topicA",
|
||||||
|
"host": "example.com"
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue