286 lines
		
	
	
		
			6.6 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			286 lines
		
	
	
		
			6.6 KiB
		
	
	
	
		
			Go
		
	
	
	
| package burrow
 | |
| 
 | |
| import (
 | |
| 	"fmt"
 | |
| 	"io/ioutil"
 | |
| 	"net/http"
 | |
| 	"net/http/httptest"
 | |
| 	"os"
 | |
| 	"strings"
 | |
| 	"testing"
 | |
| 
 | |
| 	"github.com/influxdata/telegraf/testutil"
 | |
| 	"github.com/stretchr/testify/require"
 | |
| )
 | |
| 
 | |
| // remap uri to json file, eg: /v3/kafka -> ./testdata/v3_kafka.json
 | |
| func getResponseJSON(requestURI string) ([]byte, int) {
 | |
| 	uri := strings.TrimLeft(requestURI, "/")
 | |
| 	mappedFile := strings.Replace(uri, "/", "_", -1)
 | |
| 	jsonFile := fmt.Sprintf("./testdata/%s.json", mappedFile)
 | |
| 
 | |
| 	code := 200
 | |
| 	_, err := os.Stat(jsonFile)
 | |
| 	if err != nil {
 | |
| 		code = 404
 | |
| 		jsonFile = "./testdata/error.json"
 | |
| 	}
 | |
| 
 | |
| 	// respond with file
 | |
| 	b, _ := ioutil.ReadFile(jsonFile)
 | |
| 	return b, code
 | |
| }
 | |
| 
 | |
| // return mocked HTTP server
 | |
| func getHTTPServer() *httptest.Server {
 | |
| 	return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
 | |
| 		body, code := getResponseJSON(r.RequestURI)
 | |
| 		w.WriteHeader(code)
 | |
| 		w.Header().Set("Content-Type", "application/json")
 | |
| 		w.Write(body)
 | |
| 	}))
 | |
| }
 | |
| 
 | |
| // return mocked HTTP server with basic auth
 | |
| func getHTTPServerBasicAuth() *httptest.Server {
 | |
| 	return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
 | |
| 		w.Header().Set("WWW-Authenticate", `Basic realm="Restricted"`)
 | |
| 
 | |
| 		username, password, authOK := r.BasicAuth()
 | |
| 		if authOK == false {
 | |
| 			http.Error(w, "Not authorized", 401)
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		if username != "test" && password != "test" {
 | |
| 			http.Error(w, "Not authorized", 401)
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		// ok, continue
 | |
| 		body, code := getResponseJSON(r.RequestURI)
 | |
| 		w.WriteHeader(code)
 | |
| 		w.Header().Set("Content-Type", "application/json")
 | |
| 		w.Write(body)
 | |
| 	}))
 | |
| }
 | |
| 
 | |
| // test burrow_topic measurement
 | |
| func TestBurrowTopic(t *testing.T) {
 | |
| 	s := getHTTPServer()
 | |
| 	defer s.Close()
 | |
| 
 | |
| 	plugin := &burrow{Servers: []string{s.URL}}
 | |
| 	acc := &testutil.Accumulator{}
 | |
| 	plugin.Gather(acc)
 | |
| 
 | |
| 	fields := []map[string]interface{}{
 | |
| 		// topicA
 | |
| 		{"offset": int64(459178195)},
 | |
| 		{"offset": int64(459178022)},
 | |
| 		{"offset": int64(456491598)},
 | |
| 	}
 | |
| 	tags := []map[string]string{
 | |
| 		// topicA
 | |
| 		{"cluster": "clustername1", "topic": "topicA", "partition": "0"},
 | |
| 		{"cluster": "clustername1", "topic": "topicA", "partition": "1"},
 | |
| 		{"cluster": "clustername1", "topic": "topicA", "partition": "2"},
 | |
| 	}
 | |
| 
 | |
| 	require.Empty(t, acc.Errors)
 | |
| 	require.Equal(t, true, acc.HasMeasurement("burrow_topic"))
 | |
| 	for i := 0; i < len(fields); i++ {
 | |
| 		acc.AssertContainsTaggedFields(t, "burrow_topic", fields[i], tags[i])
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // test burrow_partition measurement
 | |
| func TestBurrowPartition(t *testing.T) {
 | |
| 	s := getHTTPServer()
 | |
| 	defer s.Close()
 | |
| 
 | |
| 	plugin := &burrow{
 | |
| 		Servers: []string{s.URL},
 | |
| 	}
 | |
| 	acc := &testutil.Accumulator{}
 | |
| 	plugin.Gather(acc)
 | |
| 
 | |
| 	fields := []map[string]interface{}{
 | |
| 		{
 | |
| 			"status":      "OK",
 | |
| 			"status_code": 1,
 | |
| 			"lag":         int64(0),
 | |
| 			"offset":      int64(431323195),
 | |
| 			"timestamp":   int64(1515609490008),
 | |
| 		},
 | |
| 		{
 | |
| 			"status":      "OK",
 | |
| 			"status_code": 1,
 | |
| 			"lag":         int64(0),
 | |
| 			"offset":      int64(431322962),
 | |
| 			"timestamp":   int64(1515609490008),
 | |
| 		},
 | |
| 		{
 | |
| 			"status":      "OK",
 | |
| 			"status_code": 1,
 | |
| 			"lag":         int64(0),
 | |
| 			"offset":      int64(428636563),
 | |
| 			"timestamp":   int64(1515609490008),
 | |
| 		},
 | |
| 	}
 | |
| 	tags := []map[string]string{
 | |
| 		{"cluster": "clustername1", "group": "group1", "topic": "topicA", "partition": "0", "owner": "kafka1"},
 | |
| 		{"cluster": "clustername1", "group": "group1", "topic": "topicA", "partition": "1", "owner": "kafka2"},
 | |
| 		{"cluster": "clustername1", "group": "group1", "topic": "topicA", "partition": "2", "owner": "kafka3"},
 | |
| 	}
 | |
| 
 | |
| 	require.Empty(t, acc.Errors)
 | |
| 	require.Equal(t, true, acc.HasMeasurement("burrow_partition"))
 | |
| 
 | |
| 	for i := 0; i < len(fields); i++ {
 | |
| 		acc.AssertContainsTaggedFields(t, "burrow_partition", fields[i], tags[i])
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // burrow_group
 | |
| func TestBurrowGroup(t *testing.T) {
 | |
| 	s := getHTTPServer()
 | |
| 	defer s.Close()
 | |
| 
 | |
| 	plugin := &burrow{
 | |
| 		Servers: []string{s.URL},
 | |
| 	}
 | |
| 	acc := &testutil.Accumulator{}
 | |
| 	plugin.Gather(acc)
 | |
| 
 | |
| 	fields := []map[string]interface{}{
 | |
| 		{
 | |
| 			"status":          "OK",
 | |
| 			"status_code":     1,
 | |
| 			"partition_count": 3,
 | |
| 			"total_lag":       int64(0),
 | |
| 			"lag":             int64(0),
 | |
| 			"offset":          int64(431323195 + 431322962 + 428636563),
 | |
| 			"timestamp":       int64(1515609490008),
 | |
| 		},
 | |
| 	}
 | |
| 
 | |
| 	tags := []map[string]string{
 | |
| 		{"cluster": "clustername1", "group": "group1"},
 | |
| 	}
 | |
| 
 | |
| 	require.Empty(t, acc.Errors)
 | |
| 	require.Equal(t, true, acc.HasMeasurement("burrow_group"))
 | |
| 
 | |
| 	for i := 0; i < len(fields); i++ {
 | |
| 		acc.AssertContainsTaggedFields(t, "burrow_group", fields[i], tags[i])
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // collect from multiple servers
 | |
| func TestMultipleServers(t *testing.T) {
 | |
| 	s1 := getHTTPServer()
 | |
| 	defer s1.Close()
 | |
| 
 | |
| 	s2 := getHTTPServer()
 | |
| 	defer s2.Close()
 | |
| 
 | |
| 	plugin := &burrow{
 | |
| 		Servers: []string{s1.URL, s2.URL},
 | |
| 	}
 | |
| 	acc := &testutil.Accumulator{}
 | |
| 	plugin.Gather(acc)
 | |
| 
 | |
| 	require.Exactly(t, 14, len(acc.Metrics))
 | |
| 	require.Empty(t, acc.Errors)
 | |
| }
 | |
| 
 | |
| // collect multiple times
 | |
| func TestMultipleRuns(t *testing.T) {
 | |
| 	s := getHTTPServer()
 | |
| 	defer s.Close()
 | |
| 
 | |
| 	plugin := &burrow{
 | |
| 		Servers: []string{s.URL},
 | |
| 	}
 | |
| 	for i := 0; i < 4; i++ {
 | |
| 		acc := &testutil.Accumulator{}
 | |
| 		plugin.Gather(acc)
 | |
| 
 | |
| 		require.Exactly(t, 7, len(acc.Metrics))
 | |
| 		require.Empty(t, acc.Errors)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // collect from http basic auth server
 | |
| func TestBasicAuthConfig(t *testing.T) {
 | |
| 	s := getHTTPServerBasicAuth()
 | |
| 	defer s.Close()
 | |
| 
 | |
| 	plugin := &burrow{
 | |
| 		Servers:  []string{s.URL},
 | |
| 		Username: "test",
 | |
| 		Password: "test",
 | |
| 	}
 | |
| 
 | |
| 	acc := &testutil.Accumulator{}
 | |
| 	plugin.Gather(acc)
 | |
| 
 | |
| 	require.Exactly(t, 7, len(acc.Metrics))
 | |
| 	require.Empty(t, acc.Errors)
 | |
| }
 | |
| 
 | |
| // collect from whitelisted clusters
 | |
| func TestFilterClusters(t *testing.T) {
 | |
| 	s := getHTTPServer()
 | |
| 	defer s.Close()
 | |
| 
 | |
| 	plugin := &burrow{
 | |
| 		Servers:         []string{s.URL},
 | |
| 		ClustersInclude: []string{"wrongname*"}, // clustername1 -> no match
 | |
| 	}
 | |
| 
 | |
| 	acc := &testutil.Accumulator{}
 | |
| 	plugin.Gather(acc)
 | |
| 
 | |
| 	// no match by cluster
 | |
| 	require.Exactly(t, 0, len(acc.Metrics))
 | |
| 	require.Empty(t, acc.Errors)
 | |
| }
 | |
| 
 | |
| // collect from whitelisted groups
 | |
| func TestFilterGroups(t *testing.T) {
 | |
| 	s := getHTTPServer()
 | |
| 	defer s.Close()
 | |
| 
 | |
| 	plugin := &burrow{
 | |
| 		Servers:       []string{s.URL},
 | |
| 		GroupsInclude: []string{"group?"}, // group1 -> match
 | |
| 		TopicsExclude: []string{"*"},      // exclude all
 | |
| 	}
 | |
| 
 | |
| 	acc := &testutil.Accumulator{}
 | |
| 	plugin.Gather(acc)
 | |
| 
 | |
| 	require.Exactly(t, 4, len(acc.Metrics))
 | |
| 	require.Empty(t, acc.Errors)
 | |
| }
 | |
| 
 | |
| // collect from whitelisted topics
 | |
| func TestFilterTopics(t *testing.T) {
 | |
| 	s := getHTTPServer()
 | |
| 	defer s.Close()
 | |
| 
 | |
| 	plugin := &burrow{
 | |
| 		Servers:       []string{s.URL},
 | |
| 		TopicsInclude: []string{"topic?"}, // topicA -> match
 | |
| 		GroupsExclude: []string{"*"},      // exclude all
 | |
| 	}
 | |
| 
 | |
| 	acc := &testutil.Accumulator{}
 | |
| 	plugin.Gather(acc)
 | |
| 
 | |
| 	require.Exactly(t, 3, len(acc.Metrics))
 | |
| 	require.Empty(t, acc.Errors)
 | |
| }
 |