diff --git a/README.md b/README.md index d4d19db0f..8ae242b5c 100644 --- a/README.md +++ b/README.md @@ -132,6 +132,7 @@ configuration options. * [bcache](./plugins/inputs/bcache) * [bond](./plugins/inputs/bond) * [cassandra](./plugins/inputs/cassandra) (deprecated, use [jolokia2](./plugins/inputs/jolokia2)) +* [burrow](./plugins/inputs/burrow) * [ceph](./plugins/inputs/ceph) * [cgroup](./plugins/inputs/cgroup) * [chrony](./plugins/inputs/chrony) diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 80db99bfb..239cf6e11 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -7,6 +7,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/aurora" _ "github.com/influxdata/telegraf/plugins/inputs/bcache" _ "github.com/influxdata/telegraf/plugins/inputs/bond" + _ "github.com/influxdata/telegraf/plugins/inputs/burrow" _ "github.com/influxdata/telegraf/plugins/inputs/cassandra" _ "github.com/influxdata/telegraf/plugins/inputs/ceph" _ "github.com/influxdata/telegraf/plugins/inputs/cgroup" diff --git a/plugins/inputs/burrow/README.md b/plugins/inputs/burrow/README.md new file mode 100644 index 000000000..039cff8c4 --- /dev/null +++ b/plugins/inputs/burrow/README.md @@ -0,0 +1,98 @@ +# Telegraf Plugin: Burrow + +Collect Kafka topic, consumer and partition status +via [Burrow](https://github.com/linkedin/Burrow) HTTP [API](https://github.com/linkedin/Burrow/wiki/HTTP-Endpoint). + +Supported Burrow version: `1.x` + +### Configuration + +``` + ## Burrow API endpoints in format "schema://host:port". + ## Default is "http://localhost:8000". + servers = ["http://localhost:8000"] + + ## Override Burrow API prefix. + ## Useful when Burrow is behind reverse-proxy. + # api_prefix = "/v3/kafka" + + ## Maximum time to receive response. + # response_timeout = "5s" + + ## Limit per-server concurrent connections. + ## Useful in case of large number of topics or consumer groups. + # concurrent_connections = 20 + + ## Filter clusters, default is no filtering. + ## Values can be specified as glob patterns. + # clusters_include = [] + # clusters_exclude = [] + + ## Filter consumer groups, default is no filtering. + ## Values can be specified as glob patterns. + # groups_include = [] + # groups_exclude = [] + + ## Filter topics, default is no filtering. + ## Values can be specified as glob patterns. + # topics_include = [] + # topics_exclude = [] + + ## Credentials for basic HTTP authentication. + # username = "" + # password = "" + + ## Optional SSL config + # ssl_ca = "/etc/telegraf/ca.pem" + # ssl_cert = "/etc/telegraf/cert.pem" + # ssl_key = "/etc/telegraf/key.pem" + # insecure_skip_verify = false +``` + +### Partition Status mappings + +* `OK` = 1 +* `NOT_FOUND` = 2 +* `WARN` = 3 +* `ERR` = 4 +* `STOP` = 5 +* `STALL` = 6 + +> unknown value will be mapped to 0 + +### Fields + +* `burrow_group` (one event per each consumer group) + - status (string, see Partition Status mappings) + - status_code (int, `1..6`, see Partition status mappings) + - parition_count (int, `number of partitions`) + - total_lag (int64, `totallag`) + - lag (int64, `maxlag.current_lag || 0`) + +* `burrow_partition` (one event per each topic partition) + - status (string, see Partition Status mappings) + - status_code (int, `1..6`, see Partition status mappings) + - lag (int64, `current_lag || 0`) + - offset (int64, `end.timestamp`) + - timestamp (int64, `end.timestamp`) + +* `burrow_topic` (one event per topic offset) + - offset (int64) + + +### Tags + +* `burrow_group` + - cluster (string) + - group (string) + +* `burrow_partition` + - cluster (string) + - group (string) + - topic (string) + - partition (int) + +* `burrow_topic` + - cluster (string) + - topic (string) + - partition (int) diff --git a/plugins/inputs/burrow/burrow.go b/plugins/inputs/burrow/burrow.go new file mode 100644 index 000000000..88fdb4b7f --- /dev/null +++ b/plugins/inputs/burrow/burrow.go @@ -0,0 +1,485 @@ +package burrow + +import ( + "encoding/json" + "fmt" + "net/http" + "net/url" + "strconv" + "strings" + "sync" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/filter" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/tls" + "github.com/influxdata/telegraf/plugins/inputs" +) + +const ( + defaultBurrowPrefix = "/v3/kafka" + defaultConcurrentConnections = 20 + defaultResponseTimeout = time.Second * 5 + defaultServer = "http://localhost:8000" +) + +const configSample = ` + ## Burrow API endpoints in format "schema://host:port". + ## Default is "http://localhost:8000". + servers = ["http://localhost:8000"] + + ## Override Burrow API prefix. + ## Useful when Burrow is behind reverse-proxy. + # api_prefix = "/v3/kafka" + + ## Maximum time to receive response. + # response_timeout = "5s" + + ## Limit per-server concurrent connections. + ## Useful in case of large number of topics or consumer groups. + # concurrent_connections = 20 + + ## Filter clusters, default is no filtering. + ## Values can be specified as glob patterns. + # clusters_include = [] + # clusters_exclude = [] + + ## Filter consumer groups, default is no filtering. + ## Values can be specified as glob patterns. + # groups_include = [] + # groups_exclude = [] + + ## Filter topics, default is no filtering. + ## Values can be specified as glob patterns. + # topics_include = [] + # topics_exclude = [] + + ## Credentials for basic HTTP authentication. + # username = "" + # password = "" + + ## Optional SSL config + # ssl_ca = "/etc/telegraf/ca.pem" + # ssl_cert = "/etc/telegraf/cert.pem" + # ssl_key = "/etc/telegraf/key.pem" + # insecure_skip_verify = false +` + +type ( + burrow struct { + tls.ClientConfig + + Servers []string + Username string + Password string + ResponseTimeout internal.Duration + ConcurrentConnections int + + APIPrefix string `toml:"api_prefix"` + ClustersExclude []string + ClustersInclude []string + GroupsExclude []string + GroupsInclude []string + TopicsExclude []string + TopicsInclude []string + + client *http.Client + filterClusters filter.Filter + filterGroups filter.Filter + filterTopics filter.Filter + } + + // response + apiResponse struct { + Clusters []string `json:"clusters"` + Groups []string `json:"consumers"` + Topics []string `json:"topics"` + Offsets []int64 `json:"offsets"` + Status apiStatusResponse `json:"status"` + } + + // response: status field + apiStatusResponse struct { + Partitions []apiStatusResponseLag `json:"partitions"` + Status string `json:"status"` + PartitionCount int `json:"partition_count"` + Maxlag *apiStatusResponseLag `json:"maxlag"` + TotalLag int64 `json:"totallag"` + } + + // response: lag field + apiStatusResponseLag struct { + Topic string `json:"topic"` + Partition int32 `json:"partition"` + Status string `json:"status"` + Start apiStatusResponseLagItem `json:"start"` + End apiStatusResponseLagItem `json:"end"` + CurrentLag int64 `json:"current_lag"` + } + + // response: lag field item + apiStatusResponseLagItem struct { + Offset int64 `json:"offset"` + Timestamp int64 `json:"timestamp"` + Lag int64 `json:"lag"` + } +) + +func init() { + inputs.Add("burrow", func() telegraf.Input { + return &burrow{} + }) +} + +func (b *burrow) SampleConfig() string { + return configSample +} + +func (b *burrow) Description() string { + return "Collect Kafka topics and consumers status from Burrow HTTP API." +} + +func (b *burrow) Gather(acc telegraf.Accumulator) error { + var wg sync.WaitGroup + + if len(b.Servers) == 0 { + b.Servers = []string{defaultServer} + } + + if b.client == nil { + b.setDefaults() + if err := b.compileGlobs(); err != nil { + return err + } + c, err := b.createClient() + if err != nil { + return err + } + b.client = c + } + + for _, addr := range b.Servers { + u, err := url.Parse(addr) + if err != nil { + acc.AddError(fmt.Errorf("unable to parse address '%s': %s", addr, err)) + continue + } + if u.Path == "" { + u.Path = b.APIPrefix + } + + wg.Add(1) + go func(u *url.URL) { + defer wg.Done() + acc.AddError(b.gatherServer(u, acc)) + }(u) + } + + wg.Wait() + return nil +} + +func (b *burrow) setDefaults() { + if b.APIPrefix == "" { + b.APIPrefix = defaultBurrowPrefix + } + if b.ConcurrentConnections < 1 { + b.ConcurrentConnections = defaultConcurrentConnections + } + if b.ResponseTimeout.Duration < time.Second { + b.ResponseTimeout = internal.Duration{ + Duration: defaultResponseTimeout, + } + } +} + +func (b *burrow) compileGlobs() error { + var err error + + // compile glob patterns + b.filterClusters, err = filter.NewIncludeExcludeFilter(b.ClustersInclude, b.ClustersExclude) + if err != nil { + return err + } + b.filterGroups, err = filter.NewIncludeExcludeFilter(b.GroupsInclude, b.GroupsExclude) + if err != nil { + return err + } + b.filterTopics, err = filter.NewIncludeExcludeFilter(b.TopicsInclude, b.TopicsExclude) + if err != nil { + return err + } + return nil +} + +func (b *burrow) createClient() (*http.Client, error) { + tlsCfg, err := b.ClientConfig.TLSConfig() + if err != nil { + return nil, err + } + + client := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tlsCfg, + }, + Timeout: b.ResponseTimeout.Duration, + } + + return client, nil +} + +func (b *burrow) getResponse(u *url.URL) (*apiResponse, error) { + req, err := http.NewRequest(http.MethodGet, u.String(), nil) + if err != nil { + return nil, err + } + if b.Username != "" { + req.SetBasicAuth(b.Username, b.Password) + } + + res, err := b.client.Do(req) + if err != nil { + return nil, err + } + + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + return nil, fmt.Errorf("wrong response: %d", res.StatusCode) + } + + ares := &apiResponse{} + dec := json.NewDecoder(res.Body) + + return ares, dec.Decode(ares) +} + +func (b *burrow) gatherServer(src *url.URL, acc telegraf.Accumulator) error { + var wg sync.WaitGroup + + r, err := b.getResponse(src) + if err != nil { + return err + } + + guard := make(chan struct{}, b.ConcurrentConnections) + for _, cluster := range r.Clusters { + if !b.filterClusters.Match(cluster) { + continue + } + + wg.Add(1) + go func(cluster string) { + defer wg.Done() + + // fetch topic list + // endpoint: /(cluster)/topic + ut := appendPathToURL(src, cluster, "topic") + b.gatherTopics(guard, ut, cluster, acc) + }(cluster) + + wg.Add(1) + go func(cluster string) { + defer wg.Done() + + // fetch consumer group list + // endpoint: /(cluster)/consumer + uc := appendPathToURL(src, cluster, "consumer") + b.gatherGroups(guard, uc, cluster, acc) + }(cluster) + } + + wg.Wait() + return nil +} + +func (b *burrow) gatherTopics(guard chan struct{}, src *url.URL, cluster string, acc telegraf.Accumulator) { + var wg sync.WaitGroup + + r, err := b.getResponse(src) + if err != nil { + acc.AddError(err) + return + } + + for _, topic := range r.Topics { + if !b.filterTopics.Match(topic) { + continue + } + + guard <- struct{}{} + wg.Add(1) + + go func(topic string) { + defer func() { + <-guard + wg.Done() + }() + + // fetch topic offsets + // endpoint: //topic/ + tu := appendPathToURL(src, topic) + tr, err := b.getResponse(tu) + if err != nil { + acc.AddError(err) + return + } + + b.genTopicMetrics(tr, cluster, topic, acc) + }(topic) + } + + wg.Wait() +} + +func (b *burrow) genTopicMetrics(r *apiResponse, cluster, topic string, acc telegraf.Accumulator) { + for i, offset := range r.Offsets { + tags := map[string]string{ + "cluster": cluster, + "topic": topic, + "partition": strconv.Itoa(i), + } + + acc.AddFields( + "burrow_topic", + map[string]interface{}{ + "offset": offset, + }, + tags, + ) + } +} + +func (b *burrow) gatherGroups(guard chan struct{}, src *url.URL, cluster string, acc telegraf.Accumulator) { + var wg sync.WaitGroup + + r, err := b.getResponse(src) + if err != nil { + acc.AddError(err) + return + } + + for _, group := range r.Groups { + if !b.filterGroups.Match(group) { + continue + } + + guard <- struct{}{} + wg.Add(1) + + go func(group string) { + defer func() { + <-guard + wg.Done() + }() + + // fetch consumer group status + // endpoint: //consumer//lag + gl := appendPathToURL(src, group, "lag") + gr, err := b.getResponse(gl) + if err != nil { + acc.AddError(err) + return + } + + b.genGroupStatusMetrics(gr, cluster, group, acc) + b.genGroupLagMetrics(gr, cluster, group, acc) + }(group) + } + + wg.Wait() +} + +func (b *burrow) genGroupStatusMetrics(r *apiResponse, cluster, group string, acc telegraf.Accumulator) { + partitionCount := r.Status.PartitionCount + if partitionCount == 0 { + partitionCount = len(r.Status.Partitions) + } + + // get max timestamp and offset from partitions list + offset := int64(0) + timestamp := int64(0) + for _, partition := range r.Status.Partitions { + if partition.End.Offset > offset { + offset = partition.End.Offset + } + if partition.End.Timestamp > timestamp { + timestamp = partition.End.Timestamp + } + } + + lag := int64(0) + if r.Status.Maxlag != nil { + lag = r.Status.Maxlag.CurrentLag + } + + acc.AddFields( + "burrow_group", + map[string]interface{}{ + "status": r.Status.Status, + "status_code": mapStatusToCode(r.Status.Status), + "partition_count": partitionCount, + "total_lag": r.Status.TotalLag, + "lag": lag, + "offset": offset, + "timestamp": timestamp, + }, + map[string]string{ + "cluster": cluster, + "group": group, + }, + ) +} + +func (b *burrow) genGroupLagMetrics(r *apiResponse, cluster, group string, acc telegraf.Accumulator) { + for _, partition := range r.Status.Partitions { + acc.AddFields( + "burrow_partition", + map[string]interface{}{ + "status": partition.Status, + "status_code": mapStatusToCode(partition.Status), + "lag": partition.CurrentLag, + "offset": partition.End.Offset, + "timestamp": partition.End.Timestamp, + }, + map[string]string{ + "cluster": cluster, + "group": group, + "topic": partition.Topic, + "partition": strconv.FormatInt(int64(partition.Partition), 10), + }, + ) + } +} + +func appendPathToURL(src *url.URL, parts ...string) *url.URL { + dst := new(url.URL) + *dst = *src + + for i, part := range parts { + parts[i] = url.PathEscape(part) + } + + ext := strings.Join(parts, "/") + dst.Path = fmt.Sprintf("%s/%s", src.Path, ext) + return dst +} + +func mapStatusToCode(src string) int { + switch src { + case "OK": + return 1 + case "NOT_FOUND": + return 2 + case "WARN": + return 3 + case "ERR": + return 4 + case "STOP": + return 5 + case "STALL": + return 6 + default: + return 0 + } +} diff --git a/plugins/inputs/burrow/burrow_test.go b/plugins/inputs/burrow/burrow_test.go new file mode 100644 index 000000000..9b3f4a0a9 --- /dev/null +++ b/plugins/inputs/burrow/burrow_test.go @@ -0,0 +1,285 @@ +package burrow + +import ( + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "os" + "strings" + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +// remap uri to json file, eg: /v3/kafka -> ./testdata/v3_kafka.json +func getResponseJSON(requestURI string) ([]byte, int) { + uri := strings.TrimLeft(requestURI, "/") + mappedFile := strings.Replace(uri, "/", "_", -1) + jsonFile := fmt.Sprintf("./testdata/%s.json", mappedFile) + + code := 200 + _, err := os.Stat(jsonFile) + if err != nil { + code = 404 + jsonFile = "./testdata/error.json" + } + + // respond with file + b, _ := ioutil.ReadFile(jsonFile) + return b, code +} + +// return mocked HTTP server +func getHTTPServer() *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, code := getResponseJSON(r.RequestURI) + w.WriteHeader(code) + w.Header().Set("Content-Type", "application/json") + w.Write(body) + })) +} + +// return mocked HTTP server with basic auth +func getHTTPServerBasicAuth() *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("WWW-Authenticate", `Basic realm="Restricted"`) + + username, password, authOK := r.BasicAuth() + if authOK == false { + http.Error(w, "Not authorized", 401) + return + } + + if username != "test" && password != "test" { + http.Error(w, "Not authorized", 401) + return + } + + // ok, continue + body, code := getResponseJSON(r.RequestURI) + w.WriteHeader(code) + w.Header().Set("Content-Type", "application/json") + w.Write(body) + })) +} + +// test burrow_topic measurement +func TestBurrowTopic(t *testing.T) { + s := getHTTPServer() + defer s.Close() + + plugin := &burrow{Servers: []string{s.URL}} + acc := &testutil.Accumulator{} + plugin.Gather(acc) + + fields := []map[string]interface{}{ + // topicA + {"offset": int64(459178195)}, + {"offset": int64(459178022)}, + {"offset": int64(456491598)}, + } + tags := []map[string]string{ + // topicA + {"cluster": "clustername1", "topic": "topicA", "partition": "0"}, + {"cluster": "clustername1", "topic": "topicA", "partition": "1"}, + {"cluster": "clustername1", "topic": "topicA", "partition": "2"}, + } + + require.Empty(t, acc.Errors) + require.Equal(t, true, acc.HasMeasurement("burrow_topic")) + for i := 0; i < len(fields); i++ { + acc.AssertContainsTaggedFields(t, "burrow_topic", fields[i], tags[i]) + } +} + +// test burrow_partition measurement +func TestBurrowPartition(t *testing.T) { + s := getHTTPServer() + defer s.Close() + + plugin := &burrow{ + Servers: []string{s.URL}, + } + acc := &testutil.Accumulator{} + plugin.Gather(acc) + + fields := []map[string]interface{}{ + { + "status": "OK", + "status_code": 1, + "lag": int64(0), + "offset": int64(431323195), + "timestamp": int64(1515609490008), + }, + { + "status": "OK", + "status_code": 1, + "lag": int64(0), + "offset": int64(431322962), + "timestamp": int64(1515609490008), + }, + { + "status": "OK", + "status_code": 1, + "lag": int64(0), + "offset": int64(428636563), + "timestamp": int64(1515609490008), + }, + } + tags := []map[string]string{ + {"cluster": "clustername1", "group": "group1", "topic": "topicA", "partition": "0"}, + {"cluster": "clustername1", "group": "group1", "topic": "topicA", "partition": "1"}, + {"cluster": "clustername1", "group": "group1", "topic": "topicA", "partition": "2"}, + } + + require.Empty(t, acc.Errors) + require.Equal(t, true, acc.HasMeasurement("burrow_partition")) + + for i := 0; i < len(fields); i++ { + acc.AssertContainsTaggedFields(t, "burrow_partition", fields[i], tags[i]) + } +} + +// burrow_group +func TestBurrowGroup(t *testing.T) { + s := getHTTPServer() + defer s.Close() + + plugin := &burrow{ + Servers: []string{s.URL}, + } + acc := &testutil.Accumulator{} + plugin.Gather(acc) + + fields := []map[string]interface{}{ + { + "status": "OK", + "status_code": 1, + "partition_count": 3, + "total_lag": int64(0), + "lag": int64(0), + "offset": int64(431323195), + "timestamp": int64(1515609490008), + }, + } + + tags := []map[string]string{ + {"cluster": "clustername1", "group": "group1"}, + } + + require.Empty(t, acc.Errors) + require.Equal(t, true, acc.HasMeasurement("burrow_group")) + + for i := 0; i < len(fields); i++ { + acc.AssertContainsTaggedFields(t, "burrow_group", fields[i], tags[i]) + } +} + +// collect from multiple servers +func TestMultipleServers(t *testing.T) { + s1 := getHTTPServer() + defer s1.Close() + + s2 := getHTTPServer() + defer s2.Close() + + plugin := &burrow{ + Servers: []string{s1.URL, s2.URL}, + } + acc := &testutil.Accumulator{} + plugin.Gather(acc) + + require.Exactly(t, 14, len(acc.Metrics)) + require.Empty(t, acc.Errors) +} + +// collect multiple times +func TestMultipleRuns(t *testing.T) { + s := getHTTPServer() + defer s.Close() + + plugin := &burrow{ + Servers: []string{s.URL}, + } + for i := 0; i < 4; i++ { + acc := &testutil.Accumulator{} + plugin.Gather(acc) + + require.Exactly(t, 7, len(acc.Metrics)) + require.Empty(t, acc.Errors) + } +} + +// collect from http basic auth server +func TestBasicAuthConfig(t *testing.T) { + s := getHTTPServerBasicAuth() + defer s.Close() + + plugin := &burrow{ + Servers: []string{s.URL}, + Username: "test", + Password: "test", + } + + acc := &testutil.Accumulator{} + plugin.Gather(acc) + + require.Exactly(t, 7, len(acc.Metrics)) + require.Empty(t, acc.Errors) +} + +// collect from whitelisted clusters +func TestFilterClusters(t *testing.T) { + s := getHTTPServer() + defer s.Close() + + plugin := &burrow{ + Servers: []string{s.URL}, + ClustersInclude: []string{"wrongname*"}, // clustername1 -> no match + } + + acc := &testutil.Accumulator{} + plugin.Gather(acc) + + // no match by cluster + require.Exactly(t, 0, len(acc.Metrics)) + require.Empty(t, acc.Errors) +} + +// collect from whitelisted groups +func TestFilterGroups(t *testing.T) { + s := getHTTPServer() + defer s.Close() + + plugin := &burrow{ + Servers: []string{s.URL}, + GroupsInclude: []string{"group?"}, // group1 -> match + TopicsExclude: []string{"*"}, // exclude all + } + + acc := &testutil.Accumulator{} + plugin.Gather(acc) + + require.Exactly(t, 4, len(acc.Metrics)) + require.Empty(t, acc.Errors) +} + +// collect from whitelisted topics +func TestFilterTopics(t *testing.T) { + s := getHTTPServer() + defer s.Close() + + plugin := &burrow{ + Servers: []string{s.URL}, + TopicsInclude: []string{"topic?"}, // topicA -> match + GroupsExclude: []string{"*"}, // exclude all + } + + acc := &testutil.Accumulator{} + plugin.Gather(acc) + + require.Exactly(t, 3, len(acc.Metrics)) + require.Empty(t, acc.Errors) +} diff --git a/plugins/inputs/burrow/testdata/error.json b/plugins/inputs/burrow/testdata/error.json new file mode 100644 index 000000000..f70b863e6 --- /dev/null +++ b/plugins/inputs/burrow/testdata/error.json @@ -0,0 +1,11 @@ +{ + "error": true, + "message": "Detailed error message", + "request": { + "uri": "/invalid/request", + "host": "responding.host.example.com", + "cluster": "", + "group": "", + "topic": "" + } +} diff --git a/plugins/inputs/burrow/testdata/v3_kafka.json b/plugins/inputs/burrow/testdata/v3_kafka.json new file mode 100644 index 000000000..dfc4d0444 --- /dev/null +++ b/plugins/inputs/burrow/testdata/v3_kafka.json @@ -0,0 +1,11 @@ +{ + "error": false, + "message": "cluster list returned", + "clusters": [ + "clustername1" + ], + "request": { + "url": "/v3/kafka", + "host": "example.com" + } +} diff --git a/plugins/inputs/burrow/testdata/v3_kafka_clustername1_consumer.json b/plugins/inputs/burrow/testdata/v3_kafka_clustername1_consumer.json new file mode 100644 index 000000000..f16226444 --- /dev/null +++ b/plugins/inputs/burrow/testdata/v3_kafka_clustername1_consumer.json @@ -0,0 +1,11 @@ +{ + "error": false, + "message": "consumer list returned", + "consumers": [ + "group1" + ], + "request": { + "url": "/v3/kafka/clustername1/consumer", + "host": "example.com" + } +} diff --git a/plugins/inputs/burrow/testdata/v3_kafka_clustername1_consumer_group1_lag.json b/plugins/inputs/burrow/testdata/v3_kafka_clustername1_consumer_group1_lag.json new file mode 100644 index 000000000..21205a663 --- /dev/null +++ b/plugins/inputs/burrow/testdata/v3_kafka_clustername1_consumer_group1_lag.json @@ -0,0 +1,90 @@ +{ + "error": false, + "message": "consumer status returned", + "status": { + "cluster": "clustername1", + "group": "group1", + "status": "OK", + "complete": 1, + "partitions": [ + { + "topic": "topicA", + "partition": 0, + "owner": "kafka", + "status": "OK", + "start": { + "offset": 431323195, + "timestamp": 1515609445004, + "lag": 0 + }, + "end": { + "offset": 431323195, + "timestamp": 1515609490008, + "lag": 0 + }, + "current_lag": 0, + "complete": 1 + }, + { + "topic": "topicA", + "partition": 1, + "owner": "kafka", + "status": "OK", + "start": { + "offset": 431322962, + "timestamp": 1515609445004, + "lag": 0 + }, + "end": { + "offset": 431322962, + "timestamp": 1515609490008, + "lag": 0 + }, + "current_lag": 0, + "complete": 1 + }, + { + "topic": "topicA", + "partition": 2, + "owner": "kafka", + "status": "OK", + "start": { + "offset": 428636563, + "timestamp": 1515609445004, + "lag": 0 + }, + "end": { + "offset": 428636563, + "timestamp": 1515609490008, + "lag": 0 + }, + "current_lag": 0, + "complete": 1 + } + ], + "partition_count": 3, + "maxlag": { + "topic": "topicA", + "partition": 0, + "owner": "kafka", + "status": "OK", + "start": { + "offset": 431323195, + "timestamp": 1515609445004, + "lag": 0 + }, + "end": { + "offset": 431323195, + "timestamp": 1515609490008, + "lag": 0 + }, + "current_lag": 0, + "complete": 1 + }, + "totallag": 0 + }, + "request": { + "url": "/v3/kafka/clustername1/consumer/group1/lag", + "host": "example.com" + } +} diff --git a/plugins/inputs/burrow/testdata/v3_kafka_clustername1_topic.json b/plugins/inputs/burrow/testdata/v3_kafka_clustername1_topic.json new file mode 100644 index 000000000..9bd21a14e --- /dev/null +++ b/plugins/inputs/burrow/testdata/v3_kafka_clustername1_topic.json @@ -0,0 +1,11 @@ +{ + "error": false, + "message": "topic list returned", + "topics": [ + "topicA" + ], + "request": { + "url": "/v3/kafka/clustername1/topic", + "host": "example.com" + } +} diff --git a/plugins/inputs/burrow/testdata/v3_kafka_clustername1_topic_topicA.json b/plugins/inputs/burrow/testdata/v3_kafka_clustername1_topic_topicA.json new file mode 100644 index 000000000..38a3cee0a --- /dev/null +++ b/plugins/inputs/burrow/testdata/v3_kafka_clustername1_topic_topicA.json @@ -0,0 +1,13 @@ +{ + "error": false, + "message": "topic offsets returned", + "offsets": [ + 459178195, + 459178022, + 456491598 + ], + "request": { + "url": "/v3/kafka/clustername1/topic/topicA", + "host": "example.com" + } +}