Fix various mysql data type conversions (#3554)
This commit is contained in:
parent
d7d224d511
commit
df9c7590b3
|
@ -169,182 +169,6 @@ func (m *Mysql) Gather(acc telegraf.Accumulator) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type mapping struct {
|
|
||||||
onServer string
|
|
||||||
inExport string
|
|
||||||
}
|
|
||||||
|
|
||||||
var mappings = []*mapping{
|
|
||||||
{
|
|
||||||
onServer: "Aborted_",
|
|
||||||
inExport: "aborted_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Bytes_",
|
|
||||||
inExport: "bytes_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Com_",
|
|
||||||
inExport: "commands_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Created_",
|
|
||||||
inExport: "created_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Handler_",
|
|
||||||
inExport: "handler_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Innodb_",
|
|
||||||
inExport: "innodb_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Key_",
|
|
||||||
inExport: "key_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Open_",
|
|
||||||
inExport: "open_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Opened_",
|
|
||||||
inExport: "opened_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Qcache_",
|
|
||||||
inExport: "qcache_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Table_",
|
|
||||||
inExport: "table_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Tokudb_",
|
|
||||||
inExport: "tokudb_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Threads_",
|
|
||||||
inExport: "threads_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Access_",
|
|
||||||
inExport: "access_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Aria__",
|
|
||||||
inExport: "aria_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Binlog__",
|
|
||||||
inExport: "binlog_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Busy_",
|
|
||||||
inExport: "busy_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Connection_",
|
|
||||||
inExport: "connection_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Delayed_",
|
|
||||||
inExport: "delayed_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Empty_",
|
|
||||||
inExport: "empty_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Executed_",
|
|
||||||
inExport: "executed_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Executed_",
|
|
||||||
inExport: "executed_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Feature_",
|
|
||||||
inExport: "feature_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Flush_",
|
|
||||||
inExport: "flush_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Last_",
|
|
||||||
inExport: "last_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Master_",
|
|
||||||
inExport: "master_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Max_",
|
|
||||||
inExport: "max_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Memory_",
|
|
||||||
inExport: "memory_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Not_",
|
|
||||||
inExport: "not_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Performance_",
|
|
||||||
inExport: "performance_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Prepared_",
|
|
||||||
inExport: "prepared_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Rows_",
|
|
||||||
inExport: "rows_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Rpl_",
|
|
||||||
inExport: "rpl_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Select_",
|
|
||||||
inExport: "select_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Slave_",
|
|
||||||
inExport: "slave_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Slow_",
|
|
||||||
inExport: "slow_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Sort_",
|
|
||||||
inExport: "sort_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Subquery_",
|
|
||||||
inExport: "subquery_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Tc_",
|
|
||||||
inExport: "tc_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Threadpool_",
|
|
||||||
inExport: "threadpool_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "wsrep_",
|
|
||||||
inExport: "wsrep_",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
onServer: "Uptime_",
|
|
||||||
inExport: "uptime_",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// status counter
|
// status counter
|
||||||
generalThreadStates = map[string]uint32{
|
generalThreadStates = map[string]uint32{
|
||||||
|
@ -717,9 +541,8 @@ func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accu
|
||||||
fields[key] = string(val)
|
fields[key] = string(val)
|
||||||
tags[key] = string(val)
|
tags[key] = string(val)
|
||||||
}
|
}
|
||||||
// parse value, if it is numeric then save, otherwise ignore
|
if value, ok := parseValue(val); ok {
|
||||||
if floatVal, ok := parseValue(val); ok {
|
fields[key] = value
|
||||||
fields[key] = floatVal
|
|
||||||
}
|
}
|
||||||
// Send 20 fields at a time
|
// Send 20 fields at a time
|
||||||
if len(fields) >= 20 {
|
if len(fields) >= 20 {
|
||||||
|
@ -769,7 +592,7 @@ func (m *Mysql) gatherSlaveStatuses(db *sql.DB, serv string, acc telegraf.Accumu
|
||||||
}
|
}
|
||||||
// range over columns, and try to parse values
|
// range over columns, and try to parse values
|
||||||
for i, col := range cols {
|
for i, col := range cols {
|
||||||
// skip unparsable values
|
col = strings.ToLower(col)
|
||||||
if value, ok := parseValue(*vals[i].(*sql.RawBytes)); ok {
|
if value, ok := parseValue(*vals[i].(*sql.RawBytes)); ok {
|
||||||
fields["slave_"+col] = value
|
fields["slave_"+col] = value
|
||||||
}
|
}
|
||||||
|
@ -820,98 +643,36 @@ func (m *Mysql) gatherBinaryLogs(db *sql.DB, serv string, acc telegraf.Accumulat
|
||||||
// the mappings of actual names and names of each status to be exported
|
// the mappings of actual names and names of each status to be exported
|
||||||
// to output is provided on mappings variable
|
// to output is provided on mappings variable
|
||||||
func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error {
|
func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error {
|
||||||
// If user forgot the '/', add it
|
|
||||||
if strings.HasSuffix(serv, ")") {
|
|
||||||
serv = serv + "/"
|
|
||||||
} else if serv == "localhost" {
|
|
||||||
serv = ""
|
|
||||||
}
|
|
||||||
|
|
||||||
// run query
|
// run query
|
||||||
rows, err := db.Query(globalStatusQuery)
|
rows, err := db.Query(globalStatusQuery)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
// parse the DSN and save host name as a tag
|
// parse the DSN and save host name as a tag
|
||||||
servtag := getDSNTag(serv)
|
servtag := getDSNTag(serv)
|
||||||
tags := map[string]string{"server": servtag}
|
tags := map[string]string{"server": servtag}
|
||||||
fields := make(map[string]interface{})
|
fields := make(map[string]interface{})
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var name string
|
var key string
|
||||||
var val interface{}
|
var val sql.RawBytes
|
||||||
|
|
||||||
err = rows.Scan(&name, &val)
|
if err = rows.Scan(&key, &val); err != nil {
|
||||||
if err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
var found bool
|
key = strings.ToLower(key)
|
||||||
|
|
||||||
// iterate over mappings and gather metrics that is provided on mapping
|
if value, ok := parseValue(val); ok {
|
||||||
for _, mapped := range mappings {
|
fields[key] = value
|
||||||
if strings.HasPrefix(name, mapped.onServer) {
|
|
||||||
// convert numeric values to integer
|
|
||||||
i, _ := strconv.Atoi(string(val.([]byte)))
|
|
||||||
fields[mapped.inExport+name[len(mapped.onServer):]] = i
|
|
||||||
found = true
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send 20 fields at a time
|
// Send 20 fields at a time
|
||||||
if len(fields) >= 20 {
|
if len(fields) >= 20 {
|
||||||
acc.AddFields("mysql", fields, tags)
|
acc.AddFields("mysql", fields, tags)
|
||||||
fields = make(map[string]interface{})
|
fields = make(map[string]interface{})
|
||||||
}
|
}
|
||||||
|
|
||||||
if found {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// search for specific values
|
|
||||||
switch name {
|
|
||||||
case "Queries":
|
|
||||||
i, err := strconv.ParseInt(string(val.([]byte)), 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", name, err))
|
|
||||||
} else {
|
|
||||||
fields["queries"] = i
|
|
||||||
}
|
|
||||||
case "Questions":
|
|
||||||
i, err := strconv.ParseInt(string(val.([]byte)), 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", name, err))
|
|
||||||
} else {
|
|
||||||
fields["questions"] = i
|
|
||||||
}
|
|
||||||
case "Slow_queries":
|
|
||||||
i, err := strconv.ParseInt(string(val.([]byte)), 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", name, err))
|
|
||||||
} else {
|
|
||||||
fields["slow_queries"] = i
|
|
||||||
}
|
|
||||||
case "Connections":
|
|
||||||
i, err := strconv.ParseInt(string(val.([]byte)), 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", name, err))
|
|
||||||
} else {
|
|
||||||
fields["connections"] = i
|
|
||||||
}
|
|
||||||
case "Syncs":
|
|
||||||
i, err := strconv.ParseInt(string(val.([]byte)), 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", name, err))
|
|
||||||
} else {
|
|
||||||
fields["syncs"] = i
|
|
||||||
}
|
|
||||||
case "Uptime":
|
|
||||||
i, err := strconv.ParseInt(string(val.([]byte)), 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", name, err))
|
|
||||||
} else {
|
|
||||||
fields["uptime"] = i
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// Send any remaining fields
|
// Send any remaining fields
|
||||||
if len(fields) > 0 {
|
if len(fields) > 0 {
|
||||||
|
@ -1059,7 +820,7 @@ func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf.
|
||||||
for s, c := range stateCounts {
|
for s, c := range stateCounts {
|
||||||
fields[newNamespace("threads", s)] = c
|
fields[newNamespace("threads", s)] = c
|
||||||
}
|
}
|
||||||
acc.AddFields("mysql_info_schema", fields, tags)
|
acc.AddFields("mysql_process_list", fields, tags)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1272,7 +1033,7 @@ func (m *Mysql) gatherInfoSchemaAutoIncStatuses(db *sql.DB, serv string, acc tel
|
||||||
fields["auto_increment_column"] = incValue
|
fields["auto_increment_column"] = incValue
|
||||||
fields["auto_increment_column_max"] = maxInt
|
fields["auto_increment_column_max"] = maxInt
|
||||||
|
|
||||||
acc.AddFields("mysql_info_schema", fields, tags)
|
acc.AddFields("mysql_table_schema", fields, tags)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -1287,21 +1048,19 @@ func (m *Mysql) gatherInnoDBMetrics(db *sql.DB, serv string, acc telegraf.Accumu
|
||||||
}
|
}
|
||||||
defer rows.Close()
|
defer rows.Close()
|
||||||
|
|
||||||
var key string
|
|
||||||
var val sql.RawBytes
|
|
||||||
|
|
||||||
// parse DSN and save server tag
|
// parse DSN and save server tag
|
||||||
servtag := getDSNTag(serv)
|
servtag := getDSNTag(serv)
|
||||||
tags := map[string]string{"server": servtag}
|
tags := map[string]string{"server": servtag}
|
||||||
fields := make(map[string]interface{})
|
fields := make(map[string]interface{})
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
|
var key string
|
||||||
|
var val sql.RawBytes
|
||||||
if err := rows.Scan(&key, &val); err != nil {
|
if err := rows.Scan(&key, &val); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
key = strings.ToLower(key)
|
key = strings.ToLower(key)
|
||||||
// parse value, if it is numeric then save, otherwise ignore
|
if value, ok := parseValue(val); ok {
|
||||||
if floatVal, ok := parseValue(val); ok {
|
fields[key] = value
|
||||||
fields[key] = floatVal
|
|
||||||
}
|
}
|
||||||
// Send 20 fields at a time
|
// Send 20 fields at a time
|
||||||
if len(fields) >= 20 {
|
if len(fields) >= 20 {
|
||||||
|
@ -1671,23 +1430,17 @@ func (m *Mysql) gatherTableSchema(db *sql.DB, serv string, acc telegraf.Accumula
|
||||||
tags["schema"] = tableSchema
|
tags["schema"] = tableSchema
|
||||||
tags["table"] = tableName
|
tags["table"] = tableName
|
||||||
|
|
||||||
acc.AddFields(newNamespace("info_schema", "table_rows"),
|
acc.AddFields("mysql_table_schema",
|
||||||
map[string]interface{}{"value": tableRows}, tags)
|
map[string]interface{}{"rows": tableRows}, tags)
|
||||||
|
|
||||||
dlTags := copyTags(tags)
|
acc.AddFields("mysql_table_schema",
|
||||||
dlTags["component"] = "data_length"
|
map[string]interface{}{"data_length": dataLength}, tags)
|
||||||
acc.AddFields(newNamespace("info_schema", "table_size", "data_length"),
|
|
||||||
map[string]interface{}{"value": dataLength}, dlTags)
|
|
||||||
|
|
||||||
ilTags := copyTags(tags)
|
acc.AddFields("mysql_table_schema",
|
||||||
ilTags["component"] = "index_length"
|
map[string]interface{}{"index_length": indexLength}, tags)
|
||||||
acc.AddFields(newNamespace("info_schema", "table_size", "index_length"),
|
|
||||||
map[string]interface{}{"value": indexLength}, ilTags)
|
|
||||||
|
|
||||||
dfTags := copyTags(tags)
|
acc.AddFields("mysql_table_schema",
|
||||||
dfTags["component"] = "data_free"
|
map[string]interface{}{"data_free": dataFree}, tags)
|
||||||
acc.AddFields(newNamespace("info_schema", "table_size", "data_free"),
|
|
||||||
map[string]interface{}{"value": dataFree}, dfTags)
|
|
||||||
|
|
||||||
versionTags := copyTags(tags)
|
versionTags := copyTags(tags)
|
||||||
versionTags["type"] = tableType
|
versionTags["type"] = tableType
|
||||||
|
@ -1695,24 +1448,34 @@ func (m *Mysql) gatherTableSchema(db *sql.DB, serv string, acc telegraf.Accumula
|
||||||
versionTags["row_format"] = rowFormat
|
versionTags["row_format"] = rowFormat
|
||||||
versionTags["create_options"] = createOptions
|
versionTags["create_options"] = createOptions
|
||||||
|
|
||||||
acc.AddFields(newNamespace("info_schema", "table_version"),
|
acc.AddFields("mysql_table_schema_version",
|
||||||
map[string]interface{}{"value": version}, versionTags)
|
map[string]interface{}{"table_version": version}, versionTags)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// parseValue can be used to convert values such as "ON","OFF","Yes","No" to 0,1
|
// parseValue can be used to convert values such as "ON","OFF","Yes","No" to 0,1
|
||||||
func parseValue(value sql.RawBytes) (float64, bool) {
|
func parseValue(value sql.RawBytes) (interface{}, bool) {
|
||||||
if bytes.Compare(value, []byte("Yes")) == 0 || bytes.Compare(value, []byte("ON")) == 0 {
|
if bytes.EqualFold(value, []byte("YES")) || bytes.Compare(value, []byte("ON")) == 0 {
|
||||||
return 1, true
|
return 1, true
|
||||||
}
|
}
|
||||||
|
|
||||||
if bytes.Compare(value, []byte("No")) == 0 || bytes.Compare(value, []byte("OFF")) == 0 {
|
if bytes.EqualFold(value, []byte("NO")) || bytes.Compare(value, []byte("OFF")) == 0 {
|
||||||
return 0, true
|
return 0, true
|
||||||
}
|
}
|
||||||
n, err := strconv.ParseFloat(string(value), 64)
|
|
||||||
return n, err == nil
|
if val, err := strconv.ParseInt(string(value), 10, 64); err == nil {
|
||||||
|
return val, true
|
||||||
|
}
|
||||||
|
if val, err := strconv.ParseFloat(string(value), 64); err == nil {
|
||||||
|
return val, true
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(string(value)) > 0 {
|
||||||
|
return string(value), true
|
||||||
|
}
|
||||||
|
return nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
// findThreadState can be used to find thread state by command and plain state
|
// findThreadState can be used to find thread state by command and plain state
|
||||||
|
|
|
@ -127,26 +127,29 @@ func TestMysqlDNSAddTimeout(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestParseValue(t *testing.T) {
|
func TestParseValue(t *testing.T) {
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
rawByte sql.RawBytes
|
rawByte sql.RawBytes
|
||||||
value float64
|
output interface{}
|
||||||
boolValue bool
|
boolValue bool
|
||||||
}{
|
}{
|
||||||
{sql.RawBytes("Yes"), 1, true},
|
{sql.RawBytes("123"), int64(123), true},
|
||||||
{sql.RawBytes("No"), 0, false},
|
{sql.RawBytes("abc"), "abc", true},
|
||||||
|
{sql.RawBytes("10.1"), 10.1, true},
|
||||||
{sql.RawBytes("ON"), 1, true},
|
{sql.RawBytes("ON"), 1, true},
|
||||||
{sql.RawBytes("OFF"), 0, false},
|
{sql.RawBytes("OFF"), 0, true},
|
||||||
{sql.RawBytes("ABC"), 0, false},
|
{sql.RawBytes("NO"), 0, true},
|
||||||
|
{sql.RawBytes("YES"), 1, true},
|
||||||
|
{sql.RawBytes("No"), 0, true},
|
||||||
|
{sql.RawBytes("Yes"), 1, true},
|
||||||
|
{sql.RawBytes(""), nil, false},
|
||||||
}
|
}
|
||||||
for _, cases := range testCases {
|
for _, cases := range testCases {
|
||||||
if value, ok := parseValue(cases.rawByte); value != cases.value && ok != cases.boolValue {
|
if got, ok := parseValue(cases.rawByte); got != cases.output && ok != cases.boolValue {
|
||||||
t.Errorf("want %d with %t, got %d with %t", int(cases.value), cases.boolValue, int(value), ok)
|
t.Errorf("for %s wanted %t, got %t", string(cases.rawByte), cases.output, got)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNewNamespace(t *testing.T) {
|
func TestNewNamespace(t *testing.T) {
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
words []string
|
words []string
|
||||||
|
|
Loading…
Reference in New Issue