package zipkin

import (
	"encoding/binary"
	"fmt"
	"net"
	"strconv"
	"time"

	"github.com/influxdata/telegraf"
	"github.com/openzipkin/zipkin-go-opentracing/_thrift/gen-go/zipkincore"
)

// DefaultServiceName when the span does not have any serviceName
const DefaultServiceName = "unknown"

//now is a moackable time for now
var now = time.Now

// LineProtocolConverter implements the Recorder interface; it is a
// type meant to encapsulate the storage of zipkin tracing data in
// telegraf as line protocol.
type LineProtocolConverter struct {
	acc telegraf.Accumulator
}

// NewLineProtocolConverter returns an instance of LineProtocolConverter that
// will add to the given telegraf.Accumulator
func NewLineProtocolConverter(acc telegraf.Accumulator) *LineProtocolConverter {
	return &LineProtocolConverter{
		acc: acc,
	}
}

// Record is LineProtocolConverter's implementation of the Record method of
// the Recorder interface; it takes a trace as input, and adds it to an internal
// telegraf.Accumulator.
func (l *LineProtocolConverter) Record(t Trace) error {
	for _, s := range t {
		fields := map[string]interface{}{
			"duration_ns": s.Duration.Nanoseconds(),
		}
		tags := map[string]string{
			"id":           s.ID,
			"parent_id":    s.ParentID,
			"trace_id":     s.TraceID,
			"name":         s.Name,
			"service_name": s.ServiceName,
		}
		l.acc.AddFields("zipkin", fields, tags, s.Timestamp)

		for _, a := range s.Annotations {
			tags := map[string]string{
				"id":            s.ID,
				"parent_id":     s.ParentID,
				"trace_id":      s.TraceID,
				"name":          s.Name,
				"service_name":  a.ServiceName,
				"annotation":    a.Value,
				"endpoint_host": a.Host,
			}
			l.acc.AddFields("zipkin", fields, tags, s.Timestamp)
		}

		for _, b := range s.BinaryAnnotations {
			tags := map[string]string{
				"id":             s.ID,
				"parent_id":      s.ParentID,
				"trace_id":       s.TraceID,
				"name":           s.Name,
				"service_name":   b.ServiceName,
				"annotation":     b.Value,
				"endpoint_host":  b.Host,
				"annotation_key": b.Key,
			}
			l.acc.AddFields("zipkin", fields, tags, s.Timestamp)
		}
	}

	return nil
}

func (l *LineProtocolConverter) Error(err error) {
	l.acc.AddError(err)
}

// NewTrace converts a slice of []*zipkincore.Spans into a new Trace
func NewTrace(spans []*zipkincore.Span) Trace {
	trace := make(Trace, len(spans))
	for i, span := range spans {
		endpoint := serviceEndpoint(span.GetAnnotations(), span.GetBinaryAnnotations())
		trace[i] = Span{
			ID:                formatID(span.GetID()),
			TraceID:           formatTraceID(span.GetTraceIDHigh(), span.GetTraceID()),
			Name:              span.GetName(),
			Timestamp:         guessTimestamp(span),
			Duration:          convertDuration(span),
			ParentID:          parentID(span),
			ServiceName:       serviceName(endpoint),
			Annotations:       NewAnnotations(span.GetAnnotations(), endpoint),
			BinaryAnnotations: NewBinaryAnnotations(span.GetBinaryAnnotations(), endpoint),
		}
	}
	return trace
}

// NewAnnotations converts a slice of *zipkincore.Annotation into a slice
// of new Annotations
func NewAnnotations(annotations []*zipkincore.Annotation, endpoint *zipkincore.Endpoint) []Annotation {
	formatted := make([]Annotation, len(annotations))
	for i, annotation := range annotations {
		formatted[i] = Annotation{
			Host:        host(endpoint),
			ServiceName: serviceName(endpoint),
			Timestamp:   microToTime(annotation.GetTimestamp()),
			Value:       annotation.GetValue(),
		}
	}

	return formatted
}

// NewBinaryAnnotations is very similar to NewAnnotations, but it
// converts zipkincore.BinaryAnnotations instead of the normal zipkincore.Annotation
func NewBinaryAnnotations(annotations []*zipkincore.BinaryAnnotation, endpoint *zipkincore.Endpoint) []BinaryAnnotation {
	formatted := make([]BinaryAnnotation, len(annotations))
	for i, annotation := range annotations {
		formatted[i] = BinaryAnnotation{
			Host:        host(endpoint),
			ServiceName: serviceName(endpoint),
			Key:         annotation.GetKey(),
			Value:       string(annotation.GetValue()),
			Type:        annotation.GetAnnotationType().String(),
		}
	}
	return formatted
}

func microToTime(micro int64) time.Time {
	return time.Unix(0, micro*int64(time.Microsecond)).UTC()
}

func formatID(id int64) string {
	return strconv.FormatInt(id, 10)
}

func formatTraceID(high, low int64) string {
	if high == 0 {
		return fmt.Sprintf("%x", low)
	}
	return fmt.Sprintf("%x%016x", high, low)
}

func minMax(span *zipkincore.Span) (time.Time, time.Time) {
	min := now().UTC()
	max := time.Time{}.UTC()
	for _, annotation := range span.Annotations {
		ts := microToTime(annotation.GetTimestamp())
		if !ts.IsZero() && ts.Before(min) {
			min = ts
		}
		if !ts.IsZero() && ts.After(max) {
			max = ts
		}
	}
	if max.IsZero() {
		max = min
	}
	return min, max
}

func guessTimestamp(span *zipkincore.Span) time.Time {
	if span.GetTimestamp() != 0 {
		return microToTime(span.GetTimestamp())
	}
	min, _ := minMax(span)
	return min
}

func convertDuration(span *zipkincore.Span) time.Duration {
	duration := time.Duration(span.GetDuration()) * time.Microsecond
	if duration != 0 {
		return duration
	}
	min, max := minMax(span)
	return max.Sub(min)
}

func parentID(span *zipkincore.Span) string {
	// A parent ID of 0 means that this is a parent span. In this case,
	// we set the parent ID of the span to be its own id, so it points to
	// itself.
	id := span.GetParentID()
	if id != 0 {
		return formatID(id)
	}
	return formatID(span.ID)
}

func ipv4(addr int32) string {
	buf := make([]byte, 4)
	binary.BigEndian.PutUint32(buf, uint32(addr))
	return net.IP(buf).String()
}

func host(h *zipkincore.Endpoint) string {
	if h == nil {
		return ipv4(int32(0))
	}
	if h.GetPort() == 0 {
		return ipv4(h.GetIpv4())
	}
	// Zipkin uses a signed int16 for the port, but, warns us that they actually treat it
	// as an unsigned int16. So, we convert from int16 to int32 followed by taking & 0xffff
	// to convert from signed to unsigned
	// https://github.com/openzipkin/zipkin/blob/57dc2ec9c65fe6144e401c0c933b4400463a69df/zipkin/src/main/java/zipkin/Endpoint.java#L44
	return ipv4(h.GetIpv4()) + ":" + strconv.FormatInt(int64(int(h.GetPort())&0xffff), 10)
}

func serviceName(h *zipkincore.Endpoint) string {
	if h == nil {
		return DefaultServiceName
	}
	return h.GetServiceName()
}

func serviceEndpoint(ann []*zipkincore.Annotation, bann []*zipkincore.BinaryAnnotation) *zipkincore.Endpoint {
	for _, a := range ann {
		switch a.Value {
		case zipkincore.SERVER_RECV, zipkincore.SERVER_SEND, zipkincore.CLIENT_RECV, zipkincore.CLIENT_SEND:
			if a.Host != nil && a.Host.ServiceName != "" {
				return a.Host
			}
		}
	}

	for _, a := range bann {
		if a.Key == zipkincore.LOCAL_COMPONENT && a.Host != nil && a.Host.ServiceName != "" {
			return a.Host
		}
	}
	// Unable to find any "standard" endpoint host, so, use any that exist in the regular annotations
	for _, a := range ann {
		if a.Host != nil && a.Host.ServiceName != "" {
			return a.Host
		}
	}
	return nil
}