package logstash

import (
	"encoding/json"
	"fmt"
	"net/http"
	"net/url"
	"strings"
	"time"

	"github.com/influxdata/telegraf"
	"github.com/influxdata/telegraf/internal"
	"github.com/influxdata/telegraf/internal/choice"
	"github.com/influxdata/telegraf/internal/tls"
	"github.com/influxdata/telegraf/plugins/inputs"
	jsonParser "github.com/influxdata/telegraf/plugins/parsers/json"
)

const sampleConfig = `
  ## The URL of the exposed Logstash API endpoint.
  url = "http://127.0.0.1:9600"

  ## Use Logstash 5 single pipeline API, set to true when monitoring
  ## Logstash 5.
  # single_pipeline = false

  ## Enable optional collection components.  Can contain
  ## "pipelines", "process", and "jvm".
  # collect = ["pipelines", "process", "jvm"]

  ## Timeout for HTTP requests.
  # timeout = "5s"

  ## Optional HTTP Basic Auth credentials.
  # username = "username"
  # password = "pa$$word"

  ## Optional TLS Config.
  # tls_ca = "/etc/telegraf/ca.pem"
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"

  ## Use TLS but skip chain & host verification.
  # insecure_skip_verify = false

  ## Optional HTTP headers.
  # [inputs.logstash.headers]
  #   "X-Special-Header" = "Special-Value"
`

type Logstash struct {
	URL string `toml:"url"`

	SinglePipeline bool     `toml:"single_pipeline"`
	Collect        []string `toml:"collect"`

	Username string            `toml:"username"`
	Password string            `toml:"password"`
	Headers  map[string]string `toml:"headers"`
	Timeout  internal.Duration `toml:"timeout"`
	tls.ClientConfig

	client *http.Client
}

// NewLogstash create an instance of the plugin with default settings
func NewLogstash() *Logstash {
	return &Logstash{
		URL:            "http://127.0.0.1:9600",
		SinglePipeline: false,
		Collect:        []string{"pipelines", "process", "jvm"},
		Headers:        make(map[string]string),
		Timeout:        internal.Duration{Duration: time.Second * 5},
	}
}

// Description returns short info about plugin
func (logstash *Logstash) Description() string {
	return "Read metrics exposed by Logstash"
}

// SampleConfig returns details how to configure plugin
func (logstash *Logstash) SampleConfig() string {
	return sampleConfig
}

type ProcessStats struct {
	ID      string      `json:"id"`
	Process interface{} `json:"process"`
	Name    string      `json:"name"`
	Host    string      `json:"host"`
	Version string      `json:"version"`
}

type JVMStats struct {
	ID      string      `json:"id"`
	JVM     interface{} `json:"jvm"`
	Name    string      `json:"name"`
	Host    string      `json:"host"`
	Version string      `json:"version"`
}

type PipelinesStats struct {
	ID        string              `json:"id"`
	Pipelines map[string]Pipeline `json:"pipelines"`
	Name      string              `json:"name"`
	Host      string              `json:"host"`
	Version   string              `json:"version"`
}

type PipelineStats struct {
	ID       string   `json:"id"`
	Pipeline Pipeline `json:"pipeline"`
	Name     string   `json:"name"`
	Host     string   `json:"host"`
	Version  string   `json:"version"`
}

type Pipeline struct {
	Events  interface{}     `json:"events"`
	Plugins PipelinePlugins `json:"plugins"`
	Reloads interface{}     `json:"reloads"`
	Queue   PipelineQueue   `json:"queue"`
}

type Plugin struct {
	ID     string      `json:"id"`
	Events interface{} `json:"events"`
	Name   string      `json:"name"`
}

type PipelinePlugins struct {
	Inputs  []Plugin `json:"inputs"`
	Filters []Plugin `json:"filters"`
	Outputs []Plugin `json:"outputs"`
}

type PipelineQueue struct {
	Events   float64     `json:"events"`
	Type     string      `json:"type"`
	Capacity interface{} `json:"capacity"`
	Data     interface{} `json:"data"`
}

const jvmStats = "/_node/stats/jvm"
const processStats = "/_node/stats/process"
const pipelinesStats = "/_node/stats/pipelines"
const pipelineStats = "/_node/stats/pipeline"

func (i *Logstash) Init() error {
	err := choice.CheckSlice(i.Collect, []string{"pipelines", "process", "jvm"})
	if err != nil {
		return fmt.Errorf(`cannot verify "collect" setting: %v`, err)
	}
	return nil
}

// createHttpClient create a clients to access API
func (logstash *Logstash) createHttpClient() (*http.Client, error) {
	tlsConfig, err := logstash.ClientConfig.TLSConfig()
	if err != nil {
		return nil, err
	}

	client := &http.Client{
		Transport: &http.Transport{
			TLSClientConfig: tlsConfig,
		},
		Timeout: logstash.Timeout.Duration,
	}

	return client, nil
}

// gatherJsonData query the data source and parse the response JSON
func (logstash *Logstash) gatherJsonData(url string, value interface{}) error {
	request, err := http.NewRequest("GET", url, nil)
	if err != nil {
		return err
	}

	if (logstash.Username != "") || (logstash.Password != "") {
		request.SetBasicAuth(logstash.Username, logstash.Password)
	}

	for header, value := range logstash.Headers {
		if strings.ToLower(header) == "host" {
			request.Host = value
		} else {
			request.Header.Add(header, value)
		}
	}

	response, err := logstash.client.Do(request)
	if err != nil {
		return err
	}

	defer response.Body.Close()

	err = json.NewDecoder(response.Body).Decode(value)
	if err != nil {
		return err
	}

	return nil
}

// gatherJVMStats gather the JVM metrics and add results to the accumulator
func (logstash *Logstash) gatherJVMStats(url string, accumulator telegraf.Accumulator) error {
	jvmStats := &JVMStats{}

	err := logstash.gatherJsonData(url, jvmStats)
	if err != nil {
		return err
	}

	tags := map[string]string{
		"node_id":      jvmStats.ID,
		"node_name":    jvmStats.Name,
		"node_version": jvmStats.Version,
		"source":       jvmStats.Host,
	}

	flattener := jsonParser.JSONFlattener{}
	err = flattener.FlattenJSON("", jvmStats.JVM)
	if err != nil {
		return err
	}
	accumulator.AddFields("logstash_jvm", flattener.Fields, tags)

	return nil
}

// gatherJVMStats gather the Process metrics and add results to the accumulator
func (logstash *Logstash) gatherProcessStats(url string, accumulator telegraf.Accumulator) error {
	processStats := &ProcessStats{}

	err := logstash.gatherJsonData(url, processStats)
	if err != nil {
		return err
	}

	tags := map[string]string{
		"node_id":      processStats.ID,
		"node_name":    processStats.Name,
		"node_version": processStats.Version,
		"source":       processStats.Host,
	}

	flattener := jsonParser.JSONFlattener{}
	err = flattener.FlattenJSON("", processStats.Process)
	if err != nil {
		return err
	}
	accumulator.AddFields("logstash_process", flattener.Fields, tags)

	return nil
}

// gatherPluginsStats go through a list of plugins and add their metrics to the accumulator
func (logstash *Logstash) gatherPluginsStats(
	plugins []Plugin,
	pluginType string,
	tags map[string]string,
	accumulator telegraf.Accumulator) error {

	for _, plugin := range plugins {
		pluginTags := map[string]string{
			"plugin_name": plugin.Name,
			"plugin_id":   plugin.ID,
			"plugin_type": pluginType,
		}
		for tag, value := range tags {
			pluginTags[tag] = value
		}
		flattener := jsonParser.JSONFlattener{}
		err := flattener.FlattenJSON("", plugin.Events)
		if err != nil {
			return err
		}
		accumulator.AddFields("logstash_plugins", flattener.Fields, pluginTags)
	}

	return nil
}

func (logstash *Logstash) gatherQueueStats(
	queue *PipelineQueue,
	tags map[string]string,
	accumulator telegraf.Accumulator) error {

	var err error
	queueTags := map[string]string{
		"queue_type": queue.Type,
	}
	for tag, value := range tags {
		queueTags[tag] = value
	}

	queueFields := map[string]interface{}{
		"events": queue.Events,
	}

	if queue.Type != "memory" {
		flattener := jsonParser.JSONFlattener{}
		err = flattener.FlattenJSON("", queue.Capacity)
		if err != nil {
			return err
		}
		err = flattener.FlattenJSON("", queue.Data)
		if err != nil {
			return err
		}
		for field, value := range flattener.Fields {
			queueFields[field] = value
		}
	}

	accumulator.AddFields("logstash_queue", queueFields, queueTags)

	return nil
}

// gatherJVMStats gather the Pipeline metrics and add results to the accumulator (for Logstash < 6)
func (logstash *Logstash) gatherPipelineStats(url string, accumulator telegraf.Accumulator) error {
	pipelineStats := &PipelineStats{}

	err := logstash.gatherJsonData(url, pipelineStats)
	if err != nil {
		return err
	}

	tags := map[string]string{
		"node_id":      pipelineStats.ID,
		"node_name":    pipelineStats.Name,
		"node_version": pipelineStats.Version,
		"source":       pipelineStats.Host,
	}

	flattener := jsonParser.JSONFlattener{}
	err = flattener.FlattenJSON("", pipelineStats.Pipeline.Events)
	if err != nil {
		return err
	}
	accumulator.AddFields("logstash_events", flattener.Fields, tags)

	err = logstash.gatherPluginsStats(pipelineStats.Pipeline.Plugins.Inputs, "input", tags, accumulator)
	if err != nil {
		return err
	}
	err = logstash.gatherPluginsStats(pipelineStats.Pipeline.Plugins.Filters, "filter", tags, accumulator)
	if err != nil {
		return err
	}
	err = logstash.gatherPluginsStats(pipelineStats.Pipeline.Plugins.Outputs, "output", tags, accumulator)
	if err != nil {
		return err
	}

	err = logstash.gatherQueueStats(&pipelineStats.Pipeline.Queue, tags, accumulator)
	if err != nil {
		return err
	}

	return nil
}

// gatherJVMStats gather the Pipelines metrics and add results to the accumulator (for Logstash >= 6)
func (logstash *Logstash) gatherPipelinesStats(url string, accumulator telegraf.Accumulator) error {
	pipelinesStats := &PipelinesStats{}

	err := logstash.gatherJsonData(url, pipelinesStats)
	if err != nil {
		return err
	}

	for pipelineName, pipeline := range pipelinesStats.Pipelines {
		tags := map[string]string{
			"node_id":      pipelinesStats.ID,
			"node_name":    pipelinesStats.Name,
			"node_version": pipelinesStats.Version,
			"pipeline":     pipelineName,
			"source":       pipelinesStats.Host,
		}

		flattener := jsonParser.JSONFlattener{}
		err := flattener.FlattenJSON("", pipeline.Events)
		if err != nil {
			return err
		}
		accumulator.AddFields("logstash_events", flattener.Fields, tags)

		err = logstash.gatherPluginsStats(pipeline.Plugins.Inputs, "input", tags, accumulator)
		if err != nil {
			return err
		}
		err = logstash.gatherPluginsStats(pipeline.Plugins.Filters, "filter", tags, accumulator)
		if err != nil {
			return err
		}
		err = logstash.gatherPluginsStats(pipeline.Plugins.Outputs, "output", tags, accumulator)
		if err != nil {
			return err
		}

		err = logstash.gatherQueueStats(&pipeline.Queue, tags, accumulator)
		if err != nil {
			return err
		}
	}

	return nil
}

// Gather ask this plugin to start gathering metrics
func (logstash *Logstash) Gather(accumulator telegraf.Accumulator) error {
	if logstash.client == nil {
		client, err := logstash.createHttpClient()

		if err != nil {
			return err
		}
		logstash.client = client
	}

	if choice.Contains("jvm", logstash.Collect) {
		jvmUrl, err := url.Parse(logstash.URL + jvmStats)
		if err != nil {
			return err
		}
		if err := logstash.gatherJVMStats(jvmUrl.String(), accumulator); err != nil {
			return err
		}
	}

	if choice.Contains("process", logstash.Collect) {
		processUrl, err := url.Parse(logstash.URL + processStats)
		if err != nil {
			return err
		}
		if err := logstash.gatherProcessStats(processUrl.String(), accumulator); err != nil {
			return err
		}
	}

	if choice.Contains("pipelines", logstash.Collect) {
		if logstash.SinglePipeline {
			pipelineUrl, err := url.Parse(logstash.URL + pipelineStats)
			if err != nil {
				return err
			}
			if err := logstash.gatherPipelineStats(pipelineUrl.String(), accumulator); err != nil {
				return err
			}
		} else {
			pipelinesUrl, err := url.Parse(logstash.URL + pipelinesStats)
			if err != nil {
				return err
			}
			if err := logstash.gatherPipelinesStats(pipelinesUrl.String(), accumulator); err != nil {
				return err
			}
		}
	}

	return nil
}

// init registers this plugin instance
func init() {
	inputs.Add("logstash", func() telegraf.Input {
		return NewLogstash()
	})
}