improvements on queries and additional comments

This commit is contained in:
Maksadbek 2016-03-18 14:15:54 +05:00 committed by Cameron Sparr
parent c5d31e7527
commit d59999f510
1 changed files with 248 additions and 40 deletions

View File

@ -20,6 +20,8 @@ type Mysql struct {
PerfEventsStatementsLimit uint32
PerfEventsStatementsTimeLimit uint32
TableSchemaDatabases []string
GatherProcessList bool
GatherInfoSchemaAutoInc bool
GatherSlaveStatus bool
GatherBinaryLogs bool
GatherTableIOWaits bool
@ -27,6 +29,7 @@ type Mysql struct {
GatherTableSchema bool
GatherFileEventsStats bool
GatherPerfEventsStatements bool
IntervalSlow string
var sampleConfig = `
@ -43,6 +46,8 @@ var sampleConfig = `
PerfEventsStatementsLimit = 250
PerfEventsStatementsTimeLimit = 86400
TableSchemaDatabases = []
GatherProcessList = false
GatherInfoSchemaAutoInc = false
GatherSlaveStatus = false
GatherBinaryLogs = false
GatherTableIOWaits = false
@ -50,6 +55,8 @@ var sampleConfig = `
GatherTableSchema = false
GatherFileEventsStats = false
GatherPerfEventsStatements = false
# Some queries we may want to run less often (such as SHOW GLOBAL VARIABLES)
IntervalSlow = "30m"
var defaultTimeout = time.Second * time.Duration(5)
@ -62,7 +69,22 @@ func (m *Mysql) Description() string {
return "Read metrics from one or many mysql servers"
var localhost = ""
var (
localhost = ""
lastT time.Time
initDone = false
scanIntervalSlow uint32
func (m *Mysql) InitMysql() {
if len(m.IntervalSlow) > 0 {
interval, err := time.ParseDuration(m.IntervalSlow)
if err == nil && interval.Seconds() >= 1.0 {
scanIntervalSlow = uint32(interval.Seconds())
initDone = true
func (m *Mysql) Gather(acc telegraf.Accumulator) error {
if len(m.Servers) == 0 {
@ -72,6 +94,12 @@ func (m *Mysql) Gather(acc telegraf.Accumulator) error {
return nil
// Initialise additional query intervals
if !initDone {
// Loop through each server and collect metrics
for _, serv := range m.Servers {
err := m.gatherServer(serv, acc)
if err != nil {
@ -140,6 +168,114 @@ var mappings = []*mapping{
onServer: "Threads_",
inExport: "threads_",
onServer: "Access_",
inExport: "access_",
onServer: "Aria__",
inExport: "aria_",
onServer: "Binlog__",
inExport: "binlog_",
onServer: "Busy_",
inExport: "busy_",
onServer: "Connection_",
inExport: "connection_",
onServer: "Delayed_",
inExport: "delayed_",
onServer: "Empty_",
inExport: "empty_",
onServer: "Executed_",
inExport: "executed_",
onServer: "Executed_",
inExport: "executed_",
onServer: "Feature_",
inExport: "feature_",
onServer: "Flush_",
inExport: "flush_",
onServer: "Last_",
inExport: "last_",
onServer: "Master_",
inExport: "master_",
onServer: "Max_",
inExport: "max_",
onServer: "Memory_",
inExport: "memory_",
onServer: "Not_",
inExport: "not_",
onServer: "Performance_",
inExport: "performance_",
onServer: "Prepared_",
inExport: "prepared_",
onServer: "Rows_",
inExport: "rows_",
onServer: "Rpl_",
inExport: "rpl_",
onServer: "Select_",
inExport: "select_",
onServer: "Slave_",
inExport: "slave_",
onServer: "Slow_",
inExport: "slow_",
onServer: "Sort_",
inExport: "sort_",
onServer: "Subquery_",
inExport: "subquery_",
onServer: "Tc_",
inExport: "tc_",
onServer: "Threadpool_",
inExport: "threadpool_",
var (
@ -261,13 +397,13 @@ const (
ORDER BY null`
infoSchemaAutoIncQuery = `
SELECT table_schema, table_name, column_name, auto_increment,
pow(2, case data_type
CAST(pow(2, case data_type
when 'tinyint' then 7
when 'smallint' then 15
when 'mediumint' then 23
when 'int' then 31
when 'bigint' then 63
end+(column_type like '% unsigned'))-1 as max_int
end+(column_type like '% unsigned'))-1 as decimal(19)) as max_int
FROM information_schema.tables t
JOIN information_schema.columns c USING (table_schema,table_name)
WHERE c.extra = 'auto_increment' AND t.auto_increment IS NOT NULL
@ -398,22 +534,35 @@ func (m *Mysql) gatherServer(serv string, acc telegraf.Accumulator) error {
return err
// Global Variables may be gathered less often
if len(m.IntervalSlow) > 0 {
if uint32(time.Since(lastT).Seconds()) > scanIntervalSlow {
err = m.gatherGlobalVariables(db, serv, acc)
if err != nil {
return err
lastT = time.Now()
} else {
err = m.gatherGlobalVariables(db, serv, acc)
if err != nil {
return err
if m.GatherSlaveStatus {
if m.GatherBinaryLogs {
err = m.gatherBinaryLogs(db, serv, acc)
if err != nil {
return err
if m.GatherProcessList {
err = m.GatherProcessListStatuses(db, serv, acc)
if err != nil {
return err
if m.GatherSlaveStatus {
err = m.gatherSlaveStatuses(db, serv, acc)
@ -422,10 +571,12 @@ func (m *Mysql) gatherServer(serv string, acc telegraf.Accumulator) error {
if m.GatherInfoSchemaAutoInc {
err = m.gatherInfoSchemaAutoIncStatuses(db, serv, acc)
if err != nil {
return err
if m.GatherTableIOWaits {
err = m.gatherPerfTableIOWaits(db, serv, acc)
@ -474,15 +625,20 @@ func (m *Mysql) gatherServer(serv string, acc telegraf.Accumulator) error {
return nil
// gatherGlobalVariables can be used to fetch all global variables from
// MySQL environment.
func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accumulator) error {
// run query
rows, err := db.Query(globalVariablesQuery)
if err != nil {
return err
defer rows.Close()
var key string
var val sql.RawBytes
// parse DSN and save server tag
servtag, err := parseDSN(serv)
if err != nil {
servtag = "localhost"
@ -494,11 +650,20 @@ func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accu
return err
key = strings.ToLower(key)
// parse value, if it is numeric then save, otherwise ignore
if floatVal, ok := parseValue(val); ok {
fields[key] = floatVal
// Send 20 fields at a time
if len(fields) >= 20 {
acc.AddFields("mysql_variables", fields, tags)
fields = make(map[string]interface{})
// Send any remaining fields
if len(fields) > 0 {
acc.AddFields("mysql_variables", fields, tags)
return nil
@ -507,40 +672,41 @@ func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accu
// If the multi-source replication is set, then everything works differently
// This code does not work with multi-source replication.
func (m *Mysql) gatherSlaveStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error {
// run query
rows, err := db.Query(slaveStatusQuery)
if err != nil {
return err
defer rows.Close()
servtag, err := parseDSN(serv)
if err != nil {
servtag = "localhost"
tags := map[string]string{"server": servtag}
fields := make(map[string]interface{})
if rows.Next() {
cols, err := rows.Columns()
// to save the column names as a field key
// scanning keys and values separately
if rows.Next() {
// get columns names, and create an array with its length
cols, err := rows.Columns()
if err != nil {
return err
vals := make([]interface{}, len(cols))
// fill the array with sql.Rawbytes
for i := range vals {
vals[i] = &sql.RawBytes{}
if err = rows.Scan(vals...); err != nil {
return err
// range over columns, and try to parse values
for i, col := range cols {
// skip unparsable values
if value, ok := parseValue(*vals[i].(*sql.RawBytes)); ok {
//acc.Add("slave_"+col, value, tags)
fields["slave_"+col] = value
@ -551,13 +717,16 @@ func (m *Mysql) gatherSlaveStatuses(db *sql.DB, serv string, acc telegraf.Accumu
// gatherBinaryLogs can be used to collect size and count of all binary files
// binlogs metric requires the MySQL server to turn it on in configuration
func (m *Mysql) gatherBinaryLogs(db *sql.DB, serv string, acc telegraf.Accumulator) error {
// run query
rows, err := db.Query(binaryLogsQuery)
if err != nil {
return err
defer rows.Close()
// parse DSN and save host as a tag
var servtag string
servtag, err = parseDSN(serv)
if err != nil {
@ -572,6 +741,7 @@ func (m *Mysql) gatherBinaryLogs(db *sql.DB, serv string, acc telegraf.Accumulat
fileName string
// iterate over rows and count the size and count of files
for rows.Next() {
if err := rows.Scan(&fileName, &fileSize); err != nil {
return err
@ -585,6 +755,9 @@ func (m *Mysql) gatherBinaryLogs(db *sql.DB, serv string, acc telegraf.Accumulat
return nil
// gatherGlobalStatuses can be used to get MySQL status metrics
// the mappings of actual names and names of each status to be exported
// to output is provided on mappings variable
func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error {
// If user forgot the '/', add it
if strings.HasSuffix(serv, ")") {
@ -593,11 +766,13 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum
serv = ""
// run query
rows, err := db.Query(globalStatusQuery)
if err != nil {
return err
// parse the DSN and save host name as a tag
var servtag string
servtag, err = parseDSN(serv)
if err != nil {
@ -616,18 +791,26 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum
var found bool
// iterate over mappings and gather metrics that is provided on mapping
for _, mapped := range mappings {
if strings.HasPrefix(name, mapped.onServer) {
// convert numeric values to integer
i, _ := strconv.Atoi(string(val.([]byte)))
fields[mapped.inExport+name[len(mapped.onServer):]] = i
found = true
// Send 20 fields at a time
if len(fields) >= 20 {
acc.AddFields("mysql", fields, tags)
fields = make(map[string]interface{})
if found {
// search for specific values
switch name {
case "Queries":
i, err := strconv.ParseInt(string(val.([]byte)), 10, 64)
@ -643,10 +826,27 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum
fields["slow_queries"] = i
case "Connections":
i, err := strconv.ParseInt(string(val.([]byte)), 10, 64)
if err != nil {
return err
fields["connections"] = i
case "Syncs":
i, err := strconv.ParseInt(string(val.([]byte)), 10, 64)
if err != nil {
return err
fields["syncs"] = i
// Send any remaining fields
if len(fields) > 0 {
acc.AddFields("mysql", fields, tags)
// gather connection metrics from processlist for each user
if m.GatherProcessList {
conn_rows, err := db.Query("SELECT user, sum(1) FROM INFORMATION_SCHEMA.PROCESSLIST GROUP BY user")
for conn_rows.Next() {
@ -667,11 +867,15 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum
fields["connections"] = connections
acc.AddFields("mysql_users", fields, tags)
return nil
// GatherProcessList can be used to collect metrics on each running command
// and its state with its running count
func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error {
// run query
rows, err := db.Query(infoSchemaProcessListQuery)
if err != nil {
return err
@ -702,7 +906,9 @@ func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf.
if err != nil {
return err
// each state has its mapping
foundState := findThreadState(command, state)
// count each state
stateCounts[foundState] += count
@ -821,7 +1027,7 @@ func (m *Mysql) gatherPerfIndexIOWaits(db *sql.DB, serv string, acc telegraf.Acc
return nil
// gatherInfoSchemaAutoIncStatuses can be used to get auto incremented value of the column
// gatherInfoSchemaAutoIncStatuses can be used to get auto incremented values of the column
func (m *Mysql) gatherInfoSchemaAutoIncStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error {
rows, err := db.Query(infoSchemaAutoIncQuery)
if err != nil {
@ -861,6 +1067,7 @@ func (m *Mysql) gatherInfoSchemaAutoIncStatuses(db *sql.DB, serv string, acc tel
// gatherPerfTableLockWaits can be used to get
// the total number and time for SQL and external lock wait events
// for each table and operation
// requires the MySQL server to be enabled to save this metric
func (m *Mysql) gatherPerfTableLockWaits(db *sql.DB, serv string, acc telegraf.Accumulator) error {
// check if table exists,
// if performance_schema is not enabled, tables do not exist
@ -1208,6 +1415,7 @@ func (m *Mysql) gatherPerfEventsStatements(db *sql.DB, serv string, acc telegraf
return nil
// gatherTableSchema can be used to gather stats on each schema
func (m *Mysql) gatherTableSchema(db *sql.DB, serv string, acc telegraf.Accumulator) error {
var (
dbList []string