preventing tags from mutation by creating new tag for each metric

This commit is contained in:
Maksadbek 2016-03-31 01:42:10 +05:00
parent 9f36e194f4
commit 116c0e4f2f
1 changed files with 119 additions and 83 deletions

View File

@ -1157,101 +1157,123 @@ func (m *Mysql) gatherPerfTableLockWaits(db *sql.DB, serv string, acc telegraf.A
"schema": objectSchema, "schema": objectSchema,
"table": objectName, "table": objectName,
} }
fields := make(map[string]interface{}) sqlLWFields := make(map[string]interface{})
tags["operation"] = "read_normal" rnTags := copyTags(tags)
fields["sql_lock_waits_total"] = countReadNormal rnTags["operation"] = "read_normal"
acc.AddFields("mysql_perf_schema", fields, tags) sqlLWFields["sql_lock_waits_total"] = countReadNormal
acc.AddFields("mysql_perf_schema", sqlLWFields, rnTags)
tags["operation"] = "read_with_shared_locks" rwslTags := copyTags(tags)
fields["sql_lock_waits_total"] = countReadWithSharedLocks rwslTags["operation"] = "read_with_shared_locks"
acc.AddFields("mysql_perf_schema", fields, tags) sqlLWFields["sql_lock_waits_total"] = countReadWithSharedLocks
acc.AddFields("mysql_perf_schema", sqlLWFields, rwslTags)
tags["operation"] = "read_high_priority" rhptTags := copyTags(tags)
fields["sql_lock_waits_total"] = countReadHighPriority rhptTags["operation"] = "read_high_priority"
acc.AddFields("mysql_perf_schema", fields, tags) sqlLWFields["sql_lock_waits_total"] = countReadHighPriority
acc.AddFields("mysql_perf_schema", sqlLWFields, rhptTags)
tags["operation"] = "read_no_insert" rniTags := copyTags(tags)
fields["sql_lock_waits_total"] = countReadNoInsert rniTags["operation"] = "read_no_insert"
acc.AddFields("mysql_perf_schema", fields, tags) sqlLWFields["sql_lock_waits_total"] = countReadNoInsert
acc.AddFields("mysql_perf_schema", sqlLWFields, tags)
tags["operation"] = "write_normal" wnTags := copyTags(tags)
fields["sql_lock_waits_total"] = countWriteNormal wnTags["operation"] = "write_normal"
acc.AddFields("mysql_perf_schema", fields, tags) sqlLWFields["sql_lock_waits_total"] = countWriteNormal
acc.AddFields("mysql_perf_schema", sqlLWFields, wnTags)
tags["operation"] = "write_allow_write" wawTags := copyTags(tags)
fields["sql_lock_waits_total"] = countWriteAllowWrite wawTags["operation"] = "write_allow_write"
acc.AddFields("mysql_perf_schema", fields, tags) sqlLWFields["sql_lock_waits_total"] = countWriteAllowWrite
acc.AddFields("mysql_perf_schema", sqlLWFields, wawTags)
tags["operation"] = "write_concurrent_insert" wciTags := copyTags(tags)
fields["sql_lock_waits_total"] = countWriteConcurrentInsert wciTags["operation"] = "write_concurrent_insert"
acc.AddFields("mysql_perf_schema", fields, tags) sqlLWFields["sql_lock_waits_total"] = countWriteConcurrentInsert
acc.AddFields("mysql_perf_schema", sqlLWFields, wciTags)
tags["operation"] = "write_delayed" wdTags := copyTags(tags)
fields["sql_lock_waits_total"] = countWriteDelayed wdTags["operation"] = "write_delayed"
acc.AddFields("mysql_perf_schema", fields, tags) sqlLWFields["sql_lock_waits_total"] = countWriteDelayed
acc.AddFields("mysql_perf_schema", sqlLWFields, wdTags)
tags["operation"] = "write_low_priority" wlpTags := copyTags(tags)
fields["sql_lock_waits_total"] = countWriteLowPriority wlpTags["operation"] = "write_low_priority"
acc.AddFields("mysql_perf_schema", fields, tags) sqlLWFields["sql_lock_waits_total"] = countWriteLowPriority
acc.AddFields("mysql_perf_schema", sqlLWFields, wlpTags)
delete(fields, "sql_lock_waits_total") externalLWFields := make(map[string]interface{})
tags["operation"] = "read" rTags := copyTags(tags)
fields["external_lock_waits_total"] = countReadExternal rTags["operation"] = "read"
acc.AddFields("mysql_perf_schema", fields, tags) externalLWFields["external_lock_waits_total"] = countReadExternal
acc.AddFields("mysql_perf_schema", externalLWFields, rTags)
tags["operation"] = "write" wTags := copyTags(tags)
fields["external_lock_waits_total"] = countWriteExternal wTags["operation"] = "write"
acc.AddFields("mysql_perf_schema", fields, tags) externalLWFields["external_lock_waits_total"] = countWriteExternal
acc.AddFields("mysql_perf_schema", externalLWFields, wTags)
delete(fields, "external_lock_waits_total") sqlLWSecTotalFields := make(map[string]interface{})
tags["operation"] = "read_normal" rnstTags := copyTags(tags)
fields["sql_lock_waits_seconds_total"] = timeReadNormal / picoSeconds rnstTags["operation"] = "read_normal"
acc.AddFields("mysql_perf_schema", fields, tags) sqlLWSecTotalFields["sql_lock_waits_seconds_total"] = timeReadNormal / picoSeconds
acc.AddFields("mysql_perf_schema", sqlLWSecTotalFields, rnstTags)
tags["operation"] = "read_with_shared_locks" rwslstTags := copyTags(tags)
fields["sql_lock_waits_seconds_total"] = timeReadWithSharedLocks / picoSeconds rwslstTags["operation"] = "read_with_shared_locks"
acc.AddFields("mysql_perf_schema", fields, tags) sqlLWSecTotalFields["sql_lock_waits_seconds_total"] = timeReadWithSharedLocks / picoSeconds
acc.AddFields("mysql_perf_schema", sqlLWSecTotalFields, rwslstTags)
tags["operation"] = "read_high_priority" rhpTags := copyTags(tags)
fields["sql_lock_waits_seconds_total"] = timeReadHighPriority / picoSeconds rhpTags["operation"] = "read_high_priority"
acc.AddFields("mysql_perf_schema", fields, tags) sqlLWSecTotalFields["sql_lock_waits_seconds_total"] = timeReadHighPriority / picoSeconds
acc.AddFields("mysql_perf_schema", sqlLWSecTotalFields, rhpTags)
tags["operation"] = "read_no_insert" rnistTags := copyTags(tags)
fields["sql_lock_waits_seconds_total"] = timeReadNoInsert / picoSeconds rnistTags["operation"] = "read_no_insert"
acc.AddFields("mysql_perf_schema", fields, tags) sqlLWSecTotalFields["sql_lock_waits_seconds_total"] = timeReadNoInsert / picoSeconds
acc.AddFields("mysql_perf_schema", sqlLWSecTotalFields, rnistTags)
tags["operation"] = "write_normal" wnstTags := copyTags(tags)
fields["sql_lock_waits_seconds_total"] = timeWriteNormal / picoSeconds wnstTags["operation"] = "write_normal"
acc.AddFields("mysql_perf_schema", fields, tags) sqlLWSecTotalFields["sql_lock_waits_seconds_total"] = timeWriteNormal / picoSeconds
acc.AddFields("mysql_perf_schema", sqlLWSecTotalFields, wnstTags)
tags["operation"] = "write_allow_write" wawstTags := copyTags(tags)
fields["sql_lock_waits_seconds_total"] = timeWriteAllowWrite / picoSeconds wawstTags["operation"] = "write_allow_write"
acc.AddFields("mysql_perf_schema", fields, tags) sqlLWSecTotalFields["sql_lock_waits_seconds_total"] = timeWriteAllowWrite / picoSeconds
acc.AddFields("mysql_perf_schema", sqlLWSecTotalFields, wawstTags)
tags["operation"] = "write_concurrent_insert" wcistTags := copyTags(tags)
fields["sql_lock_waits_seconds_total"] = timeWriteConcurrentInsert / picoSeconds wcistTags["operation"] = "write_concurrent_insert"
acc.AddFields("mysql_perf_schema", fields, tags) sqlLWSecTotalFields["sql_lock_waits_seconds_total"] = timeWriteConcurrentInsert / picoSeconds
acc.AddFields("mysql_perf_schema", sqlLWSecTotalFields, wcistTags)
tags["operation"] = "write_delayed" wdstTags := copyTags(tags)
fields["sql_lock_waits_seconds_total"] = timeWriteDelayed / picoSeconds wdstTags["operation"] = "write_delayed"
acc.AddFields("mysql_perf_schema", fields, tags) sqlLWSecTotalFields["sql_lock_waits_seconds_total"] = timeWriteDelayed / picoSeconds
acc.AddFields("mysql_perf_schema", sqlLWSecTotalFields, wdstTags)
tags["operation"] = "write_low_priority" wlpstTags := copyTags(tags)
fields["sql_lock_waits_seconds_total"] = timeWriteLowPriority / picoSeconds wlpstTags["operation"] = "write_low_priority"
acc.AddFields("mysql_perf_schema", fields, tags) sqlLWSecTotalFields["sql_lock_waits_seconds_total"] = timeWriteLowPriority / picoSeconds
acc.AddFields("mysql_perf_schema", sqlLWSecTotalFields, wlpstTags)
delete(fields, "sql_lock_waits_seconds_total") externalLWSecTotalFields := make(map[string]interface{})
tags["operation"] = "read" rstTags := copyTags(tags)
fields["external_lock_waits_seconds_total"] = timeReadExternal / picoSeconds rstTags["operation"] = "read"
acc.AddFields("mysql_perf_schema", fields, tags) externalLWSecTotalFields["external_lock_waits_seconds_total"] = timeReadExternal / picoSeconds
acc.AddFields("mysql_perf_schema", externalLWSecTotalFields, rstTags)
tags["operation"] = "write" wstTags := copyTags(tags)
fields["external_lock_waits_seconds_total"] = timeWriteExternal / picoSeconds wstTags["operation"] = "write"
acc.AddFields("mysql_perf_schema", fields, tags) externalLWSecTotalFields["external_lock_waits_seconds_total"] = timeWriteExternal / picoSeconds
acc.AddFields("mysql_perf_schema", externalLWSecTotalFields, wstTags)
} }
return nil return nil
} }
@ -1328,22 +1350,25 @@ func (m *Mysql) gatherPerfFileEventsStatuses(db *sql.DB, serv string, acc telegr
tags["event_name"] = eventName tags["event_name"] = eventName
fields := make(map[string]interface{}) fields := make(map[string]interface{})
tags["mode"] = "misc" miscTags := copyTags(tags)
miscTags["mode"] = "misc"
fields["file_events_total"] = countWrite fields["file_events_total"] = countWrite
fields["file_events_seconds_total"] = sumTimerMisc / picoSeconds fields["file_events_seconds_total"] = sumTimerMisc / picoSeconds
acc.AddFields("mysql_perf_schema", fields, tags) acc.AddFields("mysql_perf_schema", fields, miscTags)
tags["mode"] = "read" readTags := copyTags(tags)
readTags["mode"] = "read"
fields["file_events_total"] = countRead fields["file_events_total"] = countRead
fields["file_events_seconds_total"] = sumTimerRead / picoSeconds fields["file_events_seconds_total"] = sumTimerRead / picoSeconds
fields["file_events_bytes_totals"] = sumNumBytesRead fields["file_events_bytes_totals"] = sumNumBytesRead
acc.AddFields("mysql_perf_schema", fields, tags) acc.AddFields("mysql_perf_schema", fields, readTags)
tags["mode"] = "write" writeTags := copyTags(tags)
writeTags["mode"] = "write"
fields["file_events_total"] = countWrite fields["file_events_total"] = countWrite
fields["file_events_seconds_total"] = sumTimerWrite / picoSeconds fields["file_events_seconds_total"] = sumTimerWrite / picoSeconds
fields["file_events_bytes_totals"] = sumNumBytesWrite fields["file_events_bytes_totals"] = sumNumBytesWrite
acc.AddFields("mysql_perf_schema", fields, tags) acc.AddFields("mysql_perf_schema", fields, writeTags)
} }
return nil return nil
@ -1489,19 +1514,22 @@ func (m *Mysql) gatherTableSchema(db *sql.DB, serv string, acc telegraf.Accumula
tags := map[string]string{"server": servtag} tags := map[string]string{"server": servtag}
tags["schema"] = tableSchema tags["schema"] = tableSchema
tags["table"] = tableName tags["table"] = tableName
versionTags := tags
acc.Add(newNamespace("info_schema", "table_rows"), tableRows, tags) acc.Add(newNamespace("info_schema", "table_rows"), tableRows, tags)
tags["component"] = "data_length" dlTags := copyTags(tags)
acc.Add(newNamespace("info_schema", "table_size", "data_length"), dataLength, tags) dlTags["component"] = "data_length"
acc.Add(newNamespace("info_schema", "table_size", "data_length"), dataLength, dlTags)
tags["component"] = "index_length" ilTags := copyTags(tags)
acc.Add(newNamespace("info_schema", "table_size", "index_length"), indexLength, tags) ilTags["component"] = "index_length"
acc.Add(newNamespace("info_schema", "table_size", "index_length"), indexLength, ilTags)
tags["component"] = "data_free" dfTags := copyTags(tags)
acc.Add(newNamespace("info_schema", "table_size", "data_free"), dataFree, tags) dfTags["component"] = "data_free"
acc.Add(newNamespace("info_schema", "table_size", "data_free"), dataFree, dfTags)
versionTags := copyTags(tags)
versionTags["type"] = tableType versionTags["type"] = tableType
versionTags["engine"] = engine versionTags["engine"] = engine
versionTags["row_format"] = rowFormat versionTags["row_format"] = rowFormat
@ -1567,6 +1595,14 @@ func newNamespace(words ...string) string {
return strings.Replace(strings.Join(words, "_"), " ", "_", -1) return strings.Replace(strings.Join(words, "_"), " ", "_", -1)
} }
func copyTags(in map[string]string) map[string]string {
out := make(map[string]string)
for k, v := range in {
out[k] = v
}
return out
}
func init() { func init() {
inputs.Add("mysql", func() telegraf.Input { inputs.Add("mysql", func() telegraf.Input {
return &Mysql{} return &Mysql{}