Added userstats to mysql input plugin (#2137)
* Added GatherUserStatistics, row Uptime in gatherGlobalStatuses, and version fields & tags * Updated README file * pulling in latest from master * ran go fmt to fix formatting * fix unreachable code * few fixes * cleaning up and applying suggestions from sparrc
This commit is contained in:
		
							parent
							
								
									b89c45b858
								
							
						
					
					
						commit
						0c9da0985a
					
				|  | @ -7,6 +7,7 @@ This plugin gathers the statistic data from MySQL server | ||||||
| * Slave statuses | * Slave statuses | ||||||
| * Binlog size | * Binlog size | ||||||
| * Process list | * Process list | ||||||
|  | * User Statistics | ||||||
| * Info schema auto increment columns | * Info schema auto increment columns | ||||||
| * Table I/O waits | * Table I/O waits | ||||||
| * Index I/O waits | * Index I/O waits | ||||||
|  | @ -44,6 +45,9 @@ This plugin gathers the statistic data from MySQL server | ||||||
|   ## gather thread state counts from INFORMATION_SCHEMA.PROCESSLIST |   ## gather thread state counts from INFORMATION_SCHEMA.PROCESSLIST | ||||||
|   gather_process_list                       = true |   gather_process_list                       = true | ||||||
|   # |   # | ||||||
|  |   ## gather thread state counts from INFORMATION_SCHEMA.USER_STATISTICS | ||||||
|  |   gather_user_statistics                    = true | ||||||
|  |   # | ||||||
|   ## gather auto_increment columns and max values from information schema |   ## gather auto_increment columns and max values from information schema | ||||||
|   gather_info_schema_auto_inc               = true |   gather_info_schema_auto_inc               = true | ||||||
|   # |   # | ||||||
|  | @ -89,6 +93,30 @@ Requires to be turned on in configuration. | ||||||
|     * binary_files_count(int, number) |     * binary_files_count(int, number) | ||||||
| * Process list - connection metrics from processlist for each user. It has the following tags | * Process list - connection metrics from processlist for each user. It has the following tags | ||||||
|     * connections(int, number) |     * connections(int, number) | ||||||
|  | * User Statistics - connection metrics from user statistics for each user. It has the following fields | ||||||
|  |     * access_denied | ||||||
|  |     * binlog_bytes_written | ||||||
|  |     * busy_time | ||||||
|  |     * bytes_received | ||||||
|  |     * bytes_sent | ||||||
|  |     * commit_transactions | ||||||
|  |     * concurrent_connections | ||||||
|  |     * connected_time | ||||||
|  |     * cpu_time | ||||||
|  |     * denied_connections | ||||||
|  |     * empty_queries | ||||||
|  |     * hostlost_connections | ||||||
|  |     * other_commands | ||||||
|  |     * rollback_transactions | ||||||
|  |     * rows_fetched | ||||||
|  |     * rows_updated | ||||||
|  |     * select_commands | ||||||
|  |     * server | ||||||
|  |     * table_rows_read | ||||||
|  |     * total_connections | ||||||
|  |     * total_ssl_connections | ||||||
|  |     * update_commands | ||||||
|  |     * user | ||||||
| * Perf Table IO waits - total count and time of I/O waits event for each table | * Perf Table IO waits - total count and time of I/O waits event for each table | ||||||
| and process. It has following fields: | and process. It has following fields: | ||||||
|     * table_io_waits_total_fetch(float, number) |     * table_io_waits_total_fetch(float, number) | ||||||
|  | @ -158,6 +186,8 @@ The unit of fields varies by the tags. | ||||||
|     * server (the host name from which the metrics are gathered) |     * server (the host name from which the metrics are gathered) | ||||||
| * Process list measurement has following tags | * Process list measurement has following tags | ||||||
|     * user (username for whom the metrics are gathered) |     * user (username for whom the metrics are gathered) | ||||||
|  | * User Statistics measurement has following tags | ||||||
|  |     * user (username for whom the metrics are gathered) | ||||||
| * Perf table IO waits measurement has following tags | * Perf table IO waits measurement has following tags | ||||||
|     * schema |     * schema | ||||||
|     * name (object name for event or process) |     * name (object name for event or process) | ||||||
|  |  | ||||||
|  | @ -23,6 +23,7 @@ type Mysql struct { | ||||||
| 	PerfEventsStatementsTimeLimit       int64    `toml:"perf_events_statemetns_time_limit"` | 	PerfEventsStatementsTimeLimit       int64    `toml:"perf_events_statemetns_time_limit"` | ||||||
| 	TableSchemaDatabases                []string `toml:"table_schema_databases"` | 	TableSchemaDatabases                []string `toml:"table_schema_databases"` | ||||||
| 	GatherProcessList                   bool     `toml:"gather_process_list"` | 	GatherProcessList                   bool     `toml:"gather_process_list"` | ||||||
|  | 	GatherUserStatistics                bool     `toml:"gather_user_statistics"` | ||||||
| 	GatherInfoSchemaAutoInc             bool     `toml:"gather_info_schema_auto_inc"` | 	GatherInfoSchemaAutoInc             bool     `toml:"gather_info_schema_auto_inc"` | ||||||
| 	GatherSlaveStatus                   bool     `toml:"gather_slave_status"` | 	GatherSlaveStatus                   bool     `toml:"gather_slave_status"` | ||||||
| 	GatherBinaryLogs                    bool     `toml:"gather_binary_logs"` | 	GatherBinaryLogs                    bool     `toml:"gather_binary_logs"` | ||||||
|  | @ -60,6 +61,9 @@ var sampleConfig = ` | ||||||
|   ## gather thread state counts from INFORMATION_SCHEMA.PROCESSLIST |   ## gather thread state counts from INFORMATION_SCHEMA.PROCESSLIST | ||||||
|   gather_process_list                       = true |   gather_process_list                       = true | ||||||
|   # |   # | ||||||
|  |   ## gather thread state counts from INFORMATION_SCHEMA.USER_STATISTICS | ||||||
|  |   gather_user_statistics                    = true | ||||||
|  |   # | ||||||
|   ## gather auto_increment columns and max values from information schema |   ## gather auto_increment columns and max values from information schema | ||||||
|   gather_info_schema_auto_inc               = true |   gather_info_schema_auto_inc               = true | ||||||
|   # |   # | ||||||
|  | @ -415,6 +419,10 @@ const ( | ||||||
|         WHERE ID != connection_id() |         WHERE ID != connection_id() | ||||||
|         GROUP BY command,state |         GROUP BY command,state | ||||||
|         ORDER BY null` |         ORDER BY null` | ||||||
|  | 	infoSchemaUserStatisticsQuery = ` | ||||||
|  |         SELECT *,count(*) | ||||||
|  |         FROM information_schema.user_statistics | ||||||
|  | 	GROUP BY user` | ||||||
| 	infoSchemaAutoIncQuery = ` | 	infoSchemaAutoIncQuery = ` | ||||||
|         SELECT table_schema, table_name, column_name, auto_increment, |         SELECT table_schema, table_name, column_name, auto_increment, | ||||||
|           CAST(pow(2, case data_type |           CAST(pow(2, case data_type | ||||||
|  | @ -530,7 +538,6 @@ const ( | ||||||
| 			table_name | 			table_name | ||||||
| 			FROM information_schema.tables | 			FROM information_schema.tables | ||||||
| 		WHERE table_schema = 'performance_schema' AND table_name = ? | 		WHERE table_schema = 'performance_schema' AND table_name = ? | ||||||
| 
 |  | ||||||
| 	` | 	` | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
|  | @ -582,6 +589,13 @@ func (m *Mysql) gatherServer(serv string, acc telegraf.Accumulator) error { | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	if m.GatherUserStatistics { | ||||||
|  | 		err = m.GatherUserStatisticsStatuses(db, serv, acc) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	if m.GatherSlaveStatus { | 	if m.GatherSlaveStatus { | ||||||
| 		err = m.gatherSlaveStatuses(db, serv, acc) | 		err = m.gatherSlaveStatuses(db, serv, acc) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
|  | @ -669,6 +683,11 @@ func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accu | ||||||
| 			return err | 			return err | ||||||
| 		} | 		} | ||||||
| 		key = strings.ToLower(key) | 		key = strings.ToLower(key) | ||||||
|  |                 // parse mysql version and put into field and tag
 | ||||||
|  | 		if strings.Contains(key, "version") { | ||||||
|  | 			fields[key] = string(val) | ||||||
|  | 			tags[key] = string(val) | ||||||
|  | 		} | ||||||
| 		// parse value, if it is numeric then save, otherwise ignore
 | 		// parse value, if it is numeric then save, otherwise ignore
 | ||||||
| 		if floatVal, ok := parseValue(val); ok { | 		if floatVal, ok := parseValue(val); ok { | ||||||
| 			fields[key] = floatVal | 			fields[key] = floatVal | ||||||
|  | @ -854,6 +873,12 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum | ||||||
| 				return err | 				return err | ||||||
| 			} | 			} | ||||||
| 			fields["syncs"] = i | 			fields["syncs"] = i | ||||||
|  | 		case "Uptime": | ||||||
|  | 			i, err := strconv.ParseInt(string(val.([]byte)), 10, 64) | ||||||
|  | 			if err != nil { | ||||||
|  | 				return err | ||||||
|  | 			} | ||||||
|  | 			fields["uptime"] = i | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	// Send any remaining fields
 | 	// Send any remaining fields
 | ||||||
|  | @ -884,6 +909,74 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	// gather connection metrics from user_statistics for each user
 | ||||||
|  | 	if m.GatherUserStatistics { | ||||||
|  | 		conn_rows, err := db.Query("select user, total_connections, concurrent_connections, connected_time, busy_time, cpu_time, bytes_received, bytes_sent, binlog_bytes_written, rows_fetched, rows_updated, table_rows_read, select_commands, update_commands, other_commands, commit_transactions, rollback_transactions, denied_connections, lost_connections, access_denied, empty_queries, total_ssl_connections FROM INFORMATION_SCHEMA.USER_STATISTICS GROUP BY user") | ||||||
|  | 
 | ||||||
|  | 		for conn_rows.Next() { | ||||||
|  | 			var user string | ||||||
|  | 			var total_connections int64 | ||||||
|  | 			var concurrent_connections int64 | ||||||
|  | 			var connected_time int64 | ||||||
|  | 			var busy_time int64 | ||||||
|  | 			var cpu_time int64 | ||||||
|  | 			var bytes_received int64 | ||||||
|  | 			var bytes_sent int64 | ||||||
|  | 			var binlog_bytes_written int64 | ||||||
|  | 			var rows_fetched int64 | ||||||
|  | 			var rows_updated int64 | ||||||
|  | 			var table_rows_read int64 | ||||||
|  | 			var select_commands int64 | ||||||
|  | 			var update_commands int64 | ||||||
|  | 			var other_commands int64 | ||||||
|  | 			var commit_transactions int64 | ||||||
|  | 			var rollback_transactions int64 | ||||||
|  | 			var denied_connections int64 | ||||||
|  | 			var lost_connections int64 | ||||||
|  | 			var access_denied int64 | ||||||
|  | 			var empty_queries int64 | ||||||
|  | 			var total_ssl_connections int64 | ||||||
|  | 
 | ||||||
|  | 			err = conn_rows.Scan(&user, &total_connections, &concurrent_connections, | ||||||
|  | 				&connected_time, &busy_time, &cpu_time, &bytes_received, &bytes_sent, &binlog_bytes_written, | ||||||
|  | 				&rows_fetched, &rows_updated, &table_rows_read, &select_commands, &update_commands, &other_commands, | ||||||
|  | 				&commit_transactions, &rollback_transactions, &denied_connections, &lost_connections, &access_denied, | ||||||
|  | 				&empty_queries, &total_ssl_connections, | ||||||
|  | 			) | ||||||
|  | 
 | ||||||
|  | 			if err != nil { | ||||||
|  | 				return err | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			tags := map[string]string{"server": servtag, "user": user} | ||||||
|  | 			fields := map[string]interface{}{ | ||||||
|  | 			  "total_connections": total_connections, | ||||||
|  | 			  "concurrent_connections": concurrent_connections, | ||||||
|  | 			  "connected_time": connected_time, | ||||||
|  | 			  "busy_time": busy_time, | ||||||
|  | 			  "cpu_time": cpu_time, | ||||||
|  | 			  "bytes_received": bytes_received, | ||||||
|  | 			  "bytes_sent": bytes_sent, | ||||||
|  | 			  "binlog_bytes_written": binlog_bytes_written, | ||||||
|  | 			  "rows_fetched": rows_fetched, | ||||||
|  | 			  "rows_updated": rows_updated, | ||||||
|  | 			  "table_rows_read": table_rows_read, | ||||||
|  | 			  "select_commands": select_commands, | ||||||
|  | 			  "update_commands": update_commands, | ||||||
|  | 			  "other_commands": other_commands, | ||||||
|  | 			  "commit_transactions": commit_transactions, | ||||||
|  | 			  "rollback_transactions": rollback_transactions, | ||||||
|  | 			  "denied_connections": denied_connections, | ||||||
|  | 			  "lost_connections": lost_connections, | ||||||
|  | 			  "access_denied": access_denied, | ||||||
|  | 			  "empty_queries": empty_queries, | ||||||
|  | 			  "total_ssl_connections": total_ssl_connections, | ||||||
|  |                         } | ||||||
|  | 
 | ||||||
|  | 			acc.AddFields("mysql_user_stats", fields, tags) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | @ -932,6 +1025,88 @@ func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf. | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | // GatherUserStatistics can be used to collect metrics on each running command
 | ||||||
|  | // and its state with its running count
 | ||||||
|  | func (m *Mysql) GatherUserStatisticsStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error { | ||||||
|  | 	// run query
 | ||||||
|  | 	rows, err := db.Query(infoSchemaUserStatisticsQuery) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	defer rows.Close() | ||||||
|  | 	var ( | ||||||
|  | 		user                   string | ||||||
|  | 		total_connections      int64 | ||||||
|  | 		concurrent_connections int64 | ||||||
|  | 		connected_time         int64 | ||||||
|  | 		busy_time              int64 | ||||||
|  | 		cpu_time               int64 | ||||||
|  | 		bytes_received         int64 | ||||||
|  | 		bytes_sent             int64 | ||||||
|  | 		binlog_bytes_written   int64 | ||||||
|  | 		rows_fetched           int64 | ||||||
|  | 		rows_updated           int64 | ||||||
|  | 		table_rows_read        int64 | ||||||
|  | 		select_commands        int64 | ||||||
|  | 		update_commands        int64 | ||||||
|  | 		other_commands         int64 | ||||||
|  | 		commit_transactions    int64 | ||||||
|  | 		rollback_transactions  int64 | ||||||
|  | 		denied_connections     int64 | ||||||
|  | 		lost_connections       int64 | ||||||
|  | 		access_denied          int64 | ||||||
|  | 		empty_queries          int64 | ||||||
|  | 		total_ssl_connections  int64 | ||||||
|  | 		count                  uint32 | ||||||
|  | 	) | ||||||
|  | 
 | ||||||
|  | 	var servtag string | ||||||
|  | 	servtag, err = parseDSN(serv) | ||||||
|  | 	if err != nil { | ||||||
|  | 		servtag = "localhost" | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	for rows.Next() { | ||||||
|  | 		err = rows.Scan(&user, &total_connections, &concurrent_connections, | ||||||
|  | 			&connected_time, &busy_time, &cpu_time, &bytes_received, &bytes_sent, &binlog_bytes_written, | ||||||
|  | 			&rows_fetched, &rows_updated, &table_rows_read, &select_commands, &update_commands, &other_commands, | ||||||
|  | 			&commit_transactions, &rollback_transactions, &denied_connections, &lost_connections, &access_denied, | ||||||
|  | 			&empty_queries, &total_ssl_connections, &count, | ||||||
|  | 		) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		tags := map[string]string{"server": servtag, "user": user} | ||||||
|  | 		fields := map[string]interface{}{ | ||||||
|  | 
 | ||||||
|  | 			"total_connections":      total_connections, | ||||||
|  | 			"concurrent_connections": concurrent_connections, | ||||||
|  | 			"connected_time":         connected_time, | ||||||
|  | 			"busy_time":              busy_time, | ||||||
|  | 			"cpu_time":               cpu_time, | ||||||
|  | 			"bytes_received":         bytes_received, | ||||||
|  | 			"bytes_sent":             bytes_sent, | ||||||
|  | 			"binlog_bytes_written":   binlog_bytes_written, | ||||||
|  | 			"rows_fetched":           rows_fetched, | ||||||
|  | 			"rows_updated":           rows_updated, | ||||||
|  | 			"table_rows_read":        table_rows_read, | ||||||
|  | 			"select_commands":        select_commands, | ||||||
|  | 			"update_commands":        update_commands, | ||||||
|  | 			"other_commands":         other_commands, | ||||||
|  | 			"commit_transactions":    commit_transactions, | ||||||
|  | 			"rollback_transactions":  rollback_transactions, | ||||||
|  | 			"denied_connections":     denied_connections, | ||||||
|  | 			"lost_connections":       lost_connections, | ||||||
|  | 			"access_denied":          access_denied, | ||||||
|  | 			"empty_queries":          empty_queries, | ||||||
|  | 			"total_ssl_connections":  total_ssl_connections, | ||||||
|  | 		} | ||||||
|  | 		acc.AddFields("mysql_user_stats", fields, tags) | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
| // gatherPerfTableIOWaits can be used to get total count and time
 | // gatherPerfTableIOWaits can be used to get total count and time
 | ||||||
| // of I/O wait event for each table and process
 | // of I/O wait event for each table and process
 | ||||||
| func (m *Mysql) gatherPerfTableIOWaits(db *sql.DB, serv string, acc telegraf.Accumulator) error { | func (m *Mysql) gatherPerfTableIOWaits(db *sql.DB, serv string, acc telegraf.Accumulator) error { | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue