https://github.com/statsd/statsd
Raw File
Tip revision: 7c07eec4e7cebbd376d8313b230cea96c6571423 authored by Elliot Blackburn on 22 August 2023, 12:52:19 UTC
Correct container image tagging
Tip revision: 7c07eec
stats.js
/*jshint node:true, laxcomma:true */

const util = require('util');
const config = require('./lib/config');
const helpers = require('./lib/helpers');
const fs = require('fs');
const events = require('events');
const logger = require('./lib/logger');
const set = require('./lib/set');
const pm = require('./lib/process_metrics');
const process_mgmt = require('./lib/process_mgmt');
const mgmt_server = require('./lib/mgmt_server');
const mgmt = require('./lib/mgmt_console');

// initialize data structures with defaults for statsd stats
let keyCounter = {};
let counters = {};
let timers = {};
let timer_counters = {};
let gauges = {};
let gaugesTTL = {};
let sets = {};
let counter_rates = {};
let timer_data = {};
let pctThreshold = null;
let flushInterval, keyFlushInt, serversLoaded, mgmtServer;
let startup_time = Math.round(new Date().getTime() / 1000);
let backendEvents = new events.EventEmitter();
let healthStatus = config.healthStatus || 'up';
let old_timestamp = 0;
let timestamp_lag_namespace;
let keyNameSanitize = true;

// Load and init the backend from the backends/ directory.
function loadBackend(config, name) {
  const backendmod = require(name);

  if (config.debug) {
    l.log("Loading backend: " + name, 'DEBUG');
  }

  const ret = backendmod.init(startup_time, config, backendEvents, l);
  if (!ret) {
    l.log("Failed to load backend: " + name, "ERROR");
    process.exit(1);
  }
}

// Load and init the server from the servers/ directory.
// The callback mimics the dgram 'message' event parameters (msg, rinfo)
//   msg: the message received by the server. may contain more than one metric
//   rinfo: contains remote address information and message length
//      (attributes are .address, .port, .family, .size - you're welcome)
function startServer(config, name, callback) {
  const servermod = require(name);

  if (config.debug) {
    l.log("Loading server: " + name, 'DEBUG');
  }

  const ret = servermod.start(config, callback);
  if (!ret) {
    l.log("Failed to load server: " + name, "ERROR");
    process.exit(1);
  }
}

// global for conf
let conf;

// Flush metrics to each backend.
function flushMetrics() {
  const time_stamp = Math.round(new Date().getTime() / 1000);
  if (old_timestamp > 0) {
    gauges[timestamp_lag_namespace] = (time_stamp - old_timestamp - (Number(conf.flushInterval)/1000));
  }
  old_timestamp = time_stamp;

  const metrics_hash = {
    counters: counters,
    gauges: gauges,
    timers: timers,
    timer_counters: timer_counters,
    sets: sets,
    counter_rates: counter_rates,
    timer_data: timer_data,
    pctThreshold: pctThreshold,
    histogram: conf.histogram
  };

  // After all listeners, reset the stats
  backendEvents.once('flush', function clear_metrics(ts, metrics) {

    // Clear the counters
    conf.deleteCounters = conf.deleteCounters || false;
    for (const counter_key in metrics.counters) {
      if (conf.deleteCounters) {
        if ((counter_key.indexOf("packets_received") != -1) ||
            (counter_key.indexOf("metrics_received") != -1) ||
            (counter_key.indexOf("bad_lines_seen") != -1)) {
          metrics.counters[counter_key] = 0;
        } else {
         delete(metrics.counters[counter_key]);
        }
      } else {
        metrics.counters[counter_key] = 0;
      }
    }

    // Clear the timers
    conf.deleteTimers = conf.deleteTimers || false;
    for (const timer_key in metrics.timers) {
      if (conf.deleteTimers) {
        delete(metrics.timers[timer_key]);
        delete(metrics.timer_counters[timer_key]);
      } else {
        metrics.timers[timer_key] = [];
        metrics.timer_counters[timer_key] = 0;
     }
    }

    // Clear the sets
    conf.deleteSets = conf.deleteSets || false;
    for (const set_key in metrics.sets) {
      if (conf.deleteSets) {
        delete(metrics.sets[set_key]);
      } else {
        metrics.sets[set_key] = new set.Set();
      }
    }

    // Normally gauges are not reset.  so if we don't delete them, continue to persist previous value
    conf.deleteGauges = conf.deleteGauges || false;
    if (conf.deleteGauges) {
      for (const gauge_key in gaugesTTL) {
        gaugesTTL[gauge_key]--;

        // if the gauge has been idle for more than the allowed TTL cycles delete it
        if (gaugesTTL[gauge_key] < 1) {
          delete(metrics.gauges[gauge_key]);
          delete(gaugesTTL[gauge_key]);
        }
      }
    }
  });

  pm.process_metrics(metrics_hash, conf.calculatedTimerMetrics, flushInterval, time_stamp, function emitFlush(metrics) {
    backendEvents.emit('flush', time_stamp, metrics);
  });

  // Performing this setTimeout at the end of this method rather than the beginning
  // helps ensure we adapt to negative clock skew by letting the method's latency
  // introduce a short delay that should more than compensate.
  setTimeout(flushMetrics, getFlushTimeout(flushInterval));
}

const stats = {
  messages: {
    last_msg_seen: startup_time,
    bad_lines_seen: 0
  }
};

function sanitizeKeyName(key) {
  if (keyNameSanitize) {
    return key.replace(/\s+/g, '_')
              .replace(/\//g, '-')
              .replace(/[^a-zA-Z_\-0-9\.;=]/g, '');
  } else {
    return key;
  }
}

function getFlushTimeout(interval) {
  const now = new Date().getTime()
  const deltaTime = now - startup_time * 1000;
  const timeoutAttempt = Math.round(deltaTime / interval) + 1;
  const fixedTimeout = (startup_time * 1000 + timeoutAttempt * interval) - now;

  return fixedTimeout;
}

// Global for the logger
let l;

config.configFile(process.argv[2], function (config) {
  conf = config;

  process_mgmt.init(config);

  l = new logger.Logger(config.log || {});

  // force conf.gaugesMaxTTL to 1 if it not a positive integer > 1
  if (helpers.isInteger(conf.gaugesMaxTTL) && conf.gaugesMaxTTL > 1) {
    conf.gaugesMaxTTL = conf.gaugesMaxTTL;
  } else {
    conf.gaugesMaxTTL = 1;
  }

  // allows us to flag all of these on with a single config but still override them individually
  conf.deleteIdleStats = conf.deleteIdleStats !== undefined ? conf.deleteIdleStats : false;
  if (conf.deleteIdleStats) {
    conf.deleteCounters = conf.deleteCounters !== undefined ? conf.deleteCounters : true;
    conf.deleteTimers = conf.deleteTimers !== undefined ? conf.deleteTimers : true;
    conf.deleteSets = conf.deleteSets !== undefined ? conf.deleteSets : true;
    conf.deleteGauges = conf.deleteGauges !== undefined ? conf.deleteGauges : true;
  }

  // if gauges are not being deleted, clear gaugesTTL counters to save memory
  if (! conf.deleteGauges) {
    gaugesTTL = {}
  }
  // setup config for stats prefix
  let prefixStats = config.prefixStats;
  prefixStats = prefixStats !== undefined ? prefixStats : "statsd";
  //setup the names for the stats stored in counters{}
  bad_lines_seen   = prefixStats + ".bad_lines_seen";
  packets_received = prefixStats + ".packets_received";
  metrics_received = prefixStats + ".metrics_received";
  timestamp_lag_namespace = prefixStats + ".timestamp_lag";

  //now set to zero so we can increment them
  counters[bad_lines_seen]   = 0;
  counters[packets_received] = 0;
  counters[metrics_received] = 0;

  if (config.keyNameSanitize !== undefined) {
    keyNameSanitize = config.keyNameSanitize;
  }
  if (!serversLoaded) {

    // key counting
    const keyFlushInterval = Number((config.keyFlush && config.keyFlush.interval) || 0);

    const handlePacket = function (msg, rinfo) {
      backendEvents.emit('packet', msg, rinfo);
      counters[packets_received]++;
      let metrics;
      const packet_data = msg.toString();
      if (packet_data.indexOf("\n") > -1) {
        metrics = packet_data.split("\n");
      } else {
        metrics = [ packet_data ] ;
      }

      for (const midx in metrics) {
        if (metrics[midx].length === 0) {
          continue;
        }

        counters[metrics_received]++;
        if (config.dumpMessages) {
          l.log(metrics[midx].toString());
        }

        let bits = metrics[midx].toString().split('|#');
        let tags = [];
        if (bits.length > 1 && bits[1].length > 0) {
          tags = bits[1].split(',');
        }
        bits = bits[0].split(':');
        let key = bits.shift();
        if (tags.length > 0) {
          key += ';' + tags.map(function(tag) {
            return tag.replace(';', '_').replace(':', '=');
          }).join(';');
        }
        key = sanitizeKeyName(key);

        if (keyFlushInterval > 0) {
          if (! keyCounter[key]) {
            keyCounter[key] = 0;
          }
          keyCounter[key] += 1;
        }

        if (bits.length === 0) {
          bits.push("1");
        }

        for (let i = 0; i < bits.length; i++) {
          let sampleRate = 1;
          const fields = bits[i].split("|");
          if (!helpers.is_valid_packet(fields)) {
              l.log('Bad line: ' + fields + ' in msg "' + metrics[midx] +'"');
              counters[bad_lines_seen]++;
              stats.messages.bad_lines_seen++;
              continue;
          }
          if (fields[2]) {
            sampleRate = Number(fields[2].match(/^@([\d\.]+)/)[1]);
          }

          const metric_type = fields[1].trim();
          if (metric_type === "ms") {
            if (! timers[key]) {
              timers[key] = [];
              timer_counters[key] = 0;
            }
            timers[key].push(Number(fields[0] || 0));
            timer_counters[key] += (1 / sampleRate);
          } else if (metric_type === "g") {
            // if deleteGauges is true reset the max TTL to its initial value
            if (conf.deleteGauges) {
              gaugesTTL[key] = conf.gaugesMaxTTL;
            }
            if (gauges[key] && fields[0].match(/^[-+]/)) {
              gauges[key] += Number(fields[0] || 0);
            } else {
              gauges[key] = Number(fields[0] || 0);
            }
          } else if (metric_type === "s") {
            if (! sets[key]) {
              sets[key] = new set.Set();
            }
            sets[key].insert(fields[0] || '0');
          } else {
            if (! counters[key]) {
              counters[key] = 0;
            }
            counters[key] += Number(fields[0] || 1) * (1 / sampleRate);
          }
        }
      }

      stats.messages.last_msg_seen = Math.round(new Date().getTime() / 1000);
    };

    // If config.servers isn't specified, use the top-level config for backwards-compatibility
    const server_config = config.servers || [config];
    for (let i = 0; i < server_config.length; i++) {
      // The default server is UDP
      const server = server_config[i].server || './servers/udp';
      startServer(server_config[i], server, handlePacket);
    }

    mgmt_server.start(
      config,
      function(cmd, parameters, stream) {
        switch(cmd) {
          case "help":
            stream.write("Commands: stats, counters, timers, gauges, delcounters, deltimers, delgauges, health, config, quit\n\n");
            break;

          case "config":
            helpers.writeConfig(config, stream);
            break;

          case "health":
            if (parameters.length > 0) {
              const cmdaction = parameters[0].toLowerCase();
              if (cmdaction === 'up') {
                healthStatus = 'up';
              } else if (cmdaction === 'down') {
                healthStatus = 'down';
              }
            }
            stream.write("health: " + healthStatus + "\n");
            break;

          case "stats":
            const now    = Math.round(new Date().getTime() / 1000);
            const uptime = now - startup_time;

            stream.write("uptime: " + uptime + "\n");

            const stat_writer = function(group, metric, val) {
              let delta;

              if (metric.match("^last_")) {
                delta = now - val;
              }
              else {
                delta = val;
              }

              stream.write(group + "." + metric + ": " + delta + "\n");
            };

            // Loop through the base stats
            for (const group in stats) {
              for (const metric in stats[group]) {
                stat_writer(group, metric, stats[group][metric]);
              }
            }

            backendEvents.once('status', function() {
              stream.write("END\n\n");
            });

            // Let each backend contribute its status
            backendEvents.emit('status', function(err, name, stat, val) {
              if (err) {
                l.log("Failed to read stats for backend " +
                        name + ": " + err);
              } else {
                stat_writer(name, stat, val);
              }
            });

            break;

          case "counters":
            stream.write(util.inspect(counters) + "\n");
            stream.write("END\n\n");
            break;

          case "timers":
            stream.write(util.inspect(timers) + "\n");
            stream.write("END\n\n");
            break;

          case "gauges":
            stream.write(util.inspect(gauges) + "\n");
            stream.write("END\n\n");
            break;

          case "delcounters":
            mgmt.delete_stats(counters, parameters, stream);
            break;

          case "deltimers":
            mgmt.delete_stats(timers, parameters, stream);
            break;

          case "delgauges":
            mgmt.delete_stats(gauges, parameters, stream);
            break;

          case "quit":
            stream.end();
            break;

          default:
            stream.write("ERROR\n");
            break;
        }
      },
      function(err, stream) {
        l.log('MGMT: Caught ' + err +', Moving on', 'WARNING');
      }
    );

    serversLoaded = true;
    util.log("server is up", "INFO");

    pctThreshold = config.percentThreshold || 90;
    if (!Array.isArray(pctThreshold)) {
      pctThreshold = [ pctThreshold ]; // listify percentiles so single values work the same
    }

    flushInterval = Number(config.flushInterval || 10000);
    config.flushInterval = flushInterval;

    if (config.backends) {
      for (let j = 0; j < config.backends.length; j++) {
        loadBackend(config, config.backends[j]);
      }
    } else {
      // The default backend is graphite
      loadBackend(config, './backends/graphite');
    }

    // Setup the flush timer
    const flushInt = setTimeout(flushMetrics, getFlushTimeout(flushInterval));

    if (keyFlushInterval > 0) {
      const keyFlushPercent = Number((config.keyFlush && config.keyFlush.percent) || 100);
      const keyFlushLog = config.keyFlush && config.keyFlush.log;

      keyFlushInt = setInterval(function () {
        const sortedKeys = [];

        for (const key in keyCounter) {
          sortedKeys.push([key, keyCounter[key]]);
        }

        sortedKeys.sort(function(a, b) { return b[1] - a[1]; });

        let logMessage = "";
        const timeString = (new Date()) + "";

        // only show the top "keyFlushPercent" keys
        for (let i = 0, e = sortedKeys.length * (keyFlushPercent / 100); i < e; i++) {
          logMessage += timeString + " count=" + sortedKeys[i][1] + " key=" + sortedKeys[i][0] + "\n";
        }

        if (keyFlushLog) {
          const logFile = fs.createWriteStream(keyFlushLog, {flags: 'a+'});
          logFile.write(logMessage);
          logFile.end();
        } else {
          process.stdout.write(logMessage);
        }

        // clear the counter
        keyCounter = {};
      }, keyFlushInterval);
    }
  }
});

process.on('exit', function () {
  flushMetrics();
});
back to top