Add burrow input plugin (#3489)
This commit is contained in:
parent
9da088e23c
commit
2f9b180bc2
|
@ -132,6 +132,7 @@ configuration options.
|
|||
* [bcache](./plugins/inputs/bcache)
|
||||
* [bond](./plugins/inputs/bond)
|
||||
* [cassandra](./plugins/inputs/cassandra) (deprecated, use [jolokia2](./plugins/inputs/jolokia2))
|
||||
* [burrow](./plugins/inputs/burrow)
|
||||
* [ceph](./plugins/inputs/ceph)
|
||||
* [cgroup](./plugins/inputs/cgroup)
|
||||
* [chrony](./plugins/inputs/chrony)
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
_ "github.com/influxdata/telegraf/plugins/inputs/aurora"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/bcache"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/bond"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/burrow"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/cassandra"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/ceph"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/cgroup"
|
||||
|
|
|
@ -0,0 +1,98 @@
|
|||
# Telegraf Plugin: Burrow
|
||||
|
||||
Collect Kafka topic, consumer and partition status
|
||||
via [Burrow](https://github.com/linkedin/Burrow) HTTP [API](https://github.com/linkedin/Burrow/wiki/HTTP-Endpoint).
|
||||
|
||||
Supported Burrow version: `1.x`
|
||||
|
||||
### Configuration
|
||||
|
||||
```
|
||||
## Burrow API endpoints in format "schema://host:port".
|
||||
## Default is "http://localhost:8000".
|
||||
servers = ["http://localhost:8000"]
|
||||
|
||||
## Override Burrow API prefix.
|
||||
## Useful when Burrow is behind reverse-proxy.
|
||||
# api_prefix = "/v3/kafka"
|
||||
|
||||
## Maximum time to receive response.
|
||||
# response_timeout = "5s"
|
||||
|
||||
## Limit per-server concurrent connections.
|
||||
## Useful in case of large number of topics or consumer groups.
|
||||
# concurrent_connections = 20
|
||||
|
||||
## Filter clusters, default is no filtering.
|
||||
## Values can be specified as glob patterns.
|
||||
# clusters_include = []
|
||||
# clusters_exclude = []
|
||||
|
||||
## Filter consumer groups, default is no filtering.
|
||||
## Values can be specified as glob patterns.
|
||||
# groups_include = []
|
||||
# groups_exclude = []
|
||||
|
||||
## Filter topics, default is no filtering.
|
||||
## Values can be specified as glob patterns.
|
||||
# topics_include = []
|
||||
# topics_exclude = []
|
||||
|
||||
## Credentials for basic HTTP authentication.
|
||||
# username = ""
|
||||
# password = ""
|
||||
|
||||
## Optional SSL config
|
||||
# ssl_ca = "/etc/telegraf/ca.pem"
|
||||
# ssl_cert = "/etc/telegraf/cert.pem"
|
||||
# ssl_key = "/etc/telegraf/key.pem"
|
||||
# insecure_skip_verify = false
|
||||
```
|
||||
|
||||
### Partition Status mappings
|
||||
|
||||
* `OK` = 1
|
||||
* `NOT_FOUND` = 2
|
||||
* `WARN` = 3
|
||||
* `ERR` = 4
|
||||
* `STOP` = 5
|
||||
* `STALL` = 6
|
||||
|
||||
> unknown value will be mapped to 0
|
||||
|
||||
### Fields
|
||||
|
||||
* `burrow_group` (one event per each consumer group)
|
||||
- status (string, see Partition Status mappings)
|
||||
- status_code (int, `1..6`, see Partition status mappings)
|
||||
- parition_count (int, `number of partitions`)
|
||||
- total_lag (int64, `totallag`)
|
||||
- lag (int64, `maxlag.current_lag || 0`)
|
||||
|
||||
* `burrow_partition` (one event per each topic partition)
|
||||
- status (string, see Partition Status mappings)
|
||||
- status_code (int, `1..6`, see Partition status mappings)
|
||||
- lag (int64, `current_lag || 0`)
|
||||
- offset (int64, `end.timestamp`)
|
||||
- timestamp (int64, `end.timestamp`)
|
||||
|
||||
* `burrow_topic` (one event per topic offset)
|
||||
- offset (int64)
|
||||
|
||||
|
||||
### Tags
|
||||
|
||||
* `burrow_group`
|
||||
- cluster (string)
|
||||
- group (string)
|
||||
|
||||
* `burrow_partition`
|
||||
- cluster (string)
|
||||
- group (string)
|
||||
- topic (string)
|
||||
- partition (int)
|
||||
|
||||
* `burrow_topic`
|
||||
- cluster (string)
|
||||
- topic (string)
|
||||
- partition (int)
|
|
@ -0,0 +1,485 @@
|
|||
package burrow
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/filter"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/internal/tls"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultBurrowPrefix = "/v3/kafka"
|
||||
defaultConcurrentConnections = 20
|
||||
defaultResponseTimeout = time.Second * 5
|
||||
defaultServer = "http://localhost:8000"
|
||||
)
|
||||
|
||||
const configSample = `
|
||||
## Burrow API endpoints in format "schema://host:port".
|
||||
## Default is "http://localhost:8000".
|
||||
servers = ["http://localhost:8000"]
|
||||
|
||||
## Override Burrow API prefix.
|
||||
## Useful when Burrow is behind reverse-proxy.
|
||||
# api_prefix = "/v3/kafka"
|
||||
|
||||
## Maximum time to receive response.
|
||||
# response_timeout = "5s"
|
||||
|
||||
## Limit per-server concurrent connections.
|
||||
## Useful in case of large number of topics or consumer groups.
|
||||
# concurrent_connections = 20
|
||||
|
||||
## Filter clusters, default is no filtering.
|
||||
## Values can be specified as glob patterns.
|
||||
# clusters_include = []
|
||||
# clusters_exclude = []
|
||||
|
||||
## Filter consumer groups, default is no filtering.
|
||||
## Values can be specified as glob patterns.
|
||||
# groups_include = []
|
||||
# groups_exclude = []
|
||||
|
||||
## Filter topics, default is no filtering.
|
||||
## Values can be specified as glob patterns.
|
||||
# topics_include = []
|
||||
# topics_exclude = []
|
||||
|
||||
## Credentials for basic HTTP authentication.
|
||||
# username = ""
|
||||
# password = ""
|
||||
|
||||
## Optional SSL config
|
||||
# ssl_ca = "/etc/telegraf/ca.pem"
|
||||
# ssl_cert = "/etc/telegraf/cert.pem"
|
||||
# ssl_key = "/etc/telegraf/key.pem"
|
||||
# insecure_skip_verify = false
|
||||
`
|
||||
|
||||
type (
|
||||
burrow struct {
|
||||
tls.ClientConfig
|
||||
|
||||
Servers []string
|
||||
Username string
|
||||
Password string
|
||||
ResponseTimeout internal.Duration
|
||||
ConcurrentConnections int
|
||||
|
||||
APIPrefix string `toml:"api_prefix"`
|
||||
ClustersExclude []string
|
||||
ClustersInclude []string
|
||||
GroupsExclude []string
|
||||
GroupsInclude []string
|
||||
TopicsExclude []string
|
||||
TopicsInclude []string
|
||||
|
||||
client *http.Client
|
||||
filterClusters filter.Filter
|
||||
filterGroups filter.Filter
|
||||
filterTopics filter.Filter
|
||||
}
|
||||
|
||||
// response
|
||||
apiResponse struct {
|
||||
Clusters []string `json:"clusters"`
|
||||
Groups []string `json:"consumers"`
|
||||
Topics []string `json:"topics"`
|
||||
Offsets []int64 `json:"offsets"`
|
||||
Status apiStatusResponse `json:"status"`
|
||||
}
|
||||
|
||||
// response: status field
|
||||
apiStatusResponse struct {
|
||||
Partitions []apiStatusResponseLag `json:"partitions"`
|
||||
Status string `json:"status"`
|
||||
PartitionCount int `json:"partition_count"`
|
||||
Maxlag *apiStatusResponseLag `json:"maxlag"`
|
||||
TotalLag int64 `json:"totallag"`
|
||||
}
|
||||
|
||||
// response: lag field
|
||||
apiStatusResponseLag struct {
|
||||
Topic string `json:"topic"`
|
||||
Partition int32 `json:"partition"`
|
||||
Status string `json:"status"`
|
||||
Start apiStatusResponseLagItem `json:"start"`
|
||||
End apiStatusResponseLagItem `json:"end"`
|
||||
CurrentLag int64 `json:"current_lag"`
|
||||
}
|
||||
|
||||
// response: lag field item
|
||||
apiStatusResponseLagItem struct {
|
||||
Offset int64 `json:"offset"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
Lag int64 `json:"lag"`
|
||||
}
|
||||
)
|
||||
|
||||
func init() {
|
||||
inputs.Add("burrow", func() telegraf.Input {
|
||||
return &burrow{}
|
||||
})
|
||||
}
|
||||
|
||||
func (b *burrow) SampleConfig() string {
|
||||
return configSample
|
||||
}
|
||||
|
||||
func (b *burrow) Description() string {
|
||||
return "Collect Kafka topics and consumers status from Burrow HTTP API."
|
||||
}
|
||||
|
||||
func (b *burrow) Gather(acc telegraf.Accumulator) error {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
if len(b.Servers) == 0 {
|
||||
b.Servers = []string{defaultServer}
|
||||
}
|
||||
|
||||
if b.client == nil {
|
||||
b.setDefaults()
|
||||
if err := b.compileGlobs(); err != nil {
|
||||
return err
|
||||
}
|
||||
c, err := b.createClient()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.client = c
|
||||
}
|
||||
|
||||
for _, addr := range b.Servers {
|
||||
u, err := url.Parse(addr)
|
||||
if err != nil {
|
||||
acc.AddError(fmt.Errorf("unable to parse address '%s': %s", addr, err))
|
||||
continue
|
||||
}
|
||||
if u.Path == "" {
|
||||
u.Path = b.APIPrefix
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func(u *url.URL) {
|
||||
defer wg.Done()
|
||||
acc.AddError(b.gatherServer(u, acc))
|
||||
}(u)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *burrow) setDefaults() {
|
||||
if b.APIPrefix == "" {
|
||||
b.APIPrefix = defaultBurrowPrefix
|
||||
}
|
||||
if b.ConcurrentConnections < 1 {
|
||||
b.ConcurrentConnections = defaultConcurrentConnections
|
||||
}
|
||||
if b.ResponseTimeout.Duration < time.Second {
|
||||
b.ResponseTimeout = internal.Duration{
|
||||
Duration: defaultResponseTimeout,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *burrow) compileGlobs() error {
|
||||
var err error
|
||||
|
||||
// compile glob patterns
|
||||
b.filterClusters, err = filter.NewIncludeExcludeFilter(b.ClustersInclude, b.ClustersExclude)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.filterGroups, err = filter.NewIncludeExcludeFilter(b.GroupsInclude, b.GroupsExclude)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.filterTopics, err = filter.NewIncludeExcludeFilter(b.TopicsInclude, b.TopicsExclude)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *burrow) createClient() (*http.Client, error) {
|
||||
tlsCfg, err := b.ClientConfig.TLSConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
client := &http.Client{
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: tlsCfg,
|
||||
},
|
||||
Timeout: b.ResponseTimeout.Duration,
|
||||
}
|
||||
|
||||
return client, nil
|
||||
}
|
||||
|
||||
func (b *burrow) getResponse(u *url.URL) (*apiResponse, error) {
|
||||
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if b.Username != "" {
|
||||
req.SetBasicAuth(b.Username, b.Password)
|
||||
}
|
||||
|
||||
res, err := b.client.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer res.Body.Close()
|
||||
if res.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("wrong response: %d", res.StatusCode)
|
||||
}
|
||||
|
||||
ares := &apiResponse{}
|
||||
dec := json.NewDecoder(res.Body)
|
||||
|
||||
return ares, dec.Decode(ares)
|
||||
}
|
||||
|
||||
func (b *burrow) gatherServer(src *url.URL, acc telegraf.Accumulator) error {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
r, err := b.getResponse(src)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
guard := make(chan struct{}, b.ConcurrentConnections)
|
||||
for _, cluster := range r.Clusters {
|
||||
if !b.filterClusters.Match(cluster) {
|
||||
continue
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func(cluster string) {
|
||||
defer wg.Done()
|
||||
|
||||
// fetch topic list
|
||||
// endpoint: <api_prefix>/(cluster)/topic
|
||||
ut := appendPathToURL(src, cluster, "topic")
|
||||
b.gatherTopics(guard, ut, cluster, acc)
|
||||
}(cluster)
|
||||
|
||||
wg.Add(1)
|
||||
go func(cluster string) {
|
||||
defer wg.Done()
|
||||
|
||||
// fetch consumer group list
|
||||
// endpoint: <api_prefix>/(cluster)/consumer
|
||||
uc := appendPathToURL(src, cluster, "consumer")
|
||||
b.gatherGroups(guard, uc, cluster, acc)
|
||||
}(cluster)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *burrow) gatherTopics(guard chan struct{}, src *url.URL, cluster string, acc telegraf.Accumulator) {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
r, err := b.getResponse(src)
|
||||
if err != nil {
|
||||
acc.AddError(err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, topic := range r.Topics {
|
||||
if !b.filterTopics.Match(topic) {
|
||||
continue
|
||||
}
|
||||
|
||||
guard <- struct{}{}
|
||||
wg.Add(1)
|
||||
|
||||
go func(topic string) {
|
||||
defer func() {
|
||||
<-guard
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
// fetch topic offsets
|
||||
// endpoint: <api_prefix>/<cluster>/topic/<topic>
|
||||
tu := appendPathToURL(src, topic)
|
||||
tr, err := b.getResponse(tu)
|
||||
if err != nil {
|
||||
acc.AddError(err)
|
||||
return
|
||||
}
|
||||
|
||||
b.genTopicMetrics(tr, cluster, topic, acc)
|
||||
}(topic)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (b *burrow) genTopicMetrics(r *apiResponse, cluster, topic string, acc telegraf.Accumulator) {
|
||||
for i, offset := range r.Offsets {
|
||||
tags := map[string]string{
|
||||
"cluster": cluster,
|
||||
"topic": topic,
|
||||
"partition": strconv.Itoa(i),
|
||||
}
|
||||
|
||||
acc.AddFields(
|
||||
"burrow_topic",
|
||||
map[string]interface{}{
|
||||
"offset": offset,
|
||||
},
|
||||
tags,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func (b *burrow) gatherGroups(guard chan struct{}, src *url.URL, cluster string, acc telegraf.Accumulator) {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
r, err := b.getResponse(src)
|
||||
if err != nil {
|
||||
acc.AddError(err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, group := range r.Groups {
|
||||
if !b.filterGroups.Match(group) {
|
||||
continue
|
||||
}
|
||||
|
||||
guard <- struct{}{}
|
||||
wg.Add(1)
|
||||
|
||||
go func(group string) {
|
||||
defer func() {
|
||||
<-guard
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
// fetch consumer group status
|
||||
// endpoint: <api_prefix>/<cluster>/consumer/<group>/lag
|
||||
gl := appendPathToURL(src, group, "lag")
|
||||
gr, err := b.getResponse(gl)
|
||||
if err != nil {
|
||||
acc.AddError(err)
|
||||
return
|
||||
}
|
||||
|
||||
b.genGroupStatusMetrics(gr, cluster, group, acc)
|
||||
b.genGroupLagMetrics(gr, cluster, group, acc)
|
||||
}(group)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (b *burrow) genGroupStatusMetrics(r *apiResponse, cluster, group string, acc telegraf.Accumulator) {
|
||||
partitionCount := r.Status.PartitionCount
|
||||
if partitionCount == 0 {
|
||||
partitionCount = len(r.Status.Partitions)
|
||||
}
|
||||
|
||||
// get max timestamp and offset from partitions list
|
||||
offset := int64(0)
|
||||
timestamp := int64(0)
|
||||
for _, partition := range r.Status.Partitions {
|
||||
if partition.End.Offset > offset {
|
||||
offset = partition.End.Offset
|
||||
}
|
||||
if partition.End.Timestamp > timestamp {
|
||||
timestamp = partition.End.Timestamp
|
||||
}
|
||||
}
|
||||
|
||||
lag := int64(0)
|
||||
if r.Status.Maxlag != nil {
|
||||
lag = r.Status.Maxlag.CurrentLag
|
||||
}
|
||||
|
||||
acc.AddFields(
|
||||
"burrow_group",
|
||||
map[string]interface{}{
|
||||
"status": r.Status.Status,
|
||||
"status_code": mapStatusToCode(r.Status.Status),
|
||||
"partition_count": partitionCount,
|
||||
"total_lag": r.Status.TotalLag,
|
||||
"lag": lag,
|
||||
"offset": offset,
|
||||
"timestamp": timestamp,
|
||||
},
|
||||
map[string]string{
|
||||
"cluster": cluster,
|
||||
"group": group,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
func (b *burrow) genGroupLagMetrics(r *apiResponse, cluster, group string, acc telegraf.Accumulator) {
|
||||
for _, partition := range r.Status.Partitions {
|
||||
acc.AddFields(
|
||||
"burrow_partition",
|
||||
map[string]interface{}{
|
||||
"status": partition.Status,
|
||||
"status_code": mapStatusToCode(partition.Status),
|
||||
"lag": partition.CurrentLag,
|
||||
"offset": partition.End.Offset,
|
||||
"timestamp": partition.End.Timestamp,
|
||||
},
|
||||
map[string]string{
|
||||
"cluster": cluster,
|
||||
"group": group,
|
||||
"topic": partition.Topic,
|
||||
"partition": strconv.FormatInt(int64(partition.Partition), 10),
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func appendPathToURL(src *url.URL, parts ...string) *url.URL {
|
||||
dst := new(url.URL)
|
||||
*dst = *src
|
||||
|
||||
for i, part := range parts {
|
||||
parts[i] = url.PathEscape(part)
|
||||
}
|
||||
|
||||
ext := strings.Join(parts, "/")
|
||||
dst.Path = fmt.Sprintf("%s/%s", src.Path, ext)
|
||||
return dst
|
||||
}
|
||||
|
||||
func mapStatusToCode(src string) int {
|
||||
switch src {
|
||||
case "OK":
|
||||
return 1
|
||||
case "NOT_FOUND":
|
||||
return 2
|
||||
case "WARN":
|
||||
return 3
|
||||
case "ERR":
|
||||
return 4
|
||||
case "STOP":
|
||||
return 5
|
||||
case "STALL":
|
||||
return 6
|
||||
default:
|
||||
return 0
|
||||
}
|
||||
}
|
|
@ -0,0 +1,285 @@
|
|||
package burrow
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// remap uri to json file, eg: /v3/kafka -> ./testdata/v3_kafka.json
|
||||
func getResponseJSON(requestURI string) ([]byte, int) {
|
||||
uri := strings.TrimLeft(requestURI, "/")
|
||||
mappedFile := strings.Replace(uri, "/", "_", -1)
|
||||
jsonFile := fmt.Sprintf("./testdata/%s.json", mappedFile)
|
||||
|
||||
code := 200
|
||||
_, err := os.Stat(jsonFile)
|
||||
if err != nil {
|
||||
code = 404
|
||||
jsonFile = "./testdata/error.json"
|
||||
}
|
||||
|
||||
// respond with file
|
||||
b, _ := ioutil.ReadFile(jsonFile)
|
||||
return b, code
|
||||
}
|
||||
|
||||
// return mocked HTTP server
|
||||
func getHTTPServer() *httptest.Server {
|
||||
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
body, code := getResponseJSON(r.RequestURI)
|
||||
w.WriteHeader(code)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write(body)
|
||||
}))
|
||||
}
|
||||
|
||||
// return mocked HTTP server with basic auth
|
||||
func getHTTPServerBasicAuth() *httptest.Server {
|
||||
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("WWW-Authenticate", `Basic realm="Restricted"`)
|
||||
|
||||
username, password, authOK := r.BasicAuth()
|
||||
if authOK == false {
|
||||
http.Error(w, "Not authorized", 401)
|
||||
return
|
||||
}
|
||||
|
||||
if username != "test" && password != "test" {
|
||||
http.Error(w, "Not authorized", 401)
|
||||
return
|
||||
}
|
||||
|
||||
// ok, continue
|
||||
body, code := getResponseJSON(r.RequestURI)
|
||||
w.WriteHeader(code)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write(body)
|
||||
}))
|
||||
}
|
||||
|
||||
// test burrow_topic measurement
|
||||
func TestBurrowTopic(t *testing.T) {
|
||||
s := getHTTPServer()
|
||||
defer s.Close()
|
||||
|
||||
plugin := &burrow{Servers: []string{s.URL}}
|
||||
acc := &testutil.Accumulator{}
|
||||
plugin.Gather(acc)
|
||||
|
||||
fields := []map[string]interface{}{
|
||||
// topicA
|
||||
{"offset": int64(459178195)},
|
||||
{"offset": int64(459178022)},
|
||||
{"offset": int64(456491598)},
|
||||
}
|
||||
tags := []map[string]string{
|
||||
// topicA
|
||||
{"cluster": "clustername1", "topic": "topicA", "partition": "0"},
|
||||
{"cluster": "clustername1", "topic": "topicA", "partition": "1"},
|
||||
{"cluster": "clustername1", "topic": "topicA", "partition": "2"},
|
||||
}
|
||||
|
||||
require.Empty(t, acc.Errors)
|
||||
require.Equal(t, true, acc.HasMeasurement("burrow_topic"))
|
||||
for i := 0; i < len(fields); i++ {
|
||||
acc.AssertContainsTaggedFields(t, "burrow_topic", fields[i], tags[i])
|
||||
}
|
||||
}
|
||||
|
||||
// test burrow_partition measurement
|
||||
func TestBurrowPartition(t *testing.T) {
|
||||
s := getHTTPServer()
|
||||
defer s.Close()
|
||||
|
||||
plugin := &burrow{
|
||||
Servers: []string{s.URL},
|
||||
}
|
||||
acc := &testutil.Accumulator{}
|
||||
plugin.Gather(acc)
|
||||
|
||||
fields := []map[string]interface{}{
|
||||
{
|
||||
"status": "OK",
|
||||
"status_code": 1,
|
||||
"lag": int64(0),
|
||||
"offset": int64(431323195),
|
||||
"timestamp": int64(1515609490008),
|
||||
},
|
||||
{
|
||||
"status": "OK",
|
||||
"status_code": 1,
|
||||
"lag": int64(0),
|
||||
"offset": int64(431322962),
|
||||
"timestamp": int64(1515609490008),
|
||||
},
|
||||
{
|
||||
"status": "OK",
|
||||
"status_code": 1,
|
||||
"lag": int64(0),
|
||||
"offset": int64(428636563),
|
||||
"timestamp": int64(1515609490008),
|
||||
},
|
||||
}
|
||||
tags := []map[string]string{
|
||||
{"cluster": "clustername1", "group": "group1", "topic": "topicA", "partition": "0"},
|
||||
{"cluster": "clustername1", "group": "group1", "topic": "topicA", "partition": "1"},
|
||||
{"cluster": "clustername1", "group": "group1", "topic": "topicA", "partition": "2"},
|
||||
}
|
||||
|
||||
require.Empty(t, acc.Errors)
|
||||
require.Equal(t, true, acc.HasMeasurement("burrow_partition"))
|
||||
|
||||
for i := 0; i < len(fields); i++ {
|
||||
acc.AssertContainsTaggedFields(t, "burrow_partition", fields[i], tags[i])
|
||||
}
|
||||
}
|
||||
|
||||
// burrow_group
|
||||
func TestBurrowGroup(t *testing.T) {
|
||||
s := getHTTPServer()
|
||||
defer s.Close()
|
||||
|
||||
plugin := &burrow{
|
||||
Servers: []string{s.URL},
|
||||
}
|
||||
acc := &testutil.Accumulator{}
|
||||
plugin.Gather(acc)
|
||||
|
||||
fields := []map[string]interface{}{
|
||||
{
|
||||
"status": "OK",
|
||||
"status_code": 1,
|
||||
"partition_count": 3,
|
||||
"total_lag": int64(0),
|
||||
"lag": int64(0),
|
||||
"offset": int64(431323195),
|
||||
"timestamp": int64(1515609490008),
|
||||
},
|
||||
}
|
||||
|
||||
tags := []map[string]string{
|
||||
{"cluster": "clustername1", "group": "group1"},
|
||||
}
|
||||
|
||||
require.Empty(t, acc.Errors)
|
||||
require.Equal(t, true, acc.HasMeasurement("burrow_group"))
|
||||
|
||||
for i := 0; i < len(fields); i++ {
|
||||
acc.AssertContainsTaggedFields(t, "burrow_group", fields[i], tags[i])
|
||||
}
|
||||
}
|
||||
|
||||
// collect from multiple servers
|
||||
func TestMultipleServers(t *testing.T) {
|
||||
s1 := getHTTPServer()
|
||||
defer s1.Close()
|
||||
|
||||
s2 := getHTTPServer()
|
||||
defer s2.Close()
|
||||
|
||||
plugin := &burrow{
|
||||
Servers: []string{s1.URL, s2.URL},
|
||||
}
|
||||
acc := &testutil.Accumulator{}
|
||||
plugin.Gather(acc)
|
||||
|
||||
require.Exactly(t, 14, len(acc.Metrics))
|
||||
require.Empty(t, acc.Errors)
|
||||
}
|
||||
|
||||
// collect multiple times
|
||||
func TestMultipleRuns(t *testing.T) {
|
||||
s := getHTTPServer()
|
||||
defer s.Close()
|
||||
|
||||
plugin := &burrow{
|
||||
Servers: []string{s.URL},
|
||||
}
|
||||
for i := 0; i < 4; i++ {
|
||||
acc := &testutil.Accumulator{}
|
||||
plugin.Gather(acc)
|
||||
|
||||
require.Exactly(t, 7, len(acc.Metrics))
|
||||
require.Empty(t, acc.Errors)
|
||||
}
|
||||
}
|
||||
|
||||
// collect from http basic auth server
|
||||
func TestBasicAuthConfig(t *testing.T) {
|
||||
s := getHTTPServerBasicAuth()
|
||||
defer s.Close()
|
||||
|
||||
plugin := &burrow{
|
||||
Servers: []string{s.URL},
|
||||
Username: "test",
|
||||
Password: "test",
|
||||
}
|
||||
|
||||
acc := &testutil.Accumulator{}
|
||||
plugin.Gather(acc)
|
||||
|
||||
require.Exactly(t, 7, len(acc.Metrics))
|
||||
require.Empty(t, acc.Errors)
|
||||
}
|
||||
|
||||
// collect from whitelisted clusters
|
||||
func TestFilterClusters(t *testing.T) {
|
||||
s := getHTTPServer()
|
||||
defer s.Close()
|
||||
|
||||
plugin := &burrow{
|
||||
Servers: []string{s.URL},
|
||||
ClustersInclude: []string{"wrongname*"}, // clustername1 -> no match
|
||||
}
|
||||
|
||||
acc := &testutil.Accumulator{}
|
||||
plugin.Gather(acc)
|
||||
|
||||
// no match by cluster
|
||||
require.Exactly(t, 0, len(acc.Metrics))
|
||||
require.Empty(t, acc.Errors)
|
||||
}
|
||||
|
||||
// collect from whitelisted groups
|
||||
func TestFilterGroups(t *testing.T) {
|
||||
s := getHTTPServer()
|
||||
defer s.Close()
|
||||
|
||||
plugin := &burrow{
|
||||
Servers: []string{s.URL},
|
||||
GroupsInclude: []string{"group?"}, // group1 -> match
|
||||
TopicsExclude: []string{"*"}, // exclude all
|
||||
}
|
||||
|
||||
acc := &testutil.Accumulator{}
|
||||
plugin.Gather(acc)
|
||||
|
||||
require.Exactly(t, 4, len(acc.Metrics))
|
||||
require.Empty(t, acc.Errors)
|
||||
}
|
||||
|
||||
// collect from whitelisted topics
|
||||
func TestFilterTopics(t *testing.T) {
|
||||
s := getHTTPServer()
|
||||
defer s.Close()
|
||||
|
||||
plugin := &burrow{
|
||||
Servers: []string{s.URL},
|
||||
TopicsInclude: []string{"topic?"}, // topicA -> match
|
||||
GroupsExclude: []string{"*"}, // exclude all
|
||||
}
|
||||
|
||||
acc := &testutil.Accumulator{}
|
||||
plugin.Gather(acc)
|
||||
|
||||
require.Exactly(t, 3, len(acc.Metrics))
|
||||
require.Empty(t, acc.Errors)
|
||||
}
|
|
@ -0,0 +1,11 @@
|
|||
{
|
||||
"error": true,
|
||||
"message": "Detailed error message",
|
||||
"request": {
|
||||
"uri": "/invalid/request",
|
||||
"host": "responding.host.example.com",
|
||||
"cluster": "",
|
||||
"group": "",
|
||||
"topic": ""
|
||||
}
|
||||
}
|
|
@ -0,0 +1,11 @@
|
|||
{
|
||||
"error": false,
|
||||
"message": "cluster list returned",
|
||||
"clusters": [
|
||||
"clustername1"
|
||||
],
|
||||
"request": {
|
||||
"url": "/v3/kafka",
|
||||
"host": "example.com"
|
||||
}
|
||||
}
|
|
@ -0,0 +1,11 @@
|
|||
{
|
||||
"error": false,
|
||||
"message": "consumer list returned",
|
||||
"consumers": [
|
||||
"group1"
|
||||
],
|
||||
"request": {
|
||||
"url": "/v3/kafka/clustername1/consumer",
|
||||
"host": "example.com"
|
||||
}
|
||||
}
|
90
plugins/inputs/burrow/testdata/v3_kafka_clustername1_consumer_group1_lag.json
vendored
Normal file
90
plugins/inputs/burrow/testdata/v3_kafka_clustername1_consumer_group1_lag.json
vendored
Normal file
|
@ -0,0 +1,90 @@
|
|||
{
|
||||
"error": false,
|
||||
"message": "consumer status returned",
|
||||
"status": {
|
||||
"cluster": "clustername1",
|
||||
"group": "group1",
|
||||
"status": "OK",
|
||||
"complete": 1,
|
||||
"partitions": [
|
||||
{
|
||||
"topic": "topicA",
|
||||
"partition": 0,
|
||||
"owner": "kafka",
|
||||
"status": "OK",
|
||||
"start": {
|
||||
"offset": 431323195,
|
||||
"timestamp": 1515609445004,
|
||||
"lag": 0
|
||||
},
|
||||
"end": {
|
||||
"offset": 431323195,
|
||||
"timestamp": 1515609490008,
|
||||
"lag": 0
|
||||
},
|
||||
"current_lag": 0,
|
||||
"complete": 1
|
||||
},
|
||||
{
|
||||
"topic": "topicA",
|
||||
"partition": 1,
|
||||
"owner": "kafka",
|
||||
"status": "OK",
|
||||
"start": {
|
||||
"offset": 431322962,
|
||||
"timestamp": 1515609445004,
|
||||
"lag": 0
|
||||
},
|
||||
"end": {
|
||||
"offset": 431322962,
|
||||
"timestamp": 1515609490008,
|
||||
"lag": 0
|
||||
},
|
||||
"current_lag": 0,
|
||||
"complete": 1
|
||||
},
|
||||
{
|
||||
"topic": "topicA",
|
||||
"partition": 2,
|
||||
"owner": "kafka",
|
||||
"status": "OK",
|
||||
"start": {
|
||||
"offset": 428636563,
|
||||
"timestamp": 1515609445004,
|
||||
"lag": 0
|
||||
},
|
||||
"end": {
|
||||
"offset": 428636563,
|
||||
"timestamp": 1515609490008,
|
||||
"lag": 0
|
||||
},
|
||||
"current_lag": 0,
|
||||
"complete": 1
|
||||
}
|
||||
],
|
||||
"partition_count": 3,
|
||||
"maxlag": {
|
||||
"topic": "topicA",
|
||||
"partition": 0,
|
||||
"owner": "kafka",
|
||||
"status": "OK",
|
||||
"start": {
|
||||
"offset": 431323195,
|
||||
"timestamp": 1515609445004,
|
||||
"lag": 0
|
||||
},
|
||||
"end": {
|
||||
"offset": 431323195,
|
||||
"timestamp": 1515609490008,
|
||||
"lag": 0
|
||||
},
|
||||
"current_lag": 0,
|
||||
"complete": 1
|
||||
},
|
||||
"totallag": 0
|
||||
},
|
||||
"request": {
|
||||
"url": "/v3/kafka/clustername1/consumer/group1/lag",
|
||||
"host": "example.com"
|
||||
}
|
||||
}
|
|
@ -0,0 +1,11 @@
|
|||
{
|
||||
"error": false,
|
||||
"message": "topic list returned",
|
||||
"topics": [
|
||||
"topicA"
|
||||
],
|
||||
"request": {
|
||||
"url": "/v3/kafka/clustername1/topic",
|
||||
"host": "example.com"
|
||||
}
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
{
|
||||
"error": false,
|
||||
"message": "topic offsets returned",
|
||||
"offsets": [
|
||||
459178195,
|
||||
459178022,
|
||||
456491598
|
||||
],
|
||||
"request": {
|
||||
"url": "/v3/kafka/clustername1/topic/topicA",
|
||||
"host": "example.com"
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue