package influxdb

import (
	"bytes"
	"encoding/json"
	"errors"
	"io"
	"net/http"
	"sync"
	"time"

	"github.com/influxdata/telegraf"
	"github.com/influxdata/telegraf/internal"
	"github.com/influxdata/telegraf/internal/tls"
	"github.com/influxdata/telegraf/plugins/inputs"
)

const (
	maxErrorResponseBodyLength = 1024
)

type APIError struct {
	StatusCode  int
	Reason      string
	Description string `json:"error"`
}

func (e *APIError) Error() string {
	if e.Description != "" {
		return e.Reason + ": " + e.Description
	}
	return e.Reason
}

type InfluxDB struct {
	URLs     []string          `toml:"urls"`
	Username string            `toml:"username"`
	Password string            `toml:"password"`
	Timeout  internal.Duration `toml:"timeout"`
	tls.ClientConfig

	client *http.Client
}

func (*InfluxDB) Description() string {
	return "Read InfluxDB-formatted JSON metrics from one or more HTTP endpoints"
}

func (*InfluxDB) SampleConfig() string {
	return `
  ## Works with InfluxDB debug endpoints out of the box,
  ## but other services can use this format too.
  ## See the influxdb plugin's README for more details.

  ## Multiple URLs from which to read InfluxDB-formatted JSON
  ## Default is "http://localhost:8086/debug/vars".
  urls = [
    "http://localhost:8086/debug/vars"
  ]

  ## Username and password to send using HTTP Basic Authentication.
  # username = ""
  # password = ""

  ## Optional TLS Config
  # tls_ca = "/etc/telegraf/ca.pem"
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"
  ## Use TLS but skip chain & host verification
  # insecure_skip_verify = false

  ## http request & header timeout
  timeout = "5s"
`
}

func (i *InfluxDB) Gather(acc telegraf.Accumulator) error {
	if len(i.URLs) == 0 {
		i.URLs = []string{"http://localhost:8086/debug/vars"}
	}

	if i.client == nil {
		tlsCfg, err := i.ClientConfig.TLSConfig()
		if err != nil {
			return err
		}
		i.client = &http.Client{
			Transport: &http.Transport{
				ResponseHeaderTimeout: i.Timeout.Duration,
				TLSClientConfig:       tlsCfg,
			},
			Timeout: i.Timeout.Duration,
		}
	}

	var wg sync.WaitGroup
	for _, u := range i.URLs {
		wg.Add(1)
		go func(url string) {
			defer wg.Done()
			if err := i.gatherURL(acc, url); err != nil {
				acc.AddError(err)
			}
		}(u)
	}

	wg.Wait()

	return nil
}

type point struct {
	Name   string                 `json:"name"`
	Tags   map[string]string      `json:"tags"`
	Values map[string]interface{} `json:"values"`
}

type memstats struct {
	Alloc         int64      `json:"Alloc"`
	TotalAlloc    int64      `json:"TotalAlloc"`
	Sys           int64      `json:"Sys"`
	Lookups       int64      `json:"Lookups"`
	Mallocs       int64      `json:"Mallocs"`
	Frees         int64      `json:"Frees"`
	HeapAlloc     int64      `json:"HeapAlloc"`
	HeapSys       int64      `json:"HeapSys"`
	HeapIdle      int64      `json:"HeapIdle"`
	HeapInuse     int64      `json:"HeapInuse"`
	HeapReleased  int64      `json:"HeapReleased"`
	HeapObjects   int64      `json:"HeapObjects"`
	StackInuse    int64      `json:"StackInuse"`
	StackSys      int64      `json:"StackSys"`
	MSpanInuse    int64      `json:"MSpanInuse"`
	MSpanSys      int64      `json:"MSpanSys"`
	MCacheInuse   int64      `json:"MCacheInuse"`
	MCacheSys     int64      `json:"MCacheSys"`
	BuckHashSys   int64      `json:"BuckHashSys"`
	GCSys         int64      `json:"GCSys"`
	OtherSys      int64      `json:"OtherSys"`
	NextGC        int64      `json:"NextGC"`
	LastGC        int64      `json:"LastGC"`
	PauseTotalNs  int64      `json:"PauseTotalNs"`
	PauseNs       [256]int64 `json:"PauseNs"`
	NumGC         int64      `json:"NumGC"`
	GCCPUFraction float64    `json:"GCCPUFraction"`
}

// Gathers data from a particular URL
// Parameters:
//     acc    : The telegraf Accumulator to use
//     url    : endpoint to send request to
//
// Returns:
//     error: Any error that may have occurred
func (i *InfluxDB) gatherURL(
	acc telegraf.Accumulator,
	url string,
) error {
	shardCounter := 0
	now := time.Now()

	req, err := http.NewRequest("GET", url, nil)
	if err != nil {
		return err
	}

	if i.Username != "" || i.Password != "" {
		req.SetBasicAuth(i.Username, i.Password)
	}

	req.Header.Set("User-Agent", "Telegraf/"+internal.Version())

	resp, err := i.client.Do(req)
	if err != nil {
		return err
	}
	defer resp.Body.Close()

	if resp.StatusCode != http.StatusOK {
		return readResponseError(resp)
	}

	// It would be nice to be able to decode into a map[string]point, but
	// we'll get a decoder error like:
	// `json: cannot unmarshal array into Go value of type influxdb.point`
	// if any of the values aren't objects.
	// To avoid that error, we decode by hand.
	dec := json.NewDecoder(resp.Body)

	// Parse beginning of object
	if t, err := dec.Token(); err != nil {
		return err
	} else if t != json.Delim('{') {
		return errors.New("document root must be a JSON object")
	}

	// Loop through rest of object
	for {
		// Nothing left in this object, we're done
		if !dec.More() {
			break
		}

		// Read in a string key. We don't do anything with the top-level keys,
		// so it's discarded.
		key, err := dec.Token()
		if err != nil {
			return err
		}

		if keyStr, ok := key.(string); ok {
			if keyStr == "memstats" {
				var m memstats
				if err := dec.Decode(&m); err != nil {
					continue
				}
				acc.AddFields("influxdb_memstats",
					map[string]interface{}{
						"alloc":           m.Alloc,
						"total_alloc":     m.TotalAlloc,
						"sys":             m.Sys,
						"lookups":         m.Lookups,
						"mallocs":         m.Mallocs,
						"frees":           m.Frees,
						"heap_alloc":      m.HeapAlloc,
						"heap_sys":        m.HeapSys,
						"heap_idle":       m.HeapIdle,
						"heap_inuse":      m.HeapInuse,
						"heap_released":   m.HeapReleased,
						"heap_objects":    m.HeapObjects,
						"stack_inuse":     m.StackInuse,
						"stack_sys":       m.StackSys,
						"mspan_inuse":     m.MSpanInuse,
						"mspan_sys":       m.MSpanSys,
						"mcache_inuse":    m.MCacheInuse,
						"mcache_sys":      m.MCacheSys,
						"buck_hash_sys":   m.BuckHashSys,
						"gc_sys":          m.GCSys,
						"other_sys":       m.OtherSys,
						"next_gc":         m.NextGC,
						"last_gc":         m.LastGC,
						"pause_total_ns":  m.PauseTotalNs,
						"pause_ns":        m.PauseNs[(m.NumGC+255)%256],
						"num_gc":          m.NumGC,
						"gcc_pu_fraction": m.GCCPUFraction,
					},
					map[string]string{
						"url": url,
					})
			}
		}

		// Attempt to parse a whole object into a point.
		// It might be a non-object, like a string or array.
		// If we fail to decode it into a point, ignore it and move on.
		var p point
		if err := dec.Decode(&p); err != nil {
			continue
		}

		if p.Tags == nil {
			p.Tags = make(map[string]string)
		}

		// If the object was a point, but was not fully initialized,
		// ignore it and move on.
		if p.Name == "" || p.Values == nil || len(p.Values) == 0 {
			continue
		}

		if p.Name == "shard" {
			shardCounter++
		}

		// Add a tag to indicate the source of the data.
		p.Tags["url"] = url

		acc.AddFields(
			"influxdb_"+p.Name,
			p.Values,
			p.Tags,
			now,
		)
	}

	acc.AddFields("influxdb",
		map[string]interface{}{
			"n_shards": shardCounter,
		},
		nil,
		now,
	)

	return nil
}

func readResponseError(resp *http.Response) error {
	apiError := &APIError{
		StatusCode: resp.StatusCode,
		Reason:     resp.Status,
	}

	var buf bytes.Buffer
	r := io.LimitReader(resp.Body, maxErrorResponseBodyLength)
	_, err := buf.ReadFrom(r)
	if err != nil {
		return apiError
	}

	err = json.Unmarshal(buf.Bytes(), apiError)
	if err != nil {
		return apiError
	}

	return apiError
}

func init() {
	inputs.Add("influxdb", func() telegraf.Input {
		return &InfluxDB{
			Timeout: internal.Duration{Duration: time.Second * 5},
		}
	})
}