package burrow

import (
	"fmt"
	"io/ioutil"
	"net/http"
	"net/http/httptest"
	"os"
	"strings"
	"testing"

	"github.com/influxdata/telegraf/testutil"
	"github.com/stretchr/testify/require"
)

// remap uri to json file, eg: /v3/kafka -> ./testdata/v3_kafka.json
func getResponseJSON(requestURI string) ([]byte, int) {
	uri := strings.TrimLeft(requestURI, "/")
	mappedFile := strings.Replace(uri, "/", "_", -1)
	jsonFile := fmt.Sprintf("./testdata/%s.json", mappedFile)

	code := 200
	_, err := os.Stat(jsonFile)
	if err != nil {
		code = 404
		jsonFile = "./testdata/error.json"
	}

	// respond with file
	b, _ := ioutil.ReadFile(jsonFile)
	return b, code
}

// return mocked HTTP server
func getHTTPServer() *httptest.Server {
	return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		body, code := getResponseJSON(r.RequestURI)
		w.WriteHeader(code)
		w.Header().Set("Content-Type", "application/json")
		w.Write(body)
	}))
}

// return mocked HTTP server with basic auth
func getHTTPServerBasicAuth() *httptest.Server {
	return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		w.Header().Set("WWW-Authenticate", `Basic realm="Restricted"`)

		username, password, authOK := r.BasicAuth()
		if authOK == false {
			http.Error(w, "Not authorized", 401)
			return
		}

		if username != "test" && password != "test" {
			http.Error(w, "Not authorized", 401)
			return
		}

		// ok, continue
		body, code := getResponseJSON(r.RequestURI)
		w.WriteHeader(code)
		w.Header().Set("Content-Type", "application/json")
		w.Write(body)
	}))
}

// test burrow_topic measurement
func TestBurrowTopic(t *testing.T) {
	s := getHTTPServer()
	defer s.Close()

	plugin := &burrow{Servers: []string{s.URL}}
	acc := &testutil.Accumulator{}
	plugin.Gather(acc)

	fields := []map[string]interface{}{
		// topicA
		{"offset": int64(459178195)},
		{"offset": int64(459178022)},
		{"offset": int64(456491598)},
	}
	tags := []map[string]string{
		// topicA
		{"cluster": "clustername1", "topic": "topicA", "partition": "0"},
		{"cluster": "clustername1", "topic": "topicA", "partition": "1"},
		{"cluster": "clustername1", "topic": "topicA", "partition": "2"},
	}

	require.Empty(t, acc.Errors)
	require.Equal(t, true, acc.HasMeasurement("burrow_topic"))
	for i := 0; i < len(fields); i++ {
		acc.AssertContainsTaggedFields(t, "burrow_topic", fields[i], tags[i])
	}
}

// test burrow_partition measurement
func TestBurrowPartition(t *testing.T) {
	s := getHTTPServer()
	defer s.Close()

	plugin := &burrow{
		Servers: []string{s.URL},
	}
	acc := &testutil.Accumulator{}
	plugin.Gather(acc)

	fields := []map[string]interface{}{
		{
			"status":      "OK",
			"status_code": 1,
			"lag":         int64(0),
			"offset":      int64(431323195),
			"timestamp":   int64(1515609490008),
		},
		{
			"status":      "OK",
			"status_code": 1,
			"lag":         int64(0),
			"offset":      int64(431322962),
			"timestamp":   int64(1515609490008),
		},
		{
			"status":      "OK",
			"status_code": 1,
			"lag":         int64(0),
			"offset":      int64(428636563),
			"timestamp":   int64(1515609490008),
		},
	}
	tags := []map[string]string{
		{"cluster": "clustername1", "group": "group1", "topic": "topicA", "partition": "0", "owner": "kafka1"},
		{"cluster": "clustername1", "group": "group1", "topic": "topicA", "partition": "1", "owner": "kafka2"},
		{"cluster": "clustername1", "group": "group1", "topic": "topicA", "partition": "2", "owner": "kafka3"},
	}

	require.Empty(t, acc.Errors)
	require.Equal(t, true, acc.HasMeasurement("burrow_partition"))

	for i := 0; i < len(fields); i++ {
		acc.AssertContainsTaggedFields(t, "burrow_partition", fields[i], tags[i])
	}
}

// burrow_group
func TestBurrowGroup(t *testing.T) {
	s := getHTTPServer()
	defer s.Close()

	plugin := &burrow{
		Servers: []string{s.URL},
	}
	acc := &testutil.Accumulator{}
	plugin.Gather(acc)

	fields := []map[string]interface{}{
		{
			"status":          "OK",
			"status_code":     1,
			"partition_count": 3,
			"total_lag":       int64(0),
			"lag":             int64(0),
			"offset":          int64(431323195 + 431322962 + 428636563),
			"timestamp":       int64(1515609490008),
		},
	}

	tags := []map[string]string{
		{"cluster": "clustername1", "group": "group1"},
	}

	require.Empty(t, acc.Errors)
	require.Equal(t, true, acc.HasMeasurement("burrow_group"))

	for i := 0; i < len(fields); i++ {
		acc.AssertContainsTaggedFields(t, "burrow_group", fields[i], tags[i])
	}
}

// collect from multiple servers
func TestMultipleServers(t *testing.T) {
	s1 := getHTTPServer()
	defer s1.Close()

	s2 := getHTTPServer()
	defer s2.Close()

	plugin := &burrow{
		Servers: []string{s1.URL, s2.URL},
	}
	acc := &testutil.Accumulator{}
	plugin.Gather(acc)

	require.Exactly(t, 14, len(acc.Metrics))
	require.Empty(t, acc.Errors)
}

// collect multiple times
func TestMultipleRuns(t *testing.T) {
	s := getHTTPServer()
	defer s.Close()

	plugin := &burrow{
		Servers: []string{s.URL},
	}
	for i := 0; i < 4; i++ {
		acc := &testutil.Accumulator{}
		plugin.Gather(acc)

		require.Exactly(t, 7, len(acc.Metrics))
		require.Empty(t, acc.Errors)
	}
}

// collect from http basic auth server
func TestBasicAuthConfig(t *testing.T) {
	s := getHTTPServerBasicAuth()
	defer s.Close()

	plugin := &burrow{
		Servers:  []string{s.URL},
		Username: "test",
		Password: "test",
	}

	acc := &testutil.Accumulator{}
	plugin.Gather(acc)

	require.Exactly(t, 7, len(acc.Metrics))
	require.Empty(t, acc.Errors)
}

// collect from whitelisted clusters
func TestFilterClusters(t *testing.T) {
	s := getHTTPServer()
	defer s.Close()

	plugin := &burrow{
		Servers:         []string{s.URL},
		ClustersInclude: []string{"wrongname*"}, // clustername1 -> no match
	}

	acc := &testutil.Accumulator{}
	plugin.Gather(acc)

	// no match by cluster
	require.Exactly(t, 0, len(acc.Metrics))
	require.Empty(t, acc.Errors)
}

// collect from whitelisted groups
func TestFilterGroups(t *testing.T) {
	s := getHTTPServer()
	defer s.Close()

	plugin := &burrow{
		Servers:       []string{s.URL},
		GroupsInclude: []string{"group?"}, // group1 -> match
		TopicsExclude: []string{"*"},      // exclude all
	}

	acc := &testutil.Accumulator{}
	plugin.Gather(acc)

	require.Exactly(t, 1, len(acc.Metrics))
	require.Empty(t, acc.Errors)
}

// collect from whitelisted topics
func TestFilterTopics(t *testing.T) {
	s := getHTTPServer()
	defer s.Close()

	plugin := &burrow{
		Servers:       []string{s.URL},
		TopicsInclude: []string{"topic?"}, // topicA -> match
		GroupsExclude: []string{"*"},      // exclude all
	}

	acc := &testutil.Accumulator{}
	plugin.Gather(acc)

	require.Exactly(t, 3, len(acc.Metrics))
	require.Empty(t, acc.Errors)
}