telegraf/plugins/inputs/zipkin/convert.go

250 lines
6.9 KiB
Go

package zipkin
import (
"encoding/binary"
"fmt"
"net"
"strconv"
"time"
"github.com/influxdata/telegraf"
"github.com/openzipkin/zipkin-go-opentracing/_thrift/gen-go/zipkincore"
)
// DefaultServiceName when the span does not have any serviceName
const DefaultServiceName = "unknown"
//now is a moackable time for now
var now = time.Now
// LineProtocolConverter implements the Recorder interface; it is a
// type meant to encapsulate the storage of zipkin tracing data in
// telegraf as line protocol.
type LineProtocolConverter struct {
acc telegraf.Accumulator
}
// NewLineProtocolConverter returns an instance of LineProtocolConverter that
// will add to the given telegraf.Accumulator
func NewLineProtocolConverter(acc telegraf.Accumulator) *LineProtocolConverter {
return &LineProtocolConverter{
acc: acc,
}
}
// Record is LineProtocolConverter's implementation of the Record method of
// the Recorder interface; it takes a trace as input, and adds it to an internal
// telegraf.Accumulator.
func (l *LineProtocolConverter) Record(t Trace) error {
for _, s := range t {
fields := map[string]interface{}{
"duration_ns": s.Duration.Nanoseconds(),
}
tags := map[string]string{
"id": s.ID,
"parent_id": s.ParentID,
"trace_id": s.TraceID,
"name": s.Name,
"service_name": s.ServiceName,
}
l.acc.AddFields("zipkin", fields, tags, s.Timestamp)
for _, a := range s.Annotations {
tags := map[string]string{
"id": s.ID,
"parent_id": s.ParentID,
"trace_id": s.TraceID,
"name": s.Name,
"service_name": a.ServiceName,
"annotation": a.Value,
"endpoint_host": a.Host,
}
l.acc.AddFields("zipkin", fields, tags, s.Timestamp)
}
for _, b := range s.BinaryAnnotations {
tags := map[string]string{
"id": s.ID,
"parent_id": s.ParentID,
"trace_id": s.TraceID,
"name": s.Name,
"service_name": b.ServiceName,
"annotation": b.Value,
"endpoint_host": b.Host,
"annotation_key": b.Key,
}
l.acc.AddFields("zipkin", fields, tags, s.Timestamp)
}
}
return nil
}
func (l *LineProtocolConverter) Error(err error) {
l.acc.AddError(err)
}
// NewTrace converts a slice of []*zipkincore.Spans into a new Trace
func NewTrace(spans []*zipkincore.Span) Trace {
trace := make(Trace, len(spans))
for i, span := range spans {
endpoint := serviceEndpoint(span.GetAnnotations(), span.GetBinaryAnnotations())
trace[i] = Span{
ID: formatID(span.GetID()),
TraceID: formatTraceID(span.GetTraceIDHigh(), span.GetTraceID()),
Name: span.GetName(),
Timestamp: guessTimestamp(span),
Duration: convertDuration(span),
ParentID: parentID(span),
ServiceName: serviceName(endpoint),
Annotations: NewAnnotations(span.GetAnnotations(), endpoint),
BinaryAnnotations: NewBinaryAnnotations(span.GetBinaryAnnotations(), endpoint),
}
}
return trace
}
// NewAnnotations converts a slice of *zipkincore.Annotation into a slice
// of new Annotations
func NewAnnotations(annotations []*zipkincore.Annotation, endpoint *zipkincore.Endpoint) []Annotation {
formatted := make([]Annotation, len(annotations))
for i, annotation := range annotations {
formatted[i] = Annotation{
Host: host(endpoint),
ServiceName: serviceName(endpoint),
Timestamp: microToTime(annotation.GetTimestamp()),
Value: annotation.GetValue(),
}
}
return formatted
}
// NewBinaryAnnotations is very similar to NewAnnotations, but it
// converts zipkincore.BinaryAnnotations instead of the normal zipkincore.Annotation
func NewBinaryAnnotations(annotations []*zipkincore.BinaryAnnotation, endpoint *zipkincore.Endpoint) []BinaryAnnotation {
formatted := make([]BinaryAnnotation, len(annotations))
for i, annotation := range annotations {
formatted[i] = BinaryAnnotation{
Host: host(endpoint),
ServiceName: serviceName(endpoint),
Key: annotation.GetKey(),
Value: string(annotation.GetValue()),
Type: annotation.GetAnnotationType().String(),
}
}
return formatted
}
func microToTime(micro int64) time.Time {
return time.Unix(0, micro*int64(time.Microsecond)).UTC()
}
func formatID(id int64) string {
return strconv.FormatInt(id, 10)
}
func formatTraceID(high, low int64) string {
if high == 0 {
return fmt.Sprintf("%x", low)
}
return fmt.Sprintf("%x%016x", high, low)
}
func minMax(span *zipkincore.Span) (time.Time, time.Time) {
min := now().UTC()
max := time.Time{}.UTC()
for _, annotation := range span.Annotations {
ts := microToTime(annotation.GetTimestamp())
if !ts.IsZero() && ts.Before(min) {
min = ts
}
if !ts.IsZero() && ts.After(max) {
max = ts
}
}
if max.IsZero() {
max = min
}
return min, max
}
func guessTimestamp(span *zipkincore.Span) time.Time {
if span.GetTimestamp() != 0 {
return microToTime(span.GetTimestamp())
}
min, _ := minMax(span)
return min
}
func convertDuration(span *zipkincore.Span) time.Duration {
duration := time.Duration(span.GetDuration()) * time.Microsecond
if duration != 0 {
return duration
}
min, max := minMax(span)
return max.Sub(min)
}
func parentID(span *zipkincore.Span) string {
// A parent ID of 0 means that this is a parent span. In this case,
// we set the parent ID of the span to be its own id, so it points to
// itself.
id := span.GetParentID()
if id != 0 {
return formatID(id)
}
return formatID(span.ID)
}
func ipv4(addr int32) string {
buf := make([]byte, 4)
binary.BigEndian.PutUint32(buf, uint32(addr))
return net.IP(buf).String()
}
func host(h *zipkincore.Endpoint) string {
if h == nil {
return ipv4(int32(0))
}
if h.GetPort() == 0 {
return ipv4(h.GetIpv4())
}
// Zipkin uses a signed int16 for the port, but, warns us that they actually treat it
// as an unsigned int16. So, we convert from int16 to int32 followed by taking & 0xffff
// to convert from signed to unsigned
// https://github.com/openzipkin/zipkin/blob/57dc2ec9c65fe6144e401c0c933b4400463a69df/zipkin/src/main/java/zipkin/Endpoint.java#L44
return ipv4(h.GetIpv4()) + ":" + strconv.FormatInt(int64(int(h.GetPort())&0xffff), 10)
}
func serviceName(h *zipkincore.Endpoint) string {
if h == nil {
return DefaultServiceName
}
return h.GetServiceName()
}
func serviceEndpoint(ann []*zipkincore.Annotation, bann []*zipkincore.BinaryAnnotation) *zipkincore.Endpoint {
for _, a := range ann {
switch a.Value {
case zipkincore.SERVER_RECV, zipkincore.SERVER_SEND, zipkincore.CLIENT_RECV, zipkincore.CLIENT_SEND:
if a.Host != nil && a.Host.ServiceName != "" {
return a.Host
}
}
}
for _, a := range bann {
if a.Key == zipkincore.LOCAL_COMPONENT && a.Host != nil && a.Host.ServiceName != "" {
return a.Host
}
}
// Unable to find any "standard" endpoint host, so, use any that exist in the regular annotations
for _, a := range ann {
if a.Host != nil && a.Host.ServiceName != "" {
return a.Host
}
}
return nil
}