diff --git a/plugins/inputs/webhooks/README.md b/plugins/inputs/webhooks/README.md index e6af86621..13141fc4b 100644 --- a/plugins/inputs/webhooks/README.md +++ b/plugins/inputs/webhooks/README.md @@ -20,6 +20,8 @@ $ sudo service telegraf start - [Mandrill](mandrill/) - [Rollbar](rollbar/) - [Papertrail](papertrail/) +- [Particle](particle/) + ## Adding new webhooks plugin diff --git a/plugins/inputs/webhooks/particle/README.md b/plugins/inputs/webhooks/particle/README.md new file mode 100644 index 000000000..4e3426da5 --- /dev/null +++ b/plugins/inputs/webhooks/particle/README.md @@ -0,0 +1,39 @@ +# particle webhooks + + +You should configure your Particle.io's Webhooks to point at the `webhooks` service. To do this go to `(https://console.particle.io/)[https://console.particle.io]` and click `Integrations > New Integration > Webhook`. In the resulting page set `URL` to `http://:1619/particle`, and under `Advanced Settings` click on `JSON` and add: + +``` +{ + "measurement": "your_measurement_name" +} +``` + +If required, enter your username and password, etc. and then click `Save` + + + +## Events + +Your Particle device should publish an event that contains a JSON in the form of: +``` +String data = String::format("{ \"tags\" : { + \"tag_name\": \"tag_value\", + \"other_tag\": \"other_value\" + }, + \"values\": { + \"value_name\": %f, + \"other_value\": %f, + } + }", value_value, other_value + ); + Particle.publish("event_name", data, PRIVATE); +``` + +Escaping the "" is required in the source file. +The number of tag values and field values is not restrictied so you can send as many values per webhook call as you'd like. + +You will need to enable JSON messages in the Webhooks setup of Particle.io, and make sure to check the "include default data" box as well. + + +See [webhook doc](https://docs.particle.io/reference/webhooks/) diff --git a/plugins/inputs/webhooks/particle/particle_webhooks.go b/plugins/inputs/webhooks/particle/particle_webhooks.go new file mode 100644 index 000000000..aa3499935 --- /dev/null +++ b/plugins/inputs/webhooks/particle/particle_webhooks.go @@ -0,0 +1,64 @@ +package particle + +import ( + "encoding/json" + "net/http" + "time" + + "github.com/gorilla/mux" + "github.com/influxdata/telegraf" +) + +type event struct { + Name string `json:"event"` + Data data `json:"data"` + TTL int `json:"ttl"` + PublishedAt string `json:"published_at"` + Database string `json:"measurement"` +} + +type data struct { + Tags map[string]string `json:"tags"` + Fields map[string]interface{} `json:"values"` +} + +func newEvent() *event { + return &event{ + Data: data{ + Tags: make(map[string]string), + Fields: make(map[string]interface{}), + }, + } +} + +func (e *event) Time() (time.Time, error) { + return time.Parse("2006-01-02T15:04:05Z", e.PublishedAt) +} + +type ParticleWebhook struct { + Path string + acc telegraf.Accumulator +} + +func (rb *ParticleWebhook) Register(router *mux.Router, acc telegraf.Accumulator) { + router.HandleFunc(rb.Path, rb.eventHandler).Methods("POST") + rb.acc = acc +} + +func (rb *ParticleWebhook) eventHandler(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + e := newEvent() + if err := json.NewDecoder(r.Body).Decode(e); err != nil { + rb.acc.AddError(err) + w.WriteHeader(http.StatusBadRequest) + return + } + + pTime, err := e.Time() + if err != nil { + pTime = time.Now() + } + + rb.acc.AddFields(e.Name, e.Data.Fields, e.Data.Tags, pTime) + w.WriteHeader(http.StatusOK) +} diff --git a/plugins/inputs/webhooks/particle/particle_webhooks_test.go b/plugins/inputs/webhooks/particle/particle_webhooks_test.go new file mode 100644 index 000000000..dc6213367 --- /dev/null +++ b/plugins/inputs/webhooks/particle/particle_webhooks_test.go @@ -0,0 +1,97 @@ +package particle + +import ( + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/influxdata/telegraf/testutil" +) + +func postWebhooks(rb *ParticleWebhook, eventBody string) *httptest.ResponseRecorder { + req, _ := http.NewRequest("POST", "/", strings.NewReader(eventBody)) + w := httptest.NewRecorder() + w.Code = 500 + + rb.eventHandler(w, req) + + return w +} + +func TestNewItem(t *testing.T) { + t.Parallel() + var acc testutil.Accumulator + rb := &ParticleWebhook{Path: "/particle", acc: &acc} + resp := postWebhooks(rb, NewItemJSON()) + if resp.Code != http.StatusOK { + t.Errorf("POST new_item returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK) + } + + fields := map[string]interface{}{ + "temp_c": 26.680000, + "temp_f": 80.024001, + "infrared": 528.0, + "lux": 0.0, + "humidity": 44.937500, + "pressure": 998.998901, + "altitude": 119.331436, + "broadband": 1266.0, + } + + tags := map[string]string{ + "id": "230035001147343438323536", + "location": "TravelingWilbury", + } + + acc.AssertContainsTaggedFields(t, "temperature", fields, tags) +} + +func TestUnknowItem(t *testing.T) { + t.Parallel() + var acc testutil.Accumulator + rb := &ParticleWebhook{Path: "/particle", acc: &acc} + resp := postWebhooks(rb, UnknowJSON()) + if resp.Code != http.StatusOK { + t.Errorf("POST unknown returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK) + } +} + +func NewItemJSON() string { + return ` + { + "event": "temperature", + "data": { + "tags": { + "id": "230035001147343438323536", + "location": "TravelingWilbury" + }, + "values": { + "temp_c": 26.680000, + "temp_f": 80.024001, + "humidity": 44.937500, + "pressure": 998.998901, + "altitude": 119.331436, + "broadband": 1266.0, + "infrared": 528.0, + "lux": 0.0 + } + }, + "ttl": 60, + "published_at": "2017-09-28T21:54:10.897Z", + "coreid": "123456789938323536", + "userid": "1234ee123ac8e5ec1231a123d", + "version": 10, + "public": false, + "productID": 1234, + "name": "sensor", + "measurement": "mydata" + }` +} + +func UnknowJSON() string { + return ` + { + "event": "roger" + }` +} diff --git a/plugins/inputs/webhooks/webhooks.go b/plugins/inputs/webhooks/webhooks.go index 698cde159..d8a6e07d4 100644 --- a/plugins/inputs/webhooks/webhooks.go +++ b/plugins/inputs/webhooks/webhooks.go @@ -15,6 +15,7 @@ import ( "github.com/influxdata/telegraf/plugins/inputs/webhooks/github" "github.com/influxdata/telegraf/plugins/inputs/webhooks/mandrill" "github.com/influxdata/telegraf/plugins/inputs/webhooks/papertrail" + "github.com/influxdata/telegraf/plugins/inputs/webhooks/particle" "github.com/influxdata/telegraf/plugins/inputs/webhooks/rollbar" ) @@ -34,6 +35,7 @@ type Webhooks struct { Mandrill *mandrill.MandrillWebhook Rollbar *rollbar.RollbarWebhook Papertrail *papertrail.PapertrailWebhook + Particle *particle.ParticleWebhook srv *http.Server } @@ -62,6 +64,9 @@ func (wb *Webhooks) SampleConfig() string { [inputs.webhooks.papertrail] path = "/papertrail" + + [inputs.webhooks.particle] + path = "/particle" ` } diff --git a/plugins/inputs/webhooks/webhooks_test.go b/plugins/inputs/webhooks/webhooks_test.go index 6d3448870..a44e41433 100644 --- a/plugins/inputs/webhooks/webhooks_test.go +++ b/plugins/inputs/webhooks/webhooks_test.go @@ -6,6 +6,7 @@ import ( "github.com/influxdata/telegraf/plugins/inputs/webhooks/github" "github.com/influxdata/telegraf/plugins/inputs/webhooks/papertrail" + "github.com/influxdata/telegraf/plugins/inputs/webhooks/particle" "github.com/influxdata/telegraf/plugins/inputs/webhooks/rollbar" ) @@ -33,4 +34,10 @@ func TestAvailableWebhooks(t *testing.T) { if !reflect.DeepEqual(wb.AvailableWebhooks(), expected) { t.Errorf("expected to be %v.\nGot %v", expected, wb.AvailableWebhooks()) } + + wb.Particle = &particle.ParticleWebhook{Path: "/particle"} + expected = append(expected, wb.Particle) + if !reflect.DeepEqual(wb.AvailableWebhooks(), expected) { + t.Errorf("expected to be %v.\nGot %v", expected, wb.AvailableWebhooks()) + } }