Add input for receiving papertrail webhooks (#2038)
This commit is contained in:
parent
58ee962679
commit
70b3e763e7
|
@ -71,6 +71,7 @@ be deprecated eventually.
|
||||||
- [#1100](https://github.com/influxdata/telegraf/issues/1100): Add collectd parser
|
- [#1100](https://github.com/influxdata/telegraf/issues/1100): Add collectd parser
|
||||||
- [#1820](https://github.com/influxdata/telegraf/issues/1820): easier plugin testing without outputs
|
- [#1820](https://github.com/influxdata/telegraf/issues/1820): easier plugin testing without outputs
|
||||||
- [#2493](https://github.com/influxdata/telegraf/pull/2493): Check signature in the GitHub webhook plugin
|
- [#2493](https://github.com/influxdata/telegraf/pull/2493): Check signature in the GitHub webhook plugin
|
||||||
|
- [#2038](https://github.com/influxdata/telegraf/issues/2038): Add papertrail support to webhooks
|
||||||
|
|
||||||
### Bugfixes
|
### Bugfixes
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@ $ sudo service telegraf start
|
||||||
- [Github](github/)
|
- [Github](github/)
|
||||||
- [Mandrill](mandrill/)
|
- [Mandrill](mandrill/)
|
||||||
- [Rollbar](rollbar/)
|
- [Rollbar](rollbar/)
|
||||||
|
- [Papertrail](papertrail/)
|
||||||
|
|
||||||
## Adding new webhooks plugin
|
## Adding new webhooks plugin
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,32 @@
|
||||||
|
# papertrail webhooks
|
||||||
|
|
||||||
|
Enables Telegraf to act as a [Papertrail Webhook](http://help.papertrailapp.com/kb/how-it-works/web-hooks/).
|
||||||
|
|
||||||
|
## Events
|
||||||
|
|
||||||
|
[Full documentation](http://help.papertrailapp.com/kb/how-it-works/web-hooks/#callback).
|
||||||
|
|
||||||
|
Events from Papertrail come in two forms:
|
||||||
|
|
||||||
|
* The [event-based callback](http://help.papertrailapp.com/kb/how-it-works/web-hooks/#callback):
|
||||||
|
|
||||||
|
* A point is created per event, with the timestamp as `received_at`
|
||||||
|
* Each point has a field counter (`count`), which is set to `1` (signifying the event occurred)
|
||||||
|
* Each event "hostname" object is converted to a `host` tag
|
||||||
|
* The "saved_search" name in the payload is added as an `event` tag
|
||||||
|
|
||||||
|
* The [count-based callback](http://help.papertrailapp.com/kb/how-it-works/web-hooks/#count-only-webhooks)
|
||||||
|
|
||||||
|
* A point is created per timeseries object per count, with the timestamp as the "timeseries" key (the unix epoch of the event)
|
||||||
|
* Each point has a field counter (`count`), which is set to the value of each "timeseries" object
|
||||||
|
* Each count "source_name" object is converted to a `host` tag
|
||||||
|
* The "saved_search" name in the payload is added as an `event` tag
|
||||||
|
|
||||||
|
The current functionality is very basic, however this allows you to
|
||||||
|
track the number of events by host and saved search.
|
||||||
|
|
||||||
|
When an event is received, any point will look similar to:
|
||||||
|
|
||||||
|
```
|
||||||
|
papertrail,host=myserver.example.com,event=saved_search_name count=3i 1453248892000000000
|
||||||
|
```
|
|
@ -0,0 +1,181 @@
|
||||||
|
package papertrail
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"net/url"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
contentType = "application/x-www-form-urlencoded"
|
||||||
|
)
|
||||||
|
|
||||||
|
func post(pt *PapertrailWebhook, contentType string, body string) *httptest.ResponseRecorder {
|
||||||
|
req, _ := http.NewRequest("POST", "/", strings.NewReader(body))
|
||||||
|
req.Header.Set("Content-Type", contentType)
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
pt.eventHandler(w, req)
|
||||||
|
return w
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWrongContentType(t *testing.T) {
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
pt := &PapertrailWebhook{Path: "/papertrail", acc: &acc}
|
||||||
|
form := url.Values{}
|
||||||
|
form.Set("payload", sampleEventPayload)
|
||||||
|
data := form.Encode()
|
||||||
|
|
||||||
|
resp := post(pt, "", data)
|
||||||
|
require.Equal(t, http.StatusUnsupportedMediaType, resp.Code)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMissingPayload(t *testing.T) {
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
pt := &PapertrailWebhook{Path: "/papertrail", acc: &acc}
|
||||||
|
|
||||||
|
resp := post(pt, contentType, "")
|
||||||
|
require.Equal(t, http.StatusBadRequest, resp.Code)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPayloadNotJSON(t *testing.T) {
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
pt := &PapertrailWebhook{Path: "/papertrail", acc: &acc}
|
||||||
|
|
||||||
|
resp := post(pt, contentType, "payload={asdf]")
|
||||||
|
require.Equal(t, http.StatusBadRequest, resp.Code)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPayloadInvalidJSON(t *testing.T) {
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
pt := &PapertrailWebhook{Path: "/papertrail", acc: &acc}
|
||||||
|
|
||||||
|
resp := post(pt, contentType, `payload={"value": 42}`)
|
||||||
|
require.Equal(t, http.StatusBadRequest, resp.Code)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEventPayload(t *testing.T) {
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
pt := &PapertrailWebhook{Path: "/papertrail", acc: &acc}
|
||||||
|
|
||||||
|
form := url.Values{}
|
||||||
|
form.Set("payload", sampleEventPayload)
|
||||||
|
resp := post(pt, contentType, form.Encode())
|
||||||
|
require.Equal(t, http.StatusOK, resp.Code)
|
||||||
|
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"count": uint64(1),
|
||||||
|
}
|
||||||
|
|
||||||
|
tags1 := map[string]string{
|
||||||
|
"event": "Important stuff",
|
||||||
|
"host": "abc",
|
||||||
|
}
|
||||||
|
tags2 := map[string]string{
|
||||||
|
"event": "Important stuff",
|
||||||
|
"host": "def",
|
||||||
|
}
|
||||||
|
|
||||||
|
acc.AssertContainsTaggedFields(t, "papertrail", fields, tags1)
|
||||||
|
acc.AssertContainsTaggedFields(t, "papertrail", fields, tags2)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCountPayload(t *testing.T) {
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
pt := &PapertrailWebhook{Path: "/papertrail", acc: &acc}
|
||||||
|
form := url.Values{}
|
||||||
|
form.Set("payload", sampleCountPayload)
|
||||||
|
resp := post(pt, contentType, form.Encode())
|
||||||
|
require.Equal(t, http.StatusOK, resp.Code)
|
||||||
|
|
||||||
|
fields1 := map[string]interface{}{
|
||||||
|
"count": uint64(5),
|
||||||
|
}
|
||||||
|
fields2 := map[string]interface{}{
|
||||||
|
"count": uint64(3),
|
||||||
|
}
|
||||||
|
|
||||||
|
tags1 := map[string]string{
|
||||||
|
"event": "Important stuff",
|
||||||
|
"host": "arthur",
|
||||||
|
}
|
||||||
|
tags2 := map[string]string{
|
||||||
|
"event": "Important stuff",
|
||||||
|
"host": "ford",
|
||||||
|
}
|
||||||
|
|
||||||
|
acc.AssertContainsTaggedFields(t, "papertrail", fields1, tags1)
|
||||||
|
acc.AssertContainsTaggedFields(t, "papertrail", fields2, tags2)
|
||||||
|
}
|
||||||
|
|
||||||
|
const sampleEventPayload = `{
|
||||||
|
"events": [
|
||||||
|
{
|
||||||
|
"id": 7711561783320576,
|
||||||
|
"received_at": "2011-05-18T20:30:02-07:00",
|
||||||
|
"display_received_at": "May 18 20:30:02",
|
||||||
|
"source_ip": "208.75.57.121",
|
||||||
|
"source_name": "abc",
|
||||||
|
"source_id": 2,
|
||||||
|
"hostname": "abc",
|
||||||
|
"program": "CROND",
|
||||||
|
"severity": "Info",
|
||||||
|
"facility": "Cron",
|
||||||
|
"message": "message body"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"id": 7711562567655424,
|
||||||
|
"received_at": "2011-05-18T20:30:02-07:00",
|
||||||
|
"display_received_at": "May 18 20:30:02",
|
||||||
|
"source_ip": "208.75.57.120",
|
||||||
|
"source_name": "server1",
|
||||||
|
"source_id": 19,
|
||||||
|
"hostname": "def",
|
||||||
|
"program": "CROND",
|
||||||
|
"severity": "Info",
|
||||||
|
"facility": "Cron",
|
||||||
|
"message": "A short event"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"saved_search": {
|
||||||
|
"id": 42,
|
||||||
|
"name": "Important stuff",
|
||||||
|
"query": "cron OR server1",
|
||||||
|
"html_edit_url": "https://papertrailapp.com/searches/42/edit",
|
||||||
|
"html_search_url": "https://papertrailapp.com/searches/42"
|
||||||
|
},
|
||||||
|
"max_id": "7711582041804800",
|
||||||
|
"min_id": "7711561783320576"
|
||||||
|
}`
|
||||||
|
|
||||||
|
const sampleCountPayload = `{
|
||||||
|
"counts": [
|
||||||
|
{
|
||||||
|
"source_name": "arthur",
|
||||||
|
"source_id": 4,
|
||||||
|
"timeseries": {
|
||||||
|
"1453248895": 5
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"source_name": "ford",
|
||||||
|
"source_id": 3,
|
||||||
|
"timeseries": {
|
||||||
|
"1453248927": 3
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"saved_search": {
|
||||||
|
"id": 42,
|
||||||
|
"name": "Important stuff",
|
||||||
|
"query": "cron OR server1",
|
||||||
|
"html_edit_url": "https://papertrailapp.com/searches/42/edit",
|
||||||
|
"html_search_url": "https://papertrailapp.com/searches/42"
|
||||||
|
},
|
||||||
|
"max_id": "7711582041804800",
|
||||||
|
"min_id": "7711561783320576"
|
||||||
|
}`
|
|
@ -0,0 +1,79 @@
|
||||||
|
package papertrail
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/gorilla/mux"
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
)
|
||||||
|
|
||||||
|
type PapertrailWebhook struct {
|
||||||
|
Path string
|
||||||
|
acc telegraf.Accumulator
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pt *PapertrailWebhook) Register(router *mux.Router, acc telegraf.Accumulator) {
|
||||||
|
router.HandleFunc(pt.Path, pt.eventHandler).Methods("POST")
|
||||||
|
log.Printf("I! Started the papertrail_webhook on %s", pt.Path)
|
||||||
|
pt.acc = acc
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pt *PapertrailWebhook) eventHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.Header.Get("Content-Type") != "application/x-www-form-urlencoded" {
|
||||||
|
http.Error(w, "Unsupported Media Type", http.StatusUnsupportedMediaType)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
data := r.PostFormValue("payload")
|
||||||
|
if data == "" {
|
||||||
|
http.Error(w, "Bad Request", http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var payload Payload
|
||||||
|
err := json.Unmarshal([]byte(data), &payload)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, "Bad Request", http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if payload.Events != nil {
|
||||||
|
|
||||||
|
// Handle event-based payload
|
||||||
|
for _, e := range payload.Events {
|
||||||
|
// Warning: Duplicate event timestamps will overwrite each other
|
||||||
|
tags := map[string]string{
|
||||||
|
"host": e.Hostname,
|
||||||
|
"event": payload.SavedSearch.Name,
|
||||||
|
}
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"count": uint64(1),
|
||||||
|
}
|
||||||
|
pt.acc.AddFields("papertrail", fields, tags, e.ReceivedAt)
|
||||||
|
}
|
||||||
|
|
||||||
|
} else if payload.Counts != nil {
|
||||||
|
|
||||||
|
// Handle count-based payload
|
||||||
|
for _, c := range payload.Counts {
|
||||||
|
for ts, count := range *c.TimeSeries {
|
||||||
|
tags := map[string]string{
|
||||||
|
"host": c.SourceName,
|
||||||
|
"event": payload.SavedSearch.Name,
|
||||||
|
}
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"count": count,
|
||||||
|
}
|
||||||
|
pt.acc.AddFields("papertrail", fields, tags, time.Unix(ts, 0))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
http.Error(w, "Bad Request", http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
}
|
|
@ -0,0 +1,41 @@
|
||||||
|
package papertrail
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Event struct {
|
||||||
|
ID int64 `json:"id"`
|
||||||
|
ReceivedAt time.Time `json:"received_at"`
|
||||||
|
DisplayReceivedAt string `json:"display_received_at"`
|
||||||
|
SourceIP string `json:"source_ip"`
|
||||||
|
SourceName string `json:"source_name"`
|
||||||
|
SourceID int `json:"source_id"`
|
||||||
|
Hostname string `json:"hostname"`
|
||||||
|
Program string `json:"program"`
|
||||||
|
Severity string `json:"severity"`
|
||||||
|
Facility string `json:"facility"`
|
||||||
|
Message string `json:"message"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Count struct {
|
||||||
|
SourceName string `json:"source_name"`
|
||||||
|
SourceID int64 `json:"source_id"`
|
||||||
|
TimeSeries *map[int64]uint64 `json:"timeseries"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type SavedSearch struct {
|
||||||
|
ID int64 `json:"id"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
Query string `json:"query"`
|
||||||
|
EditURL string `json:"html_edit_url"`
|
||||||
|
SearchURL string `json:"html_search_url"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Payload struct {
|
||||||
|
Events []*Event `json:"events"`
|
||||||
|
Counts []*Count `json:"counts"`
|
||||||
|
SavedSearch *SavedSearch `json:"saved_search"`
|
||||||
|
MaxID string `json:"max_id"`
|
||||||
|
MinID string `json:"min_id"`
|
||||||
|
}
|
|
@ -13,6 +13,7 @@ import (
|
||||||
"github.com/influxdata/telegraf/plugins/inputs/webhooks/filestack"
|
"github.com/influxdata/telegraf/plugins/inputs/webhooks/filestack"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs/webhooks/github"
|
"github.com/influxdata/telegraf/plugins/inputs/webhooks/github"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs/webhooks/mandrill"
|
"github.com/influxdata/telegraf/plugins/inputs/webhooks/mandrill"
|
||||||
|
"github.com/influxdata/telegraf/plugins/inputs/webhooks/papertrail"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs/webhooks/rollbar"
|
"github.com/influxdata/telegraf/plugins/inputs/webhooks/rollbar"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -27,10 +28,11 @@ func init() {
|
||||||
type Webhooks struct {
|
type Webhooks struct {
|
||||||
ServiceAddress string
|
ServiceAddress string
|
||||||
|
|
||||||
Github *github.GithubWebhook
|
Github *github.GithubWebhook
|
||||||
Filestack *filestack.FilestackWebhook
|
Filestack *filestack.FilestackWebhook
|
||||||
Mandrill *mandrill.MandrillWebhook
|
Mandrill *mandrill.MandrillWebhook
|
||||||
Rollbar *rollbar.RollbarWebhook
|
Rollbar *rollbar.RollbarWebhook
|
||||||
|
Papertrail *papertrail.PapertrailWebhook
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWebhooks() *Webhooks {
|
func NewWebhooks() *Webhooks {
|
||||||
|
@ -54,6 +56,9 @@ func (wb *Webhooks) SampleConfig() string {
|
||||||
|
|
||||||
[inputs.webhooks.rollbar]
|
[inputs.webhooks.rollbar]
|
||||||
path = "/rollbar"
|
path = "/rollbar"
|
||||||
|
|
||||||
|
[inputs.webhooks.papertrail]
|
||||||
|
path = "/papertrail"
|
||||||
`
|
`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/plugins/inputs/webhooks/github"
|
"github.com/influxdata/telegraf/plugins/inputs/webhooks/github"
|
||||||
|
"github.com/influxdata/telegraf/plugins/inputs/webhooks/papertrail"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs/webhooks/rollbar"
|
"github.com/influxdata/telegraf/plugins/inputs/webhooks/rollbar"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -26,4 +27,10 @@ func TestAvailableWebhooks(t *testing.T) {
|
||||||
if !reflect.DeepEqual(wb.AvailableWebhooks(), expected) {
|
if !reflect.DeepEqual(wb.AvailableWebhooks(), expected) {
|
||||||
t.Errorf("expected to be %v.\nGot %v", expected, wb.AvailableWebhooks())
|
t.Errorf("expected to be %v.\nGot %v", expected, wb.AvailableWebhooks())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
wb.Papertrail = &papertrail.PapertrailWebhook{Path: "/papertrail"}
|
||||||
|
expected = append(expected, wb.Papertrail)
|
||||||
|
if !reflect.DeepEqual(wb.AvailableWebhooks(), expected) {
|
||||||
|
t.Errorf("expected to be %v.\nGot %v", expected, wb.AvailableWebhooks())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue