telegraf/plugins/inputs/mongodb/mongodb.go

147 lines
3.3 KiB
Go

package mongodb
import (
"crypto/tls"
"crypto/x509"
"fmt"
"net"
"net/url"
"sync"
"time"
"github.com/influxdata/telegraf/plugins/inputs"
"gopkg.in/mgo.v2"
)
type MongoDB struct {
Servers []string
Ssl Ssl
mongos map[string]*Server
}
type Ssl struct {
Enabled bool
CaCerts []string `toml:"cacerts"`
}
var sampleConfig = `
# An array of URI to gather stats about. Specify an ip or hostname
# with optional port add password. ie mongodb://user:auth_key@10.10.3.30:27017,
# mongodb://10.10.3.33:18832, 10.0.0.1:10000, etc.
#
# If no servers are specified, then 127.0.0.1 is used as the host and 27107 as the port.
servers = ["127.0.0.1:27017"]
`
func (m *MongoDB) SampleConfig() string {
return sampleConfig
}
func (*MongoDB) Description() string {
return "Read metrics from one or many MongoDB servers"
}
var localhost = &url.URL{Host: "127.0.0.1:27017"}
// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (m *MongoDB) Gather(acc inputs.Accumulator) error {
if len(m.Servers) == 0 {
m.gatherServer(m.getMongoServer(localhost), acc)
return nil
}
var wg sync.WaitGroup
var outerr error
for _, serv := range m.Servers {
u, err := url.Parse(serv)
if err != nil {
return fmt.Errorf("Unable to parse to address '%s': %s", serv, err)
} else if u.Scheme == "" {
u.Scheme = "mongodb"
// fallback to simple string based address (i.e. "10.0.0.1:10000")
u.Host = serv
if u.Path == u.Host {
u.Path = ""
}
}
wg.Add(1)
go func() {
defer wg.Done()
outerr = m.gatherServer(m.getMongoServer(u), acc)
}()
}
wg.Wait()
return outerr
}
func (m *MongoDB) getMongoServer(url *url.URL) *Server {
if _, ok := m.mongos[url.Host]; !ok {
m.mongos[url.Host] = &Server{
Url: url,
}
}
return m.mongos[url.Host]
}
func (m *MongoDB) gatherServer(server *Server, acc inputs.Accumulator) error {
if server.Session == nil {
var dialAddrs []string
if server.Url.User != nil {
dialAddrs = []string{server.Url.String()}
} else {
dialAddrs = []string{server.Url.Host}
}
dialInfo, err := mgo.ParseURL(dialAddrs[0])
if err != nil {
return fmt.Errorf("Unable to parse URL (%s), %s\n",
dialAddrs[0], err.Error())
}
dialInfo.Direct = true
dialInfo.Timeout = time.Duration(10) * time.Second
if m.Ssl.Enabled {
tlsConfig := &tls.Config{}
if len(m.Ssl.CaCerts) > 0 {
roots := x509.NewCertPool()
for _, caCert := range m.Ssl.CaCerts {
ok := roots.AppendCertsFromPEM([]byte(caCert))
if !ok {
return fmt.Errorf("failed to parse root certificate")
}
}
tlsConfig.RootCAs = roots
} else {
tlsConfig.InsecureSkipVerify = true
}
dialInfo.DialServer = func(addr *mgo.ServerAddr) (net.Conn, error) {
conn, err := tls.Dial("tcp", addr.String(), tlsConfig)
if err != nil {
fmt.Printf("error in Dial, %s\n", err.Error())
}
return conn, err
}
}
sess, err := mgo.DialWithInfo(dialInfo)
if err != nil {
fmt.Printf("error dialing over ssl, %s\n", err.Error())
return fmt.Errorf("Unable to connect to MongoDB, %s\n", err.Error())
}
server.Session = sess
}
return server.gatherData(acc)
}
func init() {
inputs.Add("mongodb", func() inputs.Input {
return &MongoDB{
mongos: make(map[string]*Server),
}
})
}