diff --git a/plugins/inputs/zipkin/README.md b/plugins/inputs/zipkin/README.md index af5ea6117..af95542b3 100644 --- a/plugins/inputs/zipkin/README.md +++ b/plugins/inputs/zipkin/README.md @@ -12,6 +12,9 @@ based on its main usage cases and the evolution of the OpenTracing standard.* port = 9411 # Port on which Telegraf listens ``` +The plugin accepts spans in `JSON` or `thrift` if the `Content-Type` is `application/json` or `application/x-thrift`, respectively. +If `Content-Type` is not set, then the plugin assumes it is `JSON` format. + ## Tracing: This plugin uses Annotations tags and fields to track data from spans diff --git a/plugins/inputs/zipkin/cmd/thrift_serialize/thrift_serialize.go b/plugins/inputs/zipkin/cmd/thrift_serialize/thrift_serialize.go index 5a65384d4..0d792f926 100644 --- a/plugins/inputs/zipkin/cmd/thrift_serialize/thrift_serialize.go +++ b/plugins/inputs/zipkin/cmd/thrift_serialize/thrift_serialize.go @@ -62,13 +62,17 @@ func main() { if err != nil { log.Fatalf("%v\n", err) } - ioutil.WriteFile(outFileName, raw, 0644) + if err := ioutil.WriteFile(outFileName, raw, 0644); err != nil { + log.Fatalf("%v", err) + } case "thrift": raw, err := thriftToJSONSpans(contents) if err != nil { log.Fatalf("%v\n", err) } - ioutil.WriteFile(outFileName, raw, 0644) + if err := ioutil.WriteFile(outFileName, raw, 0644); err != nil { + log.Fatalf("%v", err) + } default: log.Fatalf("Unsupported input type") } diff --git a/plugins/inputs/zipkin/codec/codec.go b/plugins/inputs/zipkin/codec/codec.go new file mode 100644 index 000000000..781aad72a --- /dev/null +++ b/plugins/inputs/zipkin/codec/codec.go @@ -0,0 +1,210 @@ +package codec + +import ( + "time" + + "github.com/influxdata/telegraf/plugins/inputs/zipkin/trace" + "github.com/openzipkin/zipkin-go-opentracing/_thrift/gen-go/zipkincore" +) + +//now is a mockable time for now +var now = time.Now + +// DefaultServiceName when the span does not have any serviceName +const DefaultServiceName = "unknown" + +// Decoder decodes the bytes and returns a trace +type Decoder interface { + Decode(octets []byte) ([]Span, error) +} + +// Span are created by instrumentation in RPC clients or servers +type Span interface { + Trace() (string, error) + SpanID() (string, error) + Parent() (string, error) + Name() string + Annotations() []Annotation + BinaryAnnotations() ([]BinaryAnnotation, error) + Timestamp() time.Time + Duration() time.Duration +} + +// Annotation represents an event that explains latency with a timestamp. +type Annotation interface { + Timestamp() time.Time + Value() string + Host() Endpoint +} + +// BinaryAnnotation represent tags applied to a Span to give it context +type BinaryAnnotation interface { + Key() string + Value() string + Host() Endpoint +} + +// Endpoint represents the network context of a service recording an annotation +type Endpoint interface { + Host() string + Name() string +} + +// DefaultEndpoint is used if the annotations have no endpoints +type DefaultEndpoint struct{} + +// Host returns 0.0.0.0; used when the host is unknown +func (d *DefaultEndpoint) Host() string { return "0.0.0.0" } + +// Name returns "unknown" when an endpoint doesn't exist +func (d *DefaultEndpoint) Name() string { return DefaultServiceName } + +// MicroToTime converts zipkin's native time of microseconds into time.Time +func MicroToTime(micro int64) time.Time { + return time.Unix(0, micro*int64(time.Microsecond)).UTC() +} + +// NewTrace converts a slice of []Span into a new Trace +func NewTrace(spans []Span) (trace.Trace, error) { + tr := make(trace.Trace, len(spans)) + for i, span := range spans { + bin, err := span.BinaryAnnotations() + if err != nil { + return nil, err + } + endpoint := serviceEndpoint(span.Annotations(), bin) + id, err := span.SpanID() + if err != nil { + return nil, err + } + + tid, err := span.Trace() + if err != nil { + return nil, err + } + + pid, err := parentID(span) + if err != nil { + return nil, err + } + + tr[i] = trace.Span{ + ID: id, + TraceID: tid, + Name: span.Name(), + Timestamp: guessTimestamp(span), + Duration: convertDuration(span), + ParentID: pid, + ServiceName: endpoint.Name(), + Annotations: NewAnnotations(span.Annotations(), endpoint), + BinaryAnnotations: NewBinaryAnnotations(bin, endpoint), + } + } + return tr, nil +} + +// NewAnnotations converts a slice of Annotation into a slice of new Annotations +func NewAnnotations(annotations []Annotation, endpoint Endpoint) []trace.Annotation { + formatted := make([]trace.Annotation, len(annotations)) + for i, annotation := range annotations { + formatted[i] = trace.Annotation{ + Host: endpoint.Host(), + ServiceName: endpoint.Name(), + Timestamp: annotation.Timestamp(), + Value: annotation.Value(), + } + } + + return formatted +} + +// NewBinaryAnnotations is very similar to NewAnnotations, but it +// converts BinaryAnnotations instead of the normal Annotation +func NewBinaryAnnotations(annotations []BinaryAnnotation, endpoint Endpoint) []trace.BinaryAnnotation { + formatted := make([]trace.BinaryAnnotation, len(annotations)) + for i, annotation := range annotations { + formatted[i] = trace.BinaryAnnotation{ + Host: endpoint.Host(), + ServiceName: endpoint.Name(), + Key: annotation.Key(), + Value: annotation.Value(), + } + } + return formatted +} + +func minMax(span Span) (time.Time, time.Time) { + min := now().UTC() + max := time.Time{}.UTC() + for _, annotation := range span.Annotations() { + ts := annotation.Timestamp() + if !ts.IsZero() && ts.Before(min) { + min = ts + } + if !ts.IsZero() && ts.After(max) { + max = ts + } + } + if max.IsZero() { + max = min + } + return min, max +} + +func guessTimestamp(span Span) time.Time { + ts := span.Timestamp() + if !ts.IsZero() { + return ts + } + + min, _ := minMax(span) + return min +} + +func convertDuration(span Span) time.Duration { + duration := span.Duration() + if duration != 0 { + return duration + } + min, max := minMax(span) + return max.Sub(min) +} + +func parentID(span Span) (string, error) { + // A parent ID of "" means that this is a parent span. In this case, + // we set the parent ID of the span to be its own id, so it points to + // itself. + id, err := span.Parent() + if err != nil { + return "", err + } + + if id != "" { + return id, nil + } + return span.SpanID() +} + +func serviceEndpoint(ann []Annotation, bann []BinaryAnnotation) Endpoint { + for _, a := range ann { + switch a.Value() { + case zipkincore.SERVER_RECV, zipkincore.SERVER_SEND, zipkincore.CLIENT_RECV, zipkincore.CLIENT_SEND: + if a.Host() != nil && a.Host().Name() != "" { + return a.Host() + } + } + } + + for _, a := range bann { + if a.Key() == zipkincore.LOCAL_COMPONENT && a.Host() != nil && a.Host().Name() != "" { + return a.Host() + } + } + // Unable to find any "standard" endpoint host, so, use any that exist in the regular annotations + for _, a := range ann { + if a.Host() != nil && a.Host().Name() != "" { + return a.Host() + } + } + return &DefaultEndpoint{} +} diff --git a/plugins/inputs/zipkin/codec/codec_test.go b/plugins/inputs/zipkin/codec/codec_test.go new file mode 100644 index 000000000..c3a9fbd73 --- /dev/null +++ b/plugins/inputs/zipkin/codec/codec_test.go @@ -0,0 +1,636 @@ +package codec + +import ( + "fmt" + "reflect" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + + "github.com/influxdata/telegraf/plugins/inputs/zipkin/trace" +) + +func Test_MicroToTime(t *testing.T) { + type args struct { + micro int64 + } + tests := []struct { + name string + micro int64 + want time.Time + }{ + { + name: "given zero micro seconds expected unix time zero", + micro: 0, + want: time.Unix(0, 0).UTC(), + }, + { + name: "given a million micro seconds expected unix time one", + micro: 1000000, + want: time.Unix(1, 0).UTC(), + }, + { + name: "given a million micro seconds expected unix time one", + micro: 1503031538791000, + want: time.Unix(0, 1503031538791000000).UTC(), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := MicroToTime(tt.micro); !reflect.DeepEqual(got, tt.want) { + t.Errorf("microToTime() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_minMax(t *testing.T) { + tests := []struct { + name string + span *MockSpan + now func() time.Time + wantMin time.Time + wantMax time.Time + }{ + { + name: "Single annotation", + span: &MockSpan{ + Anno: []Annotation{ + &MockAnnotation{ + Time: time.Unix(0, 0).UTC().Add(time.Second), + }, + }, + }, + wantMin: time.Unix(1, 0).UTC(), + wantMax: time.Unix(1, 0).UTC(), + }, + { + name: "Three annotations", + span: &MockSpan{ + Anno: []Annotation{ + &MockAnnotation{ + Time: time.Unix(0, 0).UTC().Add(1 * time.Second), + }, + &MockAnnotation{ + Time: time.Unix(0, 0).UTC().Add(2 * time.Second), + }, + &MockAnnotation{ + Time: time.Unix(0, 0).UTC().Add(3 * time.Second), + }, + }, + }, + wantMin: time.Unix(1, 0).UTC(), + wantMax: time.Unix(3, 0).UTC(), + }, + { + name: "Annotations are in the future", + span: &MockSpan{ + Anno: []Annotation{ + &MockAnnotation{ + Time: time.Unix(0, 0).UTC().Add(3 * time.Second), + }, + }, + }, + wantMin: time.Unix(2, 0).UTC(), + wantMax: time.Unix(3, 0).UTC(), + now: func() time.Time { + return time.Unix(2, 0).UTC() + }, + }, + { + name: "No Annotations", + span: &MockSpan{ + Anno: []Annotation{}, + }, + wantMin: time.Unix(2, 0).UTC(), + wantMax: time.Unix(2, 0).UTC(), + now: func() time.Time { + return time.Unix(2, 0).UTC() + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.now != nil { + now = tt.now + } + got, got1 := minMax(tt.span) + if !reflect.DeepEqual(got, tt.wantMin) { + t.Errorf("minMax() got = %v, want %v", got, tt.wantMin) + } + if !reflect.DeepEqual(got1, tt.wantMax) { + t.Errorf("minMax() got1 = %v, want %v", got1, tt.wantMax) + } + now = time.Now + }) + } +} + +func Test_guessTimestamp(t *testing.T) { + tests := []struct { + name string + span Span + now func() time.Time + want time.Time + }{ + { + name: "simple timestamp", + span: &MockSpan{ + Time: time.Unix(2, 0).UTC(), + }, + want: time.Unix(2, 0).UTC(), + }, + { + name: "zero timestamp", + span: &MockSpan{ + Time: time.Time{}, + }, + now: func() time.Time { + return time.Unix(2, 0).UTC() + }, + want: time.Unix(2, 0).UTC(), + }, + { + name: "zero timestamp with single annotation", + span: &MockSpan{ + Time: time.Time{}, + Anno: []Annotation{ + &MockAnnotation{ + Time: time.Unix(0, 0).UTC(), + }, + }, + }, + want: time.Unix(0, 0).UTC(), + }, + { + name: "zero timestamp with two annotations", + span: &MockSpan{ + Time: time.Time{}, + Anno: []Annotation{ + &MockAnnotation{ + Time: time.Unix(0, 0).UTC(), + }, + &MockAnnotation{ + Time: time.Unix(2, 0).UTC(), + }, + }, + }, + want: time.Unix(0, 0).UTC(), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.now != nil { + now = tt.now + } + if got := guessTimestamp(tt.span); !reflect.DeepEqual(got, tt.want) { + t.Errorf("guessTimestamp() = %v, want %v", got, tt.want) + } + now = time.Now + }) + } +} + +func Test_convertDuration(t *testing.T) { + tests := []struct { + name string + span Span + want time.Duration + }{ + { + name: "simple duration", + span: &MockSpan{ + Dur: time.Hour, + }, + want: time.Hour, + }, + { + name: "no timestamp, but, 2 seconds between annotations", + span: &MockSpan{ + Anno: []Annotation{ + &MockAnnotation{ + Time: time.Unix(0, 0).UTC().Add(1 * time.Second), + }, + &MockAnnotation{ + Time: time.Unix(0, 0).UTC().Add(2 * time.Second), + }, + &MockAnnotation{ + Time: time.Unix(0, 0).UTC().Add(3 * time.Second), + }, + }, + }, + want: 2 * time.Second, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := convertDuration(tt.span); got != tt.want { + t.Errorf("convertDuration() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_parentID(t *testing.T) { + tests := []struct { + name string + span Span + want string + wantErr bool + }{ + { + name: "has parent id", + span: &MockSpan{ + ParentID: "6b221d5bc9e6496c", + }, + want: "6b221d5bc9e6496c", + }, + { + name: "no parent, so use id", + span: &MockSpan{ + ID: "abceasyas123", + }, + want: "abceasyas123", + }, + { + name: "bad parent value", + span: &MockSpan{ + Error: fmt.Errorf("Mommie Dearest"), + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := parentID(tt.span) + if (err != nil) != tt.wantErr { + t.Errorf("parentID() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("parentID() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_serviceEndpoint(t *testing.T) { + tests := []struct { + name string + ann []Annotation + bann []BinaryAnnotation + want Endpoint + }{ + { + name: "Annotation with server receive", + ann: []Annotation{ + &MockAnnotation{ + Val: "battery", + H: &MockEndpoint{ + name: "aa", + }, + }, + &MockAnnotation{ + Val: "sr", + H: &MockEndpoint{ + name: "me", + }, + }, + }, + want: &MockEndpoint{ + name: "me", + }, + }, + { + name: "Annotation with no standard values", + ann: []Annotation{ + &MockAnnotation{ + Val: "noop", + }, + &MockAnnotation{ + Val: "aa", + H: &MockEndpoint{ + name: "battery", + }, + }, + }, + want: &MockEndpoint{ + name: "battery", + }, + }, + { + name: "Annotation with no endpoints", + ann: []Annotation{ + &MockAnnotation{ + Val: "noop", + }, + }, + want: &DefaultEndpoint{}, + }, + { + name: "Binary annotation with local component", + bann: []BinaryAnnotation{ + &MockBinaryAnnotation{ + K: "noop", + H: &MockEndpoint{ + name: "aa", + }, + }, + &MockBinaryAnnotation{ + K: "lc", + H: &MockEndpoint{ + name: "me", + }, + }, + }, + want: &MockEndpoint{ + name: "me", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := serviceEndpoint(tt.ann, tt.bann); !reflect.DeepEqual(got, tt.want) { + t.Errorf("serviceEndpoint() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestNewBinaryAnnotations(t *testing.T) { + tests := []struct { + name string + annotations []BinaryAnnotation + endpoint Endpoint + want []trace.BinaryAnnotation + }{ + { + name: "Should override annotation with endpoint", + annotations: []BinaryAnnotation{ + &MockBinaryAnnotation{ + K: "mykey", + V: "myvalue", + H: &MockEndpoint{ + host: "noop", + name: "noop", + }, + }, + }, + endpoint: &MockEndpoint{ + host: "myhost", + name: "myservice", + }, + want: []trace.BinaryAnnotation{ + trace.BinaryAnnotation{ + Host: "myhost", + ServiceName: "myservice", + Key: "mykey", + Value: "myvalue", + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := NewBinaryAnnotations(tt.annotations, tt.endpoint); !reflect.DeepEqual(got, tt.want) { + t.Errorf("NewBinaryAnnotations() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestNewAnnotations(t *testing.T) { + tests := []struct { + name string + annotations []Annotation + endpoint Endpoint + want []trace.Annotation + }{ + { + name: "Should override annotation with endpoint", + annotations: []Annotation{ + &MockAnnotation{ + Time: time.Unix(0, 0).UTC(), + Val: "myvalue", + H: &MockEndpoint{ + host: "noop", + name: "noop", + }, + }, + }, + endpoint: &MockEndpoint{ + host: "myhost", + name: "myservice", + }, + want: []trace.Annotation{ + trace.Annotation{ + Host: "myhost", + ServiceName: "myservice", + Timestamp: time.Unix(0, 0).UTC(), + Value: "myvalue", + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := NewAnnotations(tt.annotations, tt.endpoint); !reflect.DeepEqual(got, tt.want) { + t.Errorf("NewAnnotations() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestNewTrace(t *testing.T) { + tests := []struct { + name string + spans []Span + now func() time.Time + want trace.Trace + wantErr bool + }{ + { + name: "empty span", + spans: []Span{ + &MockSpan{}, + }, + now: func() time.Time { + return time.Unix(0, 0).UTC() + }, + want: trace.Trace{ + trace.Span{ + ServiceName: "unknown", + Timestamp: time.Unix(0, 0).UTC(), + Annotations: []trace.Annotation{}, + BinaryAnnotations: []trace.BinaryAnnotation{}, + }, + }, + }, + { + name: "span has no id", + spans: []Span{ + &MockSpan{ + Error: fmt.Errorf("Span has no id"), + }, + }, + wantErr: true, + }, + { + name: "complete span", + spans: []Span{ + &MockSpan{ + TraceID: "tid", + ID: "id", + ParentID: "", + ServiceName: "me", + Anno: []Annotation{ + &MockAnnotation{ + Time: time.Unix(1, 0).UTC(), + Val: "myval", + H: &MockEndpoint{ + host: "myhost", + name: "myname", + }, + }, + }, + Time: time.Unix(0, 0).UTC(), + Dur: 2 * time.Second, + }, + }, + now: func() time.Time { + return time.Unix(0, 0).UTC() + }, + want: trace.Trace{ + trace.Span{ + ID: "id", + ParentID: "id", + TraceID: "tid", + Name: "me", + ServiceName: "myname", + Timestamp: time.Unix(0, 0).UTC(), + Duration: 2 * time.Second, + Annotations: []trace.Annotation{ + { + Timestamp: time.Unix(1, 0).UTC(), + Value: "myval", + Host: "myhost", + ServiceName: "myname", + }, + }, + BinaryAnnotations: []trace.BinaryAnnotation{}, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.now != nil { + now = tt.now + } + got, err := NewTrace(tt.spans) + if (err != nil) != tt.wantErr { + t.Errorf("NewTrace() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !cmp.Equal(tt.want, got) { + t.Errorf("NewTrace() = %s", cmp.Diff(tt.want, got)) + } + now = time.Now + }) + } +} + +type MockSpan struct { + TraceID string + ID string + ParentID string + ServiceName string + Anno []Annotation + BinAnno []BinaryAnnotation + Time time.Time + Dur time.Duration + Error error +} + +func (m *MockSpan) Trace() (string, error) { + return m.TraceID, m.Error +} + +func (m *MockSpan) SpanID() (string, error) { + return m.ID, m.Error +} + +func (m *MockSpan) Parent() (string, error) { + return m.ParentID, m.Error +} + +func (m *MockSpan) Name() string { + return m.ServiceName +} + +func (m *MockSpan) Annotations() []Annotation { + return m.Anno +} + +func (m *MockSpan) BinaryAnnotations() ([]BinaryAnnotation, error) { + return m.BinAnno, m.Error +} + +func (m *MockSpan) Timestamp() time.Time { + return m.Time +} + +func (m *MockSpan) Duration() time.Duration { + return m.Dur +} + +type MockAnnotation struct { + Time time.Time + Val string + H Endpoint +} + +func (m *MockAnnotation) Timestamp() time.Time { + return m.Time +} + +func (m *MockAnnotation) Value() string { + return m.Val +} + +func (m *MockAnnotation) Host() Endpoint { + return m.H +} + +type MockEndpoint struct { + host string + name string +} + +func (e *MockEndpoint) Host() string { + return e.host +} + +func (e *MockEndpoint) Name() string { + return e.name +} + +type MockBinaryAnnotation struct { + Time time.Time + K string + V string + H Endpoint +} + +func (b *MockBinaryAnnotation) Key() string { + return b.K +} + +func (b *MockBinaryAnnotation) Value() string { + return b.V +} + +func (b *MockBinaryAnnotation) Host() Endpoint { + return b.H +} diff --git a/plugins/inputs/zipkin/codec/jsonV1/jsonV1.go b/plugins/inputs/zipkin/codec/jsonV1/jsonV1.go new file mode 100644 index 000000000..53670dd29 --- /dev/null +++ b/plugins/inputs/zipkin/codec/jsonV1/jsonV1.go @@ -0,0 +1,252 @@ +package jsonV1 + +import ( + "encoding/json" + "fmt" + "strconv" + "time" + + "github.com/influxdata/telegraf/plugins/inputs/zipkin/codec" + "github.com/openzipkin/zipkin-go-opentracing/_thrift/gen-go/zipkincore" +) + +// JSON decodes spans from bodies `POST`ed to the spans endpoint +type JSON struct{} + +// Decode unmarshals and validates the JSON body +func (j *JSON) Decode(octets []byte) ([]codec.Span, error) { + var spans []span + err := json.Unmarshal(octets, &spans) + if err != nil { + return nil, err + } + + res := make([]codec.Span, len(spans)) + for i := range spans { + if err := spans[i].Validate(); err != nil { + return nil, err + } + res[i] = &spans[i] + } + return res, nil +} + +type span struct { + TraceID string `json:"traceId"` + SpanName string `json:"name"` + ParentID string `json:"parentId,omitempty"` + ID string `json:"id"` + Time *int64 `json:"timestamp,omitempty"` + Dur *int64 `json:"duration,omitempty"` + Debug bool `json:"debug,omitempty"` + Anno []annotation `json:"annotations"` + BAnno []binaryAnnotation `json:"binaryAnnotations"` +} + +func (s *span) Validate() error { + var err error + check := func(f func() (string, error)) { + if err != nil { + return + } + _, err = f() + } + + check(s.Trace) + check(s.SpanID) + check(s.Parent) + if err != nil { + return err + } + + _, err = s.BinaryAnnotations() + return err +} + +func (s *span) Trace() (string, error) { + if s.TraceID == "" { + return "", fmt.Errorf("Trace ID cannot be null") + } + return TraceIDFromString(s.TraceID) +} + +func (s *span) SpanID() (string, error) { + if s.ID == "" { + return "", fmt.Errorf("Span ID cannot be null") + } + return IDFromString(s.ID) +} + +func (s *span) Parent() (string, error) { + if s.ParentID == "" { + return "", nil + } + return IDFromString(s.ParentID) +} + +func (s *span) Name() string { + return s.SpanName +} + +func (s *span) Annotations() []codec.Annotation { + res := make([]codec.Annotation, len(s.Anno)) + for i := range s.Anno { + res[i] = &s.Anno[i] + } + return res +} + +func (s *span) BinaryAnnotations() ([]codec.BinaryAnnotation, error) { + res := make([]codec.BinaryAnnotation, len(s.BAnno)) + for i, a := range s.BAnno { + if a.Key() != "" && a.Value() == "" { + return nil, fmt.Errorf("No value for key %s at binaryAnnotations[%d]", a.K, i) + } + if a.Value() != "" && a.Key() == "" { + return nil, fmt.Errorf("No key at binaryAnnotations[%d]", i) + } + res[i] = &s.BAnno[i] + } + return res, nil +} + +func (s *span) Timestamp() time.Time { + if s.Time == nil { + return time.Time{} + } + return codec.MicroToTime(*s.Time) +} + +func (s *span) Duration() time.Duration { + if s.Dur == nil { + return 0 + } + return time.Duration(*s.Dur) * time.Microsecond +} + +type annotation struct { + Endpoint *endpoint `json:"endpoint,omitempty"` + Time int64 `json:"timestamp"` + Val string `json:"value,omitempty"` +} + +func (a *annotation) Timestamp() time.Time { + return codec.MicroToTime(a.Time) +} + +func (a *annotation) Value() string { + return a.Val +} + +func (a *annotation) Host() codec.Endpoint { + return a.Endpoint +} + +type binaryAnnotation struct { + K string `json:"key"` + V json.RawMessage `json:"value"` + Type string `json:"type"` + Endpoint *endpoint `json:"endpoint,omitempty"` +} + +func (b *binaryAnnotation) Key() string { + return b.K +} + +func (b *binaryAnnotation) Value() string { + t, err := zipkincore.AnnotationTypeFromString(b.Type) + // Assume this is a string if we cannot tell the type + if err != nil { + t = zipkincore.AnnotationType_STRING + } + + switch t { + case zipkincore.AnnotationType_BOOL: + var v bool + err := json.Unmarshal(b.V, &v) + if err == nil { + return strconv.FormatBool(v) + } + case zipkincore.AnnotationType_BYTES: + return string(b.V) + case zipkincore.AnnotationType_I16, zipkincore.AnnotationType_I32, zipkincore.AnnotationType_I64: + var v int64 + err := json.Unmarshal(b.V, &v) + if err == nil { + return strconv.FormatInt(v, 10) + } + case zipkincore.AnnotationType_DOUBLE: + var v float64 + err := json.Unmarshal(b.V, &v) + if err == nil { + return strconv.FormatFloat(v, 'f', -1, 64) + } + case zipkincore.AnnotationType_STRING: + var v string + err := json.Unmarshal(b.V, &v) + if err == nil { + return v + } + } + + return "" +} + +func (b *binaryAnnotation) Host() codec.Endpoint { + return b.Endpoint +} + +type endpoint struct { + ServiceName string `json:"serviceName"` + Ipv4 string `json:"ipv4"` + Ipv6 string `json:"ipv6,omitempty"` + Port int `json:"port"` +} + +func (e *endpoint) Host() string { + if e.Port != 0 { + return fmt.Sprintf("%s:%d", e.Ipv4, e.Port) + } + return e.Ipv4 +} + +func (e *endpoint) Name() string { + return e.ServiceName +} + +// TraceIDFromString creates a TraceID from a hexadecimal string +func TraceIDFromString(s string) (string, error) { + var hi, lo uint64 + var err error + if len(s) > 32 { + return "", fmt.Errorf("TraceID cannot be longer than 32 hex characters: %s", s) + } else if len(s) > 16 { + hiLen := len(s) - 16 + if hi, err = strconv.ParseUint(s[0:hiLen], 16, 64); err != nil { + return "", err + } + if lo, err = strconv.ParseUint(s[hiLen:], 16, 64); err != nil { + return "", err + } + } else { + if lo, err = strconv.ParseUint(s, 16, 64); err != nil { + return "", err + } + } + if hi == 0 { + return fmt.Sprintf("%x", lo), nil + } + return fmt.Sprintf("%x%016x", hi, lo), nil +} + +// IDFromString creates a decimal id from a hexadecimal string +func IDFromString(s string) (string, error) { + if len(s) > 16 { + return "", fmt.Errorf("ID cannot be longer than 16 hex characters: %s", s) + } + id, err := strconv.ParseUint(s, 16, 64) + if err != nil { + return "", err + } + return strconv.FormatUint(id, 10), nil +} diff --git a/plugins/inputs/zipkin/codec/jsonV1/jsonV1_test.go b/plugins/inputs/zipkin/codec/jsonV1/jsonV1_test.go new file mode 100644 index 000000000..fa0d7c2ad --- /dev/null +++ b/plugins/inputs/zipkin/codec/jsonV1/jsonV1_test.go @@ -0,0 +1,920 @@ +package jsonV1 + +import ( + "encoding/json" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + + "github.com/influxdata/telegraf/plugins/inputs/zipkin/codec" +) + +func TestJSON_Decode(t *testing.T) { + addr := func(i int64) *int64 { return &i } + tests := []struct { + name string + octets []byte + want []codec.Span + wantErr bool + }{ + { + name: "bad json is error", + octets: []byte(` + [ + { + ]`), + wantErr: true, + }, + { + name: "Decodes simple trace", + octets: []byte(` + [ + { + "traceId": "6b221d5bc9e6496c", + "name": "get-traces", + "id": "6b221d5bc9e6496c" + } + ]`), + want: []codec.Span{ + &span{ + TraceID: "6b221d5bc9e6496c", + SpanName: "get-traces", + ID: "6b221d5bc9e6496c", + }, + }, + }, + { + name: "Decodes two spans", + octets: []byte(` + [ + { + "traceId": "6b221d5bc9e6496c", + "name": "get-traces", + "id": "6b221d5bc9e6496c" + }, + { + "traceId": "6b221d5bc9e6496c", + "name": "get-traces", + "id": "c6946e9cb5d122b6", + "parentId": "6b221d5bc9e6496c", + "duration": 10000 + } + ]`), + want: []codec.Span{ + &span{ + TraceID: "6b221d5bc9e6496c", + SpanName: "get-traces", + ID: "6b221d5bc9e6496c", + }, + &span{ + TraceID: "6b221d5bc9e6496c", + SpanName: "get-traces", + ID: "c6946e9cb5d122b6", + ParentID: "6b221d5bc9e6496c", + Dur: addr(10000), + }, + }, + }, + { + name: "Decodes trace with timestamp", + octets: []byte(` + [ + { + "traceId": "6b221d5bc9e6496c", + "name": "get-traces", + "id": "6b221d5bc9e6496c", + "timestamp": 1503031538791000 + } + ]`), + want: []codec.Span{ + &span{ + TraceID: "6b221d5bc9e6496c", + SpanName: "get-traces", + ID: "6b221d5bc9e6496c", + Time: addr(1503031538791000), + }, + }, + }, + { + name: "Decodes simple trace with high and low trace id", + octets: []byte(` + [ + { + "traceId": "48485a3953bb61246b221d5bc9e6496c", + "name": "get-traces", + "id": "6b221d5bc9e6496c" + } + ]`), + want: []codec.Span{ + &span{ + TraceID: "48485a3953bb61246b221d5bc9e6496c", + SpanName: "get-traces", + ID: "6b221d5bc9e6496c", + }, + }, + }, + { + name: "Error when trace id is null", + octets: []byte(` + [ + { + "traceId": null, + "name": "get-traces", + "id": "6b221d5bc9e6496c" + } + ]`), + wantErr: true, + }, + { + name: "ignore null parentId", + octets: []byte(` + [ + { + "traceId": "48485a3953bb61246b221d5bc9e6496c", + "name": "get-traces", + "id": "6b221d5bc9e6496c", + "parentId": null + } + ]`), + want: []codec.Span{ + &span{ + TraceID: "48485a3953bb61246b221d5bc9e6496c", + SpanName: "get-traces", + ID: "6b221d5bc9e6496c", + }, + }, + }, + { + name: "ignore null timestamp", + octets: []byte(` + [ + { + "traceId": "48485a3953bb61246b221d5bc9e6496c", + "name": "get-traces", + "id": "6b221d5bc9e6496c", + "timestamp": null + } + ]`), + want: []codec.Span{ + &span{ + TraceID: "48485a3953bb61246b221d5bc9e6496c", + SpanName: "get-traces", + ID: "6b221d5bc9e6496c", + }, + }, + }, + { + name: "ignore null duration", + octets: []byte(` + [ + { + "traceId": "48485a3953bb61246b221d5bc9e6496c", + "name": "get-traces", + "id": "6b221d5bc9e6496c", + "duration": null + } + ]`), + want: []codec.Span{ + &span{ + TraceID: "48485a3953bb61246b221d5bc9e6496c", + SpanName: "get-traces", + ID: "6b221d5bc9e6496c", + }, + }, + }, + { + name: "ignore null annotation endpoint", + octets: []byte(` + [ + { + "traceId": "48485a3953bb61246b221d5bc9e6496c", + "name": "get-traces", + "id": "6b221d5bc9e6496c", + "annotations": [ + { + "timestamp": 1461750491274000, + "value": "cs", + "endpoint": null + } + ] + } + ]`), + want: []codec.Span{ + &span{ + TraceID: "48485a3953bb61246b221d5bc9e6496c", + SpanName: "get-traces", + ID: "6b221d5bc9e6496c", + Anno: []annotation{ + { + Time: 1461750491274000, + Val: "cs", + }, + }, + }, + }, + }, + { + name: "ignore null binary annotation endpoint", + octets: []byte(` + [ + { + "traceId": "48485a3953bb61246b221d5bc9e6496c", + "name": "get-traces", + "id": "6b221d5bc9e6496c", + "binaryAnnotations": [ + { + "key": "lc", + "value": "JDBCSpanStore", + "endpoint": null + } + ] + } + ]`), + want: []codec.Span{ + &span{ + TraceID: "48485a3953bb61246b221d5bc9e6496c", + SpanName: "get-traces", + ID: "6b221d5bc9e6496c", + BAnno: []binaryAnnotation{ + { + K: "lc", + V: json.RawMessage(`"JDBCSpanStore"`), + }, + }, + }, + }, + }, + { + name: "Error when binary annotation has no key", + octets: []byte(` + [ + { + "traceId": "48485a3953bb61246b221d5bc9e6496c", + "name": "get-traces", + "id": "6b221d5bc9e6496c", + "binaryAnnotations": [ + { + "value": "JDBCSpanStore", + "endpoint": null + } + ] + } + ]`), + wantErr: true, + }, + { + name: "Error when binary annotation has no value", + octets: []byte(` + [ + { + "traceId": "48485a3953bb61246b221d5bc9e6496c", + "name": "get-traces", + "id": "6b221d5bc9e6496c", + "binaryAnnotations": [ + { + "key": "lc", + "endpoint": null + } + ] + } + ]`), + wantErr: true, + }, + { + name: "binary annotation with endpoint", + octets: []byte(` + [ + { + "traceId": "48485a3953bb61246b221d5bc9e6496c", + "name": "get-traces", + "id": "6b221d5bc9e6496c", + "binaryAnnotations": [ + { + "key": "lc", + "value": "JDBCSpanStore", + "endpoint": { + "serviceName": "service", + "port": 65535 + } + } + ] + } + ]`), + want: []codec.Span{ + &span{ + TraceID: "48485a3953bb61246b221d5bc9e6496c", + SpanName: "get-traces", + ID: "6b221d5bc9e6496c", + BAnno: []binaryAnnotation{ + { + K: "lc", + V: json.RawMessage(`"JDBCSpanStore"`), + Endpoint: &endpoint{ + ServiceName: "service", + Port: 65535, + }, + }, + }, + }, + }, + }, + { + name: "binary annotation with double value", + octets: []byte(` + [ + { + "traceId": "48485a3953bb61246b221d5bc9e6496c", + "name": "get-traces", + "id": "6b221d5bc9e6496c", + "binaryAnnotations": [ + { + "key": "num", + "value": 1.23456789, + "type": "DOUBLE" + } + ] + } + ]`), + want: []codec.Span{ + &span{ + TraceID: "48485a3953bb61246b221d5bc9e6496c", + SpanName: "get-traces", + ID: "6b221d5bc9e6496c", + BAnno: []binaryAnnotation{ + { + K: "num", + V: json.RawMessage{0x31, 0x2e, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39}, + Type: "DOUBLE", + }, + }, + }, + }, + }, + { + name: "binary annotation with integer value", + octets: []byte(` + [ + { + "traceId": "48485a3953bb61246b221d5bc9e6496c", + "name": "get-traces", + "id": "6b221d5bc9e6496c", + "binaryAnnotations": [ + { + "key": "num", + "value": 1, + "type": "I16" + } + ] + } + ]`), + want: []codec.Span{ + &span{ + TraceID: "48485a3953bb61246b221d5bc9e6496c", + SpanName: "get-traces", + ID: "6b221d5bc9e6496c", + BAnno: []binaryAnnotation{ + { + K: "num", + V: json.RawMessage{0x31}, + Type: "I16", + }, + }, + }, + }, + }, + { + name: "binary annotation with bool value", + octets: []byte(` + [ + { + "traceId": "48485a3953bb61246b221d5bc9e6496c", + "name": "get-traces", + "id": "6b221d5bc9e6496c", + "binaryAnnotations": [ + { + "key": "num", + "value": true, + "type": "BOOL" + } + ] + } + ]`), + want: []codec.Span{ + &span{ + TraceID: "48485a3953bb61246b221d5bc9e6496c", + SpanName: "get-traces", + ID: "6b221d5bc9e6496c", + BAnno: []binaryAnnotation{ + { + K: "num", + V: json.RawMessage(`true`), + Type: "BOOL", + }, + }, + }, + }, + }, + { + name: "binary annotation with bytes value", + octets: []byte(` + [ + { + "traceId": "48485a3953bb61246b221d5bc9e6496c", + "name": "get-traces", + "id": "6b221d5bc9e6496c", + "binaryAnnotations": [ + { + "key": "num", + "value": "1", + "type": "BYTES" + } + ] + } + ]`), + want: []codec.Span{ + &span{ + TraceID: "48485a3953bb61246b221d5bc9e6496c", + SpanName: "get-traces", + ID: "6b221d5bc9e6496c", + BAnno: []binaryAnnotation{ + { + K: "num", + V: json.RawMessage(`"1"`), + Type: "BYTES", + }, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + j := &JSON{} + got, err := j.Decode(tt.octets) + if (err != nil) != tt.wantErr { + t.Errorf("JSON.Decode() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !cmp.Equal(tt.want, got) { + t.Errorf("JSON.Decode() = got(-)/want(+) %s", cmp.Diff(tt.want, got)) + } + }) + } +} + +func Test_span_Trace(t *testing.T) { + tests := []struct { + name string + TraceID string + want string + wantErr bool + }{ + { + name: "Trace IDs cannot be null", + TraceID: "", + wantErr: true, + }, + { + name: "converts hex string correctly", + TraceID: "deadbeef", + want: "deadbeef", + }, + { + name: "converts high and low trace id correctly", + TraceID: "48485a3953bb61246b221d5bc9e6496c", + want: "48485a3953bb61246b221d5bc9e6496c", + }, + { + name: "errors when string isn't hex", + TraceID: "oxdeadbeef", + wantErr: true, + }, + { + name: "errors when id is too long", + TraceID: "1234567890abcdef1234567890abcdef1", + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &span{ + TraceID: tt.TraceID, + } + got, err := s.Trace() + if (err != nil) != tt.wantErr { + t.Errorf("span.Trace() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !cmp.Equal(tt.want, got) { + t.Errorf("span.Trace() = got(-)/want(+) %s", cmp.Diff(tt.want, got)) + } + }) + } +} + +func Test_span_SpanID(t *testing.T) { + tests := []struct { + name string + ID string + want string + wantErr bool + }{ + { + name: "Span IDs cannot be null", + ID: "", + wantErr: true, + }, + { + name: "converts known id correctly", + ID: "b26412d1ac16767d", + want: "12854419928166856317", + }, + { + name: "converts hex string correctly", + ID: "deadbeef", + want: "3735928559", + }, + { + name: "errors when string isn't hex", + ID: "oxdeadbeef", + wantErr: true, + }, + { + name: "errors when id is too long", + ID: "1234567890abcdef1", + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &span{ + ID: tt.ID, + } + got, err := s.SpanID() + if (err != nil) != tt.wantErr { + t.Errorf("span.SpanID() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !cmp.Equal(tt.want, got) { + t.Errorf("span.SpanID() = got(-)/want(+) %s", cmp.Diff(tt.want, got)) + } + }) + } +} + +func Test_span_Parent(t *testing.T) { + tests := []struct { + name string + ParentID string + want string + wantErr bool + }{ + { + name: "when there is no parent return empty string", + ParentID: "", + want: "", + }, + { + name: "converts hex string correctly", + ParentID: "deadbeef", + want: "3735928559", + }, + { + name: "errors when string isn't hex", + ParentID: "oxdeadbeef", + wantErr: true, + }, + { + name: "errors when parent id is too long", + ParentID: "1234567890abcdef1", + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &span{ + ParentID: tt.ParentID, + } + got, err := s.Parent() + if (err != nil) != tt.wantErr { + t.Errorf("span.Parent() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !cmp.Equal(tt.want, got) { + t.Errorf("span.Parent() = got(-)/want(+) %s", cmp.Diff(tt.want, got)) + } + }) + } +} + +func Test_span_Timestamp(t *testing.T) { + tests := []struct { + name string + Time *int64 + want time.Time + }{ + { + name: "converts to microseconds", + Time: func(i int64) *int64 { return &i }(3000000), + want: time.Unix(3, 0).UTC(), + }, + { + name: "nil time should be zero time", + Time: nil, + want: time.Time{}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &span{ + Time: tt.Time, + } + if got := s.Timestamp(); !cmp.Equal(tt.want, got) { + t.Errorf("span.Timestamp() = got(-)/want(+) %s", cmp.Diff(tt.want, got)) + } + }) + } +} + +func Test_span_Duration(t *testing.T) { + tests := []struct { + name string + dur *int64 + want time.Duration + }{ + { + name: "converts from 3 microseconds", + dur: func(i int64) *int64 { return &i }(3000000), + want: 3 * time.Second, + }, + { + name: "nil time should be zero duration", + dur: nil, + want: 0, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &span{ + Dur: tt.dur, + } + if got := s.Duration(); got != tt.want { + t.Errorf("span.Duration() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_annotation(t *testing.T) { + type fields struct { + Endpoint *endpoint + Time int64 + Val string + } + tests := []struct { + name string + fields fields + tm time.Time + val string + endpoint *endpoint + }{ + { + name: "returns all fields", + fields: fields{ + Time: 3000000, + Val: "myvalue", + Endpoint: &endpoint{ + ServiceName: "myservice", + Ipv4: "127.0.0.1", + Port: 443, + }, + }, + tm: time.Unix(3, 0).UTC(), + val: "myvalue", + endpoint: &endpoint{ + ServiceName: "myservice", + Ipv4: "127.0.0.1", + Port: 443, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + an := annotation(tt.fields) + a := &an + if got := a.Timestamp(); got != tt.tm { + t.Errorf("annotation.Timestamp() = %v, want %v", got, tt.tm) + } + if got := a.Value(); got != tt.val { + t.Errorf("annotation.Value() = %v, want %v", got, tt.val) + } + if got := a.Host(); !cmp.Equal(tt.endpoint, got) { + t.Errorf("annotation.Endpoint() = %v, want %v", got, tt.endpoint) + } + }) + } +} + +func Test_binaryAnnotation(t *testing.T) { + type fields struct { + K string + V json.RawMessage + Type string + Endpoint *endpoint + } + tests := []struct { + name string + fields fields + key string + value string + endpoint *endpoint + }{ + { + name: "returns all fields", + fields: fields{ + K: "key", + V: json.RawMessage(`"value"`), + Endpoint: &endpoint{ + ServiceName: "myservice", + Ipv4: "127.0.0.1", + Port: 443, + }, + }, + key: "key", + value: "value", + endpoint: &endpoint{ + ServiceName: "myservice", + Ipv4: "127.0.0.1", + Port: 443, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + bin := binaryAnnotation(tt.fields) + b := &bin + if got := b.Key(); got != tt.key { + t.Errorf("binaryAnnotation.Key() = %v, want %v", got, tt.key) + } + if got := b.Value(); got != tt.value { + t.Errorf("binaryAnnotation.Value() = %v, want %v", got, tt.value) + } + if got := b.Host(); !cmp.Equal(tt.endpoint, got) { + t.Errorf("binaryAnnotation.Endpoint() = %v, want %v", got, tt.endpoint) + } + }) + } +} + +func Test_endpoint_Host(t *testing.T) { + type fields struct { + Ipv4 string + Port int + } + tests := []struct { + name string + fields fields + want string + }{ + { + name: "with port", + fields: fields{ + Ipv4: "127.0.0.1", + Port: 443, + }, + want: "127.0.0.1:443", + }, + { + name: "no port", + fields: fields{ + Ipv4: "127.0.0.1", + }, + want: "127.0.0.1", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + e := &endpoint{ + Ipv4: tt.fields.Ipv4, + Port: tt.fields.Port, + } + if got := e.Host(); got != tt.want { + t.Errorf("endpoint.Host() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_endpoint_Name(t *testing.T) { + tests := []struct { + name string + ServiceName string + want string + }{ + { + name: "has service name", + ServiceName: "myservicename", + want: "myservicename", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + e := &endpoint{ + ServiceName: tt.ServiceName, + } + if got := e.Name(); got != tt.want { + t.Errorf("endpoint.Name() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestTraceIDFromString(t *testing.T) { + tests := []struct { + name string + s string + want string + wantErr bool + }{ + { + name: "Convert hex string id", + s: "6b221d5bc9e6496c", + want: "6b221d5bc9e6496c", + }, + { + name: "error : id too long", + s: "1234567890abcdef1234567890abcdef1", + wantErr: true, + }, + { + name: "error : not parsable", + s: "howdyhowdyhowdy", + wantErr: true, + }, + { + name: "Convert hex string with high/low", + s: "48485a3953bb61246b221d5bc9e6496c", + want: "48485a3953bb61246b221d5bc9e6496c", + }, + { + name: "errors in high", + s: "ERR85a3953bb61246b221d5bc9e6496c", + wantErr: true, + }, + { + name: "errors in low", + s: "48485a3953bb61246b221d5bc9e64ERR", + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := TraceIDFromString(tt.s) + if (err != nil) != tt.wantErr { + t.Errorf("TraceIDFromString() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("TraceIDFromString() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestIDFromString(t *testing.T) { + tests := []struct { + name string + s string + want string + wantErr bool + }{ + { + name: "Convert hex string id", + s: "6b221d5bc9e6496c", + want: "7719764991332993388", + }, + { + name: "error : id too long", + s: "1234567890abcdef1", + wantErr: true, + }, + { + name: "error : not parsable", + s: "howdyhowdyhowdy", + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := IDFromString(tt.s) + if (err != nil) != tt.wantErr { + t.Errorf("IDFromString() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("IDFromString() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/plugins/inputs/zipkin/codec/thrift/thrift.go b/plugins/inputs/zipkin/codec/thrift/thrift.go new file mode 100644 index 000000000..c2170e87a --- /dev/null +++ b/plugins/inputs/zipkin/codec/thrift/thrift.go @@ -0,0 +1,203 @@ +package thrift + +import ( + "encoding/binary" + "fmt" + "net" + "strconv" + "time" + + "github.com/influxdata/telegraf/plugins/inputs/zipkin/codec" + + "github.com/apache/thrift/lib/go/thrift" + "github.com/openzipkin/zipkin-go-opentracing/_thrift/gen-go/zipkincore" +) + +// UnmarshalThrift converts raw bytes in thrift format to a slice of spans +func UnmarshalThrift(body []byte) ([]*zipkincore.Span, error) { + buffer := thrift.NewTMemoryBuffer() + if _, err := buffer.Write(body); err != nil { + return nil, err + } + + transport := thrift.NewTBinaryProtocolTransport(buffer) + _, size, err := transport.ReadListBegin() + if err != nil { + return nil, err + } + + spans := make([]*zipkincore.Span, size) + for i := 0; i < size; i++ { + zs := &zipkincore.Span{} + if err = zs.Read(transport); err != nil { + return nil, err + } + spans[i] = zs + } + + if err = transport.ReadListEnd(); err != nil { + return nil, err + } + return spans, nil +} + +// Thrift decodes binary data to create a Trace +type Thrift struct{} + +// Decode unmarshals and validates bytes in thrift format +func (t *Thrift) Decode(octets []byte) ([]codec.Span, error) { + spans, err := UnmarshalThrift(octets) + if err != nil { + return nil, err + } + + res := make([]codec.Span, len(spans)) + for i, s := range spans { + res[i] = &span{s} + } + return res, nil +} + +var _ codec.Endpoint = &endpoint{} + +type endpoint struct { + *zipkincore.Endpoint +} + +func (e *endpoint) Host() string { + ipv4 := func(addr int32) string { + buf := make([]byte, 4) + binary.BigEndian.PutUint32(buf, uint32(addr)) + return net.IP(buf).String() + } + + if e.Endpoint == nil { + return ipv4(int32(0)) + } + if e.Endpoint.GetPort() == 0 { + return ipv4(e.Endpoint.GetIpv4()) + } + // Zipkin uses a signed int16 for the port, but, warns us that they actually treat it + // as an unsigned int16. So, we convert from int16 to int32 followed by taking & 0xffff + // to convert from signed to unsigned + // https://github.com/openzipkin/zipkin/blob/57dc2ec9c65fe6144e401c0c933b4400463a69df/zipkin/src/main/java/zipkin/Endpoint.java#L44 + return ipv4(e.Endpoint.GetIpv4()) + ":" + strconv.FormatInt(int64(int(e.Endpoint.GetPort())&0xffff), 10) +} + +func (e *endpoint) Name() string { + if e.Endpoint == nil { + return codec.DefaultServiceName + } + return e.Endpoint.GetServiceName() +} + +var _ codec.BinaryAnnotation = &binaryAnnotation{} + +type binaryAnnotation struct { + *zipkincore.BinaryAnnotation +} + +func (b *binaryAnnotation) Key() string { + return b.BinaryAnnotation.GetKey() +} + +func (b *binaryAnnotation) Value() string { + return string(b.BinaryAnnotation.GetValue()) +} + +func (b *binaryAnnotation) Host() codec.Endpoint { + if b.BinaryAnnotation.Host == nil { + return nil + } + return &endpoint{b.BinaryAnnotation.Host} +} + +var _ codec.Annotation = &annotation{} + +type annotation struct { + *zipkincore.Annotation +} + +func (a *annotation) Timestamp() time.Time { + ts := a.Annotation.GetTimestamp() + if ts == 0 { + return time.Time{} + } + return codec.MicroToTime(ts) +} + +func (a *annotation) Value() string { + return a.Annotation.GetValue() +} + +func (a *annotation) Host() codec.Endpoint { + if a.Annotation.Host == nil { + return nil + } + return &endpoint{a.Annotation.Host} +} + +var _ codec.Span = &span{} + +type span struct { + *zipkincore.Span +} + +func (s *span) Trace() (string, error) { + if s.Span.GetTraceIDHigh() == 0 && s.Span.GetTraceID() == 0 { + return "", fmt.Errorf("Span does not have a trace ID") + } + + if s.Span.GetTraceIDHigh() == 0 { + return fmt.Sprintf("%x", s.Span.GetTraceID()), nil + } + return fmt.Sprintf("%x%016x", s.Span.GetTraceIDHigh(), s.Span.GetTraceID()), nil +} + +func (s *span) SpanID() (string, error) { + return formatID(s.Span.GetID()), nil +} + +func (s *span) Parent() (string, error) { + id := s.Span.GetParentID() + if id != 0 { + return formatID(id), nil + } + return "", nil +} + +func (s *span) Name() string { + return s.Span.GetName() +} + +func (s *span) Annotations() []codec.Annotation { + res := make([]codec.Annotation, len(s.Span.Annotations)) + for i := range s.Span.Annotations { + res[i] = &annotation{s.Span.Annotations[i]} + } + return res +} + +func (s *span) BinaryAnnotations() ([]codec.BinaryAnnotation, error) { + res := make([]codec.BinaryAnnotation, len(s.Span.BinaryAnnotations)) + for i := range s.Span.BinaryAnnotations { + res[i] = &binaryAnnotation{s.Span.BinaryAnnotations[i]} + } + return res, nil +} + +func (s *span) Timestamp() time.Time { + ts := s.Span.GetTimestamp() + if ts == 0 { + return time.Time{} + } + return codec.MicroToTime(ts) +} + +func (s *span) Duration() time.Duration { + return time.Duration(s.Span.GetDuration()) * time.Microsecond +} + +func formatID(id int64) string { + return strconv.FormatInt(id, 10) +} diff --git a/plugins/inputs/zipkin/codec/thrift/thrift_test.go b/plugins/inputs/zipkin/codec/thrift/thrift_test.go new file mode 100644 index 000000000..4b239fdb3 --- /dev/null +++ b/plugins/inputs/zipkin/codec/thrift/thrift_test.go @@ -0,0 +1,211 @@ +package thrift + +import ( + "io/ioutil" + "testing" + + "github.com/google/go-cmp/cmp" + + "github.com/openzipkin/zipkin-go-opentracing/_thrift/gen-go/zipkincore" +) + +func Test_endpointHost(t *testing.T) { + type args struct { + h *zipkincore.Endpoint + } + tests := []struct { + name string + args args + want string + }{ + { + name: "Host Found", + args: args{ + h: &zipkincore.Endpoint{ + Ipv4: 1234, + Port: 8888, + }, + }, + want: "0.0.4.210:8888", + }, + { + name: "No Host", + args: args{ + h: nil, + }, + want: "0.0.0.0", + }, + { + name: "int overflow zipkin uses an int16 type as an unsigned int 16.", + args: args{ + h: &zipkincore.Endpoint{ + Ipv4: 1234, + Port: -1, + }, + }, + want: "0.0.4.210:65535", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + e := endpoint{tt.args.h} + if got := e.Host(); got != tt.want { + t.Errorf("host() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_endpointName(t *testing.T) { + type args struct { + h *zipkincore.Endpoint + } + tests := []struct { + name string + args args + want string + }{ + { + name: "Found ServiceName", + args: args{ + h: &zipkincore.Endpoint{ + ServiceName: "zipkin", + }, + }, + want: "zipkin", + }, + { + name: "No ServiceName", + args: args{ + h: nil, + }, + want: "unknown", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + e := endpoint{tt.args.h} + if got := e.Name(); got != tt.want { + t.Errorf("serviceName() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestUnmarshalThrift(t *testing.T) { + addr := func(i int64) *int64 { return &i } + tests := []struct { + name string + filename string + want []*zipkincore.Span + wantErr bool + }{ + { + name: "threespans", + filename: "../../testdata/threespans.dat", + want: []*zipkincore.Span{ + { + TraceID: 2505404965370368069, + Name: "Child", + ID: 8090652509916334619, + ParentID: addr(22964302721410078), + Timestamp: addr(1498688360851331), + Duration: addr(53106), + Annotations: []*zipkincore.Annotation{}, + BinaryAnnotations: []*zipkincore.BinaryAnnotation{ + &zipkincore.BinaryAnnotation{ + Key: "lc", + AnnotationType: zipkincore.AnnotationType_STRING, + Value: []byte("trivial"), + Host: &zipkincore.Endpoint{ + Ipv4: 2130706433, + ServiceName: "trivial", + }, + }, + }, + }, + { + TraceID: 2505404965370368069, + Name: "Child", + ID: 103618986556047333, + ParentID: addr(22964302721410078), + Timestamp: addr(1498688360904552), + Duration: addr(50410), + Annotations: []*zipkincore.Annotation{}, + BinaryAnnotations: []*zipkincore.BinaryAnnotation{ + &zipkincore.BinaryAnnotation{ + Key: "lc", + AnnotationType: zipkincore.AnnotationType_STRING, + Value: []byte("trivial"), + Host: &zipkincore.Endpoint{ + Ipv4: 2130706433, + ServiceName: "trivial", + }, + }, + }, + }, + { + TraceID: 2505404965370368069, + Name: "Parent", + ID: 22964302721410078, + Timestamp: addr(1498688360851318), + Duration: addr(103680), + Annotations: []*zipkincore.Annotation{ + &zipkincore.Annotation{ + Timestamp: 1498688360851325, + Value: "Starting child #0", + Host: &zipkincore.Endpoint{ + Ipv4: 2130706433, + ServiceName: "trivial", + }, + }, + &zipkincore.Annotation{ + Timestamp: 1498688360904545, + Value: "Starting child #1", + Host: &zipkincore.Endpoint{ + Ipv4: 2130706433, + ServiceName: "trivial", + }, + }, + &zipkincore.Annotation{ + Timestamp: 1498688360954992, + Value: "A Log", + Host: &zipkincore.Endpoint{ + Ipv4: 2130706433, + ServiceName: "trivial", + }, + }, + }, + BinaryAnnotations: []*zipkincore.BinaryAnnotation{ + &zipkincore.BinaryAnnotation{ + Key: "lc", + AnnotationType: zipkincore.AnnotationType_STRING, + Value: []byte("trivial"), + Host: &zipkincore.Endpoint{ + Ipv4: 2130706433, + ServiceName: "trivial", + }, + }, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + dat, err := ioutil.ReadFile(tt.filename) + if err != nil { + t.Fatalf("Could not find file %s\n", tt.filename) + } + + got, err := UnmarshalThrift(dat) + if (err != nil) != tt.wantErr { + t.Errorf("UnmarshalThrift() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !cmp.Equal(tt.want, got) { + t.Errorf("UnmarshalThrift() got(-)/want(+): %s", cmp.Diff(tt.want, got)) + } + }) + } +} diff --git a/plugins/inputs/zipkin/convert.go b/plugins/inputs/zipkin/convert.go index dad099771..940c16a38 100644 --- a/plugins/inputs/zipkin/convert.go +++ b/plugins/inputs/zipkin/convert.go @@ -1,22 +1,10 @@ package zipkin import ( - "encoding/binary" - "fmt" - "net" - "strconv" - "time" - "github.com/influxdata/telegraf" - "github.com/openzipkin/zipkin-go-opentracing/_thrift/gen-go/zipkincore" + "github.com/influxdata/telegraf/plugins/inputs/zipkin/trace" ) -// DefaultServiceName when the span does not have any serviceName -const DefaultServiceName = "unknown" - -//now is a moackable time for now -var now = time.Now - // LineProtocolConverter implements the Recorder interface; it is a // type meant to encapsulate the storage of zipkin tracing data in // telegraf as line protocol. @@ -35,7 +23,7 @@ func NewLineProtocolConverter(acc telegraf.Accumulator) *LineProtocolConverter { // Record is LineProtocolConverter's implementation of the Record method of // the Recorder interface; it takes a trace as input, and adds it to an internal // telegraf.Accumulator. -func (l *LineProtocolConverter) Record(t Trace) error { +func (l *LineProtocolConverter) Record(t trace.Trace) error { for _, s := range t { fields := map[string]interface{}{ "duration_ns": s.Duration.Nanoseconds(), @@ -83,167 +71,3 @@ func (l *LineProtocolConverter) Record(t Trace) error { func (l *LineProtocolConverter) Error(err error) { l.acc.AddError(err) } - -// NewTrace converts a slice of []*zipkincore.Spans into a new Trace -func NewTrace(spans []*zipkincore.Span) Trace { - trace := make(Trace, len(spans)) - for i, span := range spans { - endpoint := serviceEndpoint(span.GetAnnotations(), span.GetBinaryAnnotations()) - trace[i] = Span{ - ID: formatID(span.GetID()), - TraceID: formatTraceID(span.GetTraceIDHigh(), span.GetTraceID()), - Name: span.GetName(), - Timestamp: guessTimestamp(span), - Duration: convertDuration(span), - ParentID: parentID(span), - ServiceName: serviceName(endpoint), - Annotations: NewAnnotations(span.GetAnnotations(), endpoint), - BinaryAnnotations: NewBinaryAnnotations(span.GetBinaryAnnotations(), endpoint), - } - } - return trace -} - -// NewAnnotations converts a slice of *zipkincore.Annotation into a slice -// of new Annotations -func NewAnnotations(annotations []*zipkincore.Annotation, endpoint *zipkincore.Endpoint) []Annotation { - formatted := make([]Annotation, len(annotations)) - for i, annotation := range annotations { - formatted[i] = Annotation{ - Host: host(endpoint), - ServiceName: serviceName(endpoint), - Timestamp: microToTime(annotation.GetTimestamp()), - Value: annotation.GetValue(), - } - } - - return formatted -} - -// NewBinaryAnnotations is very similar to NewAnnotations, but it -// converts zipkincore.BinaryAnnotations instead of the normal zipkincore.Annotation -func NewBinaryAnnotations(annotations []*zipkincore.BinaryAnnotation, endpoint *zipkincore.Endpoint) []BinaryAnnotation { - formatted := make([]BinaryAnnotation, len(annotations)) - for i, annotation := range annotations { - formatted[i] = BinaryAnnotation{ - Host: host(endpoint), - ServiceName: serviceName(endpoint), - Key: annotation.GetKey(), - Value: string(annotation.GetValue()), - Type: annotation.GetAnnotationType().String(), - } - } - return formatted -} - -func microToTime(micro int64) time.Time { - return time.Unix(0, micro*int64(time.Microsecond)).UTC() -} - -func formatID(id int64) string { - return strconv.FormatInt(id, 10) -} - -func formatTraceID(high, low int64) string { - if high == 0 { - return fmt.Sprintf("%x", low) - } - return fmt.Sprintf("%x%016x", high, low) -} - -func minMax(span *zipkincore.Span) (time.Time, time.Time) { - min := now().UTC() - max := time.Time{}.UTC() - for _, annotation := range span.Annotations { - ts := microToTime(annotation.GetTimestamp()) - if !ts.IsZero() && ts.Before(min) { - min = ts - } - if !ts.IsZero() && ts.After(max) { - max = ts - } - } - if max.IsZero() { - max = min - } - return min, max -} - -func guessTimestamp(span *zipkincore.Span) time.Time { - if span.GetTimestamp() != 0 { - return microToTime(span.GetTimestamp()) - } - min, _ := minMax(span) - return min -} - -func convertDuration(span *zipkincore.Span) time.Duration { - duration := time.Duration(span.GetDuration()) * time.Microsecond - if duration != 0 { - return duration - } - min, max := minMax(span) - return max.Sub(min) -} - -func parentID(span *zipkincore.Span) string { - // A parent ID of 0 means that this is a parent span. In this case, - // we set the parent ID of the span to be its own id, so it points to - // itself. - id := span.GetParentID() - if id != 0 { - return formatID(id) - } - return formatID(span.ID) -} - -func ipv4(addr int32) string { - buf := make([]byte, 4) - binary.BigEndian.PutUint32(buf, uint32(addr)) - return net.IP(buf).String() -} - -func host(h *zipkincore.Endpoint) string { - if h == nil { - return ipv4(int32(0)) - } - if h.GetPort() == 0 { - return ipv4(h.GetIpv4()) - } - // Zipkin uses a signed int16 for the port, but, warns us that they actually treat it - // as an unsigned int16. So, we convert from int16 to int32 followed by taking & 0xffff - // to convert from signed to unsigned - // https://github.com/openzipkin/zipkin/blob/57dc2ec9c65fe6144e401c0c933b4400463a69df/zipkin/src/main/java/zipkin/Endpoint.java#L44 - return ipv4(h.GetIpv4()) + ":" + strconv.FormatInt(int64(int(h.GetPort())&0xffff), 10) -} - -func serviceName(h *zipkincore.Endpoint) string { - if h == nil { - return DefaultServiceName - } - return h.GetServiceName() -} - -func serviceEndpoint(ann []*zipkincore.Annotation, bann []*zipkincore.BinaryAnnotation) *zipkincore.Endpoint { - for _, a := range ann { - switch a.Value { - case zipkincore.SERVER_RECV, zipkincore.SERVER_SEND, zipkincore.CLIENT_RECV, zipkincore.CLIENT_SEND: - if a.Host != nil && a.Host.ServiceName != "" { - return a.Host - } - } - } - - for _, a := range bann { - if a.Key == zipkincore.LOCAL_COMPONENT && a.Host != nil && a.Host.ServiceName != "" { - return a.Host - } - } - // Unable to find any "standard" endpoint host, so, use any that exist in the regular annotations - for _, a := range ann { - if a.Host != nil && a.Host.ServiceName != "" { - return a.Host - } - } - return nil -} diff --git a/plugins/inputs/zipkin/convert_test.go b/plugins/inputs/zipkin/convert_test.go index b5c543073..959012589 100644 --- a/plugins/inputs/zipkin/convert_test.go +++ b/plugins/inputs/zipkin/convert_test.go @@ -1,14 +1,13 @@ package zipkin import ( - "reflect" "testing" "time" "github.com/google/go-cmp/cmp" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs/zipkin/trace" "github.com/influxdata/telegraf/testutil" - "github.com/openzipkin/zipkin-go-opentracing/_thrift/gen-go/zipkincore" ) func TestLineProtocolConverter_Record(t *testing.T) { @@ -17,7 +16,7 @@ func TestLineProtocolConverter_Record(t *testing.T) { acc telegraf.Accumulator } type args struct { - t Trace + t trace.Trace } tests := []struct { name string @@ -32,8 +31,8 @@ func TestLineProtocolConverter_Record(t *testing.T) { acc: &mockAcc, }, args: args{ - t: Trace{ - Span{ + t: trace.Trace{ + { ID: "8090652509916334619", TraceID: "2505404965370368069", Name: "Child", @@ -41,18 +40,17 @@ func TestLineProtocolConverter_Record(t *testing.T) { Timestamp: time.Unix(0, 1498688360851331000).UTC(), Duration: time.Duration(53106) * time.Microsecond, ServiceName: "trivial", - Annotations: []Annotation{}, - BinaryAnnotations: []BinaryAnnotation{ - BinaryAnnotation{ + Annotations: []trace.Annotation{}, + BinaryAnnotations: []trace.BinaryAnnotation{ + { Key: "lc", Value: "dHJpdmlhbA==", Host: "2130706433:0", ServiceName: "trivial", - Type: "STRING", }, }, }, - Span{ + { ID: "103618986556047333", TraceID: "2505404965370368069", Name: "Child", @@ -60,18 +58,17 @@ func TestLineProtocolConverter_Record(t *testing.T) { Timestamp: time.Unix(0, 1498688360904552000).UTC(), Duration: time.Duration(50410) * time.Microsecond, ServiceName: "trivial", - Annotations: []Annotation{}, - BinaryAnnotations: []BinaryAnnotation{ - BinaryAnnotation{ + Annotations: []trace.Annotation{}, + BinaryAnnotations: []trace.BinaryAnnotation{ + { Key: "lc", Value: "dHJpdmlhbA==", Host: "2130706433:0", ServiceName: "trivial", - Type: "STRING", }, }, }, - Span{ + { ID: "22964302721410078", TraceID: "2505404965370368069", Name: "Parent", @@ -79,33 +76,32 @@ func TestLineProtocolConverter_Record(t *testing.T) { Timestamp: time.Unix(0, 1498688360851318000).UTC(), Duration: time.Duration(103680) * time.Microsecond, ServiceName: "trivial", - Annotations: []Annotation{ - Annotation{ + Annotations: []trace.Annotation{ + { Timestamp: time.Unix(0, 1498688360851325000).UTC(), Value: "Starting child #0", Host: "2130706433:0", ServiceName: "trivial", }, - Annotation{ + { Timestamp: time.Unix(0, 1498688360904545000).UTC(), Value: "Starting child #1", Host: "2130706433:0", ServiceName: "trivial", }, - Annotation{ + { Timestamp: time.Unix(0, 1498688360954992000).UTC(), Value: "A Log", Host: "2130706433:0", ServiceName: "trivial", }, }, - BinaryAnnotations: []BinaryAnnotation{ - BinaryAnnotation{ + BinaryAnnotations: []trace.BinaryAnnotation{ + { Key: "lc", Value: "dHJpdmlhbA==", Host: "2130706433:0", ServiceName: "trivial", - Type: "STRING", }, }, }, @@ -265,8 +261,8 @@ func TestLineProtocolConverter_Record(t *testing.T) { acc: &mockAcc, }, args: args{ - t: Trace{ - Span{ + t: trace.Trace{ + { ID: "6802735349851856000", TraceID: "0:6802735349851856000", Name: "main.dud", @@ -274,15 +270,15 @@ func TestLineProtocolConverter_Record(t *testing.T) { Timestamp: time.Unix(1, 0).UTC(), Duration: 1, ServiceName: "trivial", - Annotations: []Annotation{ - Annotation{ + Annotations: []trace.Annotation{ + { Timestamp: time.Unix(0, 1433330263415871000).UTC(), Value: "cs", Host: "0:9410", ServiceName: "go-zipkin-testclient", }, }, - BinaryAnnotations: []BinaryAnnotation{}, + BinaryAnnotations: []trace.BinaryAnnotation{}, }, }, }, @@ -339,206 +335,3 @@ func TestLineProtocolConverter_Record(t *testing.T) { }) } } - -func Test_microToTime(t *testing.T) { - type args struct { - micro int64 - } - tests := []struct { - name string - args args - want time.Time - }{ - { - name: "given zero micro seconds expected unix time zero", - args: args{ - micro: 0, - }, - want: time.Unix(0, 0).UTC(), - }, - { - name: "given a million micro seconds expected unix time one", - args: args{ - micro: 1000000, - }, - want: time.Unix(1, 0).UTC(), - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := microToTime(tt.args.micro); !reflect.DeepEqual(got, tt.want) { - t.Errorf("microToTime() = %v, want %v", got, tt.want) - } - }) - } -} - -func newAnnotation(micro int64) *zipkincore.Annotation { - return &zipkincore.Annotation{ - Timestamp: micro, - } -} - -func Test_minMax(t *testing.T) { - type args struct { - span *zipkincore.Span - } - tests := []struct { - name string - args args - now func() time.Time - wantMin time.Time - wantMax time.Time - }{ - { - name: "Single annotation", - args: args{ - span: &zipkincore.Span{ - Annotations: []*zipkincore.Annotation{ - newAnnotation(1000000), - }, - }, - }, - wantMin: time.Unix(1, 0).UTC(), - wantMax: time.Unix(1, 0).UTC(), - }, - { - name: "Three annotations", - args: args{ - span: &zipkincore.Span{ - Annotations: []*zipkincore.Annotation{ - newAnnotation(1000000), - newAnnotation(2000000), - newAnnotation(3000000), - }, - }, - }, - wantMin: time.Unix(1, 0).UTC(), - wantMax: time.Unix(3, 0).UTC(), - }, - { - name: "Annotations are in the future", - args: args{ - span: &zipkincore.Span{ - Annotations: []*zipkincore.Annotation{ - newAnnotation(3000000), - }, - }, - }, - wantMin: time.Unix(2, 0).UTC(), - wantMax: time.Unix(3, 0).UTC(), - now: func() time.Time { - return time.Unix(2, 0).UTC() - }, - }, - { - name: "No Annotations", - args: args{ - span: &zipkincore.Span{ - Annotations: []*zipkincore.Annotation{}, - }, - }, - wantMin: time.Unix(2, 0).UTC(), - wantMax: time.Unix(2, 0).UTC(), - now: func() time.Time { - return time.Unix(2, 0).UTC() - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if tt.now != nil { - now = tt.now - } - got, got1 := minMax(tt.args.span) - if !reflect.DeepEqual(got, tt.wantMin) { - t.Errorf("minMax() got = %v, want %v", got, tt.wantMin) - } - if !reflect.DeepEqual(got1, tt.wantMax) { - t.Errorf("minMax() got1 = %v, want %v", got1, tt.wantMax) - } - now = time.Now - }) - } -} - -func Test_host(t *testing.T) { - type args struct { - h *zipkincore.Endpoint - } - tests := []struct { - name string - args args - want string - }{ - { - name: "Host Found", - args: args{ - h: &zipkincore.Endpoint{ - Ipv4: 1234, - Port: 8888, - }, - }, - want: "0.0.4.210:8888", - }, - { - name: "No Host", - args: args{ - h: nil, - }, - want: "0.0.0.0", - }, - { - name: "int overflow zipkin uses an int16 type as an unsigned int 16.", - args: args{ - h: &zipkincore.Endpoint{ - Ipv4: 1234, - Port: -1, - }, - }, - want: "0.0.4.210:65535", - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := host(tt.args.h); got != tt.want { - t.Errorf("host() = %v, want %v", got, tt.want) - } - }) - } -} - -func Test_serviceName(t *testing.T) { - type args struct { - h *zipkincore.Endpoint - } - tests := []struct { - name string - args args - want string - }{ - { - name: "Found ServiceName", - args: args{ - h: &zipkincore.Endpoint{ - ServiceName: "zipkin", - }, - }, - want: "zipkin", - }, - { - name: "No ServiceName", - args: args{ - h: nil, - }, - want: "unknown", - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := serviceName(tt.args.h); got != tt.want { - t.Errorf("serviceName() = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/plugins/inputs/zipkin/handler.go b/plugins/inputs/zipkin/handler.go index d9bd07828..24e7ac12f 100644 --- a/plugins/inputs/zipkin/handler.go +++ b/plugins/inputs/zipkin/handler.go @@ -2,22 +2,23 @@ package zipkin import ( "compress/gzip" + "fmt" "io/ioutil" + "mime" "net/http" "strings" - "sync" - "github.com/apache/thrift/lib/go/thrift" "github.com/gorilla/mux" - "github.com/openzipkin/zipkin-go-opentracing/_thrift/gen-go/zipkincore" + "github.com/influxdata/telegraf/plugins/inputs/zipkin/codec" + "github.com/influxdata/telegraf/plugins/inputs/zipkin/codec/jsonV1" + "github.com/influxdata/telegraf/plugins/inputs/zipkin/codec/thrift" ) // SpanHandler is an implementation of a Handler which accepts zipkin thrift // span data and sends it to the recorder type SpanHandler struct { - Path string - recorder Recorder - waitGroup *sync.WaitGroup + Path string + recorder Recorder } // NewSpanHandler returns a new server instance given path to handle @@ -81,6 +82,12 @@ func (s *SpanHandler) Spans(w http.ResponseWriter, r *http.Request) { defer body.Close() } + decoder, err := ContentDecoder(r) + if err != nil { + s.recorder.Error(err) + w.WriteHeader(http.StatusUnsupportedMediaType) + } + octets, err := ioutil.ReadAll(body) if err != nil { s.recorder.Error(err) @@ -88,14 +95,19 @@ func (s *SpanHandler) Spans(w http.ResponseWriter, r *http.Request) { return } - spans, err := unmarshalThrift(octets) + spans, err := decoder.Decode(octets) if err != nil { s.recorder.Error(err) - w.WriteHeader(http.StatusInternalServerError) + w.WriteHeader(http.StatusBadRequest) return } - trace := NewTrace(spans) + trace, err := codec.NewTrace(spans) + if err != nil { + s.recorder.Error(err) + w.WriteHeader(http.StatusBadRequest) + return + } if err = s.recorder.Record(trace); err != nil { s.recorder.Error(err) @@ -106,30 +118,25 @@ func (s *SpanHandler) Spans(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) } -func unmarshalThrift(body []byte) ([]*zipkincore.Span, error) { - buffer := thrift.NewTMemoryBuffer() - if _, err := buffer.Write(body); err != nil { - return nil, err +// ContentDecoer returns a Decoder that is able to produce Traces from bytes. +// Failure should yield an HTTP 415 (`http.StatusUnsupportedMediaType`) +// If a Content-Type is not set, zipkin assumes application/json +func ContentDecoder(r *http.Request) (codec.Decoder, error) { + contentType := r.Header.Get("Content-Type") + if contentType == "" { + return &jsonV1.JSON{}, nil } - transport := thrift.NewTBinaryProtocolTransport(buffer) - _, size, err := transport.ReadListBegin() - if err != nil { - return nil, err - } - - spans := make([]*zipkincore.Span, size) - for i := 0; i < size; i++ { - zs := &zipkincore.Span{} - if err = zs.Read(transport); err != nil { - return nil, err + for _, v := range strings.Split(contentType, ",") { + t, _, err := mime.ParseMediaType(v) + if err != nil { + break + } + if t == "application/json" { + return &jsonV1.JSON{}, nil + } else if t == "application/x-thrift" { + return &thrift.Thrift{}, nil } - spans[i] = zs } - - if err = transport.ReadListEnd(); err != nil { - return nil, err - } - - return spans, nil + return nil, fmt.Errorf("Unknown Content-Type: %s", contentType) } diff --git a/plugins/inputs/zipkin/handler_test.go b/plugins/inputs/zipkin/handler_test.go index 0946d982f..606a8da97 100644 --- a/plugins/inputs/zipkin/handler_test.go +++ b/plugins/inputs/zipkin/handler_test.go @@ -10,14 +10,15 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/influxdata/telegraf/plugins/inputs/zipkin/trace" ) type MockRecorder struct { - Data Trace + Data trace.Trace Err error } -func (m *MockRecorder) Record(t Trace) error { +func (m *MockRecorder) Record(t trace.Trace) error { m.Data = t return nil } @@ -39,6 +40,7 @@ func TestSpanHandler(t *testing.T) { ioutil.NopCloser( bytes.NewReader(dat))) + r.Header.Set("Content-Type", "application/x-thrift") handler := NewSpanHandler("/api/v1/spans") mockRecorder := &MockRecorder{} handler.recorder = mockRecorder @@ -51,8 +53,8 @@ func TestSpanHandler(t *testing.T) { got := mockRecorder.Data parentID := strconv.FormatInt(22964302721410078, 10) - want := Trace{ - Span{ + want := trace.Trace{ + { Name: "Child", ID: "8090652509916334619", TraceID: "22c4fc8ab3669045", @@ -60,18 +62,17 @@ func TestSpanHandler(t *testing.T) { Timestamp: time.Unix(0, 1498688360851331*int64(time.Microsecond)).UTC(), Duration: time.Duration(53106) * time.Microsecond, ServiceName: "trivial", - Annotations: []Annotation{}, - BinaryAnnotations: []BinaryAnnotation{ - BinaryAnnotation{ + Annotations: []trace.Annotation{}, + BinaryAnnotations: []trace.BinaryAnnotation{ + { Key: "lc", Value: "trivial", Host: "127.0.0.1", ServiceName: "trivial", - Type: "STRING", }, }, }, - Span{ + { Name: "Child", ID: "103618986556047333", TraceID: "22c4fc8ab3669045", @@ -79,18 +80,17 @@ func TestSpanHandler(t *testing.T) { Timestamp: time.Unix(0, 1498688360904552*int64(time.Microsecond)).UTC(), Duration: time.Duration(50410) * time.Microsecond, ServiceName: "trivial", - Annotations: []Annotation{}, - BinaryAnnotations: []BinaryAnnotation{ - BinaryAnnotation{ + Annotations: []trace.Annotation{}, + BinaryAnnotations: []trace.BinaryAnnotation{ + { Key: "lc", Value: "trivial", Host: "127.0.0.1", ServiceName: "trivial", - Type: "STRING", }, }, }, - Span{ + { Name: "Parent", ID: "22964302721410078", TraceID: "22c4fc8ab3669045", @@ -98,33 +98,32 @@ func TestSpanHandler(t *testing.T) { Timestamp: time.Unix(0, 1498688360851318*int64(time.Microsecond)).UTC(), Duration: time.Duration(103680) * time.Microsecond, ServiceName: "trivial", - Annotations: []Annotation{ - Annotation{ + Annotations: []trace.Annotation{ + { Timestamp: time.Unix(0, 1498688360851325*int64(time.Microsecond)).UTC(), Value: "Starting child #0", Host: "127.0.0.1", ServiceName: "trivial", }, - Annotation{ + { Timestamp: time.Unix(0, 1498688360904545*int64(time.Microsecond)).UTC(), Value: "Starting child #1", Host: "127.0.0.1", ServiceName: "trivial", }, - Annotation{ + { Timestamp: time.Unix(0, 1498688360954992*int64(time.Microsecond)).UTC(), Value: "A Log", Host: "127.0.0.1", ServiceName: "trivial", }, }, - BinaryAnnotations: []BinaryAnnotation{ - BinaryAnnotation{ + BinaryAnnotations: []trace.BinaryAnnotation{ + { Key: "lc", Value: "trivial", Host: "127.0.0.1", ServiceName: "trivial", - Type: "STRING", }, }, }, diff --git a/plugins/inputs/zipkin/testdata/json/brave-tracer-example.json b/plugins/inputs/zipkin/testdata/json/brave-tracer-example.json new file mode 100644 index 000000000..58eb10c17 --- /dev/null +++ b/plugins/inputs/zipkin/testdata/json/brave-tracer-example.json @@ -0,0 +1,188 @@ +[ + { + "traceId": "7312f822d43d0fd8", + "id": "b26412d1ac16767d", + "name": "http:/hi2", + "parentId": "7312f822d43d0fd8", + "annotations": [ + { + "timestamp": 1503031538791000, + "value": "sr", + "endpoint": { + "serviceName": "test", + "ipv4": "192.168.0.8", + "port": 8010 + } + }, + { + "timestamp": 1503031538794000, + "value": "ss", + "endpoint": { + "serviceName": "test", + "ipv4": "192.168.0.8", + "port": 8010 + } + } + ], + "binaryAnnotations": [ + { + "key": "mvc.controller.class", + "value": "Demo2Application", + "endpoint": { + "serviceName": "test", + "ipv4": "192.168.0.8", + "port": 8010 + } + }, + { + "key": "mvc.controller.method", + "value": "hi2", + "endpoint": { + "serviceName": "test", + "ipv4": "192.168.0.8", + "port": 8010 + } + }, + { + "key": "spring.instance_id", + "value": "192.168.0.8:test:8010", + "endpoint": { + "serviceName": "test", + "ipv4": "192.168.0.8", + "port": 8010 + } + } + ] + }, + { + "traceId": "7312f822d43d0fd8", + "id": "b26412d1ac16767d", + "name": "http:/hi2", + "parentId": "7312f822d43d0fd8", + "timestamp": 1503031538786000, + "duration": 10000, + "annotations": [ + { + "timestamp": 1503031538786000, + "value": "cs", + "endpoint": { + "serviceName": "test", + "ipv4": "192.168.0.8", + "port": 8010 + } + }, + { + "timestamp": 1503031538796000, + "value": "cr", + "endpoint": { + "serviceName": "test", + "ipv4": "192.168.0.8", + "port": 8010 + } + } + ], + "binaryAnnotations": [ + { + "key": "http.host", + "value": "localhost", + "endpoint": { + "serviceName": "test", + "ipv4": "192.168.0.8", + "port": 8010 + } + }, + { + "key": "http.method", + "value": "GET", + "endpoint": { + "serviceName": "test", + "ipv4": "192.168.0.8", + "port": 8010 + } + }, + { + "key": "http.path", + "value": "/hi2", + "endpoint": { + "serviceName": "test", + "ipv4": "192.168.0.8", + "port": 8010 + } + }, + { + "key": "http.url", + "value": "http://localhost:8010/hi2", + "endpoint": { + "serviceName": "test", + "ipv4": "192.168.0.8", + "port": 8010 + } + }, + { + "key": "spring.instance_id", + "value": "192.168.0.8:test:8010", + "endpoint": { + "serviceName": "test", + "ipv4": "192.168.0.8", + "port": 8010 + } + } + ] + }, + { + "traceId": "7312f822d43d0fd8", + "id": "7312f822d43d0fd8", + "name": "http:/hi", + "timestamp": 1503031538778000, + "duration": 23393, + "annotations": [ + { + "timestamp": 1503031538778000, + "value": "sr", + "endpoint": { + "serviceName": "test", + "ipv4": "192.168.0.8", + "port": 8010 + } + }, + { + "timestamp": 1503031538801000, + "value": "ss", + "endpoint": { + "serviceName": "test", + "ipv4": "192.168.0.8", + "port": 8010 + } + } + ], + "binaryAnnotations": [ + { + "key": "mvc.controller.class", + "value": "Demo2Application", + "endpoint": { + "serviceName": "test", + "ipv4": "192.168.0.8", + "port": 8010 + } + }, + { + "key": "mvc.controller.method", + "value": "hi", + "endpoint": { + "serviceName": "test", + "ipv4": "192.168.0.8", + "port": 8010 + } + }, + { + "key": "spring.instance_id", + "value": "192.168.0.8:test:8010", + "endpoint": { + "serviceName": "test", + "ipv4": "192.168.0.8", + "port": 8010 + } + } + ] + } +] diff --git a/plugins/inputs/zipkin/trace/trace.go b/plugins/inputs/zipkin/trace/trace.go new file mode 100644 index 000000000..2a3ec2eb1 --- /dev/null +++ b/plugins/inputs/zipkin/trace/trace.go @@ -0,0 +1,41 @@ +package trace + +import ( + "time" +) + +// Trace is an array (or a series) of spans +type Trace []Span + +//Span represents a specific zipkin span. It holds the majority of the same +// data as a zipkin span sent via the thrift protocol, but is presented in a +// format which is more straightforward for storage purposes. +type Span struct { + ID string + TraceID string // zipkin traceid high concat with traceid + Name string + ParentID string + ServiceName string + Timestamp time.Time // If zipkin input is nil then time.Now() + Duration time.Duration + Annotations []Annotation + BinaryAnnotations []BinaryAnnotation +} + +// BinaryAnnotation represents a zipkin binary annotation. It contains +// all of the same fields as might be found in its zipkin counterpart. +type BinaryAnnotation struct { + Key string + Value string + Host string // annotation.endpoint.ipv4 + ":" + annotation.endpoint.port + ServiceName string +} + +// Annotation represents an ordinary zipkin annotation. It contains the data fields +// which will become fields/tags in influxdb +type Annotation struct { + Timestamp time.Time + Value string + Host string // annotation.endpoint.ipv4 + ":" + annotation.endpoint.port + ServiceName string +} diff --git a/plugins/inputs/zipkin/zipkin.go b/plugins/inputs/zipkin/zipkin.go index 597ae0b27..02d82fac0 100644 --- a/plugins/inputs/zipkin/zipkin.go +++ b/plugins/inputs/zipkin/zipkin.go @@ -8,11 +8,11 @@ import ( "net/http" "strconv" "sync" - "time" "github.com/gorilla/mux" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/inputs/zipkin/trace" ) const ( @@ -32,7 +32,7 @@ const ( // Recorder represents a type which can record zipkin trace data as well as // any accompanying errors, and process that data. type Recorder interface { - Record(Trace) error + Record(trace.Trace) error Error(error) } @@ -42,43 +42,6 @@ type Handler interface { Register(router *mux.Router, recorder Recorder) error } -// BinaryAnnotation represents a zipkin binary annotation. It contains -// all of the same fields as might be found in its zipkin counterpart. -type BinaryAnnotation struct { - Key string - Value string - Host string // annotation.endpoint.ipv4 + ":" + annotation.endpoint.port - ServiceName string - Type string -} - -// Annotation represents an ordinary zipkin annotation. It contains the data fields -// which will become fields/tags in influxdb -type Annotation struct { - Timestamp time.Time - Value string - Host string // annotation.endpoint.ipv4 + ":" + annotation.endpoint.port - ServiceName string -} - -//Span represents a specific zipkin span. It holds the majority of the same -// data as a zipkin span sent via the thrift protocol, but is presented in a -// format which is more straightforward for storage purposes. -type Span struct { - ID string - TraceID string // zipkin traceid high concat with traceid - Name string - ParentID string - ServiceName string - Timestamp time.Time // If zipkin input is nil then time.Now() - Duration time.Duration - Annotations []Annotation - BinaryAnnotations []BinaryAnnotation -} - -// Trace is an array (or a series) of spans -type Trace []Span - const sampleConfig = ` # path = "/api/v1/spans" # URL path for span data # port = 9411 # Port on which Telegraf listens @@ -122,7 +85,9 @@ func (z *Zipkin) Start(acc telegraf.Accumulator) error { router := mux.NewRouter() converter := NewLineProtocolConverter(acc) - z.handler.Register(router, converter) + if err := z.handler.Register(router, converter); err != nil { + return err + } z.server = &http.Server{ Handler: router, diff --git a/plugins/inputs/zipkin/zipkin_test.go b/plugins/inputs/zipkin/zipkin_test.go index 9447e67d4..e07eca0cf 100644 --- a/plugins/inputs/zipkin/zipkin_test.go +++ b/plugins/inputs/zipkin/zipkin_test.go @@ -16,14 +16,16 @@ func TestZipkinPlugin(t *testing.T) { mockAcc := testutil.Accumulator{} tests := []struct { - name string - thriftDataFile string //path name to a binary thrift data file which contains test data - wantErr bool - want []testutil.Metric + name string + datafile string // data file which contains test data + contentType string + wantErr bool + want []testutil.Metric }{ { - name: "threespan", - thriftDataFile: "testdata/threespans.dat", + name: "threespan", + datafile: "testdata/threespans.dat", + contentType: "application/x-thrift", want: []testutil.Metric{ testutil.Metric{ Measurement: "zipkin", @@ -170,8 +172,9 @@ func TestZipkinPlugin(t *testing.T) { wantErr: false, }, { - name: "distributed_trace_sample", - thriftDataFile: "testdata/distributed_trace_sample.dat", + name: "distributed_trace_sample", + datafile: "testdata/distributed_trace_sample.dat", + contentType: "application/x-thrift", want: []testutil.Metric{ testutil.Metric{ Measurement: "zipkin", @@ -185,7 +188,6 @@ func TestZipkinPlugin(t *testing.T) { Fields: map[string]interface{}{ "duration_ns": (time.Duration(1) * time.Microsecond).Nanoseconds(), }, - //Time: time.Unix(1, 0).UTC(), Time: time.Unix(0, 1433330263415871*int64(time.Microsecond)).UTC(), }, testutil.Metric{ @@ -202,7 +204,6 @@ func TestZipkinPlugin(t *testing.T) { Fields: map[string]interface{}{ "duration_ns": (time.Duration(1) * time.Microsecond).Nanoseconds(), }, - //Time: time.Unix(1, 0).UTC(), Time: time.Unix(0, 1433330263415871*int64(time.Microsecond)).UTC(), }, testutil.Metric{ @@ -223,6 +224,337 @@ func TestZipkinPlugin(t *testing.T) { }, }, }, + { + name: "JSON rather than thrift", + datafile: "testdata/json/brave-tracer-example.json", + contentType: "application/json", + want: []testutil.Metric{ + { + Measurement: "zipkin", + Tags: map[string]string{ + "id": "12854419928166856317", + "name": "http:/hi2", + "parent_id": "8291962692415852504", + "service_name": "test", + "trace_id": "7312f822d43d0fd8", + }, + Fields: map[string]interface{}{ + "duration_ns": int64(3000000), + }, Time: time.Unix(0, 1503031538791000*int64(time.Microsecond)).UTC(), + }, + { + Measurement: "zipkin", + Tags: map[string]string{ + "annotation": "sr", + "endpoint_host": "192.168.0.8:8010", + "id": "12854419928166856317", + "name": "http:/hi2", + "parent_id": "8291962692415852504", + "service_name": "test", + "trace_id": "7312f822d43d0fd8", + }, + Fields: map[string]interface{}{ + "duration_ns": int64(3000000), + }, + Time: time.Unix(0, 1503031538791000*int64(time.Microsecond)).UTC(), + }, + { + Measurement: "zipkin", + Tags: map[string]string{ + "annotation": "ss", + "endpoint_host": "192.168.0.8:8010", + "id": "12854419928166856317", + "name": "http:/hi2", + "parent_id": "8291962692415852504", + "service_name": "test", + "trace_id": "7312f822d43d0fd8", + }, + Fields: map[string]interface{}{ + "duration_ns": int64(3000000), + }, + Time: time.Unix(0, 1503031538791000*int64(time.Microsecond)).UTC(), + }, + { + Measurement: "zipkin", + Tags: map[string]string{ + "annotation": "Demo2Application", + "annotation_key": "mvc.controller.class", + "endpoint_host": "192.168.0.8:8010", + "id": "12854419928166856317", + "name": "http:/hi2", + "parent_id": "8291962692415852504", + "service_name": "test", + "trace_id": "7312f822d43d0fd8", + }, + Fields: map[string]interface{}{ + "duration_ns": int64(3000000), + }, + Time: time.Unix(0, 1503031538791000*int64(time.Microsecond)).UTC(), + }, + { + Measurement: "zipkin", + Tags: map[string]string{ + "annotation": "hi2", + "annotation_key": "mvc.controller.method", + "endpoint_host": "192.168.0.8:8010", + "id": "12854419928166856317", + "name": "http:/hi2", + "parent_id": "8291962692415852504", + "service_name": "test", + "trace_id": "7312f822d43d0fd8", + }, + Fields: map[string]interface{}{ + "duration_ns": int64(3000000), + }, + Time: time.Unix(0, 1503031538791000*int64(time.Microsecond)).UTC(), + }, + { + Measurement: "zipkin", + Tags: map[string]string{ + "annotation": "192.168.0.8:test:8010", + "annotation_key": "spring.instance_id", + "endpoint_host": "192.168.0.8:8010", + "id": "12854419928166856317", + "name": "http:/hi2", + "parent_id": "8291962692415852504", + "service_name": "test", + "trace_id": "7312f822d43d0fd8", + }, + Fields: map[string]interface{}{ + "duration_ns": int64(3000000), + }, + Time: time.Unix(0, 1503031538791000*int64(time.Microsecond)).UTC(), + }, + { + Measurement: "zipkin", + Tags: map[string]string{ + "id": "12854419928166856317", + "name": "http:/hi2", + "parent_id": "8291962692415852504", + "service_name": "test", + "trace_id": "7312f822d43d0fd8", + }, + Fields: map[string]interface{}{ + "duration_ns": int64(10000000), + }, + Time: time.Unix(0, 1503031538786000*int64(time.Microsecond)).UTC(), + }, + { + Measurement: "zipkin", + Tags: map[string]string{ + "annotation": "cs", + "endpoint_host": "192.168.0.8:8010", + "id": "12854419928166856317", + "name": "http:/hi2", + "parent_id": "8291962692415852504", + "service_name": "test", + "trace_id": "7312f822d43d0fd8", + }, + Fields: map[string]interface{}{ + "duration_ns": int64(10000000), + }, + Time: time.Unix(0, 1503031538786000*int64(time.Microsecond)).UTC(), + }, + { + Measurement: "zipkin", + Tags: map[string]string{ + "annotation": "cr", + "endpoint_host": "192.168.0.8:8010", + "id": "12854419928166856317", + "name": "http:/hi2", + "parent_id": "8291962692415852504", + "service_name": "test", + "trace_id": "7312f822d43d0fd8", + }, + Fields: map[string]interface{}{ + "duration_ns": int64(10000000), + }, + Time: time.Unix(0, 1503031538786000*int64(time.Microsecond)).UTC(), + }, + { + Measurement: "zipkin", + Tags: map[string]string{ + "annotation": "localhost", + "annotation_key": "http.host", + "endpoint_host": "192.168.0.8:8010", + "id": "12854419928166856317", + "name": "http:/hi2", + "parent_id": "8291962692415852504", + "service_name": "test", + "trace_id": "7312f822d43d0fd8", + }, + Fields: map[string]interface{}{ + "duration_ns": int64(10000000), + }, + Time: time.Unix(0, 1503031538786000*int64(time.Microsecond)).UTC(), + }, + { + Measurement: "zipkin", + Tags: map[string]string{ + "annotation": "GET", + "annotation_key": "http.method", + "endpoint_host": "192.168.0.8:8010", + "id": "12854419928166856317", + "name": "http:/hi2", + "parent_id": "8291962692415852504", + "service_name": "test", + "trace_id": "7312f822d43d0fd8", + }, + Fields: map[string]interface{}{ + "duration_ns": int64(10000000), + }, + Time: time.Unix(0, 1503031538786000*int64(time.Microsecond)).UTC(), + }, + { + Measurement: "zipkin", + Tags: map[string]string{ + "annotation": "/hi2", + "annotation_key": "http.path", + "endpoint_host": "192.168.0.8:8010", + "id": "12854419928166856317", + "name": "http:/hi2", + "parent_id": "8291962692415852504", + "service_name": "test", + "trace_id": "7312f822d43d0fd8", + }, + Fields: map[string]interface{}{ + "duration_ns": int64(10000000), + }, + Time: time.Unix(0, 1503031538786000*int64(time.Microsecond)).UTC(), + }, + { + Measurement: "zipkin", + Tags: map[string]string{ + "annotation": "http://localhost:8010/hi2", + "annotation_key": "http.url", + "endpoint_host": "192.168.0.8:8010", + "id": "12854419928166856317", + "name": "http:/hi2", + "parent_id": "8291962692415852504", + "service_name": "test", + "trace_id": "7312f822d43d0fd8", + }, + Fields: map[string]interface{}{ + "duration_ns": int64(10000000), + }, + Time: time.Unix(0, 1503031538786000*int64(time.Microsecond)).UTC(), + }, + { + Measurement: "zipkin", + Tags: map[string]string{ + "annotation": "192.168.0.8:test:8010", + "annotation_key": "spring.instance_id", + "endpoint_host": "192.168.0.8:8010", + "id": "12854419928166856317", + "name": "http:/hi2", + "parent_id": "8291962692415852504", + "service_name": "test", + "trace_id": "7312f822d43d0fd8", + }, + Fields: map[string]interface{}{ + "duration_ns": int64(10000000), + }, + Time: time.Unix(0, 1503031538786000*int64(time.Microsecond)).UTC(), + }, + { + Measurement: "zipkin", + Tags: map[string]string{ + "id": "8291962692415852504", + "name": "http:/hi", + "parent_id": "8291962692415852504", + "service_name": "test", + "trace_id": "7312f822d43d0fd8", + }, + Fields: map[string]interface{}{ + "duration_ns": int64(23393000), + }, + Time: time.Unix(0, 1503031538778000*int64(time.Microsecond)).UTC(), + }, + { + Measurement: "zipkin", + Tags: map[string]string{ + "annotation": "sr", + "endpoint_host": "192.168.0.8:8010", + "id": "8291962692415852504", + "name": "http:/hi", + "parent_id": "8291962692415852504", + "service_name": "test", + "trace_id": "7312f822d43d0fd8", + }, + Fields: map[string]interface{}{ + "duration_ns": int64(23393000), + }, + Time: time.Unix(0, 1503031538778000*int64(time.Microsecond)).UTC(), + }, + testutil.Metric{ + Measurement: "zipkin", + Tags: map[string]string{ + "annotation": "ss", + "endpoint_host": "192.168.0.8:8010", + "id": "8291962692415852504", + "name": "http:/hi", + "parent_id": "8291962692415852504", + "service_name": "test", + "trace_id": "7312f822d43d0fd8", + }, + Fields: map[string]interface{}{ + "duration_ns": int64(23393000), + }, + Time: time.Unix(0, 1503031538778000*int64(time.Microsecond)).UTC(), + }, + testutil.Metric{ + Measurement: "zipkin", + Tags: map[string]string{ + "annotation": "Demo2Application", + "annotation_key": "mvc.controller.class", + "endpoint_host": "192.168.0.8:8010", + "id": "8291962692415852504", + "name": "http:/hi", + "parent_id": "8291962692415852504", + "service_name": "test", + "trace_id": "7312f822d43d0fd8", + }, + Fields: map[string]interface{}{ + "duration_ns": int64(23393000), + }, + Time: time.Unix(0, 1503031538778000*int64(time.Microsecond)).UTC(), + }, + testutil.Metric{ + Measurement: "zipkin", + Tags: map[string]string{ + "annotation": "hi", + "annotation_key": "mvc.controller.method", + "endpoint_host": "192.168.0.8:8010", + "id": "8291962692415852504", + "name": "http:/hi", + "parent_id": "8291962692415852504", + "service_name": "test", + "trace_id": "7312f822d43d0fd8", + }, + Fields: map[string]interface{}{ + "duration_ns": int64(23393000), + }, + Time: time.Unix(0, 1503031538778000*int64(time.Microsecond)).UTC(), + }, + testutil.Metric{ + Measurement: "zipkin", + Tags: map[string]string{ + "annotation": "192.168.0.8:test:8010", + "annotation_key": "spring.instance_id", + "endpoint_host": "192.168.0.8:8010", + "id": "8291962692415852504", + "name": "http:/hi", + "parent_id": "8291962692415852504", + "service_name": "test", + "trace_id": "7312f822d43d0fd8", + }, + Fields: map[string]interface{}{ + "duration_ns": int64(23393000), + }, + Time: time.Unix(0, 1503031538778000*int64(time.Microsecond)).UTC(), + }, + }, + }, } z := &Zipkin{ @@ -240,7 +572,7 @@ func TestZipkinPlugin(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { mockAcc.ClearMetrics() - if err := postThriftData(tt.thriftDataFile, z.address); err != nil { + if err := postThriftData(tt.datafile, z.address, tt.contentType); err != nil { t.Fatalf("Posting data to http endpoint /api/v1/spans failed. Error: %s\n", err) } mockAcc.Wait(len(tt.want)) //Since the server is running concurrently, we need to wait for the number of data points we want to test to be added to the Accumulator. @@ -252,7 +584,6 @@ func TestZipkinPlugin(t *testing.T) { for _, m := range mockAcc.Metrics { got = append(got, *m) } - if !cmp.Equal(tt.want, got) { t.Fatalf("Got != Want\n %s", cmp.Diff(tt.want, got)) } @@ -266,19 +597,18 @@ func TestZipkinPlugin(t *testing.T) { } } -func postThriftData(datafile, address string) error { +func postThriftData(datafile, address, contentType string) error { dat, err := ioutil.ReadFile(datafile) if err != nil { return fmt.Errorf("could not read from data file %s", datafile) } req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/v1/spans", address), bytes.NewReader(dat)) - if err != nil { return fmt.Errorf("HTTP request creation failed") } - req.Header.Set("Content-Type", "application/x-thrift") + req.Header.Set("Content-Type", contentType) client := &http.Client{} _, err = client.Do(req) if err != nil {