Add JSON input support to zipkin plugin (#3150)
This commit is contained in:
committed by
Daniel Nelson
parent
1f1e9cc49f
commit
13a6b917c3
@@ -2,22 +2,23 @@ package zipkin
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"mime"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/apache/thrift/lib/go/thrift"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/openzipkin/zipkin-go-opentracing/_thrift/gen-go/zipkincore"
|
||||
"github.com/influxdata/telegraf/plugins/inputs/zipkin/codec"
|
||||
"github.com/influxdata/telegraf/plugins/inputs/zipkin/codec/jsonV1"
|
||||
"github.com/influxdata/telegraf/plugins/inputs/zipkin/codec/thrift"
|
||||
)
|
||||
|
||||
// SpanHandler is an implementation of a Handler which accepts zipkin thrift
|
||||
// span data and sends it to the recorder
|
||||
type SpanHandler struct {
|
||||
Path string
|
||||
recorder Recorder
|
||||
waitGroup *sync.WaitGroup
|
||||
Path string
|
||||
recorder Recorder
|
||||
}
|
||||
|
||||
// NewSpanHandler returns a new server instance given path to handle
|
||||
@@ -81,6 +82,12 @@ func (s *SpanHandler) Spans(w http.ResponseWriter, r *http.Request) {
|
||||
defer body.Close()
|
||||
}
|
||||
|
||||
decoder, err := ContentDecoder(r)
|
||||
if err != nil {
|
||||
s.recorder.Error(err)
|
||||
w.WriteHeader(http.StatusUnsupportedMediaType)
|
||||
}
|
||||
|
||||
octets, err := ioutil.ReadAll(body)
|
||||
if err != nil {
|
||||
s.recorder.Error(err)
|
||||
@@ -88,14 +95,19 @@ func (s *SpanHandler) Spans(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
spans, err := unmarshalThrift(octets)
|
||||
spans, err := decoder.Decode(octets)
|
||||
if err != nil {
|
||||
s.recorder.Error(err)
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
trace := NewTrace(spans)
|
||||
trace, err := codec.NewTrace(spans)
|
||||
if err != nil {
|
||||
s.recorder.Error(err)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if err = s.recorder.Record(trace); err != nil {
|
||||
s.recorder.Error(err)
|
||||
@@ -106,30 +118,25 @@ func (s *SpanHandler) Spans(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
func unmarshalThrift(body []byte) ([]*zipkincore.Span, error) {
|
||||
buffer := thrift.NewTMemoryBuffer()
|
||||
if _, err := buffer.Write(body); err != nil {
|
||||
return nil, err
|
||||
// ContentDecoer returns a Decoder that is able to produce Traces from bytes.
|
||||
// Failure should yield an HTTP 415 (`http.StatusUnsupportedMediaType`)
|
||||
// If a Content-Type is not set, zipkin assumes application/json
|
||||
func ContentDecoder(r *http.Request) (codec.Decoder, error) {
|
||||
contentType := r.Header.Get("Content-Type")
|
||||
if contentType == "" {
|
||||
return &jsonV1.JSON{}, nil
|
||||
}
|
||||
|
||||
transport := thrift.NewTBinaryProtocolTransport(buffer)
|
||||
_, size, err := transport.ReadListBegin()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
spans := make([]*zipkincore.Span, size)
|
||||
for i := 0; i < size; i++ {
|
||||
zs := &zipkincore.Span{}
|
||||
if err = zs.Read(transport); err != nil {
|
||||
return nil, err
|
||||
for _, v := range strings.Split(contentType, ",") {
|
||||
t, _, err := mime.ParseMediaType(v)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
if t == "application/json" {
|
||||
return &jsonV1.JSON{}, nil
|
||||
} else if t == "application/x-thrift" {
|
||||
return &thrift.Thrift{}, nil
|
||||
}
|
||||
spans[i] = zs
|
||||
}
|
||||
|
||||
if err = transport.ReadListEnd(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return spans, nil
|
||||
return nil, fmt.Errorf("Unknown Content-Type: %s", contentType)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user