This was broken by changes in go 1.8 to url.Parse. This change allows the string but prompts the user to move to the correct url string.
181 lines
4.3 KiB
Go
181 lines
4.3 KiB
Go
package mongodb
|
|
|
|
import (
|
|
"crypto/tls"
|
|
"crypto/x509"
|
|
"fmt"
|
|
"log"
|
|
"net"
|
|
"net/url"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
"github.com/influxdata/telegraf/internal"
|
|
"github.com/influxdata/telegraf/plugins/inputs"
|
|
"gopkg.in/mgo.v2"
|
|
)
|
|
|
|
type MongoDB struct {
|
|
Servers []string
|
|
Ssl Ssl
|
|
mongos map[string]*Server
|
|
GatherPerdbStats bool
|
|
|
|
// Path to CA file
|
|
SSLCA string `toml:"ssl_ca"`
|
|
// Path to host cert file
|
|
SSLCert string `toml:"ssl_cert"`
|
|
// Path to cert key file
|
|
SSLKey string `toml:"ssl_key"`
|
|
// Use SSL but skip chain & host verification
|
|
InsecureSkipVerify bool
|
|
}
|
|
|
|
type Ssl struct {
|
|
Enabled bool
|
|
CaCerts []string `toml:"cacerts"`
|
|
}
|
|
|
|
var sampleConfig = `
|
|
## An array of URLs of the form:
|
|
## "mongodb://" [user ":" pass "@"] host [ ":" port]
|
|
## For example:
|
|
## mongodb://user:auth_key@10.10.3.30:27017,
|
|
## mongodb://10.10.3.33:18832,
|
|
servers = ["mongodb://127.0.0.1:27017"]
|
|
gather_perdb_stats = false
|
|
|
|
## Optional SSL Config
|
|
# ssl_ca = "/etc/telegraf/ca.pem"
|
|
# ssl_cert = "/etc/telegraf/cert.pem"
|
|
# ssl_key = "/etc/telegraf/key.pem"
|
|
## Use SSL but skip chain & host verification
|
|
# insecure_skip_verify = false
|
|
`
|
|
|
|
func (m *MongoDB) SampleConfig() string {
|
|
return sampleConfig
|
|
}
|
|
|
|
func (*MongoDB) Description() string {
|
|
return "Read metrics from one or many MongoDB servers"
|
|
}
|
|
|
|
var localhost = &url.URL{Host: "mongodb://127.0.0.1:27017"}
|
|
|
|
// Reads stats from all configured servers accumulates stats.
|
|
// Returns one of the errors encountered while gather stats (if any).
|
|
func (m *MongoDB) Gather(acc telegraf.Accumulator) error {
|
|
if len(m.Servers) == 0 {
|
|
m.gatherServer(m.getMongoServer(localhost), acc)
|
|
return nil
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
for i, serv := range m.Servers {
|
|
if !strings.HasPrefix(serv, "mongodb://") {
|
|
// Preserve backwards compatibility for hostnames without a
|
|
// scheme, broken in go 1.8. Remove in Telegraf 2.0
|
|
serv = "mongodb://" + serv
|
|
log.Printf("W! [inputs.mongodb] Using %q as connection URL; please update your configuration to use an URL", serv)
|
|
m.Servers[i] = serv
|
|
}
|
|
|
|
u, err := url.Parse(serv)
|
|
if err != nil {
|
|
acc.AddError(fmt.Errorf("Unable to parse address %q: %s", serv, err))
|
|
continue
|
|
}
|
|
if u.Host == "" {
|
|
acc.AddError(fmt.Errorf("Unable to parse address %q", serv))
|
|
continue
|
|
}
|
|
|
|
wg.Add(1)
|
|
go func(srv *Server) {
|
|
defer wg.Done()
|
|
acc.AddError(m.gatherServer(srv, acc))
|
|
}(m.getMongoServer(u))
|
|
}
|
|
|
|
wg.Wait()
|
|
return nil
|
|
}
|
|
|
|
func (m *MongoDB) getMongoServer(url *url.URL) *Server {
|
|
if _, ok := m.mongos[url.Host]; !ok {
|
|
m.mongos[url.Host] = &Server{
|
|
Url: url,
|
|
}
|
|
}
|
|
return m.mongos[url.Host]
|
|
}
|
|
|
|
func (m *MongoDB) gatherServer(server *Server, acc telegraf.Accumulator) error {
|
|
if server.Session == nil {
|
|
var dialAddrs []string
|
|
if server.Url.User != nil {
|
|
dialAddrs = []string{server.Url.String()}
|
|
} else {
|
|
dialAddrs = []string{server.Url.Host}
|
|
}
|
|
dialInfo, err := mgo.ParseURL(dialAddrs[0])
|
|
if err != nil {
|
|
return fmt.Errorf("Unable to parse URL (%s), %s\n",
|
|
dialAddrs[0], err.Error())
|
|
}
|
|
dialInfo.Direct = true
|
|
dialInfo.Timeout = 5 * time.Second
|
|
|
|
var tlsConfig *tls.Config
|
|
|
|
if m.Ssl.Enabled {
|
|
// Deprecated SSL config
|
|
tlsConfig = &tls.Config{}
|
|
if len(m.Ssl.CaCerts) > 0 {
|
|
roots := x509.NewCertPool()
|
|
for _, caCert := range m.Ssl.CaCerts {
|
|
ok := roots.AppendCertsFromPEM([]byte(caCert))
|
|
if !ok {
|
|
return fmt.Errorf("failed to parse root certificate")
|
|
}
|
|
}
|
|
tlsConfig.RootCAs = roots
|
|
} else {
|
|
tlsConfig.InsecureSkipVerify = true
|
|
}
|
|
} else {
|
|
tlsConfig, err = internal.GetTLSConfig(
|
|
m.SSLCert, m.SSLKey, m.SSLCA, m.InsecureSkipVerify)
|
|
}
|
|
|
|
// If configured to use TLS, add a dial function
|
|
if tlsConfig != nil {
|
|
dialInfo.DialServer = func(addr *mgo.ServerAddr) (net.Conn, error) {
|
|
conn, err := tls.Dial("tcp", addr.String(), tlsConfig)
|
|
if err != nil {
|
|
fmt.Printf("error in Dial, %s\n", err.Error())
|
|
}
|
|
return conn, err
|
|
}
|
|
}
|
|
|
|
sess, err := mgo.DialWithInfo(dialInfo)
|
|
if err != nil {
|
|
return fmt.Errorf("Unable to connect to MongoDB, %s\n", err.Error())
|
|
}
|
|
server.Session = sess
|
|
}
|
|
return server.gatherData(acc, m.GatherPerdbStats)
|
|
}
|
|
|
|
func init() {
|
|
inputs.Add("mongodb", func() telegraf.Input {
|
|
return &MongoDB{
|
|
mongos: make(map[string]*Server),
|
|
}
|
|
})
|
|
}
|