package models

import (
	"time"

	"github.com/influxdata/telegraf"
	"github.com/influxdata/telegraf/metric"
)

type RunningAggregator struct {
	a      telegraf.Aggregator
	Config *AggregatorConfig

	metrics chan telegraf.Metric

	periodStart time.Time
	periodEnd   time.Time
}

func NewRunningAggregator(
	a telegraf.Aggregator,
	conf *AggregatorConfig,
) *RunningAggregator {
	return &RunningAggregator{
		a:       a,
		Config:  conf,
		metrics: make(chan telegraf.Metric, 100),
	}
}

// AggregatorConfig containing configuration parameters for the running
// aggregator plugin.
type AggregatorConfig struct {
	Name string

	DropOriginal      bool
	NameOverride      string
	MeasurementPrefix string
	MeasurementSuffix string
	Tags              map[string]string
	Filter            Filter

	Period time.Duration
	Delay  time.Duration
}

func (r *RunningAggregator) Name() string {
	return "aggregators." + r.Config.Name
}

func (r *RunningAggregator) MakeMetric(
	measurement string,
	fields map[string]interface{},
	tags map[string]string,
	mType telegraf.ValueType,
	t time.Time,
) telegraf.Metric {
	m := makemetric(
		measurement,
		fields,
		tags,
		r.Config.NameOverride,
		r.Config.MeasurementPrefix,
		r.Config.MeasurementSuffix,
		r.Config.Tags,
		nil,
		r.Config.Filter,
		false,
		mType,
		t,
	)

	if m != nil {
		m.SetAggregate(true)
	}

	return m
}

// Add applies the given metric to the aggregator.
// Before applying to the plugin, it will run any defined filters on the metric.
// Apply returns true if the original metric should be dropped.
func (r *RunningAggregator) Add(in telegraf.Metric) bool {
	if r.Config.Filter.IsActive() {
		// check if the aggregator should apply this metric
		name := in.Name()
		fields := in.Fields()
		tags := in.Tags()
		t := in.Time()
		if ok := r.Config.Filter.Apply(name, fields, tags); !ok {
			// aggregator should not apply this metric
			return false
		}

		in, _ = metric.New(name, tags, fields, t)
	}

	r.metrics <- in
	return r.Config.DropOriginal
}
func (r *RunningAggregator) add(in telegraf.Metric) {
	r.a.Add(in)
}

func (r *RunningAggregator) push(acc telegraf.Accumulator) {
	r.a.Push(acc)
}

func (r *RunningAggregator) reset() {
	r.a.Reset()
}

// Run runs the running aggregator, listens for incoming metrics, and waits
// for period ticks to tell it when to push and reset the aggregator.
func (r *RunningAggregator) Run(
	acc telegraf.Accumulator,
	shutdown chan struct{},
) {
	// The start of the period is truncated to the nearest second.
	//
	// Every metric then gets it's timestamp checked and is dropped if it
	// is not within:
	//
	//   start < t < end + truncation + delay
	//
	// So if we start at now = 00:00.2 with a 10s period and 0.3s delay:
	//   now = 00:00.2
	//   start = 00:00
	//   truncation = 00:00.2
	//   end = 00:10
	// 1st interval: 00:00 - 00:10.5
	// 2nd interval: 00:10 - 00:20.5
	// etc.
	//
	now := time.Now()
	r.periodStart = now.Truncate(time.Second)
	truncation := now.Sub(r.periodStart)
	r.periodEnd = r.periodStart.Add(r.Config.Period)
	time.Sleep(r.Config.Delay)
	periodT := time.NewTicker(r.Config.Period)
	defer periodT.Stop()

	for {
		select {
		case <-shutdown:
			if len(r.metrics) > 0 {
				// wait until metrics are flushed before exiting
				continue
			}
			return
		case m := <-r.metrics:
			if m.Time().Before(r.periodStart) ||
				m.Time().After(r.periodEnd.Add(truncation).Add(r.Config.Delay)) {
				// the metric is outside the current aggregation period, so
				// skip it.
				continue
			}
			r.add(m)
		case <-periodT.C:
			r.periodStart = r.periodEnd
			r.periodEnd = r.periodStart.Add(r.Config.Period)
			r.push(acc)
			r.reset()
		}
	}
}