211 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			211 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Go
		
	
	
	
package codec
 | 
						|
 | 
						|
import (
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/influxdata/telegraf/plugins/inputs/zipkin/trace"
 | 
						|
	"github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/zipkincore"
 | 
						|
)
 | 
						|
 | 
						|
//now is a mockable time for now
 | 
						|
var now = time.Now
 | 
						|
 | 
						|
// DefaultServiceName when the span does not have any serviceName
 | 
						|
const DefaultServiceName = "unknown"
 | 
						|
 | 
						|
// Decoder decodes the bytes and returns a trace
 | 
						|
type Decoder interface {
 | 
						|
	Decode(octets []byte) ([]Span, error)
 | 
						|
}
 | 
						|
 | 
						|
// Span are created by instrumentation in RPC clients or servers
 | 
						|
type Span interface {
 | 
						|
	Trace() (string, error)
 | 
						|
	SpanID() (string, error)
 | 
						|
	Parent() (string, error)
 | 
						|
	Name() string
 | 
						|
	Annotations() []Annotation
 | 
						|
	BinaryAnnotations() ([]BinaryAnnotation, error)
 | 
						|
	Timestamp() time.Time
 | 
						|
	Duration() time.Duration
 | 
						|
}
 | 
						|
 | 
						|
// Annotation represents an event that explains latency with a timestamp.
 | 
						|
type Annotation interface {
 | 
						|
	Timestamp() time.Time
 | 
						|
	Value() string
 | 
						|
	Host() Endpoint
 | 
						|
}
 | 
						|
 | 
						|
// BinaryAnnotation represent tags applied to a Span to give it context
 | 
						|
type BinaryAnnotation interface {
 | 
						|
	Key() string
 | 
						|
	Value() string
 | 
						|
	Host() Endpoint
 | 
						|
}
 | 
						|
 | 
						|
// Endpoint represents the network context of a service recording an annotation
 | 
						|
type Endpoint interface {
 | 
						|
	Host() string
 | 
						|
	Name() string
 | 
						|
}
 | 
						|
 | 
						|
// DefaultEndpoint is used if the annotations have no endpoints
 | 
						|
type DefaultEndpoint struct{}
 | 
						|
 | 
						|
// Host returns 0.0.0.0; used when the host is unknown
 | 
						|
func (d *DefaultEndpoint) Host() string { return "0.0.0.0" }
 | 
						|
 | 
						|
// Name returns "unknown" when an endpoint doesn't exist
 | 
						|
func (d *DefaultEndpoint) Name() string { return DefaultServiceName }
 | 
						|
 | 
						|
// MicroToTime converts zipkin's native time of microseconds into time.Time
 | 
						|
func MicroToTime(micro int64) time.Time {
 | 
						|
	return time.Unix(0, micro*int64(time.Microsecond)).UTC()
 | 
						|
}
 | 
						|
 | 
						|
// NewTrace converts a slice of []Span into a new Trace
 | 
						|
func NewTrace(spans []Span) (trace.Trace, error) {
 | 
						|
	tr := make(trace.Trace, len(spans))
 | 
						|
	for i, span := range spans {
 | 
						|
		bin, err := span.BinaryAnnotations()
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
		endpoint := serviceEndpoint(span.Annotations(), bin)
 | 
						|
		id, err := span.SpanID()
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
 | 
						|
		tid, err := span.Trace()
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
 | 
						|
		pid, err := parentID(span)
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
 | 
						|
		tr[i] = trace.Span{
 | 
						|
			ID:                id,
 | 
						|
			TraceID:           tid,
 | 
						|
			Name:              span.Name(),
 | 
						|
			Timestamp:         guessTimestamp(span),
 | 
						|
			Duration:          convertDuration(span),
 | 
						|
			ParentID:          pid,
 | 
						|
			ServiceName:       endpoint.Name(),
 | 
						|
			Annotations:       NewAnnotations(span.Annotations(), endpoint),
 | 
						|
			BinaryAnnotations: NewBinaryAnnotations(bin, endpoint),
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return tr, nil
 | 
						|
}
 | 
						|
 | 
						|
// NewAnnotations converts a slice of Annotation into a slice of new Annotations
 | 
						|
func NewAnnotations(annotations []Annotation, endpoint Endpoint) []trace.Annotation {
 | 
						|
	formatted := make([]trace.Annotation, len(annotations))
 | 
						|
	for i, annotation := range annotations {
 | 
						|
		formatted[i] = trace.Annotation{
 | 
						|
			Host:        endpoint.Host(),
 | 
						|
			ServiceName: endpoint.Name(),
 | 
						|
			Timestamp:   annotation.Timestamp(),
 | 
						|
			Value:       annotation.Value(),
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return formatted
 | 
						|
}
 | 
						|
 | 
						|
// NewBinaryAnnotations is very similar to NewAnnotations, but it
 | 
						|
// converts BinaryAnnotations instead of the normal Annotation
 | 
						|
func NewBinaryAnnotations(annotations []BinaryAnnotation, endpoint Endpoint) []trace.BinaryAnnotation {
 | 
						|
	formatted := make([]trace.BinaryAnnotation, len(annotations))
 | 
						|
	for i, annotation := range annotations {
 | 
						|
		formatted[i] = trace.BinaryAnnotation{
 | 
						|
			Host:        endpoint.Host(),
 | 
						|
			ServiceName: endpoint.Name(),
 | 
						|
			Key:         annotation.Key(),
 | 
						|
			Value:       annotation.Value(),
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return formatted
 | 
						|
}
 | 
						|
 | 
						|
func minMax(span Span) (time.Time, time.Time) {
 | 
						|
	min := now().UTC()
 | 
						|
	max := time.Time{}.UTC()
 | 
						|
	for _, annotation := range span.Annotations() {
 | 
						|
		ts := annotation.Timestamp()
 | 
						|
		if !ts.IsZero() && ts.Before(min) {
 | 
						|
			min = ts
 | 
						|
		}
 | 
						|
		if !ts.IsZero() && ts.After(max) {
 | 
						|
			max = ts
 | 
						|
		}
 | 
						|
	}
 | 
						|
	if max.IsZero() {
 | 
						|
		max = min
 | 
						|
	}
 | 
						|
	return min, max
 | 
						|
}
 | 
						|
 | 
						|
func guessTimestamp(span Span) time.Time {
 | 
						|
	ts := span.Timestamp()
 | 
						|
	if !ts.IsZero() {
 | 
						|
		return ts
 | 
						|
	}
 | 
						|
 | 
						|
	min, _ := minMax(span)
 | 
						|
	return min
 | 
						|
}
 | 
						|
 | 
						|
func convertDuration(span Span) time.Duration {
 | 
						|
	duration := span.Duration()
 | 
						|
	if duration != 0 {
 | 
						|
		return duration
 | 
						|
	}
 | 
						|
	min, max := minMax(span)
 | 
						|
	return max.Sub(min)
 | 
						|
}
 | 
						|
 | 
						|
func parentID(span Span) (string, error) {
 | 
						|
	// A parent ID of "" means that this is a parent span. In this case,
 | 
						|
	// we set the parent ID of the span to be its own id, so it points to
 | 
						|
	// itself.
 | 
						|
	id, err := span.Parent()
 | 
						|
	if err != nil {
 | 
						|
		return "", err
 | 
						|
	}
 | 
						|
 | 
						|
	if id != "" {
 | 
						|
		return id, nil
 | 
						|
	}
 | 
						|
	return span.SpanID()
 | 
						|
}
 | 
						|
 | 
						|
func serviceEndpoint(ann []Annotation, bann []BinaryAnnotation) Endpoint {
 | 
						|
	for _, a := range ann {
 | 
						|
		switch a.Value() {
 | 
						|
		case zipkincore.SERVER_RECV, zipkincore.SERVER_SEND, zipkincore.CLIENT_RECV, zipkincore.CLIENT_SEND:
 | 
						|
			if a.Host() != nil && a.Host().Name() != "" {
 | 
						|
				return a.Host()
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	for _, a := range bann {
 | 
						|
		if a.Key() == zipkincore.LOCAL_COMPONENT && a.Host() != nil && a.Host().Name() != "" {
 | 
						|
			return a.Host()
 | 
						|
		}
 | 
						|
	}
 | 
						|
	// Unable to find any "standard" endpoint host, so, use any that exist in the regular annotations
 | 
						|
	for _, a := range ann {
 | 
						|
		if a.Host() != nil && a.Host().Name() != "" {
 | 
						|
			return a.Host()
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return &DefaultEndpoint{}
 | 
						|
}
 |