From 70b3e763e79d2c9dfaed228f9eaf9591655a1505 Mon Sep 17 00:00:00 2001 From: Ross McDonald Date: Mon, 17 Apr 2017 15:49:36 -0500 Subject: [PATCH] Add input for receiving papertrail webhooks (#2038) --- CHANGELOG.md | 1 + plugins/inputs/webhooks/README.md | 1 + plugins/inputs/webhooks/papertrail/README.md | 32 ++++ .../webhooks/papertrail/papertrail_test.go | 181 ++++++++++++++++++ .../papertrail/papertrail_webhooks.go | 79 ++++++++ .../papertrail/papertrail_webhooks_models.go | 41 ++++ plugins/inputs/webhooks/webhooks.go | 13 +- plugins/inputs/webhooks/webhooks_test.go | 7 + 8 files changed, 351 insertions(+), 4 deletions(-) create mode 100644 plugins/inputs/webhooks/papertrail/README.md create mode 100644 plugins/inputs/webhooks/papertrail/papertrail_test.go create mode 100644 plugins/inputs/webhooks/papertrail/papertrail_webhooks.go create mode 100644 plugins/inputs/webhooks/papertrail/papertrail_webhooks_models.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 6715ef3bb..0c7b7c2fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -71,6 +71,7 @@ be deprecated eventually. - [#1100](https://github.com/influxdata/telegraf/issues/1100): Add collectd parser - [#1820](https://github.com/influxdata/telegraf/issues/1820): easier plugin testing without outputs - [#2493](https://github.com/influxdata/telegraf/pull/2493): Check signature in the GitHub webhook plugin +- [#2038](https://github.com/influxdata/telegraf/issues/2038): Add papertrail support to webhooks ### Bugfixes diff --git a/plugins/inputs/webhooks/README.md b/plugins/inputs/webhooks/README.md index bc7714e9e..8b789e338 100644 --- a/plugins/inputs/webhooks/README.md +++ b/plugins/inputs/webhooks/README.md @@ -19,6 +19,7 @@ $ sudo service telegraf start - [Github](github/) - [Mandrill](mandrill/) - [Rollbar](rollbar/) +- [Papertrail](papertrail/) ## Adding new webhooks plugin diff --git a/plugins/inputs/webhooks/papertrail/README.md b/plugins/inputs/webhooks/papertrail/README.md new file mode 100644 index 000000000..a3463dcaa --- /dev/null +++ b/plugins/inputs/webhooks/papertrail/README.md @@ -0,0 +1,32 @@ +# papertrail webhooks + +Enables Telegraf to act as a [Papertrail Webhook](http://help.papertrailapp.com/kb/how-it-works/web-hooks/). + +## Events + +[Full documentation](http://help.papertrailapp.com/kb/how-it-works/web-hooks/#callback). + +Events from Papertrail come in two forms: + +* The [event-based callback](http://help.papertrailapp.com/kb/how-it-works/web-hooks/#callback): + + * A point is created per event, with the timestamp as `received_at` + * Each point has a field counter (`count`), which is set to `1` (signifying the event occurred) + * Each event "hostname" object is converted to a `host` tag + * The "saved_search" name in the payload is added as an `event` tag + +* The [count-based callback](http://help.papertrailapp.com/kb/how-it-works/web-hooks/#count-only-webhooks) + + * A point is created per timeseries object per count, with the timestamp as the "timeseries" key (the unix epoch of the event) + * Each point has a field counter (`count`), which is set to the value of each "timeseries" object + * Each count "source_name" object is converted to a `host` tag + * The "saved_search" name in the payload is added as an `event` tag + +The current functionality is very basic, however this allows you to +track the number of events by host and saved search. + +When an event is received, any point will look similar to: + +``` +papertrail,host=myserver.example.com,event=saved_search_name count=3i 1453248892000000000 +``` diff --git a/plugins/inputs/webhooks/papertrail/papertrail_test.go b/plugins/inputs/webhooks/papertrail/papertrail_test.go new file mode 100644 index 000000000..14b8aec89 --- /dev/null +++ b/plugins/inputs/webhooks/papertrail/papertrail_test.go @@ -0,0 +1,181 @@ +package papertrail + +import ( + "net/http" + "net/http/httptest" + "net/url" + "strings" + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +const ( + contentType = "application/x-www-form-urlencoded" +) + +func post(pt *PapertrailWebhook, contentType string, body string) *httptest.ResponseRecorder { + req, _ := http.NewRequest("POST", "/", strings.NewReader(body)) + req.Header.Set("Content-Type", contentType) + w := httptest.NewRecorder() + pt.eventHandler(w, req) + return w +} + +func TestWrongContentType(t *testing.T) { + var acc testutil.Accumulator + pt := &PapertrailWebhook{Path: "/papertrail", acc: &acc} + form := url.Values{} + form.Set("payload", sampleEventPayload) + data := form.Encode() + + resp := post(pt, "", data) + require.Equal(t, http.StatusUnsupportedMediaType, resp.Code) +} + +func TestMissingPayload(t *testing.T) { + var acc testutil.Accumulator + pt := &PapertrailWebhook{Path: "/papertrail", acc: &acc} + + resp := post(pt, contentType, "") + require.Equal(t, http.StatusBadRequest, resp.Code) +} + +func TestPayloadNotJSON(t *testing.T) { + var acc testutil.Accumulator + pt := &PapertrailWebhook{Path: "/papertrail", acc: &acc} + + resp := post(pt, contentType, "payload={asdf]") + require.Equal(t, http.StatusBadRequest, resp.Code) +} + +func TestPayloadInvalidJSON(t *testing.T) { + var acc testutil.Accumulator + pt := &PapertrailWebhook{Path: "/papertrail", acc: &acc} + + resp := post(pt, contentType, `payload={"value": 42}`) + require.Equal(t, http.StatusBadRequest, resp.Code) +} + +func TestEventPayload(t *testing.T) { + var acc testutil.Accumulator + pt := &PapertrailWebhook{Path: "/papertrail", acc: &acc} + + form := url.Values{} + form.Set("payload", sampleEventPayload) + resp := post(pt, contentType, form.Encode()) + require.Equal(t, http.StatusOK, resp.Code) + + fields := map[string]interface{}{ + "count": uint64(1), + } + + tags1 := map[string]string{ + "event": "Important stuff", + "host": "abc", + } + tags2 := map[string]string{ + "event": "Important stuff", + "host": "def", + } + + acc.AssertContainsTaggedFields(t, "papertrail", fields, tags1) + acc.AssertContainsTaggedFields(t, "papertrail", fields, tags2) +} + +func TestCountPayload(t *testing.T) { + var acc testutil.Accumulator + pt := &PapertrailWebhook{Path: "/papertrail", acc: &acc} + form := url.Values{} + form.Set("payload", sampleCountPayload) + resp := post(pt, contentType, form.Encode()) + require.Equal(t, http.StatusOK, resp.Code) + + fields1 := map[string]interface{}{ + "count": uint64(5), + } + fields2 := map[string]interface{}{ + "count": uint64(3), + } + + tags1 := map[string]string{ + "event": "Important stuff", + "host": "arthur", + } + tags2 := map[string]string{ + "event": "Important stuff", + "host": "ford", + } + + acc.AssertContainsTaggedFields(t, "papertrail", fields1, tags1) + acc.AssertContainsTaggedFields(t, "papertrail", fields2, tags2) +} + +const sampleEventPayload = `{ + "events": [ + { + "id": 7711561783320576, + "received_at": "2011-05-18T20:30:02-07:00", + "display_received_at": "May 18 20:30:02", + "source_ip": "208.75.57.121", + "source_name": "abc", + "source_id": 2, + "hostname": "abc", + "program": "CROND", + "severity": "Info", + "facility": "Cron", + "message": "message body" + }, + { + "id": 7711562567655424, + "received_at": "2011-05-18T20:30:02-07:00", + "display_received_at": "May 18 20:30:02", + "source_ip": "208.75.57.120", + "source_name": "server1", + "source_id": 19, + "hostname": "def", + "program": "CROND", + "severity": "Info", + "facility": "Cron", + "message": "A short event" + } + ], + "saved_search": { + "id": 42, + "name": "Important stuff", + "query": "cron OR server1", + "html_edit_url": "https://papertrailapp.com/searches/42/edit", + "html_search_url": "https://papertrailapp.com/searches/42" + }, + "max_id": "7711582041804800", + "min_id": "7711561783320576" +}` + +const sampleCountPayload = `{ + "counts": [ + { + "source_name": "arthur", + "source_id": 4, + "timeseries": { + "1453248895": 5 + } + }, + { + "source_name": "ford", + "source_id": 3, + "timeseries": { + "1453248927": 3 + } + } + ], + "saved_search": { + "id": 42, + "name": "Important stuff", + "query": "cron OR server1", + "html_edit_url": "https://papertrailapp.com/searches/42/edit", + "html_search_url": "https://papertrailapp.com/searches/42" + }, + "max_id": "7711582041804800", + "min_id": "7711561783320576" +}` diff --git a/plugins/inputs/webhooks/papertrail/papertrail_webhooks.go b/plugins/inputs/webhooks/papertrail/papertrail_webhooks.go new file mode 100644 index 000000000..42453c130 --- /dev/null +++ b/plugins/inputs/webhooks/papertrail/papertrail_webhooks.go @@ -0,0 +1,79 @@ +package papertrail + +import ( + "encoding/json" + "log" + "net/http" + "time" + + "github.com/gorilla/mux" + "github.com/influxdata/telegraf" +) + +type PapertrailWebhook struct { + Path string + acc telegraf.Accumulator +} + +func (pt *PapertrailWebhook) Register(router *mux.Router, acc telegraf.Accumulator) { + router.HandleFunc(pt.Path, pt.eventHandler).Methods("POST") + log.Printf("I! Started the papertrail_webhook on %s", pt.Path) + pt.acc = acc +} + +func (pt *PapertrailWebhook) eventHandler(w http.ResponseWriter, r *http.Request) { + if r.Header.Get("Content-Type") != "application/x-www-form-urlencoded" { + http.Error(w, "Unsupported Media Type", http.StatusUnsupportedMediaType) + return + } + + data := r.PostFormValue("payload") + if data == "" { + http.Error(w, "Bad Request", http.StatusBadRequest) + return + } + + var payload Payload + err := json.Unmarshal([]byte(data), &payload) + if err != nil { + http.Error(w, "Bad Request", http.StatusBadRequest) + return + } + + if payload.Events != nil { + + // Handle event-based payload + for _, e := range payload.Events { + // Warning: Duplicate event timestamps will overwrite each other + tags := map[string]string{ + "host": e.Hostname, + "event": payload.SavedSearch.Name, + } + fields := map[string]interface{}{ + "count": uint64(1), + } + pt.acc.AddFields("papertrail", fields, tags, e.ReceivedAt) + } + + } else if payload.Counts != nil { + + // Handle count-based payload + for _, c := range payload.Counts { + for ts, count := range *c.TimeSeries { + tags := map[string]string{ + "host": c.SourceName, + "event": payload.SavedSearch.Name, + } + fields := map[string]interface{}{ + "count": count, + } + pt.acc.AddFields("papertrail", fields, tags, time.Unix(ts, 0)) + } + } + } else { + http.Error(w, "Bad Request", http.StatusBadRequest) + return + } + + w.WriteHeader(http.StatusOK) +} diff --git a/plugins/inputs/webhooks/papertrail/papertrail_webhooks_models.go b/plugins/inputs/webhooks/papertrail/papertrail_webhooks_models.go new file mode 100644 index 000000000..dd4e8d8bd --- /dev/null +++ b/plugins/inputs/webhooks/papertrail/papertrail_webhooks_models.go @@ -0,0 +1,41 @@ +package papertrail + +import ( + "time" +) + +type Event struct { + ID int64 `json:"id"` + ReceivedAt time.Time `json:"received_at"` + DisplayReceivedAt string `json:"display_received_at"` + SourceIP string `json:"source_ip"` + SourceName string `json:"source_name"` + SourceID int `json:"source_id"` + Hostname string `json:"hostname"` + Program string `json:"program"` + Severity string `json:"severity"` + Facility string `json:"facility"` + Message string `json:"message"` +} + +type Count struct { + SourceName string `json:"source_name"` + SourceID int64 `json:"source_id"` + TimeSeries *map[int64]uint64 `json:"timeseries"` +} + +type SavedSearch struct { + ID int64 `json:"id"` + Name string `json:"name"` + Query string `json:"query"` + EditURL string `json:"html_edit_url"` + SearchURL string `json:"html_search_url"` +} + +type Payload struct { + Events []*Event `json:"events"` + Counts []*Count `json:"counts"` + SavedSearch *SavedSearch `json:"saved_search"` + MaxID string `json:"max_id"` + MinID string `json:"min_id"` +} diff --git a/plugins/inputs/webhooks/webhooks.go b/plugins/inputs/webhooks/webhooks.go index bc8519d7a..7ed1ccd51 100644 --- a/plugins/inputs/webhooks/webhooks.go +++ b/plugins/inputs/webhooks/webhooks.go @@ -13,6 +13,7 @@ import ( "github.com/influxdata/telegraf/plugins/inputs/webhooks/filestack" "github.com/influxdata/telegraf/plugins/inputs/webhooks/github" "github.com/influxdata/telegraf/plugins/inputs/webhooks/mandrill" + "github.com/influxdata/telegraf/plugins/inputs/webhooks/papertrail" "github.com/influxdata/telegraf/plugins/inputs/webhooks/rollbar" ) @@ -27,10 +28,11 @@ func init() { type Webhooks struct { ServiceAddress string - Github *github.GithubWebhook - Filestack *filestack.FilestackWebhook - Mandrill *mandrill.MandrillWebhook - Rollbar *rollbar.RollbarWebhook + Github *github.GithubWebhook + Filestack *filestack.FilestackWebhook + Mandrill *mandrill.MandrillWebhook + Rollbar *rollbar.RollbarWebhook + Papertrail *papertrail.PapertrailWebhook } func NewWebhooks() *Webhooks { @@ -54,6 +56,9 @@ func (wb *Webhooks) SampleConfig() string { [inputs.webhooks.rollbar] path = "/rollbar" + + [inputs.webhooks.papertrail] + path = "/papertrail" ` } diff --git a/plugins/inputs/webhooks/webhooks_test.go b/plugins/inputs/webhooks/webhooks_test.go index 85d359e1c..6d3448870 100644 --- a/plugins/inputs/webhooks/webhooks_test.go +++ b/plugins/inputs/webhooks/webhooks_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/influxdata/telegraf/plugins/inputs/webhooks/github" + "github.com/influxdata/telegraf/plugins/inputs/webhooks/papertrail" "github.com/influxdata/telegraf/plugins/inputs/webhooks/rollbar" ) @@ -26,4 +27,10 @@ func TestAvailableWebhooks(t *testing.T) { if !reflect.DeepEqual(wb.AvailableWebhooks(), expected) { t.Errorf("expected to be %v.\nGot %v", expected, wb.AvailableWebhooks()) } + + wb.Papertrail = &papertrail.PapertrailWebhook{Path: "/papertrail"} + expected = append(expected, wb.Papertrail) + if !reflect.DeepEqual(wb.AvailableWebhooks(), expected) { + t.Errorf("expected to be %v.\nGot %v", expected, wb.AvailableWebhooks()) + } }