package minmax

import (
	"github.com/influxdata/telegraf"
	"github.com/influxdata/telegraf/plugins/aggregators"
)

type MinMax struct {
	cache map[uint64]aggregate
}

func NewMinMax() telegraf.Aggregator {
	mm := &MinMax{}
	mm.Reset()
	return mm
}

type aggregate struct {
	fields map[string]minmax
	name   string
	tags   map[string]string
}

type minmax struct {
	min float64
	max float64
}

var sampleConfig = `
  ## General Aggregator Arguments:
  ## The period on which to flush & clear the aggregator.
  period = "30s"
  ## If true, the original metric will be dropped by the
  ## aggregator and will not get sent to the output plugins.
  drop_original = false
`

func (m *MinMax) SampleConfig() string {
	return sampleConfig
}

func (m *MinMax) Description() string {
	return "Keep the aggregate min/max of each metric passing through."
}

func (m *MinMax) Add(in telegraf.Metric) {
	id := in.HashID()
	if _, ok := m.cache[id]; !ok {
		// hit an uncached metric, create caches for first time:
		a := aggregate{
			name:   in.Name(),
			tags:   in.Tags(),
			fields: make(map[string]minmax),
		}
		for k, v := range in.Fields() {
			if fv, ok := convert(v); ok {
				a.fields[k] = minmax{
					min: fv,
					max: fv,
				}
			}
		}
		m.cache[id] = a
	} else {
		for k, v := range in.Fields() {
			if fv, ok := convert(v); ok {
				if _, ok := m.cache[id].fields[k]; !ok {
					// hit an uncached field of a cached metric
					m.cache[id].fields[k] = minmax{
						min: fv,
						max: fv,
					}
					continue
				}
				if fv < m.cache[id].fields[k].min {
					tmp := m.cache[id].fields[k]
					tmp.min = fv
					m.cache[id].fields[k] = tmp
				} else if fv > m.cache[id].fields[k].max {
					tmp := m.cache[id].fields[k]
					tmp.max = fv
					m.cache[id].fields[k] = tmp
				}
			}
		}
	}
}

func (m *MinMax) Push(acc telegraf.Accumulator) {
	for _, aggregate := range m.cache {
		fields := map[string]interface{}{}
		for k, v := range aggregate.fields {
			fields[k+"_min"] = v.min
			fields[k+"_max"] = v.max
		}
		acc.AddFields(aggregate.name, fields, aggregate.tags)
	}
}

func (m *MinMax) Reset() {
	m.cache = make(map[uint64]aggregate)
}

func convert(in interface{}) (float64, bool) {
	switch v := in.(type) {
	case float64:
		return v, true
	case int64:
		return float64(v), true
	default:
		return 0, false
	}
}

func init() {
	aggregators.Add("minmax", func() telegraf.Aggregator {
		return NewMinMax()
	})
}