Add aurora input plugin (#4158)
This commit is contained in:
parent
61a0e500a8
commit
1a407ceaf9
|
@ -4,6 +4,7 @@ import (
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/aerospike"
|
_ "github.com/influxdata/telegraf/plugins/inputs/aerospike"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/amqp_consumer"
|
_ "github.com/influxdata/telegraf/plugins/inputs/amqp_consumer"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/apache"
|
_ "github.com/influxdata/telegraf/plugins/inputs/apache"
|
||||||
|
_ "github.com/influxdata/telegraf/plugins/inputs/aurora"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/bcache"
|
_ "github.com/influxdata/telegraf/plugins/inputs/bcache"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/bond"
|
_ "github.com/influxdata/telegraf/plugins/inputs/bond"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/cassandra"
|
_ "github.com/influxdata/telegraf/plugins/inputs/cassandra"
|
||||||
|
|
File diff suppressed because one or more lines are too long
|
@ -0,0 +1,280 @@
|
||||||
|
package aurora
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
"github.com/influxdata/telegraf/internal/tls"
|
||||||
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
|
)
|
||||||
|
|
||||||
|
type RoleType int
|
||||||
|
|
||||||
|
const (
|
||||||
|
Unknown RoleType = iota
|
||||||
|
Leader
|
||||||
|
Follower
|
||||||
|
)
|
||||||
|
|
||||||
|
func (r RoleType) String() string {
|
||||||
|
switch r {
|
||||||
|
case Leader:
|
||||||
|
return "leader"
|
||||||
|
case Follower:
|
||||||
|
return "follower"
|
||||||
|
default:
|
||||||
|
return "unknown"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
defaultTimeout = 5 * time.Second
|
||||||
|
defaultRoles = []string{"leader", "follower"}
|
||||||
|
)
|
||||||
|
|
||||||
|
type Vars map[string]interface{}
|
||||||
|
|
||||||
|
type Aurora struct {
|
||||||
|
Schedulers []string `toml:"schedulers"`
|
||||||
|
Roles []string `toml:"roles"`
|
||||||
|
Timeout internal.Duration `toml:"timeout"`
|
||||||
|
Username string `toml:"username"`
|
||||||
|
Password string `toml:"password"`
|
||||||
|
tls.ClientConfig
|
||||||
|
|
||||||
|
client *http.Client
|
||||||
|
urls []*url.URL
|
||||||
|
}
|
||||||
|
|
||||||
|
var sampleConfig = `
|
||||||
|
## Schedulers are the base addresses of your Aurora Schedulers
|
||||||
|
schedulers = ["http://127.0.0.1:8081"]
|
||||||
|
|
||||||
|
## Set of role types to collect metrics from.
|
||||||
|
##
|
||||||
|
## The scheduler roles are checked each interval by contacting the
|
||||||
|
## scheduler nodes; zookeeper is not contacted.
|
||||||
|
# roles = ["leader", "follower"]
|
||||||
|
|
||||||
|
## Timeout is the max time for total network operations.
|
||||||
|
# timeout = "5s"
|
||||||
|
|
||||||
|
## Username and password are sent using HTTP Basic Auth.
|
||||||
|
# username = "username"
|
||||||
|
# password = "pa$$word"
|
||||||
|
|
||||||
|
## Optional TLS Config
|
||||||
|
# tls_ca = "/etc/telegraf/ca.pem"
|
||||||
|
# tls_cert = "/etc/telegraf/cert.pem"
|
||||||
|
# tls_key = "/etc/telegraf/key.pem"
|
||||||
|
## Use TLS but skip chain & host verification
|
||||||
|
# insecure_skip_verify = false
|
||||||
|
`
|
||||||
|
|
||||||
|
func (a *Aurora) SampleConfig() string {
|
||||||
|
return sampleConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Aurora) Description() string {
|
||||||
|
return "Gather metrics from Apache Aurora schedulers"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Aurora) Gather(acc telegraf.Accumulator) error {
|
||||||
|
if a.client == nil {
|
||||||
|
err := a.initialize()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), a.Timeout.Duration)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for _, u := range a.urls {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(u *url.URL) {
|
||||||
|
defer wg.Done()
|
||||||
|
role, err := a.gatherRole(ctx, u)
|
||||||
|
if err != nil {
|
||||||
|
acc.AddError(fmt.Errorf("%s: %v", u, err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if !a.roleEnabled(role) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err = a.gatherScheduler(ctx, u, role, acc)
|
||||||
|
if err != nil {
|
||||||
|
acc.AddError(fmt.Errorf("%s: %v", u, err))
|
||||||
|
}
|
||||||
|
}(u)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Aurora) initialize() error {
|
||||||
|
tlsCfg, err := a.ClientConfig.TLSConfig()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
client := &http.Client{
|
||||||
|
Transport: &http.Transport{
|
||||||
|
Proxy: http.ProxyFromEnvironment,
|
||||||
|
TLSClientConfig: tlsCfg,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
urls := make([]*url.URL, 0, len(a.Schedulers))
|
||||||
|
for _, s := range a.Schedulers {
|
||||||
|
loc, err := url.Parse(s)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
urls = append(urls, loc)
|
||||||
|
}
|
||||||
|
|
||||||
|
if a.Timeout.Duration < time.Second {
|
||||||
|
a.Timeout.Duration = defaultTimeout
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(a.Roles) == 0 {
|
||||||
|
a.Roles = defaultRoles
|
||||||
|
}
|
||||||
|
|
||||||
|
a.client = client
|
||||||
|
a.urls = urls
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Aurora) roleEnabled(role RoleType) bool {
|
||||||
|
if len(a.Roles) == 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, v := range a.Roles {
|
||||||
|
if role.String() == v {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Aurora) gatherRole(ctx context.Context, origin *url.URL) (RoleType, error) {
|
||||||
|
loc := *origin
|
||||||
|
loc.Path = "leaderhealth"
|
||||||
|
req, err := http.NewRequest("GET", loc.String(), nil)
|
||||||
|
if err != nil {
|
||||||
|
return Unknown, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if a.Username != "" || a.Password != "" {
|
||||||
|
req.SetBasicAuth(a.Username, a.Password)
|
||||||
|
}
|
||||||
|
req.Header.Add("Accept", "text/plain")
|
||||||
|
|
||||||
|
resp, err := a.client.Do(req.WithContext(ctx))
|
||||||
|
if err != nil {
|
||||||
|
return Unknown, err
|
||||||
|
}
|
||||||
|
resp.Body.Close()
|
||||||
|
|
||||||
|
switch resp.StatusCode {
|
||||||
|
case http.StatusOK:
|
||||||
|
return Leader, nil
|
||||||
|
case http.StatusBadGateway:
|
||||||
|
fallthrough
|
||||||
|
case http.StatusServiceUnavailable:
|
||||||
|
return Follower, nil
|
||||||
|
default:
|
||||||
|
return Unknown, fmt.Errorf("%v", resp.Status)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Aurora) gatherScheduler(
|
||||||
|
ctx context.Context, origin *url.URL, role RoleType, acc telegraf.Accumulator,
|
||||||
|
) error {
|
||||||
|
loc := *origin
|
||||||
|
loc.Path = "vars.json"
|
||||||
|
req, err := http.NewRequest("GET", loc.String(), nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if a.Username != "" || a.Password != "" {
|
||||||
|
req.SetBasicAuth(a.Username, a.Password)
|
||||||
|
}
|
||||||
|
req.Header.Add("Accept", "application/json")
|
||||||
|
|
||||||
|
resp, err := a.client.Do(req.WithContext(ctx))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
return fmt.Errorf("%v", resp.Status)
|
||||||
|
}
|
||||||
|
|
||||||
|
var vars Vars
|
||||||
|
decoder := json.NewDecoder(resp.Body)
|
||||||
|
decoder.UseNumber()
|
||||||
|
err = decoder.Decode(&vars)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("decoding response: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var fields = make(map[string]interface{}, len(vars))
|
||||||
|
for k, v := range vars {
|
||||||
|
switch v := v.(type) {
|
||||||
|
case json.Number:
|
||||||
|
// Aurora encodes numbers as you would specify them as a literal,
|
||||||
|
// use this to determine if a value is a float or int.
|
||||||
|
if strings.ContainsAny(v.String(), ".eE") {
|
||||||
|
fv, err := v.Float64()
|
||||||
|
if err != nil {
|
||||||
|
acc.AddError(err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
fields[k] = fv
|
||||||
|
} else {
|
||||||
|
fi, err := v.Int64()
|
||||||
|
if err != nil {
|
||||||
|
acc.AddError(err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
fields[k] = fi
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
acc.AddFields("aurora",
|
||||||
|
fields,
|
||||||
|
map[string]string{
|
||||||
|
"scheduler": origin.String(),
|
||||||
|
"role": role.String(),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
inputs.Add("aurora", func() telegraf.Input {
|
||||||
|
return &Aurora{}
|
||||||
|
})
|
||||||
|
}
|
|
@ -0,0 +1,259 @@
|
||||||
|
package aurora
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"net/url"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
type (
|
||||||
|
TestHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request)
|
||||||
|
CheckFunc func(t *testing.T, err error, acc *testutil.Accumulator)
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestAurora(t *testing.T) {
|
||||||
|
ts := httptest.NewServer(http.NotFoundHandler())
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
plugin *Aurora
|
||||||
|
schedulers []string
|
||||||
|
roles []string
|
||||||
|
leaderhealth TestHandlerFunc
|
||||||
|
varsjson TestHandlerFunc
|
||||||
|
check CheckFunc
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "minimal",
|
||||||
|
leaderhealth: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
},
|
||||||
|
varsjson: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
|
||||||
|
body := `{
|
||||||
|
"variable_scrape_events": 2958,
|
||||||
|
"variable_scrape_events_per_sec": 1.0,
|
||||||
|
"variable_scrape_micros_per_event": 1484.0,
|
||||||
|
"variable_scrape_micros_total": 4401084,
|
||||||
|
"variable_scrape_micros_total_per_sec": 1485.0
|
||||||
|
}`
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
w.Write([]byte(body))
|
||||||
|
},
|
||||||
|
check: func(t *testing.T, err error, acc *testutil.Accumulator) {
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 1, len(acc.Metrics))
|
||||||
|
acc.AssertContainsTaggedFields(t,
|
||||||
|
"aurora",
|
||||||
|
map[string]interface{}{
|
||||||
|
"variable_scrape_events": int64(2958),
|
||||||
|
"variable_scrape_events_per_sec": 1.0,
|
||||||
|
"variable_scrape_micros_per_event": 1484.0,
|
||||||
|
"variable_scrape_micros_total": int64(4401084),
|
||||||
|
"variable_scrape_micros_total_per_sec": 1485.0,
|
||||||
|
},
|
||||||
|
map[string]string{
|
||||||
|
"scheduler": u.String(),
|
||||||
|
"role": "leader",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "disabled role",
|
||||||
|
roles: []string{"leader"},
|
||||||
|
leaderhealth: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusServiceUnavailable)
|
||||||
|
},
|
||||||
|
check: func(t *testing.T, err error, acc *testutil.Accumulator) {
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, acc.FirstError())
|
||||||
|
require.Equal(t, 0, len(acc.Metrics))
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no metrics available",
|
||||||
|
leaderhealth: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
},
|
||||||
|
varsjson: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
w.Write([]byte("{}"))
|
||||||
|
},
|
||||||
|
check: func(t *testing.T, err error, acc *testutil.Accumulator) {
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, acc.FirstError())
|
||||||
|
require.Equal(t, 0, len(acc.Metrics))
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "string metrics skipped",
|
||||||
|
leaderhealth: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
},
|
||||||
|
varsjson: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
|
||||||
|
body := `{
|
||||||
|
"foo": "bar"
|
||||||
|
}`
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
w.Write([]byte(body))
|
||||||
|
},
|
||||||
|
check: func(t *testing.T, err error, acc *testutil.Accumulator) {
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, acc.FirstError())
|
||||||
|
require.Equal(t, 0, len(acc.Metrics))
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "float64 unparseable",
|
||||||
|
leaderhealth: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
},
|
||||||
|
varsjson: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
|
||||||
|
// too large
|
||||||
|
body := `{
|
||||||
|
"foo": 1e309
|
||||||
|
}`
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
w.Write([]byte(body))
|
||||||
|
},
|
||||||
|
check: func(t *testing.T, err error, acc *testutil.Accumulator) {
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Error(t, acc.FirstError())
|
||||||
|
require.Equal(t, 0, len(acc.Metrics))
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "int64 unparseable",
|
||||||
|
leaderhealth: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
},
|
||||||
|
varsjson: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
|
||||||
|
// too large
|
||||||
|
body := `{
|
||||||
|
"foo": 9223372036854775808
|
||||||
|
}`
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
w.Write([]byte(body))
|
||||||
|
},
|
||||||
|
check: func(t *testing.T, err error, acc *testutil.Accumulator) {
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Error(t, acc.FirstError())
|
||||||
|
require.Equal(t, 0, len(acc.Metrics))
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "bad json",
|
||||||
|
leaderhealth: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
},
|
||||||
|
varsjson: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
|
||||||
|
body := `{]`
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
w.Write([]byte(body))
|
||||||
|
},
|
||||||
|
check: func(t *testing.T, err error, acc *testutil.Accumulator) {
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Error(t, acc.FirstError())
|
||||||
|
require.Equal(t, 0, len(acc.Metrics))
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "wrong status code",
|
||||||
|
leaderhealth: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
},
|
||||||
|
varsjson: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
|
||||||
|
body := `{
|
||||||
|
"value": 42
|
||||||
|
}`
|
||||||
|
w.WriteHeader(http.StatusServiceUnavailable)
|
||||||
|
w.Write([]byte(body))
|
||||||
|
},
|
||||||
|
check: func(t *testing.T, err error, acc *testutil.Accumulator) {
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Error(t, acc.FirstError())
|
||||||
|
require.Equal(t, 0, len(acc.Metrics))
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
switch r.URL.Path {
|
||||||
|
case "/leaderhealth":
|
||||||
|
tt.leaderhealth(t, w, r)
|
||||||
|
case "/vars.json":
|
||||||
|
tt.varsjson(t, w, r)
|
||||||
|
default:
|
||||||
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
plugin := &Aurora{}
|
||||||
|
plugin.Schedulers = []string{u.String()}
|
||||||
|
plugin.Roles = tt.roles
|
||||||
|
err := plugin.Gather(&acc)
|
||||||
|
tt.check(t, err, &acc)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBasicAuth(t *testing.T) {
|
||||||
|
ts := httptest.NewServer(http.NotFoundHandler())
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
username string
|
||||||
|
password string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "no auth",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "basic auth",
|
||||||
|
username: "username",
|
||||||
|
password: "pa$$word",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "username only",
|
||||||
|
username: "username",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "password only",
|
||||||
|
password: "pa$$word",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
username, password, _ := r.BasicAuth()
|
||||||
|
require.Equal(t, tt.username, username)
|
||||||
|
require.Equal(t, tt.password, password)
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
w.Write([]byte("{}"))
|
||||||
|
})
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
plugin := &Aurora{}
|
||||||
|
plugin.Schedulers = []string{u.String()}
|
||||||
|
plugin.Username = tt.username
|
||||||
|
plugin.Password = tt.password
|
||||||
|
err := plugin.Gather(&acc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
|
@ -42,6 +42,13 @@ func (a *Accumulator) NMetrics() uint64 {
|
||||||
return atomic.LoadUint64(&a.nMetrics)
|
return atomic.LoadUint64(&a.nMetrics)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *Accumulator) FirstError() error {
|
||||||
|
if len(a.Errors) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return a.Errors[0]
|
||||||
|
}
|
||||||
|
|
||||||
func (a *Accumulator) ClearMetrics() {
|
func (a *Accumulator) ClearMetrics() {
|
||||||
a.Lock()
|
a.Lock()
|
||||||
defer a.Unlock()
|
defer a.Unlock()
|
||||||
|
|
Loading…
Reference in New Issue