Use mysql.ParseDSN func instead of url.Parse

The MySQL DB driver has it's own DSN parsing function. Previously we
were using the url.Parse function, but this causes problems because a
valid MySQL DSN can be an invalid http URL, namely when using some
special characters in the password.

This change uses the MySQL DB driver's builtin ParseDSN function and
applies a timeout parameter natively via that.

Another benefit of this change is that we fail earlier if given an
invalid MySQL DSN.

closes #870
closes #1842
This commit is contained in:
Cameron Sparr
2016-10-12 12:43:51 +01:00
parent b00ad65b08
commit a65447d22e
4 changed files with 55 additions and 171 deletions

View File

@@ -4,16 +4,16 @@ import (
"bytes"
"database/sql"
"fmt"
"net/url"
"strconv"
"strings"
"sync"
"time"
_ "github.com/go-sql-driver/mysql"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/go-sql-driver/mysql"
)
type Mysql struct {
@@ -398,27 +398,6 @@ var (
}
)
func dsnAddTimeout(dsn string) (string, error) {
// DSN "?timeout=5s" is not valid, but "/?timeout=5s" is valid ("" and "/"
// are the same DSN)
if dsn == "" {
dsn = "/"
}
u, err := url.Parse(dsn)
if err != nil {
return "", err
}
v := u.Query()
// Only override timeout if not already defined
if _, ok := v["timeout"]; ok == false {
v.Add("timeout", defaultTimeout.String())
u.RawQuery = v.Encode()
}
return u.String(), nil
}
// Math constants
const (
picoSeconds = 1e12
@@ -682,10 +661,7 @@ func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accu
var val sql.RawBytes
// parse DSN and save server tag
servtag, err := parseDSN(serv)
if err != nil {
servtag = "localhost"
}
servtag := getDSNTag(serv)
tags := map[string]string{"server": servtag}
fields := make(map[string]interface{})
for rows.Next() {
@@ -722,10 +698,7 @@ func (m *Mysql) gatherSlaveStatuses(db *sql.DB, serv string, acc telegraf.Accumu
}
defer rows.Close()
servtag, err := parseDSN(serv)
if err != nil {
servtag = "localhost"
}
servtag := getDSNTag(serv)
tags := map[string]string{"server": servtag}
fields := make(map[string]interface{})
@@ -770,11 +743,7 @@ func (m *Mysql) gatherBinaryLogs(db *sql.DB, serv string, acc telegraf.Accumulat
defer rows.Close()
// parse DSN and save host as a tag
var servtag string
servtag, err = parseDSN(serv)
if err != nil {
servtag = "localhost"
}
servtag := getDSNTag(serv)
tags := map[string]string{"server": servtag}
var (
size uint64 = 0
@@ -817,11 +786,7 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum
}
// parse the DSN and save host name as a tag
var servtag string
servtag, err = parseDSN(serv)
if err != nil {
servtag = "localhost"
}
servtag := getDSNTag(serv)
tags := map[string]string{"server": servtag}
fields := make(map[string]interface{})
for rows.Next() {
@@ -932,10 +897,7 @@ func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf.
var servtag string
fields := make(map[string]interface{})
servtag, err = parseDSN(serv)
if err != nil {
servtag = "localhost"
}
servtag = getDSNTag(serv)
// mapping of state with its counts
stateCounts := make(map[string]uint32, len(generalThreadStates))
@@ -978,10 +940,7 @@ func (m *Mysql) gatherPerfTableIOWaits(db *sql.DB, serv string, acc telegraf.Acc
timeFetch, timeInsert, timeUpdate, timeDelete float64
)
servtag, err = parseDSN(serv)
if err != nil {
servtag = "localhost"
}
servtag = getDSNTag(serv)
for rows.Next() {
err = rows.Scan(&objSchema, &objName,
@@ -1030,10 +989,7 @@ func (m *Mysql) gatherPerfIndexIOWaits(db *sql.DB, serv string, acc telegraf.Acc
timeFetch, timeInsert, timeUpdate, timeDelete float64
)
servtag, err = parseDSN(serv)
if err != nil {
servtag = "localhost"
}
servtag = getDSNTag(serv)
for rows.Next() {
err = rows.Scan(&objSchema, &objName, &indexName,
@@ -1085,10 +1041,7 @@ func (m *Mysql) gatherInfoSchemaAutoIncStatuses(db *sql.DB, serv string, acc tel
incValue, maxInt uint64
)
servtag, err := parseDSN(serv)
if err != nil {
servtag = "localhost"
}
servtag := getDSNTag(serv)
for rows.Next() {
if err := rows.Scan(&schema, &table, &column, &incValue, &maxInt); err != nil {
@@ -1132,10 +1085,7 @@ func (m *Mysql) gatherPerfTableLockWaits(db *sql.DB, serv string, acc telegraf.A
}
defer rows.Close()
servtag, err := parseDSN(serv)
if err != nil {
servtag = "localhost"
}
servtag := getDSNTag(serv)
var (
objectSchema string
@@ -1257,10 +1207,7 @@ func (m *Mysql) gatherPerfEventWaits(db *sql.DB, serv string, acc telegraf.Accum
starCount, timeWait float64
)
servtag, err := parseDSN(serv)
if err != nil {
servtag = "localhost"
}
servtag := getDSNTag(serv)
tags := map[string]string{
"server": servtag,
}
@@ -1295,10 +1242,7 @@ func (m *Mysql) gatherPerfFileEventsStatuses(db *sql.DB, serv string, acc telegr
sumNumBytesRead, sumNumBytesWrite float64
)
servtag, err := parseDSN(serv)
if err != nil {
servtag = "localhost"
}
servtag := getDSNTag(serv)
tags := map[string]string{
"server": servtag,
}
@@ -1365,10 +1309,7 @@ func (m *Mysql) gatherPerfEventsStatements(db *sql.DB, serv string, acc telegraf
noIndexUsed float64
)
servtag, err := parseDSN(serv)
if err != nil {
servtag = "localhost"
}
servtag := getDSNTag(serv)
tags := map[string]string{
"server": servtag,
}
@@ -1412,14 +1353,8 @@ func (m *Mysql) gatherPerfEventsStatements(db *sql.DB, serv string, acc telegraf
// gatherTableSchema can be used to gather stats on each schema
func (m *Mysql) gatherTableSchema(db *sql.DB, serv string, acc telegraf.Accumulator) error {
var (
dbList []string
servtag string
)
servtag, err := parseDSN(serv)
if err != nil {
servtag = "localhost"
}
var dbList []string
servtag := getDSNTag(serv)
// if the list of databases if empty, then get all databases
if len(m.TableSchemaDatabases) == 0 {
@@ -1575,6 +1510,27 @@ func copyTags(in map[string]string) map[string]string {
return out
}
func dsnAddTimeout(dsn string) (string, error) {
conf, err := mysql.ParseDSN(dsn)
if err != nil {
return "", err
}
if conf.Timeout == 0 {
conf.Timeout = time.Second * 5
}
return conf.FormatDSN(), nil
}
func getDSNTag(dsn string) string {
conf, err := mysql.ParseDSN(dsn)
if err != nil {
return "127.0.0.1:3306"
}
return conf.Addr
}
func init() {
inputs.Add("mysql", func() telegraf.Input {
return &Mysql{}