package beanstalkd_test

import (
	"io"
	"net"
	"net/textproto"
	"testing"

	"github.com/influxdata/telegraf/plugins/inputs/beanstalkd"
	"github.com/influxdata/telegraf/testutil"
	"github.com/stretchr/testify/require"
)

func TestBeanstalkd(t *testing.T) {
	type tubeStats struct {
		name   string
		fields map[string]interface{}
	}

	tests := []struct {
		name             string
		tubesConfig      []string
		expectedTubes    []tubeStats
		notExpectedTubes []tubeStats
	}{
		{
			name:        "All tubes stats",
			tubesConfig: []string{},
			expectedTubes: []tubeStats{
				{name: "default", fields: defaultTubeFields},
				{name: "test", fields: testTubeFields},
			},
			notExpectedTubes: []tubeStats{},
		},
		{
			name:        "Specified tubes stats",
			tubesConfig: []string{"test"},
			expectedTubes: []tubeStats{
				{name: "test", fields: testTubeFields},
			},
			notExpectedTubes: []tubeStats{
				{name: "default", fields: defaultTubeFields},
			},
		},
		{
			name:          "Unknown tube stats",
			tubesConfig:   []string{"unknown"},
			expectedTubes: []tubeStats{},
			notExpectedTubes: []tubeStats{
				{name: "default", fields: defaultTubeFields},
				{name: "test", fields: testTubeFields},
			},
		},
	}

	for _, test := range tests {
		t.Run(test.name, func(t *testing.T) {
			server, err := startTestServer(t)
			if err != nil {
				t.Fatalf("Unable to create test server")
			}
			defer server.Close()

			serverAddress := server.Addr().String()
			plugin := beanstalkd.Beanstalkd{
				Server: serverAddress,
				Tubes:  test.tubesConfig,
			}

			var acc testutil.Accumulator
			require.NoError(t, acc.GatherError(plugin.Gather))

			acc.AssertContainsTaggedFields(t, "beanstalkd_overview",
				overviewFields,
				getOverviewTags(serverAddress),
			)

			for _, expectedTube := range test.expectedTubes {
				acc.AssertContainsTaggedFields(t, "beanstalkd_tube",
					expectedTube.fields,
					getTubeTags(serverAddress, expectedTube.name),
				)
			}

			for _, notExpectedTube := range test.notExpectedTubes {
				acc.AssertDoesNotContainsTaggedFields(t, "beanstalkd_tube",
					notExpectedTube.fields,
					getTubeTags(serverAddress, notExpectedTube.name),
				)
			}
		})
	}
}

func startTestServer(t *testing.T) (net.Listener, error) {
	server, err := net.Listen("tcp", "localhost:0")
	if err != nil {
		return nil, err
	}

	go func() {
		defer server.Close()

		connection, err := server.Accept()
		if err != nil {
			t.Log("Test server: failed to accept connection. Error: ", err)
			return
		}

		tp := textproto.NewConn(connection)
		defer tp.Close()

		sendSuccessResponse := func(body string) {
			tp.PrintfLine("OK %d\r\n%s", len(body), body)
		}

		for {
			cmd, err := tp.ReadLine()
			if err == io.EOF {
				return
			} else if err != nil {
				t.Log("Test server: failed read command. Error: ", err)
				return
			}

			switch cmd {
			case "list-tubes":
				sendSuccessResponse(listTubesResponse)
			case "stats":
				sendSuccessResponse(statsResponse)
			case "stats-tube default":
				sendSuccessResponse(statsTubeDefaultResponse)
			case "stats-tube test":
				sendSuccessResponse(statsTubeTestResponse)
			case "stats-tube unknown":
				tp.PrintfLine("NOT_FOUND")
			default:
				t.Log("Test server: unknown command")
			}
		}
	}()

	return server, nil
}

const (
	listTubesResponse = `---
- default
- test
`
	statsResponse = `---
current-jobs-urgent: 5
current-jobs-ready: 5
current-jobs-reserved: 0
current-jobs-delayed: 1
current-jobs-buried: 0
cmd-put: 6
cmd-peek: 0
cmd-peek-ready: 1
cmd-peek-delayed: 0
cmd-peek-buried: 0
cmd-reserve: 0
cmd-reserve-with-timeout: 1
cmd-delete: 1
cmd-release: 0
cmd-use: 2
cmd-watch: 0
cmd-ignore: 0
cmd-bury: 1
cmd-kick: 1
cmd-touch: 0
cmd-stats: 1
cmd-stats-job: 0
cmd-stats-tube: 2
cmd-list-tubes: 1
cmd-list-tube-used: 0
cmd-list-tubes-watched: 0
cmd-pause-tube: 0
job-timeouts: 0
total-jobs: 6
max-job-size: 65535
current-tubes: 2
current-connections: 2
current-producers: 1
current-workers: 1
current-waiting: 0
total-connections: 2
pid: 6
version: 1.10
rusage-utime: 0.000000
rusage-stime: 0.000000
uptime: 20
binlog-oldest-index: 0
binlog-current-index: 0
binlog-records-migrated: 0
binlog-records-written: 0
binlog-max-size: 10485760
id: bba7546657efdd4c
hostname: 2873efd3e88c
`
	statsTubeDefaultResponse = `---
name: default
current-jobs-urgent: 0
current-jobs-ready: 0
current-jobs-reserved: 0
current-jobs-delayed: 0
current-jobs-buried: 0
total-jobs: 0
current-using: 2
current-watching: 2
current-waiting: 0
cmd-delete: 0
cmd-pause-tube: 0
pause: 0
pause-time-left: 0
`
	statsTubeTestResponse = `---
name: test
current-jobs-urgent: 5
current-jobs-ready: 5
current-jobs-reserved: 0
current-jobs-delayed: 1
current-jobs-buried: 0
total-jobs: 6
current-using: 0
current-watching: 0
current-waiting: 0
cmd-delete: 0
cmd-pause-tube: 0
pause: 0
pause-time-left: 0
`
)

var (
	// Default tube without stats
	defaultTubeFields = map[string]interface{}{
		"cmd_delete":            0,
		"cmd_pause_tube":        0,
		"current_jobs_buried":   0,
		"current_jobs_delayed":  0,
		"current_jobs_ready":    0,
		"current_jobs_reserved": 0,
		"current_jobs_urgent":   0,
		"current_using":         2,
		"current_waiting":       0,
		"current_watching":      2,
		"pause":                 0,
		"pause_time_left":       0,
		"total_jobs":            0,
	}
	// Test tube with stats
	testTubeFields = map[string]interface{}{
		"cmd_delete":            0,
		"cmd_pause_tube":        0,
		"current_jobs_buried":   0,
		"current_jobs_delayed":  1,
		"current_jobs_ready":    5,
		"current_jobs_reserved": 0,
		"current_jobs_urgent":   5,
		"current_using":         0,
		"current_waiting":       0,
		"current_watching":      0,
		"pause":                 0,
		"pause_time_left":       0,
		"total_jobs":            6,
	}
	// Server stats
	overviewFields = map[string]interface{}{
		"binlog_current_index":     0,
		"binlog_max_size":          10485760,
		"binlog_oldest_index":      0,
		"binlog_records_migrated":  0,
		"binlog_records_written":   0,
		"cmd_bury":                 1,
		"cmd_delete":               1,
		"cmd_ignore":               0,
		"cmd_kick":                 1,
		"cmd_list_tube_used":       0,
		"cmd_list_tubes":           1,
		"cmd_list_tubes_watched":   0,
		"cmd_pause_tube":           0,
		"cmd_peek":                 0,
		"cmd_peek_buried":          0,
		"cmd_peek_delayed":         0,
		"cmd_peek_ready":           1,
		"cmd_put":                  6,
		"cmd_release":              0,
		"cmd_reserve":              0,
		"cmd_reserve_with_timeout": 1,
		"cmd_stats":                1,
		"cmd_stats_job":            0,
		"cmd_stats_tube":           2,
		"cmd_touch":                0,
		"cmd_use":                  2,
		"cmd_watch":                0,
		"current_connections":      2,
		"current_jobs_buried":      0,
		"current_jobs_delayed":     1,
		"current_jobs_ready":       5,
		"current_jobs_reserved":    0,
		"current_jobs_urgent":      5,
		"current_producers":        1,
		"current_tubes":            2,
		"current_waiting":          0,
		"current_workers":          1,
		"job_timeouts":             0,
		"max_job_size":             65535,
		"pid":                      6,
		"rusage_stime":             0.0,
		"rusage_utime":             0.0,
		"total_connections":        2,
		"total_jobs":               6,
		"uptime":                   20,
	}
)

func getOverviewTags(server string) map[string]string {
	return map[string]string{
		"hostname": "2873efd3e88c",
		"id":       "bba7546657efdd4c",
		"server":   server,
		"version":  "1.10",
	}
}

func getTubeTags(server string, tube string) map[string]string {
	return map[string]string{
		"name":   tube,
		"server": server,
	}
}