From c732abbda2f07d9c0f50a80382765a2ee5e72e87 Mon Sep 17 00:00:00 2001 From: maksadbek Date: Tue, 22 Dec 2015 12:06:39 +0500 Subject: [PATCH] Improved mysql plugin shows global variables shows slave statuses shows size and count of binary log files shows information_schema.processlist stats shows perf table stats shows auto increments stats from information schema shows perf index stats shows table lock waits summary by table shows time and operations of event waits shows file event statuses shows events statements stats from perf_schema shows schema statistics refactored plugin, provided multiple fields per insert --- plugins/inputs/mysql/mysql.go | 1156 +++++++++++++++++++++++++++- plugins/inputs/mysql/mysql_test.go | 45 ++ 2 files changed, 1170 insertions(+), 31 deletions(-) diff --git a/plugins/inputs/mysql/mysql.go b/plugins/inputs/mysql/mysql.go index 474067716..a4de7e602 100644 --- a/plugins/inputs/mysql/mysql.go +++ b/plugins/inputs/mysql/mysql.go @@ -1,7 +1,9 @@ package mysql import ( + "bytes" "database/sql" + "fmt" "net/url" "strconv" "strings" @@ -13,19 +15,35 @@ import ( ) type Mysql struct { - Servers []string + Servers []string + PerfEventsStatementsDigestTextLimit uint32 + PerfEventsStatementsLimit uint32 + PerfEventsStatementsTimeLimit uint32 + TableSchemaDatabases []string + GatherSlaveStatus bool + GatherBinaryLogs bool + GatherTableIOWaits bool + GatherIndexIOWaits bool } var sampleConfig = ` - ## specify servers via a url matching: - ## [username[:password]@][protocol[(address)]]/[?tls=[true|false|skip-verify]] - ## see https://github.com/go-sql-driver/mysql#dsn-data-source-name - ## e.g. - ## root:passwd@tcp(127.0.0.1:3306)/?tls=false - ## root@tcp(127.0.0.1:3306)/?tls=false - ## - ## If no servers are specified, then localhost is used as the host. + # specify servers via a url matching: + # [username[:password]@][protocol[(address)]]/[?tls=[true|false|skip-verify]] + # see https://github.com/go-sql-driver/mysql#dsn-data-source-name + # e.g. + # root:passwd@tcp(127.0.0.1:3306)/?tls=false + # root@tcp(127.0.0.1:3306)/?tls=false + # + # If no servers are specified, then localhost is used as the host. servers = ["tcp(127.0.0.1:3306)/"] + PerfEventsStatementsDigestTextLimit = 120 + PerfEventsStatementsLimit = 250 + PerfEventsStatementsTimeLimit = 86400 + TableSchemaDatabases = [] + GatherSlaveStatus = false + GatherBinaryLogs = false + GatherTableIOWaits = false + GatherIndexIOWaits = false ` var defaultTimeout = time.Second * time.Duration(5) @@ -118,18 +136,243 @@ var mappings = []*mapping{ }, } -func (m *Mysql) gatherServer(serv string, acc telegraf.Accumulator) error { - // If user forgot the '/', add it - if strings.HasSuffix(serv, ")") { - serv = serv + "/" - } else if serv == "localhost" { - serv = "" +var ( + // status counter + generalThreadStates = map[string]uint32{ + "after create": uint32(0), + "altering table": uint32(0), + "analyzing": uint32(0), + "checking permissions": uint32(0), + "checking table": uint32(0), + "cleaning up": uint32(0), + "closing tables": uint32(0), + "converting heap to myisam": uint32(0), + "copying to tmp table": uint32(0), + "creating sort index": uint32(0), + "creating table": uint32(0), + "creating tmp table": uint32(0), + "deleting": uint32(0), + "executing": uint32(0), + "execution of init_command": uint32(0), + "end": uint32(0), + "freeing items": uint32(0), + "flushing tables": uint32(0), + "fulltext initialization": uint32(0), + "idle": uint32(0), + "init": uint32(0), + "killed": uint32(0), + "waiting for lock": uint32(0), + "logging slow query": uint32(0), + "login": uint32(0), + "manage keys": uint32(0), + "opening tables": uint32(0), + "optimizing": uint32(0), + "preparing": uint32(0), + "reading from net": uint32(0), + "removing duplicates": uint32(0), + "removing tmp table": uint32(0), + "reopen tables": uint32(0), + "repair by sorting": uint32(0), + "repair done": uint32(0), + "repair with keycache": uint32(0), + "replication master": uint32(0), + "rolling back": uint32(0), + "searching rows for update": uint32(0), + "sending data": uint32(0), + "sorting for group": uint32(0), + "sorting for order": uint32(0), + "sorting index": uint32(0), + "sorting result": uint32(0), + "statistics": uint32(0), + "updating": uint32(0), + "waiting for tables": uint32(0), + "waiting for table flush": uint32(0), + "waiting on cond": uint32(0), + "writing to net": uint32(0), + "other": uint32(0), } + // plaintext statuses + stateStatusMappings = map[string]string{ + "user sleep": "idle", + "creating index": "altering table", + "committing alter table to storage engine": "altering table", + "discard or import tablespace": "altering table", + "rename": "altering table", + "setup": "altering table", + "renaming result table": "altering table", + "preparing for alter table": "altering table", + "copying to group table": "copying to tmp table", + "copy to tmp table": "copying to tmp table", + "query end": "end", + "update": "updating", + "updating main table": "updating", + "updating reference tables": "updating", + "system lock": "waiting for lock", + "user lock": "waiting for lock", + "table lock": "waiting for lock", + "deleting from main table": "deleting", + "deleting from reference tables": "deleting", + } +) +func dsnAddTimeout(dsn string) (string, error) { + + // DSN "?timeout=5s" is not valid, but "/?timeout=5s" is valid ("" and "/" + // are the same DSN) + if dsn == "" { + dsn = "/" + } + u, err := url.Parse(dsn) + if err != nil { + return "", err + } + v := u.Query() + + // Only override timeout if not already defined + if _, ok := v["timeout"]; ok == false { + v.Add("timeout", defaultTimeout.String()) + u.RawQuery = v.Encode() + } + return u.String(), nil +} + +// Math constants +const ( + picoSeconds = 1e12 +) + +// metric queries +const ( + globalStatusQuery = `SHOW GLOBAL STATUS` + globalVariablesQuery = `SHOW GLOBAL VARIABLES` + slaveStatusQuery = `SHOW SLAVE STATUS` + binaryLogsQuery = `SHOW BINARY LOGS` + infoSchemaProcessListQuery = ` + SELECT COALESCE(command,''),COALESCE(state,''),count(*) + FROM information_schema.processlist + WHERE ID != connection_id() + GROUP BY command,state + ORDER BY null` + infoSchemaAutoIncQuery = ` + SELECT table_schema, table_name, column_name, auto_increment, + pow(2, case data_type + when 'tinyint' then 7 + when 'smallint' then 15 + when 'mediumint' then 23 + when 'int' then 31 + when 'bigint' then 63 + end+(column_type like '% unsigned'))-1 as max_int + FROM information_schema.tables t + JOIN information_schema.columns c USING (table_schema,table_name) + WHERE c.extra = 'auto_increment' AND t.auto_increment IS NOT NULL + ` + perfTableIOWaitsQuery = ` + SELECT OBJECT_SCHEMA, OBJECT_NAME, COUNT_FETCH, COUNT_INSERT, COUNT_UPDATE, COUNT_DELETE, + SUM_TIMER_FETCH, SUM_TIMER_INSERT, SUM_TIMER_UPDATE, SUM_TIMER_DELETE + FROM performance_schema.table_io_waits_summary_by_table + WHERE OBJECT_SCHEMA NOT IN ('mysql', 'performance_schema') + ` + perfIndexIOWaitsQuery = ` + SELECT OBJECT_SCHEMA, OBJECT_NAME, ifnull(INDEX_NAME, 'NONE') as INDEX_NAME, + COUNT_FETCH, COUNT_INSERT, COUNT_UPDATE, COUNT_DELETE, + SUM_TIMER_FETCH, SUM_TIMER_INSERT, SUM_TIMER_UPDATE, SUM_TIMER_DELETE + FROM performance_schema.table_io_waits_summary_by_index_usage + WHERE OBJECT_SCHEMA NOT IN ('mysql', 'performance_schema') + ` + perfTableLockWaitsQuery = ` + SELECT + OBJECT_SCHEMA, + OBJECT_NAME, + COUNT_READ_NORMAL, + COUNT_READ_WITH_SHARED_LOCKS, + COUNT_READ_HIGH_PRIORITY, + COUNT_READ_NO_INSERT, + COUNT_READ_EXTERNAL, + COUNT_WRITE_ALLOW_WRITE, + COUNT_WRITE_CONCURRENT_INSERT, + COUNT_WRITE_DELAYED, + COUNT_WRITE_LOW_PRIORITY, + COUNT_WRITE_NORMAL, + COUNT_WRITE_EXTERNAL, + SUM_TIMER_READ_NORMAL, + SUM_TIMER_READ_WITH_SHARED_LOCKS, + SUM_TIMER_READ_HIGH_PRIORITY, + SUM_TIMER_READ_NO_INSERT, + SUM_TIMER_READ_EXTERNAL, + SUM_TIMER_WRITE_ALLOW_WRITE, + SUM_TIMER_WRITE_CONCURRENT_INSERT, + SUM_TIMER_WRITE_DELAYED, + SUM_TIMER_WRITE_LOW_PRIORITY, + SUM_TIMER_WRITE_NORMAL, + SUM_TIMER_WRITE_EXTERNAL + FROM performance_schema.table_lock_waits_summary_by_table + WHERE OBJECT_SCHEMA NOT IN ('mysql', 'performance_schema', 'information_schema') + ` + perfEventsStatementsQuery = ` + SELECT + ifnull(SCHEMA_NAME, 'NONE') as SCHEMA_NAME, + DIGEST, + LEFT(DIGEST_TEXT, %d) as DIGEST_TEXT, + COUNT_STAR, + SUM_TIMER_WAIT, + SUM_ERRORS, + SUM_WARNINGS, + SUM_ROWS_AFFECTED, + SUM_ROWS_SENT, + SUM_ROWS_EXAMINED, + SUM_CREATED_TMP_DISK_TABLES, + SUM_CREATED_TMP_TABLES, + SUM_SORT_MERGE_PASSES, + SUM_SORT_ROWS, + SUM_NO_INDEX_USED + FROM performance_schema.events_statements_summary_by_digest + WHERE SCHEMA_NAME NOT IN ('mysql', 'performance_schema', 'information_schema') + AND last_seen > DATE_SUB(NOW(), INTERVAL %d SECOND) + ORDER BY SUM_TIMER_WAIT DESC + LIMIT %d + ` + perfEventWaitsQuery = ` + SELECT EVENT_NAME, COUNT_STAR, SUM_TIMER_WAIT + FROM performance_schema.events_waits_summary_global_by_event_name + ` + perfFileEventsQuery = ` + SELECT + EVENT_NAME, + COUNT_READ, SUM_TIMER_READ, SUM_NUMBER_OF_BYTES_READ, + COUNT_WRITE, SUM_TIMER_WRITE, SUM_NUMBER_OF_BYTES_WRITE, + COUNT_MISC, SUM_TIMER_MISC + FROM performance_schema.file_summary_by_event_name + ` + tableSchemaQuery = ` + SELECT + TABLE_SCHEMA, + TABLE_NAME, + TABLE_TYPE, + ifnull(ENGINE, 'NONE') as ENGINE, + ifnull(VERSION, '0') as VERSION, + ifnull(ROW_FORMAT, 'NONE') as ROW_FORMAT, + ifnull(TABLE_ROWS, '0') as TABLE_ROWS, + ifnull(DATA_LENGTH, '0') as DATA_LENGTH, + ifnull(INDEX_LENGTH, '0') as INDEX_LENGTH, + ifnull(DATA_FREE, '0') as DATA_FREE, + ifnull(CREATE_OPTIONS, 'NONE') as CREATE_OPTIONS + FROM information_schema.tables + WHERE TABLE_SCHEMA = '%s' + ` + dbListQuery = ` + SELECT + SCHEMA_NAME + FROM information_schema.schemata + WHERE SCHEMA_NAME NOT IN ('mysql', 'performance_schema', 'information_schema') + ` +) + +func (m *Mysql) gatherServer(serv string, acc telegraf.Accumulator) error { serv, err := dsnAddTimeout(serv) if err != nil { return err } + db, err := sql.Open("mysql", serv) if err != nil { return err @@ -137,7 +380,206 @@ func (m *Mysql) gatherServer(serv string, acc telegraf.Accumulator) error { defer db.Close() - rows, err := db.Query(`SHOW /*!50002 GLOBAL */ STATUS`) + err = m.gatherGlobalStatuses(db, serv, acc) + if err != nil { + return err + } + + err = m.gatherGlobalVariables(db, serv, acc) + if err != nil { + return err + } + + if m.GatherSlaveStatus { + err = m.gatherBinaryLogs(db, serv, acc) + if err != nil { + return err + } + } + + err = m.GatherProcessListStatuses(db, serv, acc) + if err != nil { + return err + } + + if m.GatherSlaveStatus { + err = m.gatherSlaveStatuses(db, serv, acc) + if err != nil { + return err + } + } + + err = m.gatherInfoSchemaAutoIncStatuses(db, serv, acc) + if err != nil { + return err + } + + if m.GatherTableIOWaits { + err = m.gatherPerfTableIOWaits(db, serv, acc) + if err != nil { + return err + } + } + + if m.GatherIndexIOWaits { + err = m.gatherPerfIndexIOWaits(db, serv, acc) + if err != nil { + return err + } + } + + err = m.gatherPerfTableLockWaits(db, serv, acc) + if err != nil { + return err + } + + err = m.gatherPerfEventWaits(db, serv, acc) + if err != nil { + return err + } + + err = m.gatherPerfFileEventsStatuses(db, serv, acc) + if err != nil { + return err + } + + err = m.gatherPerfEventsStatements(db, serv, acc) + if err != nil { + return err + } + + err = m.gatherTableSchema(db, serv, acc) + if err != nil { + return err + } + + err = m.gatherTableSchema(db, serv, acc) + if err != nil { + return err + } + return nil +} + +func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accumulator) error { + rows, err := db.Query(globalVariablesQuery) + if err != nil { + return err + } + defer rows.Close() + var key string + var val sql.RawBytes + + servtag, err := parseDSN(serv) + if err != nil { + servtag = "localhost" + } + tags := map[string]string{"server": servtag} + fields := make(map[string]interface{}) + for rows.Next() { + if err := rows.Scan(&key, &val); err != nil { + return err + } + key = strings.ToLower(key) + if floatVal, ok := parseValue(val); ok { + fields[key] = floatVal + } + } + acc.Add("mysql_variables", fields, tags) + return nil +} + +// gatherSlaveStatuses can be used to get replication analytics +// When the server is slave, then it returns only one row. +// If the multi-source replication is set, then everything works differently +// This code does not work with multi-source replication. +func (m *Mysql) gatherSlaveStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error { + rows, err := db.Query(slaveStatusQuery) + + if err != nil { + return err + } + defer rows.Close() + + servtag, err := parseDSN(serv) + + if err != nil { + servtag = "localhost" + } + tags := map[string]string{"server": servtag} + fields := make(map[string]interface{}) + if rows.Next() { + cols, err := rows.Columns() + + if err != nil { + return err + } + vals := make([]interface{}, len(cols)) + + for i := range vals { + vals[i] = &sql.RawBytes{} + } + + if err = rows.Scan(vals...); err != nil { + return err + } + + for i, col := range cols { + // skip unparsable values + if value, ok := parseValue(*vals[i].(*sql.RawBytes)); ok { + //acc.Add("slave_"+col, value, tags) + fields["slave_"+col] = value + } + } + acc.AddFields("mysql", fields, tags) + } + + return nil +} + +// gatherBinaryLogs can be used to collect size and count of all binary files +func (m *Mysql) gatherBinaryLogs(db *sql.DB, serv string, acc telegraf.Accumulator) error { + rows, err := db.Query(binaryLogsQuery) + if err != nil { + return err + } + defer rows.Close() + + var servtag string + servtag, err = parseDSN(serv) + if err != nil { + servtag = "localhost" + } + tags := map[string]string{"server": servtag} + fields := make(map[string]interface{}) + var ( + size uint64 = 0 + count uint64 = 0 + fileSize uint64 + fileName string + ) + + for rows.Next() { + if err := rows.Scan(&fileName, &fileSize); err != nil { + return err + } + size += fileSize + count++ + } + fields["binary_size_bytes"] = size + fields["binary_files_count"] = count + acc.AddFields("mysql", fields, tags) + return nil +} + +func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error { + // If user forgot the '/', add it + if strings.HasSuffix(serv, ")") { + serv = serv + "/" + } else if serv == "localhost" { + serv = "" + } + + rows, err := db.Query(globalStatusQuery) if err != nil { return err } @@ -215,25 +657,677 @@ func (m *Mysql) gatherServer(serv string, acc telegraf.Accumulator) error { return nil } -func dsnAddTimeout(dsn string) (string, error) { - - // DSN "?timeout=5s" is not valid, but "/?timeout=5s" is valid ("" and "/" - // are the same DSN) - if dsn == "" { - dsn = "/" - } - u, err := url.Parse(dsn) +func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error { + rows, err := db.Query(infoSchemaProcessListQuery) if err != nil { - return "", err + return err } - v := u.Query() + defer rows.Close() + var ( + command string + state string + count uint32 + ) - // Only override timeout if not already defined - if _, ok := v["timeout"]; ok == false { - v.Add("timeout", defaultTimeout.String()) - u.RawQuery = v.Encode() + var servtag string + fields := make(map[string]interface{}) + servtag, err = parseDSN(serv) + if err != nil { + servtag = "localhost" } - return u.String(), nil + + // mapping of state with its counts + stateCounts := make(map[string]uint32, len(generalThreadStates)) + // set map with keys and default values + for k, v := range generalThreadStates { + stateCounts[k] = v + } + + for rows.Next() { + err = rows.Scan(&command, &state, &count) + if err != nil { + return err + } + foundState := findThreadState(command, state) + stateCounts[foundState] += count + } + + tags := map[string]string{"server": servtag} + for s, c := range stateCounts { + fields[newNamespace("threads", s)] = c + } + acc.AddFields("mysql_info_schema", fields, tags) + return nil +} + +// gatherPerfTableIOWaits can be used to get total count and time +// of I/O wait event for each table and process +func (m *Mysql) gatherPerfTableIOWaits(db *sql.DB, serv string, acc telegraf.Accumulator) error { + rows, err := db.Query(perfTableIOWaitsQuery) + if err != nil { + return err + } + + defer rows.Close() + var ( + objSchema, objName, servtag string + countFetch, countInsert, countUpdate, countDelete uint64 + timeFetch, timeInsert, timeUpdate, timeDelete uint64 + ) + + servtag, err = parseDSN(serv) + if err != nil { + servtag = "localhost" + } + + for rows.Next() { + err = rows.Scan(&objSchema, &objName, + &countFetch, &countInsert, &countUpdate, &countDelete, + &timeFetch, &timeInsert, &timeUpdate, &timeDelete, + ) + + if err != nil { + return err + } + + tags := map[string]string{ + "server": servtag, + "schema": objSchema, + "name": objName, + } + fields := make(map[string]interface{}) + fields["table_io_waits_total_fetch"] = float64(countFetch) + fields["table_io_waits_total_insert"] = float64(countInsert) + fields["table_io_waits_total_update"] = float64(countUpdate) + fields["table_io_waits_total_delete"] = float64(countDelete) + + fields["table_io_waits_seconds_total_fetch"] = float64(timeFetch) / picoSeconds + fields["table_io_waits_seconds_total_insert"] = float64(timeInsert) / picoSeconds + fields["table_io_waits_seconds_total_update"] = float64(timeUpdate) / picoSeconds + fields["table_io_waits_seconds_total_delete"] = float64(timeDelete) / picoSeconds + + acc.AddFields("mysql_perf_schema", fields, tags) + } + return nil +} + +// gatherPerfIndexIOWaits can be used to get total count and time +// of I/O wait event for each index and process +func (m *Mysql) gatherPerfIndexIOWaits(db *sql.DB, serv string, acc telegraf.Accumulator) error { + rows, err := db.Query(perfIndexIOWaitsQuery) + if err != nil { + return err + } + defer rows.Close() + + var ( + objSchema, objName, indexName, servtag string + countFetch, countInsert, countUpdate, countDelete uint64 + timeFetch, timeInsert, timeUpdate, timeDelete uint64 + ) + + servtag, err = parseDSN(serv) + if err != nil { + servtag = "localhost" + } + + for rows.Next() { + err = rows.Scan(&objSchema, &objName, &indexName, + &countFetch, &countInsert, &countUpdate, &countDelete, + &timeFetch, &timeInsert, &timeUpdate, &timeDelete, + ) + + if err != nil { + return err + } + + tags := map[string]string{ + "server": servtag, + "schema": objSchema, + "name": objName, + "index": indexName, + } + fields := make(map[string]interface{}) + fields["index_io_waits_total_fetch"] = float64(countFetch) + fields["index_io_waits_seconds_total_fetch"] = float64(timeFetch) / picoSeconds + + // update write columns only when index is NONE + if indexName == "NONE" { + fields["index_io_waits_total_insert"] = float64(countInsert) + fields["index_io_waits_total_update"] = float64(countUpdate) + fields["index_io_waits_total_delete"] = float64(countDelete) + + fields["index_io_waits_seconds_total_insert"] = float64(timeInsert) / picoSeconds + fields["index_io_waits_seconds_total_update"] = float64(timeUpdate) / picoSeconds + fields["index_io_waits_seconds_total_delete"] = float64(timeDelete) / picoSeconds + } + + acc.AddFields("mysql_perf_schema", fields, tags) + } + return nil +} + +// gatherInfoSchemaAutoIncStatuses can be used to get auto incremented value of the column +func (m *Mysql) gatherInfoSchemaAutoIncStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error { + rows, err := db.Query(infoSchemaAutoIncQuery) + if err != nil { + return err + } + defer rows.Close() + + var ( + schema, table, column string + incValue, maxInt uint64 + ) + + servtag, err := parseDSN(serv) + if err != nil { + servtag = "localhost" + } + + for rows.Next() { + if err := rows.Scan(&schema, &table, &column, &incValue, &maxInt); err != nil { + return err + } + tags := map[string]string{ + "server": servtag, + "schema": schema, + "table": table, + "column": column, + } + fields := make(map[string]interface{}) + fields["auto_increment_column"] = incValue + fields["auto_increment_column_max"] = maxInt + + acc.AddFields("mysql_info_schema", fields, tags) + } + return nil +} + +// gatherPerfTableLockWaits can be used to get +// the total number and time for SQL and external lock wait events +// for each table and operation +func (m *Mysql) gatherPerfTableLockWaits(db *sql.DB, serv string, acc telegraf.Accumulator) error { + rows, err := db.Query(perfTableLockWaitsQuery) + if err != nil { + return err + } + defer rows.Close() + + servtag, err := parseDSN(serv) + if err != nil { + servtag = "localhost" + } + + var ( + objectSchema string + objectName string + countReadNormal uint64 + countReadWithSharedLocks uint64 + countReadHighPriority uint64 + countReadNoInsert uint64 + countReadExternal uint64 + countWriteAllowWrite uint64 + countWriteConcurrentInsert uint64 + countWriteDelayed uint64 + countWriteLowPriority uint64 + countWriteNormal uint64 + countWriteExternal uint64 + timeReadNormal uint64 + timeReadWithSharedLocks uint64 + timeReadHighPriority uint64 + timeReadNoInsert uint64 + timeReadExternal uint64 + timeWriteAllowWrite uint64 + timeWriteConcurrentInsert uint64 + timeWriteDelayed uint64 + timeWriteLowPriority uint64 + timeWriteNormal uint64 + timeWriteExternal uint64 + ) + + for rows.Next() { + err = rows.Scan( + &objectSchema, + &objectName, + &countReadNormal, + &countReadWithSharedLocks, + &countReadHighPriority, + &countReadNoInsert, + &countReadExternal, + &countWriteAllowWrite, + &countWriteConcurrentInsert, + &countWriteDelayed, + &countWriteLowPriority, + &countWriteNormal, + &countWriteExternal, + &timeReadNormal, + &timeReadWithSharedLocks, + &timeReadHighPriority, + &timeReadNoInsert, + &timeReadExternal, + &timeWriteAllowWrite, + &timeWriteConcurrentInsert, + &timeWriteDelayed, + &timeWriteLowPriority, + &timeWriteNormal, + &timeWriteExternal, + ) + + if err != nil { + return err + } + tags := map[string]string{ + "server": servtag, + "schema": objectSchema, + "table": objectName, + } + fields := make(map[string]interface{}) + + tags["operation"] = "read_normal" + fields["sql_lock_waits_total"] = float64(countReadNormal) + acc.AddFields("mysql_perf_schema", fields, tags) + + tags["operation"] = "read_with_shared_locks" + fields["sql_lock_waits_total"] = float64(countReadWithSharedLocks) + acc.AddFields("mysql_perf_schema", fields, tags) + + tags["operation"] = "read_high_priority" + fields["sql_lock_waits_total"] = float64(countReadHighPriority) + acc.AddFields("mysql_perf_schema", fields, tags) + + tags["operation"] = "read_no_insert" + fields["sql_lock_waits_total"] = float64(countReadNoInsert) + acc.AddFields("mysql_perf_schema", fields, tags) + + tags["operation"] = "write_normal" + fields["sql_lock_waits_total"] = float64(countWriteNormal) + acc.AddFields("mysql_perf_schema", fields, tags) + + tags["operation"] = "write_allow_write" + fields["sql_lock_waits_total"] = float64(countWriteAllowWrite) + acc.AddFields("mysql_perf_schema", fields, tags) + + tags["operation"] = "write_concurrent_insert" + fields["sql_lock_waits_total"] = float64(countWriteConcurrentInsert) + acc.AddFields("mysql_perf_schema", fields, tags) + + tags["operation"] = "write_delayed" + fields["sql_lock_waits_total"] = float64(countWriteDelayed) + acc.AddFields("mysql_perf_schema", fields, tags) + + tags["operation"] = "write_low_priority" + fields["sql_lock_waits_total"] = float64(countWriteLowPriority) + acc.AddFields("mysql_perf_schema", fields, tags) + + delete(fields, "sql_lock_waits_total") + + tags["operation"] = "read" + fields["external_lock_waits_total"] = float64(countReadExternal) + acc.AddFields("mysql_perf_schema", fields, tags) + + tags["operation"] = "write" + fields["external_lock_waits_total"] = float64(countWriteExternal) + acc.AddFields("mysql_perf_schema", fields, tags) + + delete(fields, "external_lock_waits_total") + + tags["operation"] = "read_normal" + fields["sql_lock_waits_seconds_total"] = float64(timeReadNormal / picoSeconds) + acc.AddFields("mysql_perf_schema", fields, tags) + + tags["operation"] = "read_with_shared_locks" + fields["sql_lock_waits_seconds_total"] = float64(timeReadWithSharedLocks / picoSeconds) + acc.AddFields("mysql_perf_schema", fields, tags) + + tags["operation"] = "read_high_priority" + fields["sql_lock_waits_seconds_total"] = float64(timeReadHighPriority / picoSeconds) + acc.AddFields("mysql_perf_schema", fields, tags) + + tags["operation"] = "read_no_insert" + fields["sql_lock_waits_seconds_total"] = float64(timeReadNoInsert / picoSeconds) + acc.AddFields("mysql_perf_schema", fields, tags) + + tags["operation"] = "write_normal" + fields["sql_lock_waits_seconds_total"] = float64(timeWriteNormal / picoSeconds) + acc.AddFields("mysql_perf_schema", fields, tags) + + tags["operation"] = "write_allow_write" + fields["sql_lock_waits_seconds_total"] = float64(timeWriteAllowWrite / picoSeconds) + acc.AddFields("mysql_perf_schema", fields, tags) + + tags["operation"] = "write_concurrent_insert" + fields["sql_lock_waits_seconds_total"] = float64(timeWriteConcurrentInsert / picoSeconds) + acc.AddFields("mysql_perf_schema", fields, tags) + + tags["operation"] = "write_delayed" + fields["sql_lock_waits_seconds_total"] = float64(timeWriteDelayed / picoSeconds) + acc.AddFields("mysql_perf_schema", fields, tags) + + tags["operation"] = "write_low_priority" + fields["sql_lock_waits_seconds_total"] = float64(timeWriteLowPriority / picoSeconds) + acc.AddFields("mysql_perf_schema", fields, tags) + + delete(fields, "sql_lock_waits_seconds_total") + + tags["operation"] = "read" + fields["external_lock_waits_seconds_total"] = float64(timeReadExternal / picoSeconds) + acc.AddFields("mysql_perf_schema", fields, tags) + + tags["operation"] = "write" + fields["external_lock_waits_seconds_total"] = float64(timeWriteExternal / picoSeconds) + acc.AddFields("mysql_perf_schema", fields, tags) + } + return nil +} + +// gatherPerfEventWaits can be used to get total time and number of event waits +func (m *Mysql) gatherPerfEventWaits(db *sql.DB, serv string, acc telegraf.Accumulator) error { + rows, err := db.Query(perfEventWaitsQuery) + if err != nil { + return err + } + defer rows.Close() + + var ( + event string + starCount, timeWait uint64 + ) + + servtag, err := parseDSN(serv) + if err != nil { + servtag = "localhost" + } + tags := map[string]string{ + "server": servtag, + } + for rows.Next() { + if err := rows.Scan(&event, &starCount, &timeWait); err != nil { + return err + } + tags["event_name"] = event + fields := make(map[string]interface{}) + fields["events_waits_total"] = float64(starCount) + fields["events_waits_seconds_total"] = float64(timeWait) / picoSeconds + + acc.AddFields("mysql_perf_schema", fields, tags) + } + return nil +} + +// gatherPerfFileEvents can be used to get stats on file events +func (m *Mysql) gatherPerfFileEventsStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error { + rows, err := db.Query(perfFileEventsQuery) + if err != nil { + return err + } + + defer rows.Close() + + var ( + eventName string + countRead, countWrite, countMisc uint64 + sumTimerRead, sumTimerWrite, sumTimerMisc uint64 + sumNumBytesRead, sumNumBytesWrite uint64 + ) + + servtag, err := parseDSN(serv) + if err != nil { + servtag = "localhost" + } + tags := map[string]string{ + "server": servtag, + } + for rows.Next() { + err = rows.Scan( + &eventName, + &countRead, &sumTimerRead, &sumNumBytesRead, + &countWrite, &sumTimerWrite, &sumNumBytesWrite, + &countMisc, &sumTimerMisc, + ) + if err != nil { + return err + } + + tags["event_name"] = eventName + fields := make(map[string]interface{}) + + tags["mode"] = "misc" + fields["file_events_total"] = float64(countWrite) + fields["file_events_seconds_total"] = float64(sumTimerMisc) / picoSeconds + acc.AddFields("mysql_perf_schema", fields, tags) + + tags["mode"] = "read" + fields["file_events_total"] = float64(countRead) + fields["file_events_seconds_total"] = float64(sumTimerRead) / picoSeconds + fields["file_events_bytes_totals"] = float64(sumNumBytesRead) + acc.AddFields("mysql_perf_schema", fields, tags) + + tags["mode"] = "write" + fields["file_events_total"] = float64(countWrite) + fields["file_events_seconds_total"] = float64(sumTimerWrite) / picoSeconds + fields["file_events_bytes_totals"] = float64(sumNumBytesWrite) + acc.AddFields("mysql_perf_schema", fields, tags) + + } + return nil +} + +// gatherPerfEventsStatements can be used to get attributes of each event +func (m *Mysql) gatherPerfEventsStatements(db *sql.DB, serv string, acc telegraf.Accumulator) error { + query := fmt.Sprintf( + perfEventsStatementsQuery, + m.PerfEventsStatementsDigestTextLimit, + m.PerfEventsStatementsTimeLimit, + m.PerfEventsStatementsLimit, + ) + + rows, err := db.Query(query) + if err != nil { + return err + } + + defer rows.Close() + + var ( + schemaName, digest, digest_text string + count, queryTime, errors, warnings uint64 + rowsAffected, rowsSent, rowsExamined uint64 + tmpTables, tmpDiskTables uint64 + sortMergePasses, sortRows uint64 + noIndexUsed uint64 + ) + + servtag, err := parseDSN(serv) + if err != nil { + servtag = "localhost" + } + tags := map[string]string{ + "server": servtag, + } + + for rows.Next() { + err = rows.Scan( + &schemaName, &digest, &digest_text, + &count, &queryTime, &errors, &warnings, + &rowsAffected, &rowsSent, &rowsExamined, + &tmpTables, &tmpDiskTables, + &sortMergePasses, &sortRows, + ) + + if err != nil { + return err + } + tags["schema"] = schemaName + tags["digest"] = digest + tags["digest_text"] = digest_text + + fields := make(map[string]interface{}) + + fields["events_statements_total"] = float64(count) + fields["events_statements_seconds_total"] = float64(queryTime) / picoSeconds + fields["events_statements_errors_total"] = float64(errors) + fields["events_statements_warnings_total"] = float64(warnings) + fields["events_statements_rows_affected_total"] = float64(rowsAffected) + fields["events_statements_rows_sent_total"] = float64(rowsSent) + fields["events_statements_rows_examined_total"] = float64(rowsExamined) + fields["events_statements_tmp_tables_total"] = float64(tmpTables) + fields["events_statements_tmp_disk_tables_total"] = float64(tmpDiskTables) + fields["events_statements_sort_merge_passes_total"] = float64(sortMergePasses) + fields["events_statements_sort_rows_total"] = float64(sortRows) + fields["events_statements_no_index_used_total"] = float64(noIndexUsed) + + acc.AddFields("mysql_perf_schema", fields, tags) + } + return nil +} + +func (m *Mysql) gatherTableSchema(db *sql.DB, serv string, acc telegraf.Accumulator) error { + var ( + dbList []string + servtag string + ) + servtag, err := parseDSN(serv) + if err != nil { + servtag = "localhost" + } + + // if the list of databases if empty, then get all databases + if len(m.TableSchemaDatabases) == 0 { + rows, err := db.Query(dbListQuery) + if err != nil { + return err + } + defer rows.Close() + + var database string + for rows.Next() { + err = rows.Scan(&database) + if err != nil { + return err + } + + dbList = append(dbList, database) + } + } else { + dbList = m.TableSchemaDatabases + } + + for _, database := range dbList { + rows, err := db.Query(fmt.Sprintf(tableSchemaQuery, database)) + if err != nil { + return err + } + defer rows.Close() + var ( + tableSchema string + tableName string + tableType string + engine string + version uint64 + rowFormat string + tableRows uint64 + dataLength uint64 + indexLength uint64 + dataFree uint64 + createOptions string + ) + for rows.Next() { + err = rows.Scan( + &tableSchema, + &tableName, + &tableType, + &engine, + &version, + &rowFormat, + &tableRows, + &dataLength, + &indexLength, + &dataFree, + &createOptions, + ) + if err != nil { + return err + } + tags := map[string]string{"server": servtag} + tags["schema"] = tableSchema + tags["table"] = tableName + versionTags := tags + + acc.Add(newNamespace("info_schema", "table_rows"), float64(tableRows), tags) + + tags["component"] = "data_length" + acc.Add(newNamespace("info_schema", "table_size", "data_length"), float64(dataLength), tags) + + tags["component"] = "index_length" + acc.Add(newNamespace("info_schema", "table_size", "index_length"), float64(indexLength), tags) + + tags["component"] = "data_free" + acc.Add(newNamespace("info_schema", "table_size", "data_free"), float64(dataFree), tags) + + versionTags["type"] = tableType + versionTags["engine"] = engine + versionTags["row_format"] = rowFormat + versionTags["create_options"] = createOptions + + acc.Add(newNamespace("info_schema", "table_version"), float64(version), versionTags) + } + } + return nil +} + +// parseValue can be used to convert values such as "ON","OFF","Yes","No" to 0,1 +func parseValue(value sql.RawBytes) (float64, bool) { + if bytes.Compare(value, []byte("Yes")) == 0 || bytes.Compare(value, []byte("ON")) == 0 { + return 1, true + } + + if bytes.Compare(value, []byte("No")) == 0 || bytes.Compare(value, []byte("OFF")) == 0 { + return 0, false + } + n, err := strconv.ParseFloat(string(value), 64) + return n, err == nil +} + +// findThreadState can be used to find thread state by command and plain state +func findThreadState(rawCommand, rawState string) string { + var ( + // replace '_' symbol with space + command = strings.Replace(strings.ToLower(rawCommand), "_", " ", -1) + state = strings.Replace(strings.ToLower(rawState), "_", " ", -1) + ) + // if the state is already valid, then return it + if _, ok := generalThreadStates[state]; ok { + return state + } + + // if state is plain, return the mapping + if mappedState, ok := stateStatusMappings[state]; ok { + return mappedState + } + // if the state is any lock, return the special state + if strings.Contains(state, "waiting for") && strings.Contains(state, "lock") { + return "waiting for lock" + } + + if command == "sleep" && state == "" { + return "idle" + } + + if command == "query" { + return "executing" + } + + if command == "binlog dump" { + return "replication master" + } + // if no mappings found and state is invalid, then return "other" state + return "other" +} + +// newNamespace can be used to make a namespace +func newNamespace(words ...string) string { + return strings.Replace(strings.Join(words, "_"), " ", "_", -1) } func init() { diff --git a/plugins/inputs/mysql/mysql_test.go b/plugins/inputs/mysql/mysql_test.go index 9e4073432..989c21722 100644 --- a/plugins/inputs/mysql/mysql_test.go +++ b/plugins/inputs/mysql/mysql_test.go @@ -1,6 +1,7 @@ package mysql import ( + "database/sql" "fmt" "testing" @@ -115,3 +116,47 @@ func TestMysqlDNSAddTimeout(t *testing.T) { } } } + +func TestParseValue(t *testing.T) { + testCases := []struct { + rawByte sql.RawBytes + value float64 + boolValue bool + }{ + {sql.RawBytes("Yes"), 1, true}, + {sql.RawBytes("No"), 0, false}, + {sql.RawBytes("ON"), 1, true}, + {sql.RawBytes("OFF"), 0, false}, + {sql.RawBytes("ABC"), 0, false}, + } + for _, cases := range testCases { + if value, ok := parseValue(cases.rawByte); value != cases.value && ok != cases.boolValue { + t.Errorf("want %d with %t, got %d with %t", int(cases.value), cases.boolValue, int(value), ok) + } + } +} + +func TestNewNamespace(t *testing.T) { + testCases := []struct { + words []string + namespace string + }{ + { + []string{"thread", "info_scheme", "query update"}, + "thread_info_scheme_query_update", + }, + { + []string{"thread", "info_scheme", "query_update"}, + "thread_info_scheme_query_update", + }, + { + []string{"thread", "info", "scheme", "query", "update"}, + "thread_info_scheme_query_update", + }, + } + for _, cases := range testCases { + if got := newNamespace(cases.words...); got != cases.namespace { + t.Errorf("want %s, got %s", cases.namespace, got) + } + } +}