From 484122b7d44e20aea76d3a53cb234a755c7e5921 Mon Sep 17 00:00:00 2001 From: Sebastien Leger Date: Fri, 31 May 2019 00:17:04 +0200 Subject: [PATCH] Add open_weather_map input plugin (#5125) --- plugins/inputs/all/all.go | 1 + plugins/inputs/openweathermap/README.md | 73 +++ .../inputs/openweathermap/openweathermap.go | 305 +++++++++++ .../openweathermap/openweathermap_test.go | 482 ++++++++++++++++++ testutil/accumulator.go | 26 +- 5 files changed, 883 insertions(+), 4 deletions(-) create mode 100644 plugins/inputs/openweathermap/README.md create mode 100644 plugins/inputs/openweathermap/openweathermap.go create mode 100644 plugins/inputs/openweathermap/openweathermap_test.go diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 47f977f32..a626fce92 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -101,6 +101,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/nvidia_smi" _ "github.com/influxdata/telegraf/plugins/inputs/openldap" _ "github.com/influxdata/telegraf/plugins/inputs/opensmtpd" + _ "github.com/influxdata/telegraf/plugins/inputs/openweathermap" _ "github.com/influxdata/telegraf/plugins/inputs/passenger" _ "github.com/influxdata/telegraf/plugins/inputs/pf" _ "github.com/influxdata/telegraf/plugins/inputs/pgbouncer" diff --git a/plugins/inputs/openweathermap/README.md b/plugins/inputs/openweathermap/README.md new file mode 100644 index 000000000..7b781b129 --- /dev/null +++ b/plugins/inputs/openweathermap/README.md @@ -0,0 +1,73 @@ +# Telegraf Plugin: openweathermap + +OpenWeatherMap provides the current weather and forecasts for more than 200,000 cities. To use this plugin you will need a token. For more information [click here](https://openweathermap.org/appid). + +Find city identifiers in this [list](http://bulk.openweathermap.org/sample/city.list.json.gz). You can also use this [url](https://openweathermap.org/find) as an alternative to downloading a file. The ID is in the url of the city: `https://openweathermap.org/city/2643743` + +### Configuration: + +```toml +[[inputs.openweathermap]] + ## Root url of API to pull stats + # base_url = "https://api.openweathermap.org/data/2.5/" + ## Your personal user token from openweathermap.org + # app_id = "xxxxxxxxxxxxxxxxxxxxxxx" + ## List of city identifiers + # city_id = ["2988507", "519188"] + ## HTTP response timeout (default: 5s) + # response_timeout = "5s" + ## Query the current weather and future forecast + # fetch = ["weather", "forecast"] + ## For temperature in Fahrenheit use units=imperial + ## For temperature in Celsius use units=metric (default) + # units = "metric" +``` + +### Metrics: + ++ weather + - fields: + - humidity (int, Humidity percentage) + - temperature (float, Unit: Celcius) + - pressure (float, Atmospheric pressure in hPa) + - rain (float, Rain volume for the last 3 hours, mm) + - wind_speed (float, Wind speed. Unit Default: meter/sec) + - wind_degrees (float, Wind direction, degrees) + - tags: + - city_id + - forecast + +### Example Output: + +Using this configuration: +```toml +[[inputs.openweathermap]] + base_url = "https://api.openweathermap.org/data/2.5/" + app_id = "change_this_with_your_appid" + city_id = ["2988507", "519188"] + response_timeout = "5s" + fetch = ["weather", "forecast"] + units = "metric" +``` + +When run with: +``` +./telegraf -config telegraf.conf -input-filter openweathermap -test +``` + +It produces data similar to: +``` +> weather,city_id=4303602,forecast=* humidity=51i,pressure=1012,rain=0,temperature=16.410000000000025,wind_degrees=170,wind_speed=2.6 1556393944000000000 +> weather,city_id=2988507,forecast=* humidity=87i,pressure=1020,rain=0,temperature=7.110000000000014,wind_degrees=260,wind_speed=5.1 1556393841000000000 +> weather,city_id=2988507,forecast=3h humidity=69i,pressure=1020.38,rain=0,temperature=5.650000000000034,wind_degrees=268.456,wind_speed=5.83 1556398800000000000 +> weather,city_id=2988507,forecast=* humidity=69i,pressure=1020.38,rain=0,temperature=5.650000000000034,wind_degrees=268.456,wind_speed=5.83 1556398800000000000 +> weather,city_id=2988507,forecast=6h humidity=74i,pressure=1020.87,rain=0,temperature=5.810000000000002,wind_degrees=261.296,wind_speed=5.43 1556409600000000000 +> weather,city_id=2988507,forecast=* humidity=74i,pressure=1020.87,rain=0,temperature=5.810000000000002,wind_degrees=261.296,wind_speed=5.43 1556409600000000000 +> weather,city_id=4303602,forecast=9h humidity=66i,pressure=1010.63,rain=0,temperature=14.740000000000009,wind_degrees=196.264,wind_speed=4.3 1556398800000000000 +> weather,city_id=4303602,forecast=* humidity=66i,pressure=1010.63,rain=0,temperature=14.740000000000009,wind_degrees=196.264,wind_speed=4.3 1556398800000000000 +``` + + + + + diff --git a/plugins/inputs/openweathermap/openweathermap.go b/plugins/inputs/openweathermap/openweathermap.go new file mode 100644 index 000000000..1c246d0b6 --- /dev/null +++ b/plugins/inputs/openweathermap/openweathermap.go @@ -0,0 +1,305 @@ +package openweathermap + +import ( + "bufio" + "encoding/json" + "fmt" + "net/http" + "net/url" + "strconv" + "strings" + "sync" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/inputs" +) + +type OpenWeatherMap struct { + BaseUrl string + AppId string + CityId []string + + client *http.Client + + ResponseTimeout internal.Duration + Fetch []string + Units string +} + +// https://openweathermap.org/current#severalid +// Call for several city IDs +// The limit of locations is 20. +const owmRequestSeveralCityId int = 20 +const defaultResponseTimeout time.Duration = time.Second * 5 +const defaultUnits string = "metric" + +var sampleConfig = ` + ## Root url of weather map REST API + base_url = "https://api.openweathermap.org/" + ## Your personal user token from openweathermap.org + app_id = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + city_id = ["2988507", "2988588"] + + ## HTTP response timeout (default: 5s) + response_timeout = "5s" + fetch = ["weather", "forecast"] + units = "metric" + ## Limit OpenWeatherMap query interval. See calls per minute info at: https://openweathermap.org/price + interval = "10m" +` + +func (n *OpenWeatherMap) SampleConfig() string { + return sampleConfig +} + +func (n *OpenWeatherMap) Description() string { + return "Read current weather and forecasts data from openweathermap.org" +} + +func (n *OpenWeatherMap) Gather(acc telegraf.Accumulator) error { + var wg sync.WaitGroup + var strs []string + + base, err := url.Parse(n.BaseUrl) + if err != nil { + return err + } + + // Create an HTTP client that is re-used for each + // collection interval + + if n.client == nil { + client, err := n.createHttpClient() + if err != nil { + return err + } + n.client = client + } + units := n.Units + if units == "" { + units = defaultUnits + } + for _, fetch := range n.Fetch { + if fetch == "forecast" { + var u *url.URL + var addr *url.URL + + for _, city := range n.CityId { + u, err = url.Parse(fmt.Sprintf("/data/2.5/forecast?id=%s&APPID=%s&units=%s", city, n.AppId, units)) + if err != nil { + acc.AddError(fmt.Errorf("Unable to parse address '%s': %s", u, err)) + continue + } + addr = base.ResolveReference(u) + wg.Add(1) + go func(addr *url.URL) { + defer wg.Done() + acc.AddError(n.gatherUrl(addr, acc, true)) + }(addr) + } + } else if fetch == "weather" { + j := 0 + for j < len(n.CityId) { + var u *url.URL + var addr *url.URL + strs = make([]string, 0) + for i := 0; j < len(n.CityId) && i < owmRequestSeveralCityId; i++ { + strs = append(strs, n.CityId[j]) + j++ + } + cities := strings.Join(strs, ",") + + u, err = url.Parse(fmt.Sprintf("/data/2.5/group?id=%s&APPID=%s&units=%s", cities, n.AppId, units)) + if err != nil { + acc.AddError(fmt.Errorf("Unable to parse address '%s': %s", u, err)) + continue + } + + addr = base.ResolveReference(u) + wg.Add(1) + go func(addr *url.URL) { + defer wg.Done() + acc.AddError(n.gatherUrl(addr, acc, false)) + }(addr) + } + + } + } + + wg.Wait() + return nil +} + +func (n *OpenWeatherMap) createHttpClient() (*http.Client, error) { + + if n.ResponseTimeout.Duration < time.Second { + n.ResponseTimeout.Duration = defaultResponseTimeout + } + + client := &http.Client{ + Transport: &http.Transport{}, + Timeout: n.ResponseTimeout.Duration, + } + + return client, nil +} + +func (n *OpenWeatherMap) gatherUrl(addr *url.URL, acc telegraf.Accumulator, forecast bool) error { + resp, err := n.client.Get(addr.String()) + + if err != nil { + return fmt.Errorf("error making HTTP request to %s: %s", addr.String(), err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("%s returned HTTP status %s", addr.String(), resp.Status) + } + contentType := strings.Split(resp.Header.Get("Content-Type"), ";")[0] + switch contentType { + case "application/json": + err = gatherWeatherUrl(bufio.NewReader(resp.Body), forecast, acc) + return err + default: + return fmt.Errorf("%s returned unexpected content type %s", addr.String(), contentType) + } +} + +type WeatherEntry struct { + Dt int64 `json:"dt"` + Dttxt string `json:"dt_txt"` // empty for weather/ + Clouds struct { + All int64 `json:"all"` + } `json:"clouds"` + Main struct { + GrndLevel float64 `json:"grnd_level"` // empty for weather/ + Humidity int64 `json:"humidity"` + SeaLevel float64 `json:"sea_level"` // empty for weather/ + Pressure float64 `json:"pressure"` + Temp float64 `json:"temp"` + TempMax float64 `json:"temp_max"` + TempMin float64 `json:"temp_min"` + } `json:"main"` + Rain struct { + Rain3 float64 `json:"3h"` + } `json:"rain"` + Sys struct { + Pod string `json:"pod"` + Country string `json:"country"` + Message float64 `json:"message"` + Id int64 `json:"id"` + Type int64 `json:"type"` + Sunrise int64 `json:"sunrise"` + Sunset int64 `json:"sunset"` + } `json:"sys"` + Wind struct { + Deg float64 `json:"deg"` + Speed float64 `json:"speed"` + } `json:"wind"` + Weather []struct { + Description string `json:"description"` + Icon string `json:"icon"` + Id int64 `json:"id"` + Main string `json:"main"` + } `json:"weather"` + + // Additional entries for weather/ + Id int64 `json:"id"` + Name string `json:"name"` + Coord struct { + Lat float64 `json:"lat"` + Lon float64 `json:"lon"` + } `json:"coord"` + Visibility int64 `json:"visibility"` +} + +type Status struct { + City struct { + Coord struct { + Lat float64 `json:"lat"` + Lon float64 `json:"lon"` + } `json:"coord"` + Country string `json:"country"` + Id int64 `json:"id"` + Name string `json:"name"` + } `json:"city"` + List []WeatherEntry `json:"list"` +} + +func gatherWeatherUrl(r *bufio.Reader, forecast bool, acc telegraf.Accumulator) error { + dec := json.NewDecoder(r) + status := &Status{} + if err := dec.Decode(status); err != nil { + return fmt.Errorf("Error while decoding JSON response: %s", err) + } + status.Gather(forecast, acc) + return nil +} + +func (s *Status) Gather(forecast bool, acc telegraf.Accumulator) { + tags := map[string]string{ + "city_id": strconv.FormatInt(s.City.Id, 10), + "forecast": "*", + } + + for i, e := range s.List { + tm := time.Unix(e.Dt, 0) + if e.Id > 0 { + tags["city_id"] = strconv.FormatInt(e.Id, 10) + } + if forecast { + tags["forecast"] = fmt.Sprintf("%dh", (i+1)*3) + } + acc.AddFields( + "weather", + map[string]interface{}{ + "rain": e.Rain.Rain3, + "wind_degrees": e.Wind.Deg, + "wind_speed": e.Wind.Speed, + "humidity": e.Main.Humidity, + "pressure": e.Main.Pressure, + "temperature": e.Main.Temp, + }, + tags, + tm) + } + if forecast { + // intentional: overwrite future data points + // under the * tag + tags := map[string]string{ + "city_id": strconv.FormatInt(s.City.Id, 10), + "forecast": "*", + } + for _, e := range s.List { + tm := time.Unix(e.Dt, 0) + if e.Id > 0 { + tags["city_id"] = strconv.FormatInt(e.Id, 10) + } + acc.AddFields( + "weather", + map[string]interface{}{ + "rain": e.Rain.Rain3, + "wind_degrees": e.Wind.Deg, + "wind_speed": e.Wind.Speed, + "humidity": e.Main.Humidity, + "pressure": e.Main.Pressure, + "temperature": e.Main.Temp, + }, + tags, + tm) + } + } +} + +func init() { + inputs.Add("openweathermap", func() telegraf.Input { + tmout := internal.Duration{ + Duration: defaultResponseTimeout, + } + return &OpenWeatherMap{ + ResponseTimeout: tmout, + Units: defaultUnits, + } + }) +} diff --git a/plugins/inputs/openweathermap/openweathermap_test.go b/plugins/inputs/openweathermap/openweathermap_test.go new file mode 100644 index 000000000..98f0a64a2 --- /dev/null +++ b/plugins/inputs/openweathermap/openweathermap_test.go @@ -0,0 +1,482 @@ +package openweathermap + +import ( + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +const sampleNoContent = ` +{ +} +` + +const sampleStatusResponse = ` +{ + "city": { + "coord": { + "lat": 48.8534, + "lon": 2.3488 + }, + "country": "FR", + "id": 2988507, + "name": "Paris" + }, + "cnt": 40, + "cod": "200", + "list": [ + { + "clouds": { + "all": 88 + }, + "dt": 1543622400, + "dt_txt": "2018-12-01 00:00:00", + "main": { + "grnd_level": 1018.65, + "humidity": 91, + "pressure": 1018.65, + "sea_level": 1030.99, + "temp": 6.71, + "temp_kf": -2.14 + }, + "rain": { + "3h": 0.035 + }, + "sys": { + "pod": "n" + }, + "weather": [ + { + "description": "light rain", + "icon": "10n", + "id": 500, + "main": "Rain" + } + ], + "wind": { + "deg": 228.501, + "speed": 3.76 + } + }, + { + "clouds": { + "all": 92 + }, + "dt": 1544043600, + "dt_txt": "2018-12-05 21:00:00", + "main": { + "grnd_level": 1032.18, + "humidity": 98, + "pressure": 1032.18, + "sea_level": 1044.78, + "temp": 6.38, + "temp_kf": 0 + }, + "rain": { + "3h": 0.049999999999997 + }, + "sys": { + "pod": "n" + }, + "weather": [ + { + "description": "light rain", + "icon": "10n", + "id": 500, + "main": "Rain" + } + ], + "wind": { + "deg": 335.005, + "speed": 2.66 + } + } + ], + "message": 0.0025 +} +` + +const groupWeatherResponse = ` +{ + "cnt": 1, + "list": [{ + "coord": { + "lat": 48.85, + "lon": 2.35 + }, + "dt": 1544194800, + "id": 2988507, + "main": { + "humidity": 87, + "pressure": 1007, + "temp": 9.25 + }, + "name": "Paris", + "sys": { + "country": "FR", + "id": 6550, + "message": 0.002, + "sunrise": 1544167818, + "sunset": 1544198047, + "type": 1 + }, + "visibility": 10000, + "weather": [ + { + "description": "light intensity drizzle", + "icon": "09d", + "id": 300, + "main": "Drizzle" + } + ], + "wind": { + "deg": 290, + "speed": 8.7 + } + }] +} +` + +const batchWeatherResponse = ` +{ + "cnt": 3, + "list": [{ + "coord": { + "lon": 37.62, + "lat": 55.75 + }, + "sys": { + "type": 1, + "id": 9029, + "message": 0.0061, + "country": "RU", + "sunrise": 1556416455, + "sunset": 1556470779 + }, + "weather": [{ + "id": 802, + "main": "Clouds", + "description": "scattered clouds", + "icon": "03d" + }], + "main": { + "temp": 9.57, + "pressure": 1014, + "humidity": 46 + }, + "visibility": 10000, + "wind": { + "speed": 5, + "deg": 60 + }, + "clouds": { + "all": 40 + }, + "dt": 1556444155, + "id": 524901, + "name": "Moscow" + }, { + "coord": { + "lon": 30.52, + "lat": 50.43 + }, + "sys": { + "type": 1, + "id": 8903, + "message": 0.0076, + "country": "UA", + "sunrise": 1556419155, + "sunset": 1556471486 + }, + "weather": [{ + "id": 520, + "main": "Rain", + "description": "light intensity shower rain", + "icon": "09d" + }], + "main": { + "temp": 19.29, + "pressure": 1009, + "humidity": 63 + }, + "visibility": 10000, + "wind": { + "speed": 1 + }, + "clouds": { + "all": 0 + }, + "dt": 1556444155, + "id": 703448, + "name": "Kiev" + }, { + "coord": { + "lon": -0.13, + "lat": 51.51 + }, + "sys": { + "type": 1, + "id": 1414, + "message": 0.0088, + "country": "GB", + "sunrise": 1556426319, + "sunset": 1556479032 + }, + "weather": [{ + "id": 803, + "main": "Clouds", + "description": "broken clouds", + "icon": "04d" + }], + "main": { + "temp": 10.62, + "pressure": 1019, + "humidity": 66 + }, + "visibility": 10000, + "wind": { + "speed": 6.2, + "deg": 290 + }, + "rain": { + "3h": 0.072 + }, + "clouds": { + "all": 75 + }, + "dt": 1556444155, + "id": 2643743, + "name": "London" + }] +} +` + +func TestForecastGeneratesMetrics(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var rsp string + if r.URL.Path == "/data/2.5/forecast" { + rsp = sampleStatusResponse + w.Header()["Content-Type"] = []string{"application/json"} + } else if r.URL.Path == "/data/2.5/group" { + rsp = sampleNoContent + } else { + panic("Cannot handle request") + } + + fmt.Fprintln(w, rsp) + })) + defer ts.Close() + + n := &OpenWeatherMap{ + BaseUrl: ts.URL, + AppId: "noappid", + CityId: []string{"2988507"}, + Fetch: []string{"weather", "forecast"}, + Units: "metric", + } + + var acc testutil.Accumulator + + err_openweathermap := n.Gather(&acc) + require.NoError(t, err_openweathermap) + for _, forecast_tag := range []string{"*", "3h"} { + acc.AssertContainsTaggedFields( + t, + "weather", + map[string]interface{}{ + "humidity": int64(91), + "pressure": 1018.65, + "temperature": 6.71, + "rain": 0.035, + "wind_degrees": 228.501, + "wind_speed": 3.76, + }, + map[string]string{ + "city_id": "2988507", + "forecast": forecast_tag, + }) + } + for _, forecast_tag := range []string{"*", "6h"} { + acc.AssertContainsTaggedFields( + t, + "weather", + map[string]interface{}{ + "humidity": int64(98), + "pressure": 1032.18, + "temperature": 6.38, + "rain": 0.049999999999997, + "wind_degrees": 335.005, + "wind_speed": 2.66, + }, + map[string]string{ + "city_id": "2988507", + "forecast": forecast_tag, + }) + } +} + +func TestWeatherGeneratesMetrics(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var rsp string + if r.URL.Path == "/data/2.5/group" { + rsp = groupWeatherResponse + w.Header()["Content-Type"] = []string{"application/json"} + } else if r.URL.Path == "/data/2.5/forecast" { + rsp = sampleNoContent + } else { + panic("Cannot handle request") + } + + fmt.Fprintln(w, rsp) + })) + defer ts.Close() + + n := &OpenWeatherMap{ + BaseUrl: ts.URL, + AppId: "noappid", + CityId: []string{"2988507"}, + Fetch: []string{"weather"}, + Units: "metric", + } + + var acc testutil.Accumulator + + err_openweathermap := n.Gather(&acc) + + require.NoError(t, err_openweathermap) + + acc.AssertContainsTaggedFields( + t, + "weather", + map[string]interface{}{ + "humidity": int64(87), + "pressure": 1007.0, + "temperature": 9.25, + "wind_degrees": 290.0, + "wind_speed": 8.7, + "rain": 0.0, + }, + map[string]string{ + "city_id": "2988507", + "forecast": "*", + }) +} + +func TestBatchWeatherGeneratesMetrics(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var rsp string + if r.URL.Path == "/data/2.5/group" { + rsp = batchWeatherResponse + w.Header()["Content-Type"] = []string{"application/json"} + } else if r.URL.Path == "/data/2.5/forecast" { + rsp = sampleNoContent + } else { + panic("Cannot handle request") + } + + fmt.Fprintln(w, rsp) + })) + defer ts.Close() + + n := &OpenWeatherMap{ + BaseUrl: ts.URL, + AppId: "noappid", + CityId: []string{"524901", "703448", "2643743"}, + Fetch: []string{"weather"}, + Units: "metric", + } + + var acc testutil.Accumulator + + err_openweathermap := n.Gather(&acc) + + require.NoError(t, err_openweathermap) + + acc.AssertContainsTaggedFields( + t, + "weather", + map[string]interface{}{ + "humidity": int64(46), + "pressure": 1014.0, + "temperature": 9.57, + "wind_degrees": 60.0, + "wind_speed": 5.0, + "rain": 0.0, + }, + map[string]string{ + "city_id": "524901", + "forecast": "*", + }) + acc.AssertContainsTaggedFields( + t, + "weather", + map[string]interface{}{ + "humidity": int64(63), + "pressure": 1009.0, + "temperature": 19.29, + "wind_degrees": 0.0, + "wind_speed": 1.0, + "rain": 0.0, + }, + map[string]string{ + "city_id": "703448", + "forecast": "*", + }) + acc.AssertContainsTaggedFields( + t, + "weather", + map[string]interface{}{ + "humidity": int64(66), + "pressure": 1019.0, + "temperature": 10.62, + "wind_degrees": 290.0, + "wind_speed": 6.2, + "rain": 0.072, + }, + map[string]string{ + "city_id": "2643743", + "forecast": "*", + }) +} + +func TestResponseTimeout(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var rsp string + if r.URL.Path == "/data/2.5/group" { + rsp = groupWeatherResponse + w.Header()["Content-Type"] = []string{"application/json"} + } else if r.URL.Path == "/data/2.5/forecast" { + rsp = sampleStatusResponse + w.Header()["Content-Type"] = []string{"application/json"} + } else { + panic("Cannot handle request") + } + + time.Sleep(time.Second * 6) // Cause timeout + fmt.Fprintln(w, rsp) + })) + defer ts.Close() + + n := &OpenWeatherMap{ + BaseUrl: ts.URL, + AppId: "noappid", + CityId: []string{"2988507"}, + Fetch: []string{"weather"}, + Units: "metric", + } + + var acc testutil.Accumulator + + err_openweathermap := n.Gather(&acc) + + require.NoError(t, err_openweathermap) + acc.AssertDoesNotContainMeasurement( + t, + "weather", + ) +} diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 19acebe1c..b5021010a 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -320,6 +320,26 @@ func (a *Accumulator) WaitError(n int) { a.Unlock() } +func (a *Accumulator) assertContainsTaggedFields( + t *testing.T, + measurement string, + fields map[string]interface{}, + tags map[string]string, +) { + for _, p := range a.Metrics { + if !reflect.DeepEqual(tags, p.Tags) { + continue + } + + if p.Measurement == measurement { + assert.Equal(t, fields, p.Fields) + return + } + } + msg := fmt.Sprintf("unknown measurement %s with tags %v", measurement, tags) + assert.Fail(t, msg) +} + func (a *Accumulator) AssertContainsTaggedFields( t *testing.T, measurement string, @@ -333,13 +353,11 @@ func (a *Accumulator) AssertContainsTaggedFields( continue } - if p.Measurement == measurement { - assert.Equal(t, fields, p.Fields) + if p.Measurement == measurement && reflect.DeepEqual(fields, p.Fields) { return } } - msg := fmt.Sprintf("unknown measurement %s with tags %v", measurement, tags) - assert.Fail(t, msg) + a.assertContainsTaggedFields(t, measurement, fields, tags) } func (a *Accumulator) AssertDoesNotContainsTaggedFields(