250 lines
6.9 KiB
Go
250 lines
6.9 KiB
Go
|
package zipkin
|
||
|
|
||
|
import (
|
||
|
"encoding/binary"
|
||
|
"fmt"
|
||
|
"net"
|
||
|
"strconv"
|
||
|
"time"
|
||
|
|
||
|
"github.com/influxdata/telegraf"
|
||
|
"github.com/openzipkin/zipkin-go-opentracing/_thrift/gen-go/zipkincore"
|
||
|
)
|
||
|
|
||
|
// DefaultServiceName when the span does not have any serviceName
|
||
|
const DefaultServiceName = "unknown"
|
||
|
|
||
|
//now is a moackable time for now
|
||
|
var now = time.Now
|
||
|
|
||
|
// LineProtocolConverter implements the Recorder interface; it is a
|
||
|
// type meant to encapsulate the storage of zipkin tracing data in
|
||
|
// telegraf as line protocol.
|
||
|
type LineProtocolConverter struct {
|
||
|
acc telegraf.Accumulator
|
||
|
}
|
||
|
|
||
|
// NewLineProtocolConverter returns an instance of LineProtocolConverter that
|
||
|
// will add to the given telegraf.Accumulator
|
||
|
func NewLineProtocolConverter(acc telegraf.Accumulator) *LineProtocolConverter {
|
||
|
return &LineProtocolConverter{
|
||
|
acc: acc,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Record is LineProtocolConverter's implementation of the Record method of
|
||
|
// the Recorder interface; it takes a trace as input, and adds it to an internal
|
||
|
// telegraf.Accumulator.
|
||
|
func (l *LineProtocolConverter) Record(t Trace) error {
|
||
|
for _, s := range t {
|
||
|
fields := map[string]interface{}{
|
||
|
"duration_ns": s.Duration.Nanoseconds(),
|
||
|
}
|
||
|
tags := map[string]string{
|
||
|
"id": s.ID,
|
||
|
"parent_id": s.ParentID,
|
||
|
"trace_id": s.TraceID,
|
||
|
"name": s.Name,
|
||
|
"service_name": s.ServiceName,
|
||
|
}
|
||
|
l.acc.AddFields("zipkin", fields, tags, s.Timestamp)
|
||
|
|
||
|
for _, a := range s.Annotations {
|
||
|
tags := map[string]string{
|
||
|
"id": s.ID,
|
||
|
"parent_id": s.ParentID,
|
||
|
"trace_id": s.TraceID,
|
||
|
"name": s.Name,
|
||
|
"service_name": a.ServiceName,
|
||
|
"annotation": a.Value,
|
||
|
"endpoint_host": a.Host,
|
||
|
}
|
||
|
l.acc.AddFields("zipkin", fields, tags, s.Timestamp)
|
||
|
}
|
||
|
|
||
|
for _, b := range s.BinaryAnnotations {
|
||
|
tags := map[string]string{
|
||
|
"id": s.ID,
|
||
|
"parent_id": s.ParentID,
|
||
|
"trace_id": s.TraceID,
|
||
|
"name": s.Name,
|
||
|
"service_name": b.ServiceName,
|
||
|
"annotation": b.Value,
|
||
|
"endpoint_host": b.Host,
|
||
|
"annotation_key": b.Key,
|
||
|
}
|
||
|
l.acc.AddFields("zipkin", fields, tags, s.Timestamp)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (l *LineProtocolConverter) Error(err error) {
|
||
|
l.acc.AddError(err)
|
||
|
}
|
||
|
|
||
|
// NewTrace converts a slice of []*zipkincore.Spans into a new Trace
|
||
|
func NewTrace(spans []*zipkincore.Span) Trace {
|
||
|
trace := make(Trace, len(spans))
|
||
|
for i, span := range spans {
|
||
|
endpoint := serviceEndpoint(span.GetAnnotations(), span.GetBinaryAnnotations())
|
||
|
trace[i] = Span{
|
||
|
ID: formatID(span.GetID()),
|
||
|
TraceID: formatTraceID(span.GetTraceIDHigh(), span.GetTraceID()),
|
||
|
Name: span.GetName(),
|
||
|
Timestamp: guessTimestamp(span),
|
||
|
Duration: convertDuration(span),
|
||
|
ParentID: parentID(span),
|
||
|
ServiceName: serviceName(endpoint),
|
||
|
Annotations: NewAnnotations(span.GetAnnotations(), endpoint),
|
||
|
BinaryAnnotations: NewBinaryAnnotations(span.GetBinaryAnnotations(), endpoint),
|
||
|
}
|
||
|
}
|
||
|
return trace
|
||
|
}
|
||
|
|
||
|
// NewAnnotations converts a slice of *zipkincore.Annotation into a slice
|
||
|
// of new Annotations
|
||
|
func NewAnnotations(annotations []*zipkincore.Annotation, endpoint *zipkincore.Endpoint) []Annotation {
|
||
|
formatted := make([]Annotation, len(annotations))
|
||
|
for i, annotation := range annotations {
|
||
|
formatted[i] = Annotation{
|
||
|
Host: host(endpoint),
|
||
|
ServiceName: serviceName(endpoint),
|
||
|
Timestamp: microToTime(annotation.GetTimestamp()),
|
||
|
Value: annotation.GetValue(),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return formatted
|
||
|
}
|
||
|
|
||
|
// NewBinaryAnnotations is very similar to NewAnnotations, but it
|
||
|
// converts zipkincore.BinaryAnnotations instead of the normal zipkincore.Annotation
|
||
|
func NewBinaryAnnotations(annotations []*zipkincore.BinaryAnnotation, endpoint *zipkincore.Endpoint) []BinaryAnnotation {
|
||
|
formatted := make([]BinaryAnnotation, len(annotations))
|
||
|
for i, annotation := range annotations {
|
||
|
formatted[i] = BinaryAnnotation{
|
||
|
Host: host(endpoint),
|
||
|
ServiceName: serviceName(endpoint),
|
||
|
Key: annotation.GetKey(),
|
||
|
Value: string(annotation.GetValue()),
|
||
|
Type: annotation.GetAnnotationType().String(),
|
||
|
}
|
||
|
}
|
||
|
return formatted
|
||
|
}
|
||
|
|
||
|
func microToTime(micro int64) time.Time {
|
||
|
return time.Unix(0, micro*int64(time.Microsecond)).UTC()
|
||
|
}
|
||
|
|
||
|
func formatID(id int64) string {
|
||
|
return strconv.FormatInt(id, 10)
|
||
|
}
|
||
|
|
||
|
func formatTraceID(high, low int64) string {
|
||
|
if high == 0 {
|
||
|
return fmt.Sprintf("%x", low)
|
||
|
}
|
||
|
return fmt.Sprintf("%x%016x", high, low)
|
||
|
}
|
||
|
|
||
|
func minMax(span *zipkincore.Span) (time.Time, time.Time) {
|
||
|
min := now().UTC()
|
||
|
max := time.Time{}.UTC()
|
||
|
for _, annotation := range span.Annotations {
|
||
|
ts := microToTime(annotation.GetTimestamp())
|
||
|
if !ts.IsZero() && ts.Before(min) {
|
||
|
min = ts
|
||
|
}
|
||
|
if !ts.IsZero() && ts.After(max) {
|
||
|
max = ts
|
||
|
}
|
||
|
}
|
||
|
if max.IsZero() {
|
||
|
max = min
|
||
|
}
|
||
|
return min, max
|
||
|
}
|
||
|
|
||
|
func guessTimestamp(span *zipkincore.Span) time.Time {
|
||
|
if span.GetTimestamp() != 0 {
|
||
|
return microToTime(span.GetTimestamp())
|
||
|
}
|
||
|
min, _ := minMax(span)
|
||
|
return min
|
||
|
}
|
||
|
|
||
|
func convertDuration(span *zipkincore.Span) time.Duration {
|
||
|
duration := time.Duration(span.GetDuration()) * time.Microsecond
|
||
|
if duration != 0 {
|
||
|
return duration
|
||
|
}
|
||
|
min, max := minMax(span)
|
||
|
return max.Sub(min)
|
||
|
}
|
||
|
|
||
|
func parentID(span *zipkincore.Span) string {
|
||
|
// A parent ID of 0 means that this is a parent span. In this case,
|
||
|
// we set the parent ID of the span to be its own id, so it points to
|
||
|
// itself.
|
||
|
id := span.GetParentID()
|
||
|
if id != 0 {
|
||
|
return formatID(id)
|
||
|
}
|
||
|
return formatID(span.ID)
|
||
|
}
|
||
|
|
||
|
func ipv4(addr int32) string {
|
||
|
buf := make([]byte, 4)
|
||
|
binary.BigEndian.PutUint32(buf, uint32(addr))
|
||
|
return net.IP(buf).String()
|
||
|
}
|
||
|
|
||
|
func host(h *zipkincore.Endpoint) string {
|
||
|
if h == nil {
|
||
|
return ipv4(int32(0))
|
||
|
}
|
||
|
if h.GetPort() == 0 {
|
||
|
return ipv4(h.GetIpv4())
|
||
|
}
|
||
|
// Zipkin uses a signed int16 for the port, but, warns us that they actually treat it
|
||
|
// as an unsigned int16. So, we convert from int16 to int32 followed by taking & 0xffff
|
||
|
// to convert from signed to unsigned
|
||
|
// https://github.com/openzipkin/zipkin/blob/57dc2ec9c65fe6144e401c0c933b4400463a69df/zipkin/src/main/java/zipkin/Endpoint.java#L44
|
||
|
return ipv4(h.GetIpv4()) + ":" + strconv.FormatInt(int64(int(h.GetPort())&0xffff), 10)
|
||
|
}
|
||
|
|
||
|
func serviceName(h *zipkincore.Endpoint) string {
|
||
|
if h == nil {
|
||
|
return DefaultServiceName
|
||
|
}
|
||
|
return h.GetServiceName()
|
||
|
}
|
||
|
|
||
|
func serviceEndpoint(ann []*zipkincore.Annotation, bann []*zipkincore.BinaryAnnotation) *zipkincore.Endpoint {
|
||
|
for _, a := range ann {
|
||
|
switch a.Value {
|
||
|
case zipkincore.SERVER_RECV, zipkincore.SERVER_SEND, zipkincore.CLIENT_RECV, zipkincore.CLIENT_SEND:
|
||
|
if a.Host != nil && a.Host.ServiceName != "" {
|
||
|
return a.Host
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
for _, a := range bann {
|
||
|
if a.Key == zipkincore.LOCAL_COMPONENT && a.Host != nil && a.Host.ServiceName != "" {
|
||
|
return a.Host
|
||
|
}
|
||
|
}
|
||
|
// Unable to find any "standard" endpoint host, so, use any that exist in the regular annotations
|
||
|
for _, a := range ann {
|
||
|
if a.Host != nil && a.Host.ServiceName != "" {
|
||
|
return a.Host
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
}
|