diff --git a/Godeps b/Godeps index b021911e8..45da5486d 100644 --- a/Godeps +++ b/Godeps @@ -3,6 +3,7 @@ github.com/Shopify/sarama c01858abb625b73a3af51d0798e4ad42c8147093 github.com/Sirupsen/logrus 61e43dc76f7ee59a82bdf3d71033dc12bea4c77d github.com/aerospike/aerospike-client-go 95e1ad7791bdbca44707fedbb29be42024900d9c github.com/amir/raidman c74861fe6a7bb8ede0a010ce4485bdbb4fc4c985 +github.com/apache/thrift 4aaa92ece8503a6da9bc6701604f69acf2b99d07 github.com/aws/aws-sdk-go c861d27d0304a79f727e9a8a4e2ac1e74602fdc0 github.com/beorn7/perks 4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9 github.com/bsm/sarama-cluster ccdc0803695fbce22f1706d04ded46cd518fd832 @@ -17,10 +18,13 @@ github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3 github.com/eapache/go-xerial-snappy bb955e01b9346ac19dc29eb16586c90ded99a98c github.com/eapache/queue 44cc805cf13205b55f69e14bcb69867d1ae92f98 github.com/eclipse/paho.mqtt.golang d4f545eb108a2d19f9b1a735689dbfb719bc21fb +github.com/go-logfmt/logfmt 390ab7935ee28ec6b286364bba9b4dd6410cb3d5 github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034 github.com/gobwas/glob bea32b9cd2d6f55753d94a28e959b13f0244797a +github.com/gogo/protobuf 7b6c6391c4ff245962047fc1e2c6e08b1cdfa0e8 github.com/golang/protobuf 8ee79997227bf9b34611aee7946ae64735e6fd93 github.com/golang/snappy 7db9049039a047d955fe8c19b83c8ff5abd765c7 +github.com/google/go-cmp f94e52cad91c65a63acc1e75d4be223ea22e99bc github.com/gorilla/mux 392c28fe23e1c45ddba891b0320b3b5df220beea github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478 github.com/hashicorp/consul 63d2fc68239b996096a1c55a0d4b400ea4c2583f @@ -39,6 +43,9 @@ github.com/nats-io/nats ea9585611a4ab58a205b9b125ebd74c389a6b898 github.com/nats-io/nuid 289cccf02c178dc782430d534e3c1f5b72af807f github.com/nsqio/go-nsq a53d495e81424aaf7a7665a9d32a97715c40e953 github.com/opencontainers/runc 89ab7f2ccc1e45ddf6485eaa802c35dcf321dfc8 +github.com/opentracing-contrib/go-observer a52f2342449246d5bcc273e65cbdcfa5f7d6c63c +github.com/opentracing/opentracing-go 06f47b42c792fef2796e9681353e1d908c417827 +github.com/openzipkin/zipkin-go-opentracing 1cafbdfde94fbf2b373534764e0863aa3bd0bf7b github.com/pierrec/lz4 5c9560bfa9ace2bf86080bf40d46b34ae44604df github.com/pierrec/xxHash 5a004441f897722c627870a981d02b29924215fa github.com/pkg/errors 645ef00459ed84a119197bfb8d8205042c6df63d diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index aeaca0ded..44fce6cda 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -26,9 +26,11 @@ works: - github.com/eclipse/paho.mqtt.golang [ECLIPSE](https://github.com/eclipse/paho.mqtt.golang/blob/master/LICENSE) - github.com/fsouza/go-dockerclient [BSD](https://github.com/fsouza/go-dockerclient/blob/master/LICENSE) - github.com/gobwas/glob [MIT](https://github.com/gobwas/glob/blob/master/LICENSE) +- github.com/google/go-cmp [BSD](https://github.com/google/go-cmp/blob/master/LICENSE) - github.com/gogo/protobuf [BSD](https://github.com/gogo/protobuf/blob/master/LICENSE) - github.com/golang/protobuf [BSD](https://github.com/golang/protobuf/blob/master/LICENSE) - github.com/golang/snappy [BSD](https://github.com/golang/snappy/blob/master/LICENSE) +- github.com/go-logfmt/logfmt [MIT](https://github.com/go-logfmt/logfmt/blob/master/LICENSE) - github.com/gorilla/mux [BSD](https://github.com/gorilla/mux/blob/master/LICENSE) - github.com/go-sql-driver/mysql [MPL](https://github.com/go-sql-driver/mysql/blob/master/LICENSE) - github.com/hailocab/go-hostpool [MIT](https://github.com/hailocab/go-hostpool/blob/master/LICENSE) @@ -52,6 +54,9 @@ works: - github.com/nats-io/nats [MIT](https://github.com/nats-io/nats/blob/master/LICENSE) - github.com/nats-io/nuid [MIT](https://github.com/nats-io/nuid/blob/master/LICENSE) - github.com/nsqio/go-nsq [MIT](https://github.com/nsqio/go-nsq/blob/master/LICENSE) +- github.com/opentracing-contrib/go-observer [APACHE](https://github.com/opentracing-contrib/go-observer/blob/master/LICENSE) +- github.com/opentracing/opentracing-go [MIT](https://github.com/opentracing/opentracing-go/blob/master/LICENSE) +- github.com/openzipkin/zipkin-go-opentracing [MIT](https://github.com/openzipkin/zipkin-go-opentracing/blob/master/LICENSE) - github.com/pierrec/lz4 [BSD](https://github.com/pierrec/lz4/blob/master/LICENSE) - github.com/pierrec/xxHash [BSD](https://github.com/pierrec/xxHash/blob/master/LICENSE) - github.com/pkg/errors [BSD](https://github.com/pkg/errors/blob/master/LICENSE) diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 0a68235b8..84c320fed 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -89,5 +89,6 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/webhooks" _ "github.com/influxdata/telegraf/plugins/inputs/win_perf_counters" _ "github.com/influxdata/telegraf/plugins/inputs/zfs" + _ "github.com/influxdata/telegraf/plugins/inputs/zipkin" _ "github.com/influxdata/telegraf/plugins/inputs/zookeeper" ) diff --git a/plugins/inputs/zipkin/README.md b/plugins/inputs/zipkin/README.md new file mode 100644 index 000000000..af5ea6117 --- /dev/null +++ b/plugins/inputs/zipkin/README.md @@ -0,0 +1,164 @@ +# Zipkin Plugin + +This plugin implements the Zipkin http server to gather trace and timing data needed to troubleshoot latency problems in microservice architectures. + +*Please Note: This plugin is experimental; Its data schema may be subject to change +based on its main usage cases and the evolution of the OpenTracing standard.* + +## Configuration: +```toml +[[inputs.zipkin]] + path = "/api/v1/spans" # URL path for span data + port = 9411 # Port on which Telegraf listens +``` + +## Tracing: + +This plugin uses Annotations tags and fields to track data from spans + +- __TRACE:__ is a set of spans that share a single root span. +Traces are built by collecting all Spans that share a traceId. + +- __SPAN:__ is a set of Annotations and BinaryAnnotations that correspond to a particular RPC. + +- __Annotations:__ for each annotation & binary annotation of a span a metric is output. *Records an occurrence in time at the beginning and end of a request.* + + Annotations may have the following values: + + - __CS (client start):__ beginning of span, request is made. + - __SR (server receive):__ server receives request and will start processing it + network latency & clock jitters differ it from cs + - __SS (server send):__ server is done processing and sends request back to client + amount of time it took to process request will differ it from sr + - __CR (client receive):__ end of span, client receives response from server + RPC is considered complete with this annotation + +### Tags +* __"id":__ The 64 bit ID of the span. +* __"parent_id":__ An ID associated with a particular child span. If there is no child span, the parent ID is set to ID. +* __"trace_id":__ The 64 or 128-bit ID of a particular trace. Every span in a trace shares this ID. Concatenation of high and low and converted to hexadecimal. +* __"name":__ Defines a span + +##### Annotations have these additional tags: + +* __"service_name":__ Defines a service +* __"annotation":__ The value of an annotation +* __"endpoint_host":__ Listening port concat with IPV4, if port is not present it will not be concatenated + +##### Binary Annotations have these additional tag: + + * __"service_name":__ Defines a service + * __"annotation":__ The value of an annotation + * __"endpoint_host":__ Listening port concat with IPV4, if port is not present it will not be concatenated + * __"annotation_key":__ label describing the annotation + + +### Fields: + * __"duration_ns":__ The time in nanoseconds between the end and beginning of a span. + + + +### Sample Queries: + +__Get All Span Names for Service__ `my_web_server` +```sql +SHOW TAG VALUES FROM "zipkin" with key="name" WHERE "service_name" = 'my_web_server' +``` + - __Description:__ returns a list containing the names of the spans which have annotations with the given `service_name` of `my_web_server`. + +__Get All Service Names__ +```sql +SHOW TAG VALUES FROM "zipkin" WITH KEY = "service_name" +``` + - __Description:__ returns a list of all `distinct` endpoint service names. + +__Find spans with longest duration__ +```sql +SELECT max("duration_ns") FROM "zipkin" WHERE "service_name" = 'my_service' AND "name" = 'my_span_name' AND time > now() - 20m GROUP BY "trace_id",time(30s) LIMIT 5 +``` + - __Description:__ In the last 20 minutes find the top 5 longest span durations for service `my_server` and span name `my_span_name` + + +### Recommended InfluxDB setup + +This test will create high cardinality data so we reccomend using the [tsi influxDB engine](https://www.influxdata.com/path-1-billion-time-series-influxdb-high-cardinality-indexing-ready-testing/). +#### How To Set Up InfluxDB For Work With Zipkin + + ##### Steps + 1. ___Update___ InfluxDB to >= 1.3, in order to use the new tsi engine. + + 2. ___Generate___ a config file with the following command: +```sh +influxd config > /path/for/config/file +``` + 3. ___Add___ the following to your config file, under the `[data]` tab: +```toml +[data] + index-version = "tsi1" +``` + + 4. ___Start___ `influxd` with your new config file: +```sh +influxd -config=/path/to/your/config/file +``` + + 5. ___Update___ your retention policy: +```sql +ALTER RETENTION POLICY "autogen" ON "telegraf" DURATION 1d SHARD DURATION 30m +``` + +### Example Input Trace: + +- [Cli microservice with two services Test](https://github.com/openzipkin/zipkin-go-opentracing/tree/master/examples/cli_with_2_services) +- [Test data from distributed trace repo sample json](https://github.com/mattkanwisher/distributedtrace/blob/master/testclient/sample.json) +#### [Trace Example from Zipkin model](http://zipkin.io/pages/data_model.html) +```json +{ + "traceId": "bd7a977555f6b982", + "name": "query", + "id": "be2d01e33cc78d97", + "parentId": "ebf33e1a81dc6f71", + "timestamp": 1458702548786000, + "duration": 13000, + "annotations": [ + { + "endpoint": { + "serviceName": "zipkin-query", + "ipv4": "192.168.1.2", + "port": 9411 + }, + "timestamp": 1458702548786000, + "value": "cs" + }, + { + "endpoint": { + "serviceName": "zipkin-query", + "ipv4": "192.168.1.2", + "port": 9411 + }, + "timestamp": 1458702548799000, + "value": "cr" + } + ], + "binaryAnnotations": [ + { + "key": "jdbc.query", + "value": "select distinct `zipkin_spans`.`trace_id` from `zipkin_spans` join `zipkin_annotations` on (`zipkin_spans`.`trace_id` = `zipkin_annotations`.`trace_id` and `zipkin_spans`.`id` = `zipkin_annotations`.`span_id`) where (`zipkin_annotations`.`endpoint_service_name` = ? and `zipkin_spans`.`start_ts` between ? and ?) order by `zipkin_spans`.`start_ts` desc limit ?", + "endpoint": { + "serviceName": "zipkin-query", + "ipv4": "192.168.1.2", + "port": 9411 + } + }, + { + "key": "sa", + "value": true, + "endpoint": { + "serviceName": "spanstore-jdbc", + "ipv4": "127.0.0.1", + "port": 3306 + } + } + ] +} +``` diff --git a/plugins/inputs/zipkin/cmd/stress_test_write/stress_test_write.go b/plugins/inputs/zipkin/cmd/stress_test_write/stress_test_write.go new file mode 100644 index 000000000..f4bc134f9 --- /dev/null +++ b/plugins/inputs/zipkin/cmd/stress_test_write/stress_test_write.go @@ -0,0 +1,75 @@ +/* +This is a development testing cli tool meant to stress the zipkin telegraf plugin. +It writes a specified number of zipkin spans to the plugin endpoint, with other +parameters which dictate batch size and flush timeout. + +Usage as follows: + +`./stress_test_write -batch_size= -max_backlog= -batch_interval= -span_count -zipkin_host=` + +Or with a timer: + +`time ./stress_test_write -batch_size= -max_backlog= -batch_interval= -span_count -zipkin_host=` + +However, the flag defaults work just fine for a good write stress test (and are what +this tool has mainly been tested with), so there shouldn't be much need to +manually tweak the parameters. +*/ + +package main + +import ( + "flag" + "fmt" + "log" + "time" + + zipkin "github.com/openzipkin/zipkin-go-opentracing" +) + +var ( + BatchSize int + MaxBackLog int + BatchTimeInterval int + SpanCount int + ZipkinServerHost string +) + +const usage = `./stress_test_write -batch_size= -max_backlog= -batch_interval= -span_count -zipkin_host=` + +func init() { + flag.IntVar(&BatchSize, "batch_size", 10000, usage) + flag.IntVar(&MaxBackLog, "max_backlog", 100000, usage) + flag.IntVar(&BatchTimeInterval, "batch_interval", 1, usage) + flag.IntVar(&SpanCount, "span_count", 100000, usage) + flag.StringVar(&ZipkinServerHost, "zipkin_host", "localhost", usage) +} + +func main() { + flag.Parse() + var hostname = fmt.Sprintf("http://%s:9411/api/v1/spans", ZipkinServerHost) + collector, err := zipkin.NewHTTPCollector( + hostname, + zipkin.HTTPBatchSize(BatchSize), + zipkin.HTTPMaxBacklog(MaxBackLog), + zipkin.HTTPBatchInterval(time.Duration(BatchTimeInterval)*time.Second)) + defer collector.Close() + if err != nil { + log.Fatalf("Error intializing zipkin http collector: %v\n", err) + } + + tracer, err := zipkin.NewTracer( + zipkin.NewRecorder(collector, false, "127.0.0.1:0", "trivial")) + + if err != nil { + log.Fatalf("Error: %v\n", err) + } + + log.Printf("Writing %d spans to zipkin server at %s\n", SpanCount, hostname) + for i := 0; i < SpanCount; i++ { + parent := tracer.StartSpan("Parent") + parent.LogEvent(fmt.Sprintf("Trace%d", i)) + parent.Finish() + } + log.Println("Done. Flushing remaining spans...") +} diff --git a/plugins/inputs/zipkin/cmd/thrift_serialize/thrift_serialize.go b/plugins/inputs/zipkin/cmd/thrift_serialize/thrift_serialize.go new file mode 100644 index 000000000..5a65384d4 --- /dev/null +++ b/plugins/inputs/zipkin/cmd/thrift_serialize/thrift_serialize.go @@ -0,0 +1,153 @@ +/* +A small cli utility meant to convert json to zipkin thrift binary format, and +vice versa. + +To convert from json to thrift, +the json is unmarshalled, converted to zipkincore.Span structures, and +marshalled into thrift binary protocol. The json must be in an array format (even if it only has one object), +because the tool automatically tries to unmarshall the json into an array of structs. + +To convert from thrift to json, +the opposite process must happen. The thrift binary data must be read into an array of +zipkin span structures, and those spans must be marshalled into json. + +Usage: + +./thrift_serialize -input -output -deserialize + +If `deserialize` is set to true (false by default), the tool will interpret the input file as +thrift, and write it as json to the output file. +Otherwise, the input file will be interpreted as json, and the output will be encoded as thrift. + + +*/ +package main + +import ( + "encoding/json" + "errors" + "flag" + "fmt" + "io/ioutil" + "log" + + "github.com/apache/thrift/lib/go/thrift" + "github.com/openzipkin/zipkin-go-opentracing/_thrift/gen-go/zipkincore" +) + +var ( + filename string + outFileName string + inputType string +) + +const usage = `./json_serialize -input -output output -input-type` + +func init() { + flag.StringVar(&filename, "input", "", usage) + flag.StringVar(&outFileName, "output", "", usage) + flag.StringVar(&inputType, "input-type", "thrift", usage) +} + +func main() { + flag.Parse() + contents, err := ioutil.ReadFile(filename) + if err != nil { + log.Fatalf("Error reading file: %v\n", err) + } + + switch inputType { + case "json": + raw, err := jsonToZipkinThrift(contents) + if err != nil { + log.Fatalf("%v\n", err) + } + ioutil.WriteFile(outFileName, raw, 0644) + case "thrift": + raw, err := thriftToJSONSpans(contents) + if err != nil { + log.Fatalf("%v\n", err) + } + ioutil.WriteFile(outFileName, raw, 0644) + default: + log.Fatalf("Unsupported input type") + } +} + +func jsonToZipkinThrift(jsonRaw []byte) ([]byte, error) { + if len(jsonRaw) == 0 { + return nil, errors.New("no data") + } + + if string(jsonRaw)[0] != '[' { + return nil, errors.New("cannot unmarshal non array type") + } + + var spans []*zipkincore.Span + err := json.Unmarshal(jsonRaw, &spans) + if err != nil { + return nil, fmt.Errorf("error unmarshalling: %v", err) + } + + var zspans []*zipkincore.Span + if err != nil { + return nil, err + } + zspans = append(zspans, spans...) + + fmt.Println(spans) + + buf := thrift.NewTMemoryBuffer() + transport := thrift.NewTBinaryProtocolTransport(buf) + + if err = transport.WriteListBegin(thrift.STRUCT, len(spans)); err != nil { + return nil, fmt.Errorf("error in beginning thrift write: %v", err) + } + + for _, span := range zspans { + err = span.Write(transport) + if err != nil { + return nil, fmt.Errorf("error converting zipkin struct to thrift: %v", err) + } + } + + if err = transport.WriteListEnd(); err != nil { + return nil, fmt.Errorf("error finishing thrift write: %v", err) + } + + return buf.Bytes(), nil +} + +func thriftToJSONSpans(thriftData []byte) ([]byte, error) { + buffer := thrift.NewTMemoryBuffer() + if _, err := buffer.Write(thriftData); err != nil { + err = fmt.Errorf("error in buffer write: %v", err) + return nil, err + } + + transport := thrift.NewTBinaryProtocolTransport(buffer) + _, size, err := transport.ReadListBegin() + if err != nil { + err = fmt.Errorf("error in ReadListBegin: %v", err) + return nil, err + } + + var spans []*zipkincore.Span + for i := 0; i < size; i++ { + zs := &zipkincore.Span{} + if err = zs.Read(transport); err != nil { + err = fmt.Errorf("Error reading into zipkin struct: %v", err) + return nil, err + } + spans = append(spans, zs) + } + + err = transport.ReadListEnd() + if err != nil { + err = fmt.Errorf("error ending thrift read: %v", err) + return nil, err + } + + out, _ := json.MarshalIndent(spans, "", " ") + return out, nil +} diff --git a/plugins/inputs/zipkin/convert.go b/plugins/inputs/zipkin/convert.go new file mode 100644 index 000000000..dad099771 --- /dev/null +++ b/plugins/inputs/zipkin/convert.go @@ -0,0 +1,249 @@ +package zipkin + +import ( + "encoding/binary" + "fmt" + "net" + "strconv" + "time" + + "github.com/influxdata/telegraf" + "github.com/openzipkin/zipkin-go-opentracing/_thrift/gen-go/zipkincore" +) + +// DefaultServiceName when the span does not have any serviceName +const DefaultServiceName = "unknown" + +//now is a moackable time for now +var now = time.Now + +// LineProtocolConverter implements the Recorder interface; it is a +// type meant to encapsulate the storage of zipkin tracing data in +// telegraf as line protocol. +type LineProtocolConverter struct { + acc telegraf.Accumulator +} + +// NewLineProtocolConverter returns an instance of LineProtocolConverter that +// will add to the given telegraf.Accumulator +func NewLineProtocolConverter(acc telegraf.Accumulator) *LineProtocolConverter { + return &LineProtocolConverter{ + acc: acc, + } +} + +// Record is LineProtocolConverter's implementation of the Record method of +// the Recorder interface; it takes a trace as input, and adds it to an internal +// telegraf.Accumulator. +func (l *LineProtocolConverter) Record(t Trace) error { + for _, s := range t { + fields := map[string]interface{}{ + "duration_ns": s.Duration.Nanoseconds(), + } + tags := map[string]string{ + "id": s.ID, + "parent_id": s.ParentID, + "trace_id": s.TraceID, + "name": s.Name, + "service_name": s.ServiceName, + } + l.acc.AddFields("zipkin", fields, tags, s.Timestamp) + + for _, a := range s.Annotations { + tags := map[string]string{ + "id": s.ID, + "parent_id": s.ParentID, + "trace_id": s.TraceID, + "name": s.Name, + "service_name": a.ServiceName, + "annotation": a.Value, + "endpoint_host": a.Host, + } + l.acc.AddFields("zipkin", fields, tags, s.Timestamp) + } + + for _, b := range s.BinaryAnnotations { + tags := map[string]string{ + "id": s.ID, + "parent_id": s.ParentID, + "trace_id": s.TraceID, + "name": s.Name, + "service_name": b.ServiceName, + "annotation": b.Value, + "endpoint_host": b.Host, + "annotation_key": b.Key, + } + l.acc.AddFields("zipkin", fields, tags, s.Timestamp) + } + } + + return nil +} + +func (l *LineProtocolConverter) Error(err error) { + l.acc.AddError(err) +} + +// NewTrace converts a slice of []*zipkincore.Spans into a new Trace +func NewTrace(spans []*zipkincore.Span) Trace { + trace := make(Trace, len(spans)) + for i, span := range spans { + endpoint := serviceEndpoint(span.GetAnnotations(), span.GetBinaryAnnotations()) + trace[i] = Span{ + ID: formatID(span.GetID()), + TraceID: formatTraceID(span.GetTraceIDHigh(), span.GetTraceID()), + Name: span.GetName(), + Timestamp: guessTimestamp(span), + Duration: convertDuration(span), + ParentID: parentID(span), + ServiceName: serviceName(endpoint), + Annotations: NewAnnotations(span.GetAnnotations(), endpoint), + BinaryAnnotations: NewBinaryAnnotations(span.GetBinaryAnnotations(), endpoint), + } + } + return trace +} + +// NewAnnotations converts a slice of *zipkincore.Annotation into a slice +// of new Annotations +func NewAnnotations(annotations []*zipkincore.Annotation, endpoint *zipkincore.Endpoint) []Annotation { + formatted := make([]Annotation, len(annotations)) + for i, annotation := range annotations { + formatted[i] = Annotation{ + Host: host(endpoint), + ServiceName: serviceName(endpoint), + Timestamp: microToTime(annotation.GetTimestamp()), + Value: annotation.GetValue(), + } + } + + return formatted +} + +// NewBinaryAnnotations is very similar to NewAnnotations, but it +// converts zipkincore.BinaryAnnotations instead of the normal zipkincore.Annotation +func NewBinaryAnnotations(annotations []*zipkincore.BinaryAnnotation, endpoint *zipkincore.Endpoint) []BinaryAnnotation { + formatted := make([]BinaryAnnotation, len(annotations)) + for i, annotation := range annotations { + formatted[i] = BinaryAnnotation{ + Host: host(endpoint), + ServiceName: serviceName(endpoint), + Key: annotation.GetKey(), + Value: string(annotation.GetValue()), + Type: annotation.GetAnnotationType().String(), + } + } + return formatted +} + +func microToTime(micro int64) time.Time { + return time.Unix(0, micro*int64(time.Microsecond)).UTC() +} + +func formatID(id int64) string { + return strconv.FormatInt(id, 10) +} + +func formatTraceID(high, low int64) string { + if high == 0 { + return fmt.Sprintf("%x", low) + } + return fmt.Sprintf("%x%016x", high, low) +} + +func minMax(span *zipkincore.Span) (time.Time, time.Time) { + min := now().UTC() + max := time.Time{}.UTC() + for _, annotation := range span.Annotations { + ts := microToTime(annotation.GetTimestamp()) + if !ts.IsZero() && ts.Before(min) { + min = ts + } + if !ts.IsZero() && ts.After(max) { + max = ts + } + } + if max.IsZero() { + max = min + } + return min, max +} + +func guessTimestamp(span *zipkincore.Span) time.Time { + if span.GetTimestamp() != 0 { + return microToTime(span.GetTimestamp()) + } + min, _ := minMax(span) + return min +} + +func convertDuration(span *zipkincore.Span) time.Duration { + duration := time.Duration(span.GetDuration()) * time.Microsecond + if duration != 0 { + return duration + } + min, max := minMax(span) + return max.Sub(min) +} + +func parentID(span *zipkincore.Span) string { + // A parent ID of 0 means that this is a parent span. In this case, + // we set the parent ID of the span to be its own id, so it points to + // itself. + id := span.GetParentID() + if id != 0 { + return formatID(id) + } + return formatID(span.ID) +} + +func ipv4(addr int32) string { + buf := make([]byte, 4) + binary.BigEndian.PutUint32(buf, uint32(addr)) + return net.IP(buf).String() +} + +func host(h *zipkincore.Endpoint) string { + if h == nil { + return ipv4(int32(0)) + } + if h.GetPort() == 0 { + return ipv4(h.GetIpv4()) + } + // Zipkin uses a signed int16 for the port, but, warns us that they actually treat it + // as an unsigned int16. So, we convert from int16 to int32 followed by taking & 0xffff + // to convert from signed to unsigned + // https://github.com/openzipkin/zipkin/blob/57dc2ec9c65fe6144e401c0c933b4400463a69df/zipkin/src/main/java/zipkin/Endpoint.java#L44 + return ipv4(h.GetIpv4()) + ":" + strconv.FormatInt(int64(int(h.GetPort())&0xffff), 10) +} + +func serviceName(h *zipkincore.Endpoint) string { + if h == nil { + return DefaultServiceName + } + return h.GetServiceName() +} + +func serviceEndpoint(ann []*zipkincore.Annotation, bann []*zipkincore.BinaryAnnotation) *zipkincore.Endpoint { + for _, a := range ann { + switch a.Value { + case zipkincore.SERVER_RECV, zipkincore.SERVER_SEND, zipkincore.CLIENT_RECV, zipkincore.CLIENT_SEND: + if a.Host != nil && a.Host.ServiceName != "" { + return a.Host + } + } + } + + for _, a := range bann { + if a.Key == zipkincore.LOCAL_COMPONENT && a.Host != nil && a.Host.ServiceName != "" { + return a.Host + } + } + // Unable to find any "standard" endpoint host, so, use any that exist in the regular annotations + for _, a := range ann { + if a.Host != nil && a.Host.ServiceName != "" { + return a.Host + } + } + return nil +} diff --git a/plugins/inputs/zipkin/convert_test.go b/plugins/inputs/zipkin/convert_test.go new file mode 100644 index 000000000..b5c543073 --- /dev/null +++ b/plugins/inputs/zipkin/convert_test.go @@ -0,0 +1,544 @@ +package zipkin + +import ( + "reflect" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" + "github.com/openzipkin/zipkin-go-opentracing/_thrift/gen-go/zipkincore" +) + +func TestLineProtocolConverter_Record(t *testing.T) { + mockAcc := testutil.Accumulator{} + type fields struct { + acc telegraf.Accumulator + } + type args struct { + t Trace + } + tests := []struct { + name string + fields fields + args args + wantErr bool + want []testutil.Metric + }{ + { + name: "threespan", + fields: fields{ + acc: &mockAcc, + }, + args: args{ + t: Trace{ + Span{ + ID: "8090652509916334619", + TraceID: "2505404965370368069", + Name: "Child", + ParentID: "22964302721410078", + Timestamp: time.Unix(0, 1498688360851331000).UTC(), + Duration: time.Duration(53106) * time.Microsecond, + ServiceName: "trivial", + Annotations: []Annotation{}, + BinaryAnnotations: []BinaryAnnotation{ + BinaryAnnotation{ + Key: "lc", + Value: "dHJpdmlhbA==", + Host: "2130706433:0", + ServiceName: "trivial", + Type: "STRING", + }, + }, + }, + Span{ + ID: "103618986556047333", + TraceID: "2505404965370368069", + Name: "Child", + ParentID: "22964302721410078", + Timestamp: time.Unix(0, 1498688360904552000).UTC(), + Duration: time.Duration(50410) * time.Microsecond, + ServiceName: "trivial", + Annotations: []Annotation{}, + BinaryAnnotations: []BinaryAnnotation{ + BinaryAnnotation{ + Key: "lc", + Value: "dHJpdmlhbA==", + Host: "2130706433:0", + ServiceName: "trivial", + Type: "STRING", + }, + }, + }, + Span{ + ID: "22964302721410078", + TraceID: "2505404965370368069", + Name: "Parent", + ParentID: "22964302721410078", + Timestamp: time.Unix(0, 1498688360851318000).UTC(), + Duration: time.Duration(103680) * time.Microsecond, + ServiceName: "trivial", + Annotations: []Annotation{ + Annotation{ + Timestamp: time.Unix(0, 1498688360851325000).UTC(), + Value: "Starting child #0", + Host: "2130706433:0", + ServiceName: "trivial", + }, + Annotation{ + Timestamp: time.Unix(0, 1498688360904545000).UTC(), + Value: "Starting child #1", + Host: "2130706433:0", + ServiceName: "trivial", + }, + Annotation{ + Timestamp: time.Unix(0, 1498688360954992000).UTC(), + Value: "A Log", + Host: "2130706433:0", + ServiceName: "trivial", + }, + }, + BinaryAnnotations: []BinaryAnnotation{ + BinaryAnnotation{ + Key: "lc", + Value: "dHJpdmlhbA==", + Host: "2130706433:0", + ServiceName: "trivial", + Type: "STRING", + }, + }, + }, + }, + }, + want: []testutil.Metric{ + testutil.Metric{ + Measurement: "zipkin", + Tags: map[string]string{ + "id": "8090652509916334619", + "parent_id": "22964302721410078", + "trace_id": "2505404965370368069", + "service_name": "trivial", + "name": "Child", + }, + Fields: map[string]interface{}{ + "duration_ns": (time.Duration(53106) * time.Microsecond).Nanoseconds(), + }, + Time: time.Unix(0, 1498688360851331000).UTC(), + }, + testutil.Metric{ + Measurement: "zipkin", + Tags: map[string]string{ + "id": "8090652509916334619", + "parent_id": "22964302721410078", + "trace_id": "2505404965370368069", + "name": "Child", + "service_name": "trivial", + "annotation": "dHJpdmlhbA==", + "endpoint_host": "2130706433:0", + "annotation_key": "lc", + }, + Fields: map[string]interface{}{ + "duration_ns": (time.Duration(53106) * time.Microsecond).Nanoseconds(), + }, + Time: time.Unix(0, 1498688360851331000).UTC(), + }, + testutil.Metric{ + Measurement: "zipkin", + Tags: map[string]string{ + "id": "103618986556047333", + "parent_id": "22964302721410078", + "trace_id": "2505404965370368069", + "service_name": "trivial", + "name": "Child", + }, + Fields: map[string]interface{}{ + "duration_ns": (time.Duration(50410) * time.Microsecond).Nanoseconds(), + }, + Time: time.Unix(0, 1498688360904552000).UTC(), + }, + testutil.Metric{ + Measurement: "zipkin", + Tags: map[string]string{ + "id": "103618986556047333", + "parent_id": "22964302721410078", + "trace_id": "2505404965370368069", + "name": "Child", + "service_name": "trivial", + "annotation": "dHJpdmlhbA==", + "endpoint_host": "2130706433:0", + "annotation_key": "lc", + }, + Fields: map[string]interface{}{ + "duration_ns": (time.Duration(50410) * time.Microsecond).Nanoseconds(), + }, + Time: time.Unix(0, 1498688360904552000).UTC(), + }, + testutil.Metric{ + Measurement: "zipkin", + Tags: map[string]string{ + "id": "22964302721410078", + "parent_id": "22964302721410078", + "trace_id": "2505404965370368069", + "service_name": "trivial", + "name": "Parent", + }, + Fields: map[string]interface{}{ + "duration_ns": (time.Duration(103680) * time.Microsecond).Nanoseconds(), + }, + Time: time.Unix(0, 1498688360851318000).UTC(), + }, + testutil.Metric{ + Measurement: "zipkin", + Tags: map[string]string{ + "service_name": "trivial", + "annotation": "Starting child #0", + "endpoint_host": "2130706433:0", + "id": "22964302721410078", + "parent_id": "22964302721410078", + "trace_id": "2505404965370368069", + "name": "Parent", + }, + Fields: map[string]interface{}{ + "duration_ns": (time.Duration(103680) * time.Microsecond).Nanoseconds(), + }, + Time: time.Unix(0, 1498688360851318000).UTC(), + }, + testutil.Metric{ + Measurement: "zipkin", + Tags: map[string]string{ + "service_name": "trivial", + "annotation": "Starting child #1", + "endpoint_host": "2130706433:0", + "id": "22964302721410078", + "parent_id": "22964302721410078", + "trace_id": "2505404965370368069", + "name": "Parent", + }, + Fields: map[string]interface{}{ + "duration_ns": (time.Duration(103680) * time.Microsecond).Nanoseconds(), + }, + Time: time.Unix(0, 1498688360851318000).UTC(), + }, + testutil.Metric{ + Measurement: "zipkin", + Tags: map[string]string{ + "parent_id": "22964302721410078", + "trace_id": "2505404965370368069", + "name": "Parent", + "service_name": "trivial", + "annotation": "A Log", + "endpoint_host": "2130706433:0", + "id": "22964302721410078", + }, + Fields: map[string]interface{}{ + "duration_ns": (time.Duration(103680) * time.Microsecond).Nanoseconds(), + }, + Time: time.Unix(0, 1498688360851318000).UTC(), + }, + testutil.Metric{ + Measurement: "zipkin", + Tags: map[string]string{ + "trace_id": "2505404965370368069", + "service_name": "trivial", + "annotation": "dHJpdmlhbA==", + "annotation_key": "lc", + "id": "22964302721410078", + "parent_id": "22964302721410078", + "name": "Parent", + "endpoint_host": "2130706433:0", + }, + Fields: map[string]interface{}{ + "duration_ns": (time.Duration(103680) * time.Microsecond).Nanoseconds(), + }, + Time: time.Unix(0, 1498688360851318000).UTC(), + }, + }, + wantErr: false, + }, + + //// Test data from distributed trace repo sample json + // https://github.com/mattkanwisher/distributedtrace/blob/master/testclient/sample.json + { + name: "distributed_trace_sample", + fields: fields{ + acc: &mockAcc, + }, + args: args{ + t: Trace{ + Span{ + ID: "6802735349851856000", + TraceID: "0:6802735349851856000", + Name: "main.dud", + ParentID: "6802735349851856000", + Timestamp: time.Unix(1, 0).UTC(), + Duration: 1, + ServiceName: "trivial", + Annotations: []Annotation{ + Annotation{ + Timestamp: time.Unix(0, 1433330263415871000).UTC(), + Value: "cs", + Host: "0:9410", + ServiceName: "go-zipkin-testclient", + }, + }, + BinaryAnnotations: []BinaryAnnotation{}, + }, + }, + }, + want: []testutil.Metric{ + testutil.Metric{ + Measurement: "zipkin", + Tags: map[string]string{ + "id": "6802735349851856000", + "parent_id": "6802735349851856000", + "trace_id": "0:6802735349851856000", + "name": "main.dud", + "service_name": "trivial", + }, + Fields: map[string]interface{}{ + "duration_ns": (time.Duration(1) * time.Nanosecond).Nanoseconds(), + }, + Time: time.Unix(1, 0).UTC(), + }, + testutil.Metric{ + Measurement: "zipkin", + Tags: map[string]string{ + "annotation": "cs", + "endpoint_host": "0:9410", + "id": "6802735349851856000", + "parent_id": "6802735349851856000", + "trace_id": "0:6802735349851856000", + "name": "main.dud", + "service_name": "go-zipkin-testclient", + }, + Fields: map[string]interface{}{ + "duration_ns": (time.Duration(1) * time.Nanosecond).Nanoseconds(), + }, + Time: time.Unix(1, 0).UTC(), + }, + }, + }, + } + for i, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockAcc.ClearMetrics() + l := &LineProtocolConverter{ + acc: tt.fields.acc, + } + if err := l.Record(tt.args.t); (err != nil) != tt.wantErr { + t.Errorf("LineProtocolConverter.Record() error = %v, wantErr %v", err, tt.wantErr) + } + got := []testutil.Metric{} + for _, metric := range mockAcc.Metrics { + got = append(got, *metric) + } + if !cmp.Equal(got, tt.want) { + t.Errorf("LineProtocolConverter.Record()/%s/%d error = %s ", tt.name, i, cmp.Diff(got, tt.want)) + } + }) + } +} + +func Test_microToTime(t *testing.T) { + type args struct { + micro int64 + } + tests := []struct { + name string + args args + want time.Time + }{ + { + name: "given zero micro seconds expected unix time zero", + args: args{ + micro: 0, + }, + want: time.Unix(0, 0).UTC(), + }, + { + name: "given a million micro seconds expected unix time one", + args: args{ + micro: 1000000, + }, + want: time.Unix(1, 0).UTC(), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := microToTime(tt.args.micro); !reflect.DeepEqual(got, tt.want) { + t.Errorf("microToTime() = %v, want %v", got, tt.want) + } + }) + } +} + +func newAnnotation(micro int64) *zipkincore.Annotation { + return &zipkincore.Annotation{ + Timestamp: micro, + } +} + +func Test_minMax(t *testing.T) { + type args struct { + span *zipkincore.Span + } + tests := []struct { + name string + args args + now func() time.Time + wantMin time.Time + wantMax time.Time + }{ + { + name: "Single annotation", + args: args{ + span: &zipkincore.Span{ + Annotations: []*zipkincore.Annotation{ + newAnnotation(1000000), + }, + }, + }, + wantMin: time.Unix(1, 0).UTC(), + wantMax: time.Unix(1, 0).UTC(), + }, + { + name: "Three annotations", + args: args{ + span: &zipkincore.Span{ + Annotations: []*zipkincore.Annotation{ + newAnnotation(1000000), + newAnnotation(2000000), + newAnnotation(3000000), + }, + }, + }, + wantMin: time.Unix(1, 0).UTC(), + wantMax: time.Unix(3, 0).UTC(), + }, + { + name: "Annotations are in the future", + args: args{ + span: &zipkincore.Span{ + Annotations: []*zipkincore.Annotation{ + newAnnotation(3000000), + }, + }, + }, + wantMin: time.Unix(2, 0).UTC(), + wantMax: time.Unix(3, 0).UTC(), + now: func() time.Time { + return time.Unix(2, 0).UTC() + }, + }, + { + name: "No Annotations", + args: args{ + span: &zipkincore.Span{ + Annotations: []*zipkincore.Annotation{}, + }, + }, + wantMin: time.Unix(2, 0).UTC(), + wantMax: time.Unix(2, 0).UTC(), + now: func() time.Time { + return time.Unix(2, 0).UTC() + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.now != nil { + now = tt.now + } + got, got1 := minMax(tt.args.span) + if !reflect.DeepEqual(got, tt.wantMin) { + t.Errorf("minMax() got = %v, want %v", got, tt.wantMin) + } + if !reflect.DeepEqual(got1, tt.wantMax) { + t.Errorf("minMax() got1 = %v, want %v", got1, tt.wantMax) + } + now = time.Now + }) + } +} + +func Test_host(t *testing.T) { + type args struct { + h *zipkincore.Endpoint + } + tests := []struct { + name string + args args + want string + }{ + { + name: "Host Found", + args: args{ + h: &zipkincore.Endpoint{ + Ipv4: 1234, + Port: 8888, + }, + }, + want: "0.0.4.210:8888", + }, + { + name: "No Host", + args: args{ + h: nil, + }, + want: "0.0.0.0", + }, + { + name: "int overflow zipkin uses an int16 type as an unsigned int 16.", + args: args{ + h: &zipkincore.Endpoint{ + Ipv4: 1234, + Port: -1, + }, + }, + want: "0.0.4.210:65535", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := host(tt.args.h); got != tt.want { + t.Errorf("host() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_serviceName(t *testing.T) { + type args struct { + h *zipkincore.Endpoint + } + tests := []struct { + name string + args args + want string + }{ + { + name: "Found ServiceName", + args: args{ + h: &zipkincore.Endpoint{ + ServiceName: "zipkin", + }, + }, + want: "zipkin", + }, + { + name: "No ServiceName", + args: args{ + h: nil, + }, + want: "unknown", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := serviceName(tt.args.h); got != tt.want { + t.Errorf("serviceName() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/plugins/inputs/zipkin/handler.go b/plugins/inputs/zipkin/handler.go new file mode 100644 index 000000000..d9bd07828 --- /dev/null +++ b/plugins/inputs/zipkin/handler.go @@ -0,0 +1,135 @@ +package zipkin + +import ( + "compress/gzip" + "io/ioutil" + "net/http" + "strings" + "sync" + + "github.com/apache/thrift/lib/go/thrift" + "github.com/gorilla/mux" + "github.com/openzipkin/zipkin-go-opentracing/_thrift/gen-go/zipkincore" +) + +// SpanHandler is an implementation of a Handler which accepts zipkin thrift +// span data and sends it to the recorder +type SpanHandler struct { + Path string + recorder Recorder + waitGroup *sync.WaitGroup +} + +// NewSpanHandler returns a new server instance given path to handle +func NewSpanHandler(path string) *SpanHandler { + return &SpanHandler{ + Path: path, + } +} + +func cors(next http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if origin := r.Header.Get("Origin"); origin != "" { + w.Header().Set(`Access-Control-Allow-Origin`, origin) + w.Header().Set(`Access-Control-Allow-Methods`, strings.Join([]string{ + `OPTIONS`, + `POST`, + }, ", ")) + + w.Header().Set(`Access-Control-Allow-Headers`, strings.Join([]string{ + `Accept`, + `Accept-Encoding`, + `Content-Length`, + `Content-Type`, + }, ", ")) + + w.Header().Set(`Access-Control-Expose-Headers`, strings.Join([]string{ + `Date`, + }, ", ")) + } + + if r.Method == "OPTIONS" { + return + } + + next.ServeHTTP(w, r) + } +} + +// Register implements the Service interface. Register accepts zipkin thrift data +// POSTed to the path of the mux router +func (s *SpanHandler) Register(router *mux.Router, recorder Recorder) error { + handler := cors(http.HandlerFunc(s.Spans)) + router.Handle(s.Path, handler).Methods("POST", "OPTIONS") + s.recorder = recorder + return nil +} + +// Spans handles zipkin thrift spans +func (s *SpanHandler) Spans(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + body := r.Body + var err error + // Handle gzip decoding of the body + if r.Header.Get("Content-Encoding") == "gzip" { + body, err = gzip.NewReader(r.Body) + if err != nil { + s.recorder.Error(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + defer body.Close() + } + + octets, err := ioutil.ReadAll(body) + if err != nil { + s.recorder.Error(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + spans, err := unmarshalThrift(octets) + if err != nil { + s.recorder.Error(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + trace := NewTrace(spans) + + if err = s.recorder.Record(trace); err != nil { + s.recorder.Error(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusNoContent) +} + +func unmarshalThrift(body []byte) ([]*zipkincore.Span, error) { + buffer := thrift.NewTMemoryBuffer() + if _, err := buffer.Write(body); err != nil { + return nil, err + } + + transport := thrift.NewTBinaryProtocolTransport(buffer) + _, size, err := transport.ReadListBegin() + if err != nil { + return nil, err + } + + spans := make([]*zipkincore.Span, size) + for i := 0; i < size; i++ { + zs := &zipkincore.Span{} + if err = zs.Read(transport); err != nil { + return nil, err + } + spans[i] = zs + } + + if err = transport.ReadListEnd(); err != nil { + return nil, err + } + + return spans, nil +} diff --git a/plugins/inputs/zipkin/handler_test.go b/plugins/inputs/zipkin/handler_test.go new file mode 100644 index 000000000..0946d982f --- /dev/null +++ b/plugins/inputs/zipkin/handler_test.go @@ -0,0 +1,136 @@ +package zipkin + +import ( + "bytes" + "io/ioutil" + "net/http" + "net/http/httptest" + "strconv" + "testing" + "time" + + "github.com/google/go-cmp/cmp" +) + +type MockRecorder struct { + Data Trace + Err error +} + +func (m *MockRecorder) Record(t Trace) error { + m.Data = t + return nil +} + +func (m *MockRecorder) Error(err error) { + m.Err = err +} + +func TestSpanHandler(t *testing.T) { + dat, err := ioutil.ReadFile("testdata/threespans.dat") + if err != nil { + t.Fatalf("Could not find file %s\n", "testdata/threespans.dat") + } + + w := httptest.NewRecorder() + r := httptest.NewRequest( + "POST", + "http://server.local/api/v1/spans", + ioutil.NopCloser( + bytes.NewReader(dat))) + + handler := NewSpanHandler("/api/v1/spans") + mockRecorder := &MockRecorder{} + handler.recorder = mockRecorder + + handler.Spans(w, r) + if w.Code != http.StatusNoContent { + t.Errorf("MainHandler did not return StatusNoContent %d", w.Code) + } + + got := mockRecorder.Data + + parentID := strconv.FormatInt(22964302721410078, 10) + want := Trace{ + Span{ + Name: "Child", + ID: "8090652509916334619", + TraceID: "22c4fc8ab3669045", + ParentID: parentID, + Timestamp: time.Unix(0, 1498688360851331*int64(time.Microsecond)).UTC(), + Duration: time.Duration(53106) * time.Microsecond, + ServiceName: "trivial", + Annotations: []Annotation{}, + BinaryAnnotations: []BinaryAnnotation{ + BinaryAnnotation{ + Key: "lc", + Value: "trivial", + Host: "127.0.0.1", + ServiceName: "trivial", + Type: "STRING", + }, + }, + }, + Span{ + Name: "Child", + ID: "103618986556047333", + TraceID: "22c4fc8ab3669045", + ParentID: parentID, + Timestamp: time.Unix(0, 1498688360904552*int64(time.Microsecond)).UTC(), + Duration: time.Duration(50410) * time.Microsecond, + ServiceName: "trivial", + Annotations: []Annotation{}, + BinaryAnnotations: []BinaryAnnotation{ + BinaryAnnotation{ + Key: "lc", + Value: "trivial", + Host: "127.0.0.1", + ServiceName: "trivial", + Type: "STRING", + }, + }, + }, + Span{ + Name: "Parent", + ID: "22964302721410078", + TraceID: "22c4fc8ab3669045", + ParentID: "22964302721410078", + Timestamp: time.Unix(0, 1498688360851318*int64(time.Microsecond)).UTC(), + Duration: time.Duration(103680) * time.Microsecond, + ServiceName: "trivial", + Annotations: []Annotation{ + Annotation{ + Timestamp: time.Unix(0, 1498688360851325*int64(time.Microsecond)).UTC(), + Value: "Starting child #0", + Host: "127.0.0.1", + ServiceName: "trivial", + }, + Annotation{ + Timestamp: time.Unix(0, 1498688360904545*int64(time.Microsecond)).UTC(), + Value: "Starting child #1", + Host: "127.0.0.1", + ServiceName: "trivial", + }, + Annotation{ + Timestamp: time.Unix(0, 1498688360954992*int64(time.Microsecond)).UTC(), + Value: "A Log", + Host: "127.0.0.1", + ServiceName: "trivial", + }, + }, + BinaryAnnotations: []BinaryAnnotation{ + BinaryAnnotation{ + Key: "lc", + Value: "trivial", + Host: "127.0.0.1", + ServiceName: "trivial", + Type: "STRING", + }, + }, + }, + } + + if !cmp.Equal(got, want) { + t.Fatalf("Got != Want\n %s", cmp.Diff(got, want)) + } +} diff --git a/plugins/inputs/zipkin/testdata/cli_microservice.dat b/plugins/inputs/zipkin/testdata/cli_microservice.dat new file mode 100644 index 000000000..585d6b111 Binary files /dev/null and b/plugins/inputs/zipkin/testdata/cli_microservice.dat differ diff --git a/plugins/inputs/zipkin/testdata/distributed_trace_sample.dat b/plugins/inputs/zipkin/testdata/distributed_trace_sample.dat new file mode 100644 index 000000000..9ce376f11 Binary files /dev/null and b/plugins/inputs/zipkin/testdata/distributed_trace_sample.dat differ diff --git a/plugins/inputs/zipkin/testdata/json/cli_microservice.json b/plugins/inputs/zipkin/testdata/json/cli_microservice.json new file mode 100644 index 000000000..af9446b4a --- /dev/null +++ b/plugins/inputs/zipkin/testdata/json/cli_microservice.json @@ -0,0 +1,407 @@ +[ + { + "trace_id": 243463817635710260, + "name": "Concat", + "id": 3383422996321511664, + "parent_id": 4574092882326506380, + "annotations": [ + { + "timestamp": 1499817952283903, + "value": "cs", + "host": { + "ipv4": 0, + "port": 0, + "service_name": "cli" + } + }, + { + "timestamp": 1499817952286792, + "value": "cr", + "host": { + "ipv4": 0, + "port": 0, + "service_name": "cli" + } + } + ], + "binary_annotations": [ + { + "key": "http.path", + "value": "L2NvbmNhdC8=", + "annotation_type": "STRING", + "host": { + "ipv4": 0, + "port": 0, + "service_name": "cli" + } + }, + { + "key": "http.url", + "value": "aHR0cDovL2xvY2FsaG9zdDo2MTAwMS9jb25jYXQv", + "annotation_type": "STRING", + "host": { + "ipv4": 0, + "port": 0, + "service_name": "cli" + } + }, + { + "key": "peer.hostname", + "value": "bG9jYWxob3N0", + "annotation_type": "STRING", + "host": { + "ipv4": 0, + "port": 0, + "service_name": "cli" + } + }, + { + "key": "span.kind", + "value": "Y2xpZW50", + "annotation_type": "STRING", + "host": { + "ipv4": 0, + "port": 0, + "service_name": "cli" + } + }, + { + "key": "http.method", + "value": "R0VU", + "annotation_type": "STRING", + "host": { + "ipv4": 0, + "port": 0, + "service_name": "cli" + } + }, + { + "key": "http.host", + "value": "bG9jYWxob3N0OjYxMDAx", + "annotation_type": "STRING", + "host": { + "ipv4": 0, + "port": 0, + "service_name": "cli" + } + } + ], + "timestamp": 1499817952283903, + "duration": 2888, + "trace_id_high": 8269862291023777619 + }, + { + "trace_id": 243463817635710260, + "name": "Sum", + "id": 6036416808826525494, + "parent_id": 4574092882326506380, + "annotations": [ + { + "timestamp": 1499817952286828, + "value": "cs", + "host": { + "ipv4": 0, + "port": 0, + "service_name": "cli" + } + }, + { + "timestamp": 1499817952333847, + "value": "cr", + "host": { + "ipv4": 0, + "port": 0, + "service_name": "cli" + } + } + ], + "binary_annotations": [ + { + "key": "span.kind", + "value": "Y2xpZW50", + "annotation_type": "STRING", + "host": { + "ipv4": 0, + "port": 0, + "service_name": "cli" + } + }, + { + "key": "http.method", + "value": "R0VU", + "annotation_type": "STRING", + "host": { + "ipv4": 0, + "port": 0, + "service_name": "cli" + } + }, + { + "key": "http.host", + "value": "bG9jYWxob3N0OjYxMDAx", + "annotation_type": "STRING", + "host": { + "ipv4": 0, + "port": 0, + "service_name": "cli" + } + }, + { + "key": "http.path", + "value": "L3N1bS8=", + "annotation_type": "STRING", + "host": { + "ipv4": 0, + "port": 0, + "service_name": "cli" + } + }, + { + "key": "http.url", + "value": "aHR0cDovL2xvY2FsaG9zdDo2MTAwMS9zdW0v", + "annotation_type": "STRING", + "host": { + "ipv4": 0, + "port": 0, + "service_name": "cli" + } + }, + { + "key": "peer.hostname", + "value": "bG9jYWxob3N0", + "annotation_type": "STRING", + "host": { + "ipv4": 0, + "port": 0, + "service_name": "cli" + } + } + ], + "timestamp": 1499817952286828, + "duration": 47019, + "trace_id_high": 8269862291023777619 + }, + { + "trace_id": 243463817635710260, + "name": "Run", + "id": 4574092882326506380, + "annotations": [ + { + "timestamp": 1499817952283897, + "value": "Call Concat", + "host": { + "ipv4": 0, + "port": 0, + "service_name": "cli" + } + }, + { + "timestamp": 1499817952286824, + "value": "Call Sum", + "host": { + "ipv4": 0, + "port": 0, + "service_name": "cli" + } + } + ], + "binary_annotations": [ + { + "key": "lc", + "value": "Y2xp", + "annotation_type": "STRING", + "host": { + "ipv4": 0, + "port": 0, + "service_name": "cli" + } + } + ], + "timestamp": 1499817952283881, + "duration": 50014, + "trace_id_high": 8269862291023777619 + } +] + +[ + { + "trace_id": 243463817635710260, + "name": "myComplexQuery", + "id": 4254041670140233539, + "parent_id": 8633460035494236932, + "annotations": [ + { + "timestamp": 1499817952307418, + "value": "cs", + "host": { + "ipv4": 2130706433, + "port": -4534, + "service_name": "svc2" + } + }, + { + "timestamp": 1499817952331909, + "value": "cr", + "host": { + "ipv4": 2130706433, + "port": -4534, + "service_name": "svc2" + } + } + ], + "binary_annotations": [ + { + "key": "sa", + "value": "UG9zdGdyZVNRTA==", + "annotation_type": "STRING", + "host": { + "ipv4": 2130706433, + "port": 5432, + "service_name": "PostgreSQL", + "ipv6": "AAAAAAAAAAAAAAAAAAAAAQ==" + } + }, + { + "key": "query", + "value": "U0VMRUNUIHJlY2lwZXMgRlJPTSBjb29rYm9vayBXSEVSRSB0b3BpYyA9ICd3b3JsZCBkb21pbmF0aW9uJw==", + "annotation_type": "STRING", + "host": { + "ipv4": 2130706433, + "port": -4534, + "service_name": "svc2" + } + }, + { + "key": "span.kind", + "value": "cmVzb3VyY2U=", + "annotation_type": "STRING", + "host": { + "ipv4": 2130706433, + "port": -4534, + "service_name": "svc2" + } + }, + { + "key": "peer.service", + "value": "UG9zdGdyZVNRTA==", + "annotation_type": "STRING", + "host": { + "ipv4": 2130706433, + "port": -4534, + "service_name": "svc2" + } + }, + { + "key": "peer.hostname", + "value": "bG9jYWxob3N0", + "annotation_type": "STRING", + "host": { + "ipv4": 2130706433, + "port": -4534, + "service_name": "svc2" + } + }, + { + "key": "peer.port", + "value": "AAAVOA==", + "annotation_type": "I32", + "host": { + "ipv4": 2130706433, + "port": -4534, + "service_name": "svc2" + } + } + ], + "timestamp": 1499817952307418, + "duration": 24491, + "trace_id_high": 8269862291023777619 + }, + { + "trace_id": 243463817635710260, + "name": "Sum", + "id": 8633460035494236932, + "parent_id": 6036416808826525494, + "annotations": [ + { + "timestamp": 1499817952287147, + "value": "sr", + "host": { + "ipv4": 2130706433, + "port": -4534, + "service_name": "svc2" + } + }, + { + "timestamp": 1499817952333348, + "value": "ss", + "host": { + "ipv4": 2130706433, + "port": -4534, + "service_name": "svc2" + } + }, + { + "timestamp": 1499817952296675, + "value": "MyEventAnnotation", + "host": { + "ipv4": 2130706433, + "port": -4534, + "service_name": "svc2" + } + } + ], + "binary_annotations": [ + { + "key": "span.kind", + "value": "c2VydmVy", + "annotation_type": "STRING", + "host": { + "ipv4": 2130706433, + "port": -4534, + "service_name": "svc2" + } + }, + { + "key": "serverSide", + "value": "aGVyZQ==", + "annotation_type": "STRING", + "host": { + "ipv4": 2130706433, + "port": -4534, + "service_name": "svc2" + } + }, + { + "key": "service", + "value": "c3ZjMg==", + "annotation_type": "STRING", + "host": { + "ipv4": 2130706433, + "port": -4534, + "service_name": "svc2" + } + }, + { + "key": "key1", + "value": "dmFsdWUx", + "annotation_type": "STRING", + "host": { + "ipv4": 2130706433, + "port": -4534, + "service_name": "svc2" + } + }, + { + "key": "key2", + "value": "AAAAAgAAAAA=", + "annotation_type": "I32", + "host": { + "ipv4": 2130706433, + "port": -4534, + "service_name": "svc2" + } + } + ], + "trace_id_high": 8269862291023777619 + } +] diff --git a/plugins/inputs/zipkin/testdata/json/distributed_trace_sample.json b/plugins/inputs/zipkin/testdata/json/distributed_trace_sample.json new file mode 100644 index 000000000..b41eaccf4 --- /dev/null +++ b/plugins/inputs/zipkin/testdata/json/distributed_trace_sample.json @@ -0,0 +1,30 @@ +[{ + "trace_id": 6802735349851856000, + "name": "main.dud", + "id": 6802735349851856000, + "parent_id": null, + "annotations": [ + { + "timestamp": 1433330263415871, + "value": "cs", + "host": { + "ipv4": 0, + "port": 9410, + "service_name": "go-zipkin-testclient" + }, + "duration": null + }, + { + "timestamp": 1433330263415872, + "value": "cr", + "host": { + "ipv4": 0, + "port": 9410, + "service_name": "go-zipkin-testclient" + }, + "duration": null + } + ], + "binary_annotations": [], + "debug": true +}] diff --git a/plugins/inputs/zipkin/testdata/json/threespans.json b/plugins/inputs/zipkin/testdata/json/threespans.json new file mode 100644 index 000000000..5809977df --- /dev/null +++ b/plugins/inputs/zipkin/testdata/json/threespans.json @@ -0,0 +1,92 @@ +[ + { + "trace_id": 2505404965370368069, + "name": "Child", + "id": 8090652509916334619, + "parent_id": 22964302721410078, + "annotations": [], + "binary_annotations": [ + { + "key": "lc", + "value": "dHJpdmlhbA==", + "annotation_type": "STRING", + "host": { + "ipv4": 2130706433, + "port": 0, + "service_name": "trivial" + } + } + ], + "timestamp": 1498688360851331, + "duration": 53106 + }, + { + "trace_id": 2505404965370368069, + "name": "Child", + "id": 103618986556047333, + "parent_id": 22964302721410078, + "annotations": [], + "binary_annotations": [ + { + "key": "lc", + "value": "dHJpdmlhbA==", + "annotation_type": "STRING", + "host": { + "ipv4": 2130706433, + "port": 0, + "service_name": "trivial" + } + } + ], + "timestamp": 1498688360904552, + "duration": 50410 + }, + { + "trace_id": 2505404965370368069, + "name": "Parent", + "id": 22964302721410078, + "annotations": [ + { + "timestamp": 1498688360851325, + "value": "Starting child #0", + "host": { + "ipv4": 2130706433, + "port": 0, + "service_name": "trivial" + } + }, + { + "timestamp": 1498688360904545, + "value": "Starting child #1", + "host": { + "ipv4": 2130706433, + "port": 0, + "service_name": "trivial" + } + }, + { + "timestamp": 1498688360954992, + "value": "A Log", + "host": { + "ipv4": 2130706433, + "port": 0, + "service_name": "trivial" + } + } + ], + "binary_annotations": [ + { + "key": "lc", + "value": "dHJpdmlhbA==", + "annotation_type": "STRING", + "host": { + "ipv4": 2130706433, + "port": 0, + "service_name": "trivial" + } + } + ], + "timestamp": 1498688360851318, + "duration": 103680 + } +] diff --git a/plugins/inputs/zipkin/testdata/threespans.dat b/plugins/inputs/zipkin/testdata/threespans.dat new file mode 100644 index 000000000..3fe5fb2ef Binary files /dev/null and b/plugins/inputs/zipkin/testdata/threespans.dat differ diff --git a/plugins/inputs/zipkin/zipkin.go b/plugins/inputs/zipkin/zipkin.go new file mode 100644 index 000000000..597ae0b27 --- /dev/null +++ b/plugins/inputs/zipkin/zipkin.go @@ -0,0 +1,182 @@ +package zipkin + +import ( + "context" + "fmt" + "log" + "net" + "net/http" + "strconv" + "sync" + "time" + + "github.com/gorilla/mux" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" +) + +const ( + // DefaultPort is the default port zipkin listens on, which zipkin implementations + // expect. + DefaultPort = 9411 + + // DefaultRoute is the default route zipkin uses, and zipkin implementations + // expect. + DefaultRoute = "/api/v1/spans" + + // DefaultShutdownTimeout is the max amount of time telegraf will wait + // for the plugin to shutdown + DefaultShutdownTimeout = 5 +) + +// Recorder represents a type which can record zipkin trace data as well as +// any accompanying errors, and process that data. +type Recorder interface { + Record(Trace) error + Error(error) +} + +// Handler represents a type which can register itself with a router for +// http routing, and a Recorder for trace data collection. +type Handler interface { + Register(router *mux.Router, recorder Recorder) error +} + +// BinaryAnnotation represents a zipkin binary annotation. It contains +// all of the same fields as might be found in its zipkin counterpart. +type BinaryAnnotation struct { + Key string + Value string + Host string // annotation.endpoint.ipv4 + ":" + annotation.endpoint.port + ServiceName string + Type string +} + +// Annotation represents an ordinary zipkin annotation. It contains the data fields +// which will become fields/tags in influxdb +type Annotation struct { + Timestamp time.Time + Value string + Host string // annotation.endpoint.ipv4 + ":" + annotation.endpoint.port + ServiceName string +} + +//Span represents a specific zipkin span. It holds the majority of the same +// data as a zipkin span sent via the thrift protocol, but is presented in a +// format which is more straightforward for storage purposes. +type Span struct { + ID string + TraceID string // zipkin traceid high concat with traceid + Name string + ParentID string + ServiceName string + Timestamp time.Time // If zipkin input is nil then time.Now() + Duration time.Duration + Annotations []Annotation + BinaryAnnotations []BinaryAnnotation +} + +// Trace is an array (or a series) of spans +type Trace []Span + +const sampleConfig = ` + # path = "/api/v1/spans" # URL path for span data + # port = 9411 # Port on which Telegraf listens +` + +// Zipkin is a telegraf configuration structure for the zipkin input plugin, +// but it also contains fields for the management of a separate, concurrent +// zipkin http server +type Zipkin struct { + ServiceAddress string + Port int + Path string + + address string + handler Handler + server *http.Server + waitGroup *sync.WaitGroup +} + +// Description is a necessary method implementation from telegraf.ServiceInput +func (z Zipkin) Description() string { + return "This plugin implements the Zipkin http server to gather trace and timing data needed to troubleshoot latency problems in microservice architectures." +} + +// SampleConfig is a necessary method implementation from telegraf.ServiceInput +func (z Zipkin) SampleConfig() string { + return sampleConfig +} + +// Gather is empty for the zipkin plugin; all gathering is done through +// the separate goroutine launched in (*Zipkin).Start() +func (z *Zipkin) Gather(acc telegraf.Accumulator) error { return nil } + +// Start launches a separate goroutine for collecting zipkin client http requests, +// passing in a telegraf.Accumulator such that data can be collected. +func (z *Zipkin) Start(acc telegraf.Accumulator) error { + z.handler = NewSpanHandler(z.Path) + + var wg sync.WaitGroup + z.waitGroup = &wg + + router := mux.NewRouter() + converter := NewLineProtocolConverter(acc) + z.handler.Register(router, converter) + + z.server = &http.Server{ + Handler: router, + } + + addr := ":" + strconv.Itoa(z.Port) + ln, err := net.Listen("tcp", addr) + if err != nil { + return err + } + + z.address = ln.Addr().String() + log.Printf("I! Started the zipkin listener on %s", z.address) + + go func() { + wg.Add(1) + defer wg.Done() + + z.Listen(ln, acc) + }() + + return nil +} + +// Stop shuts the internal http server down with via context.Context +func (z *Zipkin) Stop() { + ctx, cancel := context.WithTimeout(context.Background(), DefaultShutdownTimeout) + + defer z.waitGroup.Wait() + defer cancel() + + z.server.Shutdown(ctx) +} + +// Listen creates an http server on the zipkin instance it is called with, and +// serves http until it is stopped by Zipkin's (*Zipkin).Stop() method. +func (z *Zipkin) Listen(ln net.Listener, acc telegraf.Accumulator) { + if err := z.server.Serve(ln); err != nil { + // Because of the clean shutdown in `(*Zipkin).Stop()` + // We're expecting a server closed error at some point + // So we don't want to display it as an error. + // This interferes with telegraf's internal data collection, + // by making it appear as if a serious error occurred. + if err != http.ErrServerClosed { + acc.AddError(fmt.Errorf("E! Error listening: %v", err)) + } + } +} + +func init() { + inputs.Add("zipkin", func() telegraf.Input { + return &Zipkin{ + Path: DefaultRoute, + Port: DefaultPort, + } + }) +} diff --git a/plugins/inputs/zipkin/zipkin_test.go b/plugins/inputs/zipkin/zipkin_test.go new file mode 100644 index 000000000..9447e67d4 --- /dev/null +++ b/plugins/inputs/zipkin/zipkin_test.go @@ -0,0 +1,289 @@ +package zipkin + +import ( + "bytes" + "fmt" + "io/ioutil" + "net/http" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/influxdata/telegraf/testutil" +) + +func TestZipkinPlugin(t *testing.T) { + mockAcc := testutil.Accumulator{} + + tests := []struct { + name string + thriftDataFile string //path name to a binary thrift data file which contains test data + wantErr bool + want []testutil.Metric + }{ + { + name: "threespan", + thriftDataFile: "testdata/threespans.dat", + want: []testutil.Metric{ + testutil.Metric{ + Measurement: "zipkin", + Tags: map[string]string{ + "id": "8090652509916334619", + "parent_id": "22964302721410078", + "trace_id": "22c4fc8ab3669045", + "service_name": "trivial", + "name": "Child", + }, + Fields: map[string]interface{}{ + "duration_ns": (time.Duration(53106) * time.Microsecond).Nanoseconds(), + }, + Time: time.Unix(0, 1498688360851331000).UTC(), + }, + testutil.Metric{ + Measurement: "zipkin", + Tags: map[string]string{ + "id": "8090652509916334619", + "parent_id": "22964302721410078", + "trace_id": "22c4fc8ab3669045", + "name": "Child", + "service_name": "trivial", + "annotation": "trivial", //base64: dHJpdmlhbA== + "endpoint_host": "127.0.0.1", + "annotation_key": "lc", + }, + Fields: map[string]interface{}{ + "duration_ns": (time.Duration(53106) * time.Microsecond).Nanoseconds(), + }, + Time: time.Unix(0, 1498688360851331000).UTC(), + }, + testutil.Metric{ + Measurement: "zipkin", + Tags: map[string]string{ + "id": "103618986556047333", + "parent_id": "22964302721410078", + "trace_id": "22c4fc8ab3669045", + "service_name": "trivial", + "name": "Child", + }, + Fields: map[string]interface{}{ + "duration_ns": (time.Duration(50410) * time.Microsecond).Nanoseconds(), + }, + Time: time.Unix(0, 1498688360904552000).UTC(), + }, + testutil.Metric{ + Measurement: "zipkin", + Tags: map[string]string{ + "id": "103618986556047333", + "parent_id": "22964302721410078", + "trace_id": "22c4fc8ab3669045", + "name": "Child", + "service_name": "trivial", + "annotation": "trivial", //base64: dHJpdmlhbA== + "endpoint_host": "127.0.0.1", + "annotation_key": "lc", + }, + Fields: map[string]interface{}{ + "duration_ns": (time.Duration(50410) * time.Microsecond).Nanoseconds(), + }, + Time: time.Unix(0, 1498688360904552000).UTC(), + }, + testutil.Metric{ + Measurement: "zipkin", + Tags: map[string]string{ + "id": "22964302721410078", + "parent_id": "22964302721410078", + "trace_id": "22c4fc8ab3669045", + "service_name": "trivial", + "name": "Parent", + }, + Fields: map[string]interface{}{ + "duration_ns": (time.Duration(103680) * time.Microsecond).Nanoseconds(), + }, + Time: time.Unix(0, 1498688360851318000).UTC(), + }, + testutil.Metric{ + Measurement: "zipkin", + Tags: map[string]string{ + "service_name": "trivial", + "annotation": "Starting child #0", + "endpoint_host": "127.0.0.1", + "id": "22964302721410078", + "parent_id": "22964302721410078", + "trace_id": "22c4fc8ab3669045", + "name": "Parent", + }, + Fields: map[string]interface{}{ + "duration_ns": (time.Duration(103680) * time.Microsecond).Nanoseconds(), + }, + Time: time.Unix(0, 1498688360851318000).UTC(), + }, + testutil.Metric{ + Measurement: "zipkin", + Tags: map[string]string{ + "service_name": "trivial", + "annotation": "Starting child #1", + "endpoint_host": "127.0.0.1", + "id": "22964302721410078", + "parent_id": "22964302721410078", + "trace_id": "22c4fc8ab3669045", + "name": "Parent", + }, + Fields: map[string]interface{}{ + "duration_ns": (time.Duration(103680) * time.Microsecond).Nanoseconds(), + }, + Time: time.Unix(0, 1498688360851318000).UTC(), + }, + testutil.Metric{ + Measurement: "zipkin", + Tags: map[string]string{ + "parent_id": "22964302721410078", + "trace_id": "22c4fc8ab3669045", + "name": "Parent", + "service_name": "trivial", + "annotation": "A Log", + "endpoint_host": "127.0.0.1", + "id": "22964302721410078", + }, + Fields: map[string]interface{}{ + "duration_ns": (time.Duration(103680) * time.Microsecond).Nanoseconds(), + }, + Time: time.Unix(0, 1498688360851318000).UTC(), + }, + testutil.Metric{ + Measurement: "zipkin", + Tags: map[string]string{ + "trace_id": "22c4fc8ab3669045", + "service_name": "trivial", + "annotation": "trivial", //base64: dHJpdmlhbA== + "annotation_key": "lc", + "id": "22964302721410078", + "parent_id": "22964302721410078", + "name": "Parent", + "endpoint_host": "127.0.0.1", + }, + Fields: map[string]interface{}{ + "duration_ns": (time.Duration(103680) * time.Microsecond).Nanoseconds(), + }, + Time: time.Unix(0, 1498688360851318000).UTC(), + }, + }, + wantErr: false, + }, + { + name: "distributed_trace_sample", + thriftDataFile: "testdata/distributed_trace_sample.dat", + want: []testutil.Metric{ + testutil.Metric{ + Measurement: "zipkin", + Tags: map[string]string{ + "id": "6802735349851856000", + "parent_id": "6802735349851856000", + "trace_id": "5e682bc21ce99c80", + "service_name": "go-zipkin-testclient", + "name": "main.dud", + }, + Fields: map[string]interface{}{ + "duration_ns": (time.Duration(1) * time.Microsecond).Nanoseconds(), + }, + //Time: time.Unix(1, 0).UTC(), + Time: time.Unix(0, 1433330263415871*int64(time.Microsecond)).UTC(), + }, + testutil.Metric{ + Measurement: "zipkin", + Tags: map[string]string{ + "annotation": "cs", + "endpoint_host": "0.0.0.0:9410", + "id": "6802735349851856000", + "parent_id": "6802735349851856000", + "trace_id": "5e682bc21ce99c80", + "name": "main.dud", + "service_name": "go-zipkin-testclient", + }, + Fields: map[string]interface{}{ + "duration_ns": (time.Duration(1) * time.Microsecond).Nanoseconds(), + }, + //Time: time.Unix(1, 0).UTC(), + Time: time.Unix(0, 1433330263415871*int64(time.Microsecond)).UTC(), + }, + testutil.Metric{ + Measurement: "zipkin", + Tags: map[string]string{ + "annotation": "cr", + "endpoint_host": "0.0.0.0:9410", + "id": "6802735349851856000", + "parent_id": "6802735349851856000", + "trace_id": "5e682bc21ce99c80", + "name": "main.dud", + "service_name": "go-zipkin-testclient", + }, + Fields: map[string]interface{}{ + "duration_ns": (time.Duration(1) * time.Microsecond).Nanoseconds(), + }, + Time: time.Unix(0, 1433330263415871*int64(time.Microsecond)).UTC(), + }, + }, + }, + } + + z := &Zipkin{ + Path: "/api/v1/spans", + Port: 0, + } + + err := z.Start(&mockAcc) + if err != nil { + t.Fatal("Failed to start zipkin server") + } + + defer z.Stop() + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockAcc.ClearMetrics() + if err := postThriftData(tt.thriftDataFile, z.address); err != nil { + t.Fatalf("Posting data to http endpoint /api/v1/spans failed. Error: %s\n", err) + } + mockAcc.Wait(len(tt.want)) //Since the server is running concurrently, we need to wait for the number of data points we want to test to be added to the Accumulator. + if len(mockAcc.Errors) > 0 != tt.wantErr { + t.Fatalf("Got unexpected errors. want error = %v, errors = %v\n", tt.wantErr, mockAcc.Errors) + } + + var got []testutil.Metric + for _, m := range mockAcc.Metrics { + got = append(got, *m) + } + + if !cmp.Equal(tt.want, got) { + t.Fatalf("Got != Want\n %s", cmp.Diff(tt.want, got)) + } + }) + } + mockAcc.ClearMetrics() + z.Stop() + // Make sure there is no erroneous error on shutdown + if len(mockAcc.Errors) != 0 { + t.Fatal("Expected no errors on shutdown") + } +} + +func postThriftData(datafile, address string) error { + dat, err := ioutil.ReadFile(datafile) + if err != nil { + return fmt.Errorf("could not read from data file %s", datafile) + } + + req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/v1/spans", address), bytes.NewReader(dat)) + + if err != nil { + return fmt.Errorf("HTTP request creation failed") + } + + req.Header.Set("Content-Type", "application/x-thrift") + client := &http.Client{} + _, err = client.Do(req) + if err != nil { + return fmt.Errorf("HTTP POST request to zipkin endpoint %s failed %v", address, err) + } + + return nil +}