From 68cd7a45ef9d1766d584ca400bf58b57b85bc70c Mon Sep 17 00:00:00 2001 From: David Norton Date: Tue, 3 Oct 2017 09:04:29 -0400 Subject: [PATCH] bug fixes and refactoring --- .../webhooks/particle/particle_webhooks.go | 61 ++++++++++++------- .../particle/particle_webhooks_events.go | 22 ------- .../particle_webhooks_events_json_test.go | 39 ------------ .../particle/particle_webhooks_test.go | 60 +++++++++++++++--- 4 files changed, 90 insertions(+), 92 deletions(-) delete mode 100644 plugins/inputs/webhooks/particle/particle_webhooks_events.go delete mode 100644 plugins/inputs/webhooks/particle/particle_webhooks_events_json_test.go diff --git a/plugins/inputs/webhooks/particle/particle_webhooks.go b/plugins/inputs/webhooks/particle/particle_webhooks.go index 813bd0665..258619856 100644 --- a/plugins/inputs/webhooks/particle/particle_webhooks.go +++ b/plugins/inputs/webhooks/particle/particle_webhooks.go @@ -2,14 +2,40 @@ package particle import ( "encoding/json" - "github.com/gorilla/mux" - "github.com/influxdata/telegraf" - "io/ioutil" "log" "net/http" "time" + + "github.com/gorilla/mux" + "github.com/influxdata/telegraf" ) +type event struct { + Name string `json:"event"` + Data data `json:"data"` + TTL int `json:"ttl"` + PublishedAt string `json:"published_at"` + Database string `json:"influx_db"` +} + +type data struct { + Tags map[string]string `json:"tags"` + Fields map[string]interface{} `json:"values"` +} + +func newEvent() *event { + return &event{ + Data: data{ + Tags: make(map[string]string), + Fields: make(map[string]interface{}), + }, + } +} + +func (e *event) Time() (time.Time, error) { + return time.Parse("2006-01-02T15:04:05Z", e.PublishedAt) +} + type ParticleWebhook struct { Path string acc telegraf.Accumulator @@ -23,26 +49,19 @@ func (rb *ParticleWebhook) Register(router *mux.Router, acc telegraf.Accumulator func (rb *ParticleWebhook) eventHandler(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() - data, err := ioutil.ReadAll(r.Body) + e := newEvent() + if err := json.NewDecoder(r.Body).Decode(e); err != nil { + log.Println(err) + w.WriteHeader(http.StatusBadRequest) + return + } + + pTime, err := e.Time() if err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - dummy := &DummyData{} - if err := json.Unmarshal(data, dummy); err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - pd := &ParticleData{} - if err := json.Unmarshal([]byte(dummy.Data), pd); err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - pTime, err := dummy.Time() - if err != nil { - log.Printf("Time Conversion Error") pTime = time.Now() + log.Printf("error parsing particle event time: %s. Using telegraf host time instead: %s", e.PublishedAt, pTime) } - rb.acc.AddFields(dummy.InfluxDB, pd.Fields, pd.Tags, pTime) + + rb.acc.AddFields(e.Name, e.Data.Fields, e.Data.Tags, pTime) w.WriteHeader(http.StatusOK) } diff --git a/plugins/inputs/webhooks/particle/particle_webhooks_events.go b/plugins/inputs/webhooks/particle/particle_webhooks_events.go deleted file mode 100644 index 089536525..000000000 --- a/plugins/inputs/webhooks/particle/particle_webhooks_events.go +++ /dev/null @@ -1,22 +0,0 @@ -package particle - -import ( - "time" -) - -type DummyData struct { - Event string `json:"event"` - Data string `json:"data"` - Ttl int `json:"ttl"` - PublishedAt string `json:"published_at"` - InfluxDB string `json:"influx_db"` -} -type ParticleData struct { - Event string `json:"event"` - Tags map[string]string `json:"tags"` - Fields map[string]interface{} `json:"values"` -} - -func (d *DummyData) Time() (time.Time, error) { - return time.Parse("2006-01-02T15:04:05Z", d.PublishedAt) -} diff --git a/plugins/inputs/webhooks/particle/particle_webhooks_events_json_test.go b/plugins/inputs/webhooks/particle/particle_webhooks_events_json_test.go deleted file mode 100644 index aef0537e9..000000000 --- a/plugins/inputs/webhooks/particle/particle_webhooks_events_json_test.go +++ /dev/null @@ -1,39 +0,0 @@ -package particle - -func NewItemJSON() string { - return ` - { - "event": "temperature", - "data": "{ - "tags": { - "id": "230035001147343438323536", - "location": "TravelingWilbury" - }, - "values": { - "temp_c": 26.680000, - "temp_f": 80.024001, - "humidity": 44.937500, - "pressure": 998.998901, - "altitude": 119.331436, - "broadband": 1266, - "infrared": 528, - "lux": 0 - } - }", - "ttl": 60, - "published_at": "2017-09-28T21:54:10.897Z", - "coreid": "123456789938323536", - "userid": "1234ee123ac8e5ec1231a123d", - "version": 10, - "public": false, - "productID": 1234, - "name": "sensor" - "influx_db": "mydata" - }` -} -func UnknowJSON() string { - return ` - { - "event": "roger" - }` -} diff --git a/plugins/inputs/webhooks/particle/particle_webhooks_test.go b/plugins/inputs/webhooks/particle/particle_webhooks_test.go index c62e0f0c8..eecf26e14 100644 --- a/plugins/inputs/webhooks/particle/particle_webhooks_test.go +++ b/plugins/inputs/webhooks/particle/particle_webhooks_test.go @@ -1,17 +1,16 @@ package particle import ( - "github.com/influxdata/telegraf/testutil" - "log" "net/http" "net/http/httptest" "strings" "testing" + + "github.com/influxdata/telegraf/testutil" ) func postWebhooks(rb *ParticleWebhook, eventBody string) *httptest.ResponseRecorder { req, _ := http.NewRequest("POST", "/", strings.NewReader(eventBody)) - log.Printf("eventBody: %s\n", eventBody) w := httptest.NewRecorder() w.Code = 500 @@ -21,10 +20,10 @@ func postWebhooks(rb *ParticleWebhook, eventBody string) *httptest.ResponseRecor } func TestNewItem(t *testing.T) { + t.Parallel() var acc testutil.Accumulator rb := &ParticleWebhook{Path: "/particle", acc: &acc} resp := postWebhooks(rb, NewItemJSON()) - log.Printf("Respnse: %s\n", resp.Body) if resp.Code != http.StatusOK { t.Errorf("POST new_item returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK) } @@ -32,12 +31,12 @@ func TestNewItem(t *testing.T) { fields := map[string]interface{}{ "temp_c": 26.680000, "temp_f": 80.024001, + "infrared": 528.0, + "lux": 0.0, "humidity": 44.937500, "pressure": 998.998901, "altitude": 119.331436, - "broadband": 1266, - "infrared": 528, - "lux": 0, + "broadband": 1266.0, } tags := map[string]string{ @@ -45,13 +44,54 @@ func TestNewItem(t *testing.T) { "location": "TravelingWilbury", } - acc.AssertContainsTaggedFields(t, "particle_webhooks", fields, tags) + acc.AssertContainsTaggedFields(t, "temperature", fields, tags) } + func TestUnknowItem(t *testing.T) { - rb := &ParticleWebhook{Path: "/particle"} + t.Parallel() + var acc testutil.Accumulator + rb := &ParticleWebhook{Path: "/particle", acc: &acc} resp := postWebhooks(rb, UnknowJSON()) - log.Printf("Response: %s\n", resp.Body) if resp.Code != http.StatusOK { t.Errorf("POST unknown returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK) } } + +func NewItemJSON() string { + return ` + { + "event": "temperature", + "data": { + "tags": { + "id": "230035001147343438323536", + "location": "TravelingWilbury" + }, + "values": { + "temp_c": 26.680000, + "temp_f": 80.024001, + "humidity": 44.937500, + "pressure": 998.998901, + "altitude": 119.331436, + "broadband": 1266, + "infrared": 528, + "lux": 0 + } + }, + "ttl": 60, + "published_at": "2017-09-28T21:54:10.897Z", + "coreid": "123456789938323536", + "userid": "1234ee123ac8e5ec1231a123d", + "version": 10, + "public": false, + "productID": 1234, + "name": "sensor", + "influx_db": "mydata" + }` +} + +func UnknowJSON() string { + return ` + { + "event": "roger" + }` +}