From ccea33ff28fe933ca372a8b47d1d1cdcbf60a927 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Wed, 25 Oct 2017 15:28:55 -0700 Subject: [PATCH] Gather concurrently from snmp agents (#3365) --- plugins/inputs/snmp/snmp.go | 69 ++++++++++++++++++-------------- plugins/inputs/snmp/snmp_test.go | 40 ++++++++++++------ 2 files changed, 66 insertions(+), 43 deletions(-) diff --git a/plugins/inputs/snmp/snmp.go b/plugins/inputs/snmp/snmp.go index 2aef729b3..c4f66f519 100644 --- a/plugins/inputs/snmp/snmp.go +++ b/plugins/inputs/snmp/snmp.go @@ -135,7 +135,7 @@ type Snmp struct { Name string Fields []Field `toml:"field"` - connectionCache map[string]snmpConnection + connectionCache []snmpConnection initialized bool } @@ -144,6 +144,8 @@ func (s *Snmp) init() error { return nil } + s.connectionCache = make([]snmpConnection, len(s.Agents)) + for i := range s.Tables { if err := s.Tables[i].init(); err != nil { return Errorf(err, "initializing table %s", s.Tables[i].Name) @@ -342,30 +344,36 @@ func (s *Snmp) Gather(acc telegraf.Accumulator) error { return err } - for _, agent := range s.Agents { - gs, err := s.getConnection(agent) - if err != nil { - acc.AddError(Errorf(err, "agent %s", agent)) - continue - } - - // First is the top-level fields. We treat the fields as table prefixes with an empty index. - t := Table{ - Name: s.Name, - Fields: s.Fields, - } - topTags := map[string]string{} - if err := s.gatherTable(acc, gs, t, topTags, false); err != nil { - acc.AddError(Errorf(err, "agent %s", agent)) - } - - // Now is the real tables. - for _, t := range s.Tables { - if err := s.gatherTable(acc, gs, t, topTags, true); err != nil { - acc.AddError(Errorf(err, "agent %s: gathering table %s", agent, t.Name)) + var wg sync.WaitGroup + for i, agent := range s.Agents { + wg.Add(1) + go func(i int, agent string) { + defer wg.Done() + gs, err := s.getConnection(i) + if err != nil { + acc.AddError(Errorf(err, "agent %s", agent)) + return } - } + + // First is the top-level fields. We treat the fields as table prefixes with an empty index. + t := Table{ + Name: s.Name, + Fields: s.Fields, + } + topTags := map[string]string{} + if err := s.gatherTable(acc, gs, t, topTags, false); err != nil { + acc.AddError(Errorf(err, "agent %s", agent)) + } + + // Now is the real tables. + for _, t := range s.Tables { + if err := s.gatherTable(acc, gs, t, topTags, true); err != nil { + acc.AddError(Errorf(err, "agent %s: gathering table %s", agent, t.Name)) + } + } + }(i, agent) } + wg.Wait() return nil } @@ -568,16 +576,18 @@ func (gsw gosnmpWrapper) Get(oids []string) (*gosnmp.SnmpPacket, error) { } // getConnection creates a snmpConnection (*gosnmp.GoSNMP) object and caches the -// result using `agent` as the cache key. -func (s *Snmp) getConnection(agent string) (snmpConnection, error) { - if s.connectionCache == nil { - s.connectionCache = map[string]snmpConnection{} - } - if gs, ok := s.connectionCache[agent]; ok { +// result using `agentIndex` as the cache key. This is done to allow multiple +// connections to a single address. It is an error to use a connection in +// more than one goroutine. +func (s *Snmp) getConnection(idx int) (snmpConnection, error) { + if gs := s.connectionCache[idx]; gs != nil { return gs, nil } + agent := s.Agents[idx] + gs := gosnmpWrapper{&gosnmp.GoSNMP{}} + s.connectionCache[idx] = gs host, portStr, err := net.SplitHostPort(agent) if err != nil { @@ -677,7 +687,6 @@ func (s *Snmp) getConnection(agent string) (snmpConnection, error) { return nil, Errorf(err, "setting up connection") } - s.connectionCache[agent] = gs return gs, nil } diff --git a/plugins/inputs/snmp/snmp_test.go b/plugins/inputs/snmp/snmp_test.go index 07fdeddc1..f9a7d95e2 100644 --- a/plugins/inputs/snmp/snmp_test.go +++ b/plugins/inputs/snmp/snmp_test.go @@ -120,7 +120,7 @@ func TestSampleConfig(t *testing.T) { }, }, } - assert.Equal(t, s, *conf.Inputs.Snmp[0]) + assert.Equal(t, &s, conf.Inputs.Snmp[0]) } func TestFieldInit(t *testing.T) { @@ -251,13 +251,16 @@ func TestSnmpInit_noTranslate(t *testing.T) { func TestGetSNMPConnection_v2(t *testing.T) { s := &Snmp{ + Agents: []string{"1.2.3.4:567", "1.2.3.4"}, Timeout: internal.Duration{Duration: 3 * time.Second}, Retries: 4, Version: 2, Community: "foo", } + err := s.init() + require.NoError(t, err) - gsc, err := s.getConnection("1.2.3.4:567") + gsc, err := s.getConnection(0) require.NoError(t, err) gs := gsc.(gosnmpWrapper) assert.Equal(t, "1.2.3.4", gs.Target) @@ -265,7 +268,7 @@ func TestGetSNMPConnection_v2(t *testing.T) { assert.Equal(t, gosnmp.Version2c, gs.Version) assert.Equal(t, "foo", gs.Community) - gsc, err = s.getConnection("1.2.3.4") + gsc, err = s.getConnection(1) require.NoError(t, err) gs = gsc.(gosnmpWrapper) assert.Equal(t, "1.2.3.4", gs.Target) @@ -274,6 +277,7 @@ func TestGetSNMPConnection_v2(t *testing.T) { func TestGetSNMPConnection_v3(t *testing.T) { s := &Snmp{ + Agents: []string{"1.2.3.4"}, Version: 3, MaxRepetitions: 20, ContextName: "mycontext", @@ -287,8 +291,10 @@ func TestGetSNMPConnection_v3(t *testing.T) { EngineBoots: 1, EngineTime: 2, } + err := s.init() + require.NoError(t, err) - gsc, err := s.getConnection("1.2.3.4") + gsc, err := s.getConnection(0) require.NoError(t, err) gs := gsc.(gosnmpWrapper) assert.Equal(t, gs.Version, gosnmp.Version3) @@ -308,15 +314,22 @@ func TestGetSNMPConnection_v3(t *testing.T) { } func TestGetSNMPConnection_caching(t *testing.T) { - s := &Snmp{} - gs1, err := s.getConnection("1.2.3.4") + s := &Snmp{ + Agents: []string{"1.2.3.4", "1.2.3.5", "1.2.3.5"}, + } + err := s.init() require.NoError(t, err) - gs2, err := s.getConnection("1.2.3.4") + gs1, err := s.getConnection(0) require.NoError(t, err) - gs3, err := s.getConnection("1.2.3.5") + gs2, err := s.getConnection(0) + require.NoError(t, err) + gs3, err := s.getConnection(1) + require.NoError(t, err) + gs4, err := s.getConnection(2) require.NoError(t, err) assert.True(t, gs1 == gs2) assert.False(t, gs2 == gs3) + assert.False(t, gs3 == gs4) } func TestGosnmpWrapper_walk_retry(t *testing.T) { @@ -560,11 +573,11 @@ func TestGather(t *testing.T) { }, }, - connectionCache: map[string]snmpConnection{ - "TestGather": tsc, + connectionCache: []snmpConnection{ + tsc, }, + initialized: true, } - acc := &testutil.Accumulator{} tstart := time.Now() @@ -607,9 +620,10 @@ func TestGather_host(t *testing.T) { }, }, - connectionCache: map[string]snmpConnection{ - "TestGather": tsc, + connectionCache: []snmpConnection{ + tsc, }, + initialized: true, } acc := &testutil.Accumulator{}