Gather concurrently from snmp agents (#3365)

This commit is contained in:
Daniel Nelson 2017-10-25 15:28:55 -07:00 committed by GitHub
parent 6ea61b55d9
commit a519abf13f
2 changed files with 66 additions and 43 deletions

View File

@ -135,7 +135,7 @@ type Snmp struct {
Name string
Fields []Field `toml:"field"`
connectionCache map[string]snmpConnection
connectionCache []snmpConnection
initialized bool
}
@ -144,6 +144,8 @@ func (s *Snmp) init() error {
return nil
}
s.connectionCache = make([]snmpConnection, len(s.Agents))
for i := range s.Tables {
if err := s.Tables[i].init(); err != nil {
return Errorf(err, "initializing table %s", s.Tables[i].Name)
@ -342,30 +344,36 @@ func (s *Snmp) Gather(acc telegraf.Accumulator) error {
return err
}
for _, agent := range s.Agents {
gs, err := s.getConnection(agent)
if err != nil {
acc.AddError(Errorf(err, "agent %s", agent))
continue
}
// First is the top-level fields. We treat the fields as table prefixes with an empty index.
t := Table{
Name: s.Name,
Fields: s.Fields,
}
topTags := map[string]string{}
if err := s.gatherTable(acc, gs, t, topTags, false); err != nil {
acc.AddError(Errorf(err, "agent %s", agent))
}
// Now is the real tables.
for _, t := range s.Tables {
if err := s.gatherTable(acc, gs, t, topTags, true); err != nil {
acc.AddError(Errorf(err, "agent %s: gathering table %s", agent, t.Name))
var wg sync.WaitGroup
for i, agent := range s.Agents {
wg.Add(1)
go func(i int, agent string) {
defer wg.Done()
gs, err := s.getConnection(i)
if err != nil {
acc.AddError(Errorf(err, "agent %s", agent))
return
}
}
// First is the top-level fields. We treat the fields as table prefixes with an empty index.
t := Table{
Name: s.Name,
Fields: s.Fields,
}
topTags := map[string]string{}
if err := s.gatherTable(acc, gs, t, topTags, false); err != nil {
acc.AddError(Errorf(err, "agent %s", agent))
}
// Now is the real tables.
for _, t := range s.Tables {
if err := s.gatherTable(acc, gs, t, topTags, true); err != nil {
acc.AddError(Errorf(err, "agent %s: gathering table %s", agent, t.Name))
}
}
}(i, agent)
}
wg.Wait()
return nil
}
@ -568,16 +576,18 @@ func (gsw gosnmpWrapper) Get(oids []string) (*gosnmp.SnmpPacket, error) {
}
// getConnection creates a snmpConnection (*gosnmp.GoSNMP) object and caches the
// result using `agent` as the cache key.
func (s *Snmp) getConnection(agent string) (snmpConnection, error) {
if s.connectionCache == nil {
s.connectionCache = map[string]snmpConnection{}
}
if gs, ok := s.connectionCache[agent]; ok {
// result using `agentIndex` as the cache key. This is done to allow multiple
// connections to a single address. It is an error to use a connection in
// more than one goroutine.
func (s *Snmp) getConnection(idx int) (snmpConnection, error) {
if gs := s.connectionCache[idx]; gs != nil {
return gs, nil
}
agent := s.Agents[idx]
gs := gosnmpWrapper{&gosnmp.GoSNMP{}}
s.connectionCache[idx] = gs
host, portStr, err := net.SplitHostPort(agent)
if err != nil {
@ -677,7 +687,6 @@ func (s *Snmp) getConnection(agent string) (snmpConnection, error) {
return nil, Errorf(err, "setting up connection")
}
s.connectionCache[agent] = gs
return gs, nil
}

View File

@ -120,7 +120,7 @@ func TestSampleConfig(t *testing.T) {
},
},
}
assert.Equal(t, s, *conf.Inputs.Snmp[0])
assert.Equal(t, &s, conf.Inputs.Snmp[0])
}
func TestFieldInit(t *testing.T) {
@ -251,13 +251,16 @@ func TestSnmpInit_noTranslate(t *testing.T) {
func TestGetSNMPConnection_v2(t *testing.T) {
s := &Snmp{
Agents: []string{"1.2.3.4:567", "1.2.3.4"},
Timeout: internal.Duration{Duration: 3 * time.Second},
Retries: 4,
Version: 2,
Community: "foo",
}
err := s.init()
require.NoError(t, err)
gsc, err := s.getConnection("1.2.3.4:567")
gsc, err := s.getConnection(0)
require.NoError(t, err)
gs := gsc.(gosnmpWrapper)
assert.Equal(t, "1.2.3.4", gs.Target)
@ -265,7 +268,7 @@ func TestGetSNMPConnection_v2(t *testing.T) {
assert.Equal(t, gosnmp.Version2c, gs.Version)
assert.Equal(t, "foo", gs.Community)
gsc, err = s.getConnection("1.2.3.4")
gsc, err = s.getConnection(1)
require.NoError(t, err)
gs = gsc.(gosnmpWrapper)
assert.Equal(t, "1.2.3.4", gs.Target)
@ -274,6 +277,7 @@ func TestGetSNMPConnection_v2(t *testing.T) {
func TestGetSNMPConnection_v3(t *testing.T) {
s := &Snmp{
Agents: []string{"1.2.3.4"},
Version: 3,
MaxRepetitions: 20,
ContextName: "mycontext",
@ -287,8 +291,10 @@ func TestGetSNMPConnection_v3(t *testing.T) {
EngineBoots: 1,
EngineTime: 2,
}
err := s.init()
require.NoError(t, err)
gsc, err := s.getConnection("1.2.3.4")
gsc, err := s.getConnection(0)
require.NoError(t, err)
gs := gsc.(gosnmpWrapper)
assert.Equal(t, gs.Version, gosnmp.Version3)
@ -308,15 +314,22 @@ func TestGetSNMPConnection_v3(t *testing.T) {
}
func TestGetSNMPConnection_caching(t *testing.T) {
s := &Snmp{}
gs1, err := s.getConnection("1.2.3.4")
s := &Snmp{
Agents: []string{"1.2.3.4", "1.2.3.5", "1.2.3.5"},
}
err := s.init()
require.NoError(t, err)
gs2, err := s.getConnection("1.2.3.4")
gs1, err := s.getConnection(0)
require.NoError(t, err)
gs3, err := s.getConnection("1.2.3.5")
gs2, err := s.getConnection(0)
require.NoError(t, err)
gs3, err := s.getConnection(1)
require.NoError(t, err)
gs4, err := s.getConnection(2)
require.NoError(t, err)
assert.True(t, gs1 == gs2)
assert.False(t, gs2 == gs3)
assert.False(t, gs3 == gs4)
}
func TestGosnmpWrapper_walk_retry(t *testing.T) {
@ -560,11 +573,11 @@ func TestGather(t *testing.T) {
},
},
connectionCache: map[string]snmpConnection{
"TestGather": tsc,
connectionCache: []snmpConnection{
tsc,
},
initialized: true,
}
acc := &testutil.Accumulator{}
tstart := time.Now()
@ -607,9 +620,10 @@ func TestGather_host(t *testing.T) {
},
},
connectionCache: map[string]snmpConnection{
"TestGather": tsc,
connectionCache: []snmpConnection{
tsc,
},
initialized: true,
}
acc := &testutil.Accumulator{}