Add JSON input support to zipkin plugin (#3150)
(cherry picked from commit 13a6b917c3)
This commit is contained in:
committed by
Daniel Nelson
parent
7254111d37
commit
d4cd1b7eb4
@@ -1,22 +1,10 @@
|
||||
package zipkin
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/openzipkin/zipkin-go-opentracing/_thrift/gen-go/zipkincore"
|
||||
"github.com/influxdata/telegraf/plugins/inputs/zipkin/trace"
|
||||
)
|
||||
|
||||
// DefaultServiceName when the span does not have any serviceName
|
||||
const DefaultServiceName = "unknown"
|
||||
|
||||
//now is a moackable time for now
|
||||
var now = time.Now
|
||||
|
||||
// LineProtocolConverter implements the Recorder interface; it is a
|
||||
// type meant to encapsulate the storage of zipkin tracing data in
|
||||
// telegraf as line protocol.
|
||||
@@ -35,7 +23,7 @@ func NewLineProtocolConverter(acc telegraf.Accumulator) *LineProtocolConverter {
|
||||
// Record is LineProtocolConverter's implementation of the Record method of
|
||||
// the Recorder interface; it takes a trace as input, and adds it to an internal
|
||||
// telegraf.Accumulator.
|
||||
func (l *LineProtocolConverter) Record(t Trace) error {
|
||||
func (l *LineProtocolConverter) Record(t trace.Trace) error {
|
||||
for _, s := range t {
|
||||
fields := map[string]interface{}{
|
||||
"duration_ns": s.Duration.Nanoseconds(),
|
||||
@@ -83,167 +71,3 @@ func (l *LineProtocolConverter) Record(t Trace) error {
|
||||
func (l *LineProtocolConverter) Error(err error) {
|
||||
l.acc.AddError(err)
|
||||
}
|
||||
|
||||
// NewTrace converts a slice of []*zipkincore.Spans into a new Trace
|
||||
func NewTrace(spans []*zipkincore.Span) Trace {
|
||||
trace := make(Trace, len(spans))
|
||||
for i, span := range spans {
|
||||
endpoint := serviceEndpoint(span.GetAnnotations(), span.GetBinaryAnnotations())
|
||||
trace[i] = Span{
|
||||
ID: formatID(span.GetID()),
|
||||
TraceID: formatTraceID(span.GetTraceIDHigh(), span.GetTraceID()),
|
||||
Name: span.GetName(),
|
||||
Timestamp: guessTimestamp(span),
|
||||
Duration: convertDuration(span),
|
||||
ParentID: parentID(span),
|
||||
ServiceName: serviceName(endpoint),
|
||||
Annotations: NewAnnotations(span.GetAnnotations(), endpoint),
|
||||
BinaryAnnotations: NewBinaryAnnotations(span.GetBinaryAnnotations(), endpoint),
|
||||
}
|
||||
}
|
||||
return trace
|
||||
}
|
||||
|
||||
// NewAnnotations converts a slice of *zipkincore.Annotation into a slice
|
||||
// of new Annotations
|
||||
func NewAnnotations(annotations []*zipkincore.Annotation, endpoint *zipkincore.Endpoint) []Annotation {
|
||||
formatted := make([]Annotation, len(annotations))
|
||||
for i, annotation := range annotations {
|
||||
formatted[i] = Annotation{
|
||||
Host: host(endpoint),
|
||||
ServiceName: serviceName(endpoint),
|
||||
Timestamp: microToTime(annotation.GetTimestamp()),
|
||||
Value: annotation.GetValue(),
|
||||
}
|
||||
}
|
||||
|
||||
return formatted
|
||||
}
|
||||
|
||||
// NewBinaryAnnotations is very similar to NewAnnotations, but it
|
||||
// converts zipkincore.BinaryAnnotations instead of the normal zipkincore.Annotation
|
||||
func NewBinaryAnnotations(annotations []*zipkincore.BinaryAnnotation, endpoint *zipkincore.Endpoint) []BinaryAnnotation {
|
||||
formatted := make([]BinaryAnnotation, len(annotations))
|
||||
for i, annotation := range annotations {
|
||||
formatted[i] = BinaryAnnotation{
|
||||
Host: host(endpoint),
|
||||
ServiceName: serviceName(endpoint),
|
||||
Key: annotation.GetKey(),
|
||||
Value: string(annotation.GetValue()),
|
||||
Type: annotation.GetAnnotationType().String(),
|
||||
}
|
||||
}
|
||||
return formatted
|
||||
}
|
||||
|
||||
func microToTime(micro int64) time.Time {
|
||||
return time.Unix(0, micro*int64(time.Microsecond)).UTC()
|
||||
}
|
||||
|
||||
func formatID(id int64) string {
|
||||
return strconv.FormatInt(id, 10)
|
||||
}
|
||||
|
||||
func formatTraceID(high, low int64) string {
|
||||
if high == 0 {
|
||||
return fmt.Sprintf("%x", low)
|
||||
}
|
||||
return fmt.Sprintf("%x%016x", high, low)
|
||||
}
|
||||
|
||||
func minMax(span *zipkincore.Span) (time.Time, time.Time) {
|
||||
min := now().UTC()
|
||||
max := time.Time{}.UTC()
|
||||
for _, annotation := range span.Annotations {
|
||||
ts := microToTime(annotation.GetTimestamp())
|
||||
if !ts.IsZero() && ts.Before(min) {
|
||||
min = ts
|
||||
}
|
||||
if !ts.IsZero() && ts.After(max) {
|
||||
max = ts
|
||||
}
|
||||
}
|
||||
if max.IsZero() {
|
||||
max = min
|
||||
}
|
||||
return min, max
|
||||
}
|
||||
|
||||
func guessTimestamp(span *zipkincore.Span) time.Time {
|
||||
if span.GetTimestamp() != 0 {
|
||||
return microToTime(span.GetTimestamp())
|
||||
}
|
||||
min, _ := minMax(span)
|
||||
return min
|
||||
}
|
||||
|
||||
func convertDuration(span *zipkincore.Span) time.Duration {
|
||||
duration := time.Duration(span.GetDuration()) * time.Microsecond
|
||||
if duration != 0 {
|
||||
return duration
|
||||
}
|
||||
min, max := minMax(span)
|
||||
return max.Sub(min)
|
||||
}
|
||||
|
||||
func parentID(span *zipkincore.Span) string {
|
||||
// A parent ID of 0 means that this is a parent span. In this case,
|
||||
// we set the parent ID of the span to be its own id, so it points to
|
||||
// itself.
|
||||
id := span.GetParentID()
|
||||
if id != 0 {
|
||||
return formatID(id)
|
||||
}
|
||||
return formatID(span.ID)
|
||||
}
|
||||
|
||||
func ipv4(addr int32) string {
|
||||
buf := make([]byte, 4)
|
||||
binary.BigEndian.PutUint32(buf, uint32(addr))
|
||||
return net.IP(buf).String()
|
||||
}
|
||||
|
||||
func host(h *zipkincore.Endpoint) string {
|
||||
if h == nil {
|
||||
return ipv4(int32(0))
|
||||
}
|
||||
if h.GetPort() == 0 {
|
||||
return ipv4(h.GetIpv4())
|
||||
}
|
||||
// Zipkin uses a signed int16 for the port, but, warns us that they actually treat it
|
||||
// as an unsigned int16. So, we convert from int16 to int32 followed by taking & 0xffff
|
||||
// to convert from signed to unsigned
|
||||
// https://github.com/openzipkin/zipkin/blob/57dc2ec9c65fe6144e401c0c933b4400463a69df/zipkin/src/main/java/zipkin/Endpoint.java#L44
|
||||
return ipv4(h.GetIpv4()) + ":" + strconv.FormatInt(int64(int(h.GetPort())&0xffff), 10)
|
||||
}
|
||||
|
||||
func serviceName(h *zipkincore.Endpoint) string {
|
||||
if h == nil {
|
||||
return DefaultServiceName
|
||||
}
|
||||
return h.GetServiceName()
|
||||
}
|
||||
|
||||
func serviceEndpoint(ann []*zipkincore.Annotation, bann []*zipkincore.BinaryAnnotation) *zipkincore.Endpoint {
|
||||
for _, a := range ann {
|
||||
switch a.Value {
|
||||
case zipkincore.SERVER_RECV, zipkincore.SERVER_SEND, zipkincore.CLIENT_RECV, zipkincore.CLIENT_SEND:
|
||||
if a.Host != nil && a.Host.ServiceName != "" {
|
||||
return a.Host
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, a := range bann {
|
||||
if a.Key == zipkincore.LOCAL_COMPONENT && a.Host != nil && a.Host.ServiceName != "" {
|
||||
return a.Host
|
||||
}
|
||||
}
|
||||
// Unable to find any "standard" endpoint host, so, use any that exist in the regular annotations
|
||||
for _, a := range ann {
|
||||
if a.Host != nil && a.Host.ServiceName != "" {
|
||||
return a.Host
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user