Gather concurrently from snmp agents (#3365)

This commit is contained in:
Daniel Nelson 2017-10-25 15:28:55 -07:00 committed by GitHub
parent db8b7e2e17
commit ccea33ff28
2 changed files with 66 additions and 43 deletions

View File

@ -135,7 +135,7 @@ type Snmp struct {
Name string Name string
Fields []Field `toml:"field"` Fields []Field `toml:"field"`
connectionCache map[string]snmpConnection connectionCache []snmpConnection
initialized bool initialized bool
} }
@ -144,6 +144,8 @@ func (s *Snmp) init() error {
return nil return nil
} }
s.connectionCache = make([]snmpConnection, len(s.Agents))
for i := range s.Tables { for i := range s.Tables {
if err := s.Tables[i].init(); err != nil { if err := s.Tables[i].init(); err != nil {
return Errorf(err, "initializing table %s", s.Tables[i].Name) return Errorf(err, "initializing table %s", s.Tables[i].Name)
@ -342,11 +344,15 @@ func (s *Snmp) Gather(acc telegraf.Accumulator) error {
return err return err
} }
for _, agent := range s.Agents { var wg sync.WaitGroup
gs, err := s.getConnection(agent) for i, agent := range s.Agents {
wg.Add(1)
go func(i int, agent string) {
defer wg.Done()
gs, err := s.getConnection(i)
if err != nil { if err != nil {
acc.AddError(Errorf(err, "agent %s", agent)) acc.AddError(Errorf(err, "agent %s", agent))
continue return
} }
// First is the top-level fields. We treat the fields as table prefixes with an empty index. // First is the top-level fields. We treat the fields as table prefixes with an empty index.
@ -365,7 +371,9 @@ func (s *Snmp) Gather(acc telegraf.Accumulator) error {
acc.AddError(Errorf(err, "agent %s: gathering table %s", agent, t.Name)) acc.AddError(Errorf(err, "agent %s: gathering table %s", agent, t.Name))
} }
} }
}(i, agent)
} }
wg.Wait()
return nil return nil
} }
@ -568,16 +576,18 @@ func (gsw gosnmpWrapper) Get(oids []string) (*gosnmp.SnmpPacket, error) {
} }
// getConnection creates a snmpConnection (*gosnmp.GoSNMP) object and caches the // getConnection creates a snmpConnection (*gosnmp.GoSNMP) object and caches the
// result using `agent` as the cache key. // result using `agentIndex` as the cache key. This is done to allow multiple
func (s *Snmp) getConnection(agent string) (snmpConnection, error) { // connections to a single address. It is an error to use a connection in
if s.connectionCache == nil { // more than one goroutine.
s.connectionCache = map[string]snmpConnection{} func (s *Snmp) getConnection(idx int) (snmpConnection, error) {
} if gs := s.connectionCache[idx]; gs != nil {
if gs, ok := s.connectionCache[agent]; ok {
return gs, nil return gs, nil
} }
agent := s.Agents[idx]
gs := gosnmpWrapper{&gosnmp.GoSNMP{}} gs := gosnmpWrapper{&gosnmp.GoSNMP{}}
s.connectionCache[idx] = gs
host, portStr, err := net.SplitHostPort(agent) host, portStr, err := net.SplitHostPort(agent)
if err != nil { if err != nil {
@ -677,7 +687,6 @@ func (s *Snmp) getConnection(agent string) (snmpConnection, error) {
return nil, Errorf(err, "setting up connection") return nil, Errorf(err, "setting up connection")
} }
s.connectionCache[agent] = gs
return gs, nil return gs, nil
} }

View File

@ -120,7 +120,7 @@ func TestSampleConfig(t *testing.T) {
}, },
}, },
} }
assert.Equal(t, s, *conf.Inputs.Snmp[0]) assert.Equal(t, &s, conf.Inputs.Snmp[0])
} }
func TestFieldInit(t *testing.T) { func TestFieldInit(t *testing.T) {
@ -251,13 +251,16 @@ func TestSnmpInit_noTranslate(t *testing.T) {
func TestGetSNMPConnection_v2(t *testing.T) { func TestGetSNMPConnection_v2(t *testing.T) {
s := &Snmp{ s := &Snmp{
Agents: []string{"1.2.3.4:567", "1.2.3.4"},
Timeout: internal.Duration{Duration: 3 * time.Second}, Timeout: internal.Duration{Duration: 3 * time.Second},
Retries: 4, Retries: 4,
Version: 2, Version: 2,
Community: "foo", Community: "foo",
} }
err := s.init()
require.NoError(t, err)
gsc, err := s.getConnection("1.2.3.4:567") gsc, err := s.getConnection(0)
require.NoError(t, err) require.NoError(t, err)
gs := gsc.(gosnmpWrapper) gs := gsc.(gosnmpWrapper)
assert.Equal(t, "1.2.3.4", gs.Target) assert.Equal(t, "1.2.3.4", gs.Target)
@ -265,7 +268,7 @@ func TestGetSNMPConnection_v2(t *testing.T) {
assert.Equal(t, gosnmp.Version2c, gs.Version) assert.Equal(t, gosnmp.Version2c, gs.Version)
assert.Equal(t, "foo", gs.Community) assert.Equal(t, "foo", gs.Community)
gsc, err = s.getConnection("1.2.3.4") gsc, err = s.getConnection(1)
require.NoError(t, err) require.NoError(t, err)
gs = gsc.(gosnmpWrapper) gs = gsc.(gosnmpWrapper)
assert.Equal(t, "1.2.3.4", gs.Target) assert.Equal(t, "1.2.3.4", gs.Target)
@ -274,6 +277,7 @@ func TestGetSNMPConnection_v2(t *testing.T) {
func TestGetSNMPConnection_v3(t *testing.T) { func TestGetSNMPConnection_v3(t *testing.T) {
s := &Snmp{ s := &Snmp{
Agents: []string{"1.2.3.4"},
Version: 3, Version: 3,
MaxRepetitions: 20, MaxRepetitions: 20,
ContextName: "mycontext", ContextName: "mycontext",
@ -287,8 +291,10 @@ func TestGetSNMPConnection_v3(t *testing.T) {
EngineBoots: 1, EngineBoots: 1,
EngineTime: 2, EngineTime: 2,
} }
err := s.init()
require.NoError(t, err)
gsc, err := s.getConnection("1.2.3.4") gsc, err := s.getConnection(0)
require.NoError(t, err) require.NoError(t, err)
gs := gsc.(gosnmpWrapper) gs := gsc.(gosnmpWrapper)
assert.Equal(t, gs.Version, gosnmp.Version3) assert.Equal(t, gs.Version, gosnmp.Version3)
@ -308,15 +314,22 @@ func TestGetSNMPConnection_v3(t *testing.T) {
} }
func TestGetSNMPConnection_caching(t *testing.T) { func TestGetSNMPConnection_caching(t *testing.T) {
s := &Snmp{} s := &Snmp{
gs1, err := s.getConnection("1.2.3.4") Agents: []string{"1.2.3.4", "1.2.3.5", "1.2.3.5"},
}
err := s.init()
require.NoError(t, err) require.NoError(t, err)
gs2, err := s.getConnection("1.2.3.4") gs1, err := s.getConnection(0)
require.NoError(t, err) require.NoError(t, err)
gs3, err := s.getConnection("1.2.3.5") gs2, err := s.getConnection(0)
require.NoError(t, err)
gs3, err := s.getConnection(1)
require.NoError(t, err)
gs4, err := s.getConnection(2)
require.NoError(t, err) require.NoError(t, err)
assert.True(t, gs1 == gs2) assert.True(t, gs1 == gs2)
assert.False(t, gs2 == gs3) assert.False(t, gs2 == gs3)
assert.False(t, gs3 == gs4)
} }
func TestGosnmpWrapper_walk_retry(t *testing.T) { func TestGosnmpWrapper_walk_retry(t *testing.T) {
@ -560,11 +573,11 @@ func TestGather(t *testing.T) {
}, },
}, },
connectionCache: map[string]snmpConnection{ connectionCache: []snmpConnection{
"TestGather": tsc, tsc,
}, },
initialized: true,
} }
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
tstart := time.Now() tstart := time.Now()
@ -607,9 +620,10 @@ func TestGather_host(t *testing.T) {
}, },
}, },
connectionCache: map[string]snmpConnection{ connectionCache: []snmpConnection{
"TestGather": tsc, tsc,
}, },
initialized: true,
} }
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}