Add Zipkin input plugin (#3080)
This commit is contained in:
135
plugins/inputs/zipkin/handler.go
Normal file
135
plugins/inputs/zipkin/handler.go
Normal file
@@ -0,0 +1,135 @@
|
||||
package zipkin
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/apache/thrift/lib/go/thrift"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/openzipkin/zipkin-go-opentracing/_thrift/gen-go/zipkincore"
|
||||
)
|
||||
|
||||
// SpanHandler is an implementation of a Handler which accepts zipkin thrift
|
||||
// span data and sends it to the recorder
|
||||
type SpanHandler struct {
|
||||
Path string
|
||||
recorder Recorder
|
||||
waitGroup *sync.WaitGroup
|
||||
}
|
||||
|
||||
// NewSpanHandler returns a new server instance given path to handle
|
||||
func NewSpanHandler(path string) *SpanHandler {
|
||||
return &SpanHandler{
|
||||
Path: path,
|
||||
}
|
||||
}
|
||||
|
||||
func cors(next http.HandlerFunc) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if origin := r.Header.Get("Origin"); origin != "" {
|
||||
w.Header().Set(`Access-Control-Allow-Origin`, origin)
|
||||
w.Header().Set(`Access-Control-Allow-Methods`, strings.Join([]string{
|
||||
`OPTIONS`,
|
||||
`POST`,
|
||||
}, ", "))
|
||||
|
||||
w.Header().Set(`Access-Control-Allow-Headers`, strings.Join([]string{
|
||||
`Accept`,
|
||||
`Accept-Encoding`,
|
||||
`Content-Length`,
|
||||
`Content-Type`,
|
||||
}, ", "))
|
||||
|
||||
w.Header().Set(`Access-Control-Expose-Headers`, strings.Join([]string{
|
||||
`Date`,
|
||||
}, ", "))
|
||||
}
|
||||
|
||||
if r.Method == "OPTIONS" {
|
||||
return
|
||||
}
|
||||
|
||||
next.ServeHTTP(w, r)
|
||||
}
|
||||
}
|
||||
|
||||
// Register implements the Service interface. Register accepts zipkin thrift data
|
||||
// POSTed to the path of the mux router
|
||||
func (s *SpanHandler) Register(router *mux.Router, recorder Recorder) error {
|
||||
handler := cors(http.HandlerFunc(s.Spans))
|
||||
router.Handle(s.Path, handler).Methods("POST", "OPTIONS")
|
||||
s.recorder = recorder
|
||||
return nil
|
||||
}
|
||||
|
||||
// Spans handles zipkin thrift spans
|
||||
func (s *SpanHandler) Spans(w http.ResponseWriter, r *http.Request) {
|
||||
defer r.Body.Close()
|
||||
body := r.Body
|
||||
var err error
|
||||
// Handle gzip decoding of the body
|
||||
if r.Header.Get("Content-Encoding") == "gzip" {
|
||||
body, err = gzip.NewReader(r.Body)
|
||||
if err != nil {
|
||||
s.recorder.Error(err)
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
defer body.Close()
|
||||
}
|
||||
|
||||
octets, err := ioutil.ReadAll(body)
|
||||
if err != nil {
|
||||
s.recorder.Error(err)
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
spans, err := unmarshalThrift(octets)
|
||||
if err != nil {
|
||||
s.recorder.Error(err)
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
trace := NewTrace(spans)
|
||||
|
||||
if err = s.recorder.Record(trace); err != nil {
|
||||
s.recorder.Error(err)
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
func unmarshalThrift(body []byte) ([]*zipkincore.Span, error) {
|
||||
buffer := thrift.NewTMemoryBuffer()
|
||||
if _, err := buffer.Write(body); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
transport := thrift.NewTBinaryProtocolTransport(buffer)
|
||||
_, size, err := transport.ReadListBegin()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
spans := make([]*zipkincore.Span, size)
|
||||
for i := 0; i < size; i++ {
|
||||
zs := &zipkincore.Span{}
|
||||
if err = zs.Read(transport); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
spans[i] = zs
|
||||
}
|
||||
|
||||
if err = transport.ReadListEnd(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return spans, nil
|
||||
}
|
||||
Reference in New Issue
Block a user