improvements on queries and additional comments

This commit is contained in:
Maksadbek 2016-03-18 14:15:54 +05:00
parent 5064d83a41
commit 3144351aa2
1 changed files with 248 additions and 40 deletions

View File

@ -20,6 +20,8 @@ type Mysql struct {
PerfEventsStatementsLimit uint32 PerfEventsStatementsLimit uint32
PerfEventsStatementsTimeLimit uint32 PerfEventsStatementsTimeLimit uint32
TableSchemaDatabases []string TableSchemaDatabases []string
GatherProcessList bool
GatherInfoSchemaAutoInc bool
GatherSlaveStatus bool GatherSlaveStatus bool
GatherBinaryLogs bool GatherBinaryLogs bool
GatherTableIOWaits bool GatherTableIOWaits bool
@ -27,6 +29,7 @@ type Mysql struct {
GatherTableSchema bool GatherTableSchema bool
GatherFileEventsStats bool GatherFileEventsStats bool
GatherPerfEventsStatements bool GatherPerfEventsStatements bool
IntervalSlow string
} }
var sampleConfig = ` var sampleConfig = `
@ -43,6 +46,8 @@ var sampleConfig = `
PerfEventsStatementsLimit = 250 PerfEventsStatementsLimit = 250
PerfEventsStatementsTimeLimit = 86400 PerfEventsStatementsTimeLimit = 86400
TableSchemaDatabases = [] TableSchemaDatabases = []
GatherProcessList = false
GatherInfoSchemaAutoInc = false
GatherSlaveStatus = false GatherSlaveStatus = false
GatherBinaryLogs = false GatherBinaryLogs = false
GatherTableIOWaits = false GatherTableIOWaits = false
@ -50,6 +55,8 @@ var sampleConfig = `
GatherTableSchema = false GatherTableSchema = false
GatherFileEventsStats = false GatherFileEventsStats = false
GatherPerfEventsStatements = false GatherPerfEventsStatements = false
# Some queries we may want to run less often (such as SHOW GLOBAL VARIABLES)
IntervalSlow = "30m"
` `
var defaultTimeout = time.Second * time.Duration(5) var defaultTimeout = time.Second * time.Duration(5)
@ -62,7 +69,22 @@ func (m *Mysql) Description() string {
return "Read metrics from one or many mysql servers" return "Read metrics from one or many mysql servers"
} }
var localhost = "" var (
localhost = ""
lastT time.Time
initDone = false
scanIntervalSlow uint32
)
func (m *Mysql) InitMysql() {
if len(m.IntervalSlow) > 0 {
interval, err := time.ParseDuration(m.IntervalSlow)
if err == nil && interval.Seconds() >= 1.0 {
scanIntervalSlow = uint32(interval.Seconds())
}
}
initDone = true
}
func (m *Mysql) Gather(acc telegraf.Accumulator) error { func (m *Mysql) Gather(acc telegraf.Accumulator) error {
if len(m.Servers) == 0 { if len(m.Servers) == 0 {
@ -72,6 +94,12 @@ func (m *Mysql) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
// Initialise additional query intervals
if !initDone {
m.InitMysql()
}
// Loop through each server and collect metrics
for _, serv := range m.Servers { for _, serv := range m.Servers {
err := m.gatherServer(serv, acc) err := m.gatherServer(serv, acc)
if err != nil { if err != nil {
@ -140,6 +168,114 @@ var mappings = []*mapping{
onServer: "Threads_", onServer: "Threads_",
inExport: "threads_", inExport: "threads_",
}, },
{
onServer: "Access_",
inExport: "access_",
},
{
onServer: "Aria__",
inExport: "aria_",
},
{
onServer: "Binlog__",
inExport: "binlog_",
},
{
onServer: "Busy_",
inExport: "busy_",
},
{
onServer: "Connection_",
inExport: "connection_",
},
{
onServer: "Delayed_",
inExport: "delayed_",
},
{
onServer: "Empty_",
inExport: "empty_",
},
{
onServer: "Executed_",
inExport: "executed_",
},
{
onServer: "Executed_",
inExport: "executed_",
},
{
onServer: "Feature_",
inExport: "feature_",
},
{
onServer: "Flush_",
inExport: "flush_",
},
{
onServer: "Last_",
inExport: "last_",
},
{
onServer: "Master_",
inExport: "master_",
},
{
onServer: "Max_",
inExport: "max_",
},
{
onServer: "Memory_",
inExport: "memory_",
},
{
onServer: "Not_",
inExport: "not_",
},
{
onServer: "Performance_",
inExport: "performance_",
},
{
onServer: "Prepared_",
inExport: "prepared_",
},
{
onServer: "Rows_",
inExport: "rows_",
},
{
onServer: "Rpl_",
inExport: "rpl_",
},
{
onServer: "Select_",
inExport: "select_",
},
{
onServer: "Slave_",
inExport: "slave_",
},
{
onServer: "Slow_",
inExport: "slow_",
},
{
onServer: "Sort_",
inExport: "sort_",
},
{
onServer: "Subquery_",
inExport: "subquery_",
},
{
onServer: "Tc_",
inExport: "tc_",
},
{
onServer: "Threadpool_",
inExport: "threadpool_",
},
} }
var ( var (
@ -261,13 +397,13 @@ const (
ORDER BY null` ORDER BY null`
infoSchemaAutoIncQuery = ` infoSchemaAutoIncQuery = `
SELECT table_schema, table_name, column_name, auto_increment, SELECT table_schema, table_name, column_name, auto_increment,
pow(2, case data_type CAST(pow(2, case data_type
when 'tinyint' then 7 when 'tinyint' then 7
when 'smallint' then 15 when 'smallint' then 15
when 'mediumint' then 23 when 'mediumint' then 23
when 'int' then 31 when 'int' then 31
when 'bigint' then 63 when 'bigint' then 63
end+(column_type like '% unsigned'))-1 as max_int end+(column_type like '% unsigned'))-1 as decimal(19)) as max_int
FROM information_schema.tables t FROM information_schema.tables t
JOIN information_schema.columns c USING (table_schema,table_name) JOIN information_schema.columns c USING (table_schema,table_name)
WHERE c.extra = 'auto_increment' AND t.auto_increment IS NOT NULL WHERE c.extra = 'auto_increment' AND t.auto_increment IS NOT NULL
@ -398,22 +534,35 @@ func (m *Mysql) gatherServer(serv string, acc telegraf.Accumulator) error {
return err return err
} }
// Global Variables may be gathered less often
if len(m.IntervalSlow) > 0 {
if uint32(time.Since(lastT).Seconds()) > scanIntervalSlow {
err = m.gatherGlobalVariables(db, serv, acc) err = m.gatherGlobalVariables(db, serv, acc)
if err != nil { if err != nil {
return err return err
} }
lastT = time.Now()
} else {
err = m.gatherGlobalVariables(db, serv, acc)
if err != nil {
return err
}
}
}
if m.GatherSlaveStatus { if m.GatherBinaryLogs {
err = m.gatherBinaryLogs(db, serv, acc) err = m.gatherBinaryLogs(db, serv, acc)
if err != nil { if err != nil {
return err return err
} }
} }
if m.GatherProcessList {
err = m.GatherProcessListStatuses(db, serv, acc) err = m.GatherProcessListStatuses(db, serv, acc)
if err != nil { if err != nil {
return err return err
} }
}
if m.GatherSlaveStatus { if m.GatherSlaveStatus {
err = m.gatherSlaveStatuses(db, serv, acc) err = m.gatherSlaveStatuses(db, serv, acc)
@ -422,10 +571,12 @@ func (m *Mysql) gatherServer(serv string, acc telegraf.Accumulator) error {
} }
} }
if m.GatherInfoSchemaAutoInc {
err = m.gatherInfoSchemaAutoIncStatuses(db, serv, acc) err = m.gatherInfoSchemaAutoIncStatuses(db, serv, acc)
if err != nil { if err != nil {
return err return err
} }
}
if m.GatherTableIOWaits { if m.GatherTableIOWaits {
err = m.gatherPerfTableIOWaits(db, serv, acc) err = m.gatherPerfTableIOWaits(db, serv, acc)
@ -474,15 +625,20 @@ func (m *Mysql) gatherServer(serv string, acc telegraf.Accumulator) error {
return nil return nil
} }
// gatherGlobalVariables can be used to fetch all global variables from
// MySQL environment.
func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accumulator) error { func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accumulator) error {
// run query
rows, err := db.Query(globalVariablesQuery) rows, err := db.Query(globalVariablesQuery)
if err != nil { if err != nil {
return err return err
} }
defer rows.Close() defer rows.Close()
var key string var key string
var val sql.RawBytes var val sql.RawBytes
// parse DSN and save server tag
servtag, err := parseDSN(serv) servtag, err := parseDSN(serv)
if err != nil { if err != nil {
servtag = "localhost" servtag = "localhost"
@ -494,11 +650,20 @@ func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accu
return err return err
} }
key = strings.ToLower(key) key = strings.ToLower(key)
// parse value, if it is numeric then save, otherwise ignore
if floatVal, ok := parseValue(val); ok { if floatVal, ok := parseValue(val); ok {
fields[key] = floatVal fields[key] = floatVal
} }
} // Send 20 fields at a time
if len(fields) >= 20 {
acc.AddFields("mysql_variables", fields, tags) acc.AddFields("mysql_variables", fields, tags)
fields = make(map[string]interface{})
}
}
// Send any remaining fields
if len(fields) > 0 {
acc.AddFields("mysql_variables", fields, tags)
}
return nil return nil
} }
@ -507,40 +672,41 @@ func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accu
// If the multi-source replication is set, then everything works differently // If the multi-source replication is set, then everything works differently
// This code does not work with multi-source replication. // This code does not work with multi-source replication.
func (m *Mysql) gatherSlaveStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error { func (m *Mysql) gatherSlaveStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error {
// run query
rows, err := db.Query(slaveStatusQuery) rows, err := db.Query(slaveStatusQuery)
if err != nil { if err != nil {
return err return err
} }
defer rows.Close() defer rows.Close()
servtag, err := parseDSN(serv) servtag, err := parseDSN(serv)
if err != nil { if err != nil {
servtag = "localhost" servtag = "localhost"
} }
tags := map[string]string{"server": servtag} tags := map[string]string{"server": servtag}
fields := make(map[string]interface{}) fields := make(map[string]interface{})
if rows.Next() {
cols, err := rows.Columns()
// to save the column names as a field key
// scanning keys and values separately
if rows.Next() {
// get columns names, and create an array with its length
cols, err := rows.Columns()
if err != nil { if err != nil {
return err return err
} }
vals := make([]interface{}, len(cols)) vals := make([]interface{}, len(cols))
// fill the array with sql.Rawbytes
for i := range vals { for i := range vals {
vals[i] = &sql.RawBytes{} vals[i] = &sql.RawBytes{}
} }
if err = rows.Scan(vals...); err != nil { if err = rows.Scan(vals...); err != nil {
return err return err
} }
// range over columns, and try to parse values
for i, col := range cols { for i, col := range cols {
// skip unparsable values // skip unparsable values
if value, ok := parseValue(*vals[i].(*sql.RawBytes)); ok { if value, ok := parseValue(*vals[i].(*sql.RawBytes)); ok {
//acc.Add("slave_"+col, value, tags)
fields["slave_"+col] = value fields["slave_"+col] = value
} }
} }
@ -551,13 +717,16 @@ func (m *Mysql) gatherSlaveStatuses(db *sql.DB, serv string, acc telegraf.Accumu
} }
// gatherBinaryLogs can be used to collect size and count of all binary files // gatherBinaryLogs can be used to collect size and count of all binary files
// binlogs metric requires the MySQL server to turn it on in configuration
func (m *Mysql) gatherBinaryLogs(db *sql.DB, serv string, acc telegraf.Accumulator) error { func (m *Mysql) gatherBinaryLogs(db *sql.DB, serv string, acc telegraf.Accumulator) error {
// run query
rows, err := db.Query(binaryLogsQuery) rows, err := db.Query(binaryLogsQuery)
if err != nil { if err != nil {
return err return err
} }
defer rows.Close() defer rows.Close()
// parse DSN and save host as a tag
var servtag string var servtag string
servtag, err = parseDSN(serv) servtag, err = parseDSN(serv)
if err != nil { if err != nil {
@ -572,6 +741,7 @@ func (m *Mysql) gatherBinaryLogs(db *sql.DB, serv string, acc telegraf.Accumulat
fileName string fileName string
) )
// iterate over rows and count the size and count of files
for rows.Next() { for rows.Next() {
if err := rows.Scan(&fileName, &fileSize); err != nil { if err := rows.Scan(&fileName, &fileSize); err != nil {
return err return err
@ -585,6 +755,9 @@ func (m *Mysql) gatherBinaryLogs(db *sql.DB, serv string, acc telegraf.Accumulat
return nil return nil
} }
// gatherGlobalStatuses can be used to get MySQL status metrics
// the mappings of actual names and names of each status to be exported
// to output is provided on mappings variable
func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error { func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error {
// If user forgot the '/', add it // If user forgot the '/', add it
if strings.HasSuffix(serv, ")") { if strings.HasSuffix(serv, ")") {
@ -593,11 +766,13 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum
serv = "" serv = ""
} }
// run query
rows, err := db.Query(globalStatusQuery) rows, err := db.Query(globalStatusQuery)
if err != nil { if err != nil {
return err return err
} }
// parse the DSN and save host name as a tag
var servtag string var servtag string
servtag, err = parseDSN(serv) servtag, err = parseDSN(serv)
if err != nil { if err != nil {
@ -616,18 +791,26 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum
var found bool var found bool
// iterate over mappings and gather metrics that is provided on mapping
for _, mapped := range mappings { for _, mapped := range mappings {
if strings.HasPrefix(name, mapped.onServer) { if strings.HasPrefix(name, mapped.onServer) {
// convert numeric values to integer
i, _ := strconv.Atoi(string(val.([]byte))) i, _ := strconv.Atoi(string(val.([]byte)))
fields[mapped.inExport+name[len(mapped.onServer):]] = i fields[mapped.inExport+name[len(mapped.onServer):]] = i
found = true found = true
} }
} }
// Send 20 fields at a time
if len(fields) >= 20 {
acc.AddFields("mysql", fields, tags)
fields = make(map[string]interface{})
}
if found { if found {
continue continue
} }
// search for specific values
switch name { switch name {
case "Queries": case "Queries":
i, err := strconv.ParseInt(string(val.([]byte)), 10, 64) i, err := strconv.ParseInt(string(val.([]byte)), 10, 64)
@ -643,10 +826,27 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum
} }
fields["slow_queries"] = i fields["slow_queries"] = i
case "Connections":
i, err := strconv.ParseInt(string(val.([]byte)), 10, 64)
if err != nil {
return err
} }
fields["connections"] = i
case "Syncs":
i, err := strconv.ParseInt(string(val.([]byte)), 10, 64)
if err != nil {
return err
} }
fields["syncs"] = i
}
// Send any remaining fields
if len(fields) > 0 {
acc.AddFields("mysql", fields, tags) acc.AddFields("mysql", fields, tags)
}
}
// gather connection metrics from processlist for each user
if m.GatherProcessList {
conn_rows, err := db.Query("SELECT user, sum(1) FROM INFORMATION_SCHEMA.PROCESSLIST GROUP BY user") conn_rows, err := db.Query("SELECT user, sum(1) FROM INFORMATION_SCHEMA.PROCESSLIST GROUP BY user")
for conn_rows.Next() { for conn_rows.Next() {
@ -667,11 +867,15 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum
fields["connections"] = connections fields["connections"] = connections
acc.AddFields("mysql_users", fields, tags) acc.AddFields("mysql_users", fields, tags)
} }
}
return nil return nil
} }
// GatherProcessList can be used to collect metrics on each running command
// and its state with its running count
func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error { func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error {
// run query
rows, err := db.Query(infoSchemaProcessListQuery) rows, err := db.Query(infoSchemaProcessListQuery)
if err != nil { if err != nil {
return err return err
@ -702,7 +906,9 @@ func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf.
if err != nil { if err != nil {
return err return err
} }
// each state has its mapping
foundState := findThreadState(command, state) foundState := findThreadState(command, state)
// count each state
stateCounts[foundState] += count stateCounts[foundState] += count
} }
@ -821,7 +1027,7 @@ func (m *Mysql) gatherPerfIndexIOWaits(db *sql.DB, serv string, acc telegraf.Acc
return nil return nil
} }
// gatherInfoSchemaAutoIncStatuses can be used to get auto incremented value of the column // gatherInfoSchemaAutoIncStatuses can be used to get auto incremented values of the column
func (m *Mysql) gatherInfoSchemaAutoIncStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error { func (m *Mysql) gatherInfoSchemaAutoIncStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error {
rows, err := db.Query(infoSchemaAutoIncQuery) rows, err := db.Query(infoSchemaAutoIncQuery)
if err != nil { if err != nil {
@ -861,6 +1067,7 @@ func (m *Mysql) gatherInfoSchemaAutoIncStatuses(db *sql.DB, serv string, acc tel
// gatherPerfTableLockWaits can be used to get // gatherPerfTableLockWaits can be used to get
// the total number and time for SQL and external lock wait events // the total number and time for SQL and external lock wait events
// for each table and operation // for each table and operation
// requires the MySQL server to be enabled to save this metric
func (m *Mysql) gatherPerfTableLockWaits(db *sql.DB, serv string, acc telegraf.Accumulator) error { func (m *Mysql) gatherPerfTableLockWaits(db *sql.DB, serv string, acc telegraf.Accumulator) error {
// check if table exists, // check if table exists,
// if performance_schema is not enabled, tables do not exist // if performance_schema is not enabled, tables do not exist
@ -1208,6 +1415,7 @@ func (m *Mysql) gatherPerfEventsStatements(db *sql.DB, serv string, acc telegraf
return nil return nil
} }
// gatherTableSchema can be used to gather stats on each schema
func (m *Mysql) gatherTableSchema(db *sql.DB, serv string, acc telegraf.Accumulator) error { func (m *Mysql) gatherTableSchema(db *sql.DB, serv string, acc telegraf.Accumulator) error {
var ( var (
dbList []string dbList []string