diff --git a/plugins/inputs/mysql/mysql.go b/plugins/inputs/mysql/mysql.go index 23f8e4762..ea59113ee 100644 --- a/plugins/inputs/mysql/mysql.go +++ b/plugins/inputs/mysql/mysql.go @@ -20,6 +20,8 @@ type Mysql struct { PerfEventsStatementsLimit uint32 PerfEventsStatementsTimeLimit uint32 TableSchemaDatabases []string + GatherProcessList bool + GatherInfoSchemaAutoInc bool GatherSlaveStatus bool GatherBinaryLogs bool GatherTableIOWaits bool @@ -27,6 +29,7 @@ type Mysql struct { GatherTableSchema bool GatherFileEventsStats bool GatherPerfEventsStatements bool + IntervalSlow string } var sampleConfig = ` @@ -43,6 +46,8 @@ var sampleConfig = ` PerfEventsStatementsLimit = 250 PerfEventsStatementsTimeLimit = 86400 TableSchemaDatabases = [] + GatherProcessList = false + GatherInfoSchemaAutoInc = false GatherSlaveStatus = false GatherBinaryLogs = false GatherTableIOWaits = false @@ -50,6 +55,8 @@ var sampleConfig = ` GatherTableSchema = false GatherFileEventsStats = false GatherPerfEventsStatements = false + # Some queries we may want to run less often (such as SHOW GLOBAL VARIABLES) + IntervalSlow = "30m" ` var defaultTimeout = time.Second * time.Duration(5) @@ -62,7 +69,22 @@ func (m *Mysql) Description() string { return "Read metrics from one or many mysql servers" } -var localhost = "" +var ( + localhost = "" + lastT time.Time + initDone = false + scanIntervalSlow uint32 +) + +func (m *Mysql) InitMysql() { + if len(m.IntervalSlow) > 0 { + interval, err := time.ParseDuration(m.IntervalSlow) + if err == nil && interval.Seconds() >= 1.0 { + scanIntervalSlow = uint32(interval.Seconds()) + } + } + initDone = true +} func (m *Mysql) Gather(acc telegraf.Accumulator) error { if len(m.Servers) == 0 { @@ -72,6 +94,12 @@ func (m *Mysql) Gather(acc telegraf.Accumulator) error { return nil } + // Initialise additional query intervals + if !initDone { + m.InitMysql() + } + + // Loop through each server and collect metrics for _, serv := range m.Servers { err := m.gatherServer(serv, acc) if err != nil { @@ -140,6 +168,114 @@ var mappings = []*mapping{ onServer: "Threads_", inExport: "threads_", }, + { + onServer: "Access_", + inExport: "access_", + }, + { + onServer: "Aria__", + inExport: "aria_", + }, + { + onServer: "Binlog__", + inExport: "binlog_", + }, + { + onServer: "Busy_", + inExport: "busy_", + }, + { + onServer: "Connection_", + inExport: "connection_", + }, + { + onServer: "Delayed_", + inExport: "delayed_", + }, + { + onServer: "Empty_", + inExport: "empty_", + }, + { + onServer: "Executed_", + inExport: "executed_", + }, + { + onServer: "Executed_", + inExport: "executed_", + }, + { + onServer: "Feature_", + inExport: "feature_", + }, + { + onServer: "Flush_", + inExport: "flush_", + }, + { + onServer: "Last_", + inExport: "last_", + }, + { + onServer: "Master_", + inExport: "master_", + }, + { + onServer: "Max_", + inExport: "max_", + }, + { + onServer: "Memory_", + inExport: "memory_", + }, + { + onServer: "Not_", + inExport: "not_", + }, + { + onServer: "Performance_", + inExport: "performance_", + }, + { + onServer: "Prepared_", + inExport: "prepared_", + }, + { + onServer: "Rows_", + inExport: "rows_", + }, + { + onServer: "Rpl_", + inExport: "rpl_", + }, + { + onServer: "Select_", + inExport: "select_", + }, + { + onServer: "Slave_", + inExport: "slave_", + }, + { + onServer: "Slow_", + inExport: "slow_", + }, + { + onServer: "Sort_", + inExport: "sort_", + }, + { + onServer: "Subquery_", + inExport: "subquery_", + }, + { + onServer: "Tc_", + inExport: "tc_", + }, + { + onServer: "Threadpool_", + inExport: "threadpool_", + }, } var ( @@ -261,13 +397,13 @@ const ( ORDER BY null` infoSchemaAutoIncQuery = ` SELECT table_schema, table_name, column_name, auto_increment, - pow(2, case data_type + CAST(pow(2, case data_type when 'tinyint' then 7 when 'smallint' then 15 when 'mediumint' then 23 when 'int' then 31 when 'bigint' then 63 - end+(column_type like '% unsigned'))-1 as max_int + end+(column_type like '% unsigned'))-1 as decimal(19)) as max_int FROM information_schema.tables t JOIN information_schema.columns c USING (table_schema,table_name) WHERE c.extra = 'auto_increment' AND t.auto_increment IS NOT NULL @@ -398,21 +534,34 @@ func (m *Mysql) gatherServer(serv string, acc telegraf.Accumulator) error { return err } - err = m.gatherGlobalVariables(db, serv, acc) - if err != nil { - return err + // Global Variables may be gathered less often + if len(m.IntervalSlow) > 0 { + if uint32(time.Since(lastT).Seconds()) > scanIntervalSlow { + err = m.gatherGlobalVariables(db, serv, acc) + if err != nil { + return err + } + lastT = time.Now() + } else { + err = m.gatherGlobalVariables(db, serv, acc) + if err != nil { + return err + } + } } - if m.GatherSlaveStatus { + if m.GatherBinaryLogs { err = m.gatherBinaryLogs(db, serv, acc) if err != nil { return err } } - err = m.GatherProcessListStatuses(db, serv, acc) - if err != nil { - return err + if m.GatherProcessList { + err = m.GatherProcessListStatuses(db, serv, acc) + if err != nil { + return err + } } if m.GatherSlaveStatus { @@ -422,9 +571,11 @@ func (m *Mysql) gatherServer(serv string, acc telegraf.Accumulator) error { } } - err = m.gatherInfoSchemaAutoIncStatuses(db, serv, acc) - if err != nil { - return err + if m.GatherInfoSchemaAutoInc { + err = m.gatherInfoSchemaAutoIncStatuses(db, serv, acc) + if err != nil { + return err + } } if m.GatherTableIOWaits { @@ -474,15 +625,20 @@ func (m *Mysql) gatherServer(serv string, acc telegraf.Accumulator) error { return nil } +// gatherGlobalVariables can be used to fetch all global variables from +// MySQL environment. func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accumulator) error { + // run query rows, err := db.Query(globalVariablesQuery) if err != nil { return err } defer rows.Close() + var key string var val sql.RawBytes + // parse DSN and save server tag servtag, err := parseDSN(serv) if err != nil { servtag = "localhost" @@ -494,11 +650,20 @@ func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accu return err } key = strings.ToLower(key) + // parse value, if it is numeric then save, otherwise ignore if floatVal, ok := parseValue(val); ok { fields[key] = floatVal } + // Send 20 fields at a time + if len(fields) >= 20 { + acc.AddFields("mysql_variables", fields, tags) + fields = make(map[string]interface{}) + } + } + // Send any remaining fields + if len(fields) > 0 { + acc.AddFields("mysql_variables", fields, tags) } - acc.AddFields("mysql_variables", fields, tags) return nil } @@ -507,40 +672,41 @@ func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accu // If the multi-source replication is set, then everything works differently // This code does not work with multi-source replication. func (m *Mysql) gatherSlaveStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error { + // run query rows, err := db.Query(slaveStatusQuery) - if err != nil { return err } defer rows.Close() servtag, err := parseDSN(serv) - if err != nil { servtag = "localhost" } + tags := map[string]string{"server": servtag} fields := make(map[string]interface{}) - if rows.Next() { - cols, err := rows.Columns() + // to save the column names as a field key + // scanning keys and values separately + if rows.Next() { + // get columns names, and create an array with its length + cols, err := rows.Columns() if err != nil { return err } vals := make([]interface{}, len(cols)) - + // fill the array with sql.Rawbytes for i := range vals { vals[i] = &sql.RawBytes{} } - if err = rows.Scan(vals...); err != nil { return err } - + // range over columns, and try to parse values for i, col := range cols { // skip unparsable values if value, ok := parseValue(*vals[i].(*sql.RawBytes)); ok { - //acc.Add("slave_"+col, value, tags) fields["slave_"+col] = value } } @@ -551,13 +717,16 @@ func (m *Mysql) gatherSlaveStatuses(db *sql.DB, serv string, acc telegraf.Accumu } // gatherBinaryLogs can be used to collect size and count of all binary files +// binlogs metric requires the MySQL server to turn it on in configuration func (m *Mysql) gatherBinaryLogs(db *sql.DB, serv string, acc telegraf.Accumulator) error { + // run query rows, err := db.Query(binaryLogsQuery) if err != nil { return err } defer rows.Close() + // parse DSN and save host as a tag var servtag string servtag, err = parseDSN(serv) if err != nil { @@ -572,6 +741,7 @@ func (m *Mysql) gatherBinaryLogs(db *sql.DB, serv string, acc telegraf.Accumulat fileName string ) + // iterate over rows and count the size and count of files for rows.Next() { if err := rows.Scan(&fileName, &fileSize); err != nil { return err @@ -585,6 +755,9 @@ func (m *Mysql) gatherBinaryLogs(db *sql.DB, serv string, acc telegraf.Accumulat return nil } +// gatherGlobalStatuses can be used to get MySQL status metrics +// the mappings of actual names and names of each status to be exported +// to output is provided on mappings variable func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error { // If user forgot the '/', add it if strings.HasSuffix(serv, ")") { @@ -593,11 +766,13 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum serv = "" } + // run query rows, err := db.Query(globalStatusQuery) if err != nil { return err } + // parse the DSN and save host name as a tag var servtag string servtag, err = parseDSN(serv) if err != nil { @@ -616,18 +791,26 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum var found bool + // iterate over mappings and gather metrics that is provided on mapping for _, mapped := range mappings { if strings.HasPrefix(name, mapped.onServer) { + // convert numeric values to integer i, _ := strconv.Atoi(string(val.([]byte))) fields[mapped.inExport+name[len(mapped.onServer):]] = i found = true } } + // Send 20 fields at a time + if len(fields) >= 20 { + acc.AddFields("mysql", fields, tags) + fields = make(map[string]interface{}) + } if found { continue } + // search for specific values switch name { case "Queries": i, err := strconv.ParseInt(string(val.([]byte)), 10, 64) @@ -643,35 +826,56 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum } fields["slow_queries"] = i + case "Connections": + i, err := strconv.ParseInt(string(val.([]byte)), 10, 64) + if err != nil { + return err + } + fields["connections"] = i + case "Syncs": + i, err := strconv.ParseInt(string(val.([]byte)), 10, 64) + if err != nil { + return err + } + fields["syncs"] = i + } + // Send any remaining fields + if len(fields) > 0 { + acc.AddFields("mysql", fields, tags) } } - acc.AddFields("mysql", fields, tags) - conn_rows, err := db.Query("SELECT user, sum(1) FROM INFORMATION_SCHEMA.PROCESSLIST GROUP BY user") + // gather connection metrics from processlist for each user + if m.GatherProcessList { + conn_rows, err := db.Query("SELECT user, sum(1) FROM INFORMATION_SCHEMA.PROCESSLIST GROUP BY user") - for conn_rows.Next() { - var user string - var connections int64 + for conn_rows.Next() { + var user string + var connections int64 - err = conn_rows.Scan(&user, &connections) - if err != nil { - return err + err = conn_rows.Scan(&user, &connections) + if err != nil { + return err + } + + tags := map[string]string{"server": servtag, "user": user} + fields := make(map[string]interface{}) + + if err != nil { + return err + } + fields["connections"] = connections + acc.AddFields("mysql_users", fields, tags) } - - tags := map[string]string{"server": servtag, "user": user} - fields := make(map[string]interface{}) - - if err != nil { - return err - } - fields["connections"] = connections - acc.AddFields("mysql_users", fields, tags) } return nil } +// GatherProcessList can be used to collect metrics on each running command +// and its state with its running count func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error { + // run query rows, err := db.Query(infoSchemaProcessListQuery) if err != nil { return err @@ -702,7 +906,9 @@ func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf. if err != nil { return err } + // each state has its mapping foundState := findThreadState(command, state) + // count each state stateCounts[foundState] += count } @@ -821,7 +1027,7 @@ func (m *Mysql) gatherPerfIndexIOWaits(db *sql.DB, serv string, acc telegraf.Acc return nil } -// gatherInfoSchemaAutoIncStatuses can be used to get auto incremented value of the column +// gatherInfoSchemaAutoIncStatuses can be used to get auto incremented values of the column func (m *Mysql) gatherInfoSchemaAutoIncStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error { rows, err := db.Query(infoSchemaAutoIncQuery) if err != nil { @@ -861,6 +1067,7 @@ func (m *Mysql) gatherInfoSchemaAutoIncStatuses(db *sql.DB, serv string, acc tel // gatherPerfTableLockWaits can be used to get // the total number and time for SQL and external lock wait events // for each table and operation +// requires the MySQL server to be enabled to save this metric func (m *Mysql) gatherPerfTableLockWaits(db *sql.DB, serv string, acc telegraf.Accumulator) error { // check if table exists, // if performance_schema is not enabled, tables do not exist @@ -1208,6 +1415,7 @@ func (m *Mysql) gatherPerfEventsStatements(db *sql.DB, serv string, acc telegraf return nil } +// gatherTableSchema can be used to gather stats on each schema func (m *Mysql) gatherTableSchema(db *sql.DB, serv string, acc telegraf.Accumulator) error { var ( dbList []string