Skip to content

Commit

Permalink
Support fetching multiple modules in one scrape (#945)
Browse files Browse the repository at this point in the history
* Implemented a feature to fetch from multiple modules
* Changed snmp_collection_duration_seconds from Summary to Histogram

---------

Signed-off-by: Kakuya Ando <[email protected]>
  • Loading branch information
servak authored Aug 23, 2023
1 parent 2ae8cfa commit f4d7c3f
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 57 deletions.
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,25 @@ Note that [URL encoding](https://en.wikipedia.org/wiki/URL_encoding) should be u
to the `:` and `/` characters. Prometheus encodes query parameters automatically and manual encoding
is not necessary within the Prometheus configuration file.

## Multi-Module Handling
The multi-module functionality allows you to specify multiple modules, enabling the retrieval of information from several modules in a single scrape.
The concurrency can be specified using the snmp-exporter option `--snmp.module-concurrency` (the default is 1).

Note: This implementation does not perform any de-duplication of walks between different modules.

There are two ways to specify multiple modules. You can either separate them with a comma or define multiple params_module.
The URLs would look like this:

For comma separation:
```
http://localhost:9116/snmp?module=if_mib,arista_sw&target=192.0.0.8
```

For multiple params_module:
```
http://localhost:9116/snmp?module=if_mib&module=arista_sw&target=192.0.0.8
```

## Configuration

The default configuration file name is `snmp.yml` and should not be edited
Expand Down
103 changes: 83 additions & 20 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"time"

"github.com/alecthomas/kingpin/v2"
Expand All @@ -42,6 +43,14 @@ var (
float64Mantissa uint64 = 9007199254740992
wrapCounters = kingpin.Flag("snmp.wrap-large-counters", "Wrap 64-bit counters to avoid floating point rounding.").Default("true").Bool()
srcAddress = kingpin.Flag("snmp.source-address", "Source address to send snmp from in the format 'address:port' to use when connecting targets. If the port parameter is empty or '0', as in '127.0.0.1:' or '[::1]:0', a source port number is automatically (random) chosen.").Default("").String()

snmpDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "snmp_collection_duration_seconds",
Help: "Duration of collections by the SNMP exporter",
},
[]string{"auth", "module"},
)
)

// Types preceded by an enum with their actual type.
Expand Down Expand Up @@ -80,6 +89,14 @@ func listToOid(l []int) string {
return strings.Join(result, ".")
}

func InitModuleMetrics(auths map[string]*config.Auth, modules map[string]*config.Module) {
for auth := range auths {
for module := range modules {
snmpDuration.WithLabelValues(auth, module)
}
}
}

type ScrapeResults struct {
pdus []gosnmp.SnmpPDU
packets uint64
Expand Down Expand Up @@ -355,13 +372,27 @@ type internalMetrics struct {
snmpRetries prometheus.Counter
}

type NamedModule struct {
*config.Module
name string
}

func NewNamedModule(name string, module *config.Module) *NamedModule {
return &NamedModule{
Module: module,
name: name,
}
}

type Collector struct {
ctx context.Context
target string
auth *config.Auth
module *config.Module
logger log.Logger
metrics internalMetrics
ctx context.Context
target string
auth *config.Auth
authName string
modules []*NamedModule
logger log.Logger
metrics internalMetrics
concurrency int
}

func newInternalMetrics(reg prometheus.Registerer) internalMetrics {
Expand Down Expand Up @@ -403,47 +434,48 @@ func newInternalMetrics(reg prometheus.Registerer) internalMetrics {
}
}

func New(ctx context.Context, target string, auth *config.Auth, module *config.Module, logger log.Logger, reg prometheus.Registerer) *Collector {
func New(ctx context.Context, target, authName string, auth *config.Auth, modules []*NamedModule, logger log.Logger, reg prometheus.Registerer, conc int) *Collector {
internalMetrics := newInternalMetrics(reg)
return &Collector{ctx: ctx, target: target, auth: auth, module: module, logger: logger, metrics: internalMetrics}
return &Collector{ctx: ctx, target: target, authName: authName, auth: auth, modules: modules, logger: logger, metrics: internalMetrics, concurrency: conc}
}

// Describe implements Prometheus.Collector.
func (c Collector) Describe(ch chan<- *prometheus.Desc) {
ch <- prometheus.NewDesc("dummy", "dummy", nil, nil)
}

// Collect implements Prometheus.Collector.
func (c Collector) Collect(ch chan<- prometheus.Metric) {
func (c Collector) collect(ch chan<- prometheus.Metric, module *NamedModule) {
logger := log.With(c.logger, "module", module.name)
start := time.Now()
results, err := ScrapeTarget(c.ctx, c.target, c.auth, c.module, c.logger, c.metrics)
results, err := ScrapeTarget(c.ctx, c.target, c.auth, module.Module, logger, c.metrics)
moduleLabel := prometheus.Labels{"module": module.name}
if err != nil {
level.Info(c.logger).Log("msg", "Error scraping target", "err", err)
ch <- prometheus.NewInvalidMetric(prometheus.NewDesc("snmp_error", "Error scraping target", nil, nil), err)
level.Info(logger).Log("msg", "Error scraping target", "err", err)
ch <- prometheus.NewInvalidMetric(prometheus.NewDesc("snmp_error", "Error scraping target", nil, moduleLabel), err)
return
}
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc("snmp_scrape_walk_duration_seconds", "Time SNMP walk/bulkwalk took.", nil, nil),
prometheus.NewDesc("snmp_scrape_walk_duration_seconds", "Time SNMP walk/bulkwalk took.", nil, moduleLabel),
prometheus.GaugeValue,
time.Since(start).Seconds())
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc("snmp_scrape_packets_sent", "Packets sent for get, bulkget, and walk; including retries.", nil, nil),
prometheus.NewDesc("snmp_scrape_packets_sent", "Packets sent for get, bulkget, and walk; including retries.", nil, moduleLabel),
prometheus.GaugeValue,
float64(results.packets))
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc("snmp_scrape_packets_retried", "Packets retried for get, bulkget, and walk.", nil, nil),
prometheus.NewDesc("snmp_scrape_packets_retried", "Packets retried for get, bulkget, and walk.", nil, moduleLabel),
prometheus.GaugeValue,
float64(results.retries))
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc("snmp_scrape_pdus_returned", "PDUs returned from get, bulkget, and walk.", nil, nil),
prometheus.NewDesc("snmp_scrape_pdus_returned", "PDUs returned from get, bulkget, and walk.", nil, moduleLabel),
prometheus.GaugeValue,
float64(len(results.pdus)))
oidToPdu := make(map[string]gosnmp.SnmpPDU, len(results.pdus))
for _, pdu := range results.pdus {
oidToPdu[pdu.Name[1:]] = pdu
}

metricTree := buildMetricTree(c.module.Metrics)
metricTree := buildMetricTree(module.Metrics)
// Look for metrics that match each pdu.
PduLoop:
for oid, pdu := range oidToPdu {
Expand All @@ -457,7 +489,7 @@ PduLoop:
}
if head.metric != nil {
// Found a match.
samples := pduToSamples(oidList[i+1:], &pdu, head.metric, oidToPdu, c.logger, c.metrics)
samples := pduToSamples(oidList[i+1:], &pdu, head.metric, oidToPdu, logger, c.metrics)
for _, sample := range samples {
ch <- sample
}
Expand All @@ -466,11 +498,42 @@ PduLoop:
}
}
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc("snmp_scrape_duration_seconds", "Total SNMP time scrape took (walk and processing).", nil, nil),
prometheus.NewDesc("snmp_scrape_duration_seconds", "Total SNMP time scrape took (walk and processing).", nil, moduleLabel),
prometheus.GaugeValue,
time.Since(start).Seconds())
}

// Collect implements Prometheus.Collector.
func (c Collector) Collect(ch chan<- prometheus.Metric) {
wg := sync.WaitGroup{}
workerCount := c.concurrency
if workerCount < 1 {
workerCount = 1
}
workerChan := make(chan *NamedModule)
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for m := range workerChan {
logger := log.With(c.logger, "module", m.name)
level.Debug(logger).Log("msg", "Starting scrape")
start := time.Now()
c.collect(ch, m)
duration := time.Since(start).Seconds()
level.Debug(logger).Log("msg", "Finished scrape", "duration_seconds", duration)
snmpDuration.WithLabelValues(c.authName, m.name).Observe(duration)
}
}()
}

for _, module := range c.modules {
workerChan <- module
}
close(workerChan)
wg.Wait()
}

func getPduValue(pdu *gosnmp.SnmpPDU) float64 {
switch pdu.Type {
case gosnmp.Counter64:
Expand Down
73 changes: 36 additions & 37 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import (
_ "net/http/pprof"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"

"github.com/alecthomas/kingpin/v2"
"github.com/go-kit/log"
Expand All @@ -43,20 +43,14 @@ import (
var (
configFile = kingpin.Flag("config.file", "Path to configuration file.").Default("snmp.yml").String()
dryRun = kingpin.Flag("dry-run", "Only verify configuration is valid and exit.").Default("false").Bool()
concurrency = kingpin.Flag("snmp.module-concurrency", "The number of modules to fetch concurrently per scrape").Default("1").Int()
metricsPath = kingpin.Flag(
"web.telemetry-path",
"Path under which to expose metrics.",
).Default("/metrics").String()
toolkitFlags = webflag.AddFlags(kingpin.CommandLine, ":9116")

// Metrics about the SNMP exporter itself.
snmpDuration = promauto.NewSummaryVec(
prometheus.SummaryOpts{
Name: "snmp_collection_duration_seconds",
Help: "Duration of collections by the SNMP exporter",
},
[]string{"auth", "module"},
)
snmpRequestErrors = promauto.NewCounter(
prometheus.CounterOpts{
Name: "snmp_request_errors_total",
Expand Down Expand Up @@ -94,44 +88,50 @@ func handler(w http.ResponseWriter, r *http.Request, logger log.Logger) {
authName = "public_v2"
}

moduleName := query.Get("module")
if len(query["module"]) > 1 {
http.Error(w, "'module' parameter must only be specified once", http.StatusBadRequest)
snmpRequestErrors.Inc()
return
queryModule := query["module"]
if len(queryModule) == 0 {
queryModule = append(queryModule, "if_mib")
}
if moduleName == "" {
moduleName = "if_mib"
uniqueM := make(map[string]bool)
var modules []string
for _, qm := range queryModule {
for _, m := range strings.Split(qm, ",") {
if m == "" {
continue
}
if _, ok := uniqueM[m]; !ok {
uniqueM[m] = true
modules = append(modules, m)
}
}
}

sc.RLock()
auth, authOk := sc.C.Auths[authName]
module, moduleOk := sc.C.Modules[moduleName]
sc.RUnlock()
if !authOk {
sc.RUnlock()
http.Error(w, fmt.Sprintf("Unknown auth '%s'", authName), http.StatusBadRequest)
snmpRequestErrors.Inc()
return
}
if !moduleOk {
http.Error(w, fmt.Sprintf("Unknown module '%s'", moduleName), http.StatusBadRequest)
snmpRequestErrors.Inc()
return
var nmodules []*collector.NamedModule
for _, m := range modules {
module, moduleOk := sc.C.Modules[m]
if !moduleOk {
sc.RUnlock()
http.Error(w, fmt.Sprintf("Unknown module '%s'", m), http.StatusBadRequest)
snmpRequestErrors.Inc()
return
}
nmodules = append(nmodules, collector.NewNamedModule(m, module))
}

logger = log.With(logger, "auth", authName, "module", moduleName, "target", target)
level.Debug(logger).Log("msg", "Starting scrape")

start := time.Now()
sc.RUnlock()
logger = log.With(logger, "auth", authName, "target", target)
registry := prometheus.NewRegistry()
c := collector.New(r.Context(), target, auth, module, logger, registry)
c := collector.New(r.Context(), target, authName, auth, nmodules, logger, registry, *concurrency)
registry.MustRegister(c)
// Delegate http serving to Prometheus client library, which will call collector.Collect.
h := promhttp.HandlerFor(registry, promhttp.HandlerOpts{})
h.ServeHTTP(w, r)
duration := time.Since(start).Seconds()
snmpDuration.WithLabelValues(authName, moduleName).Observe(duration)
level.Debug(logger).Log("msg", "Finished scrape", "duration_seconds", duration)
}

func updateConfiguration(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -160,11 +160,7 @@ func (sc *SafeConfig) ReloadConfig(configFile string) (err error) {
sc.Lock()
sc.C = conf
// Initialize metrics.
for auth := range sc.C.Auths {
for module := range sc.C.Modules {
snmpDuration.WithLabelValues(auth, module)
}
}
collector.InitModuleMetrics(sc.C.Auths, sc.C.Modules)
sc.Unlock()
return nil
}
Expand All @@ -176,8 +172,11 @@ func main() {
kingpin.HelpFlag.Short('h')
kingpin.Parse()
logger := promlog.New(promlogConfig)
if *concurrency < 1 {
*concurrency = 1
}

level.Info(logger).Log("msg", "Starting snmp_exporter", "version", version.Info())
level.Info(logger).Log("msg", "Starting snmp_exporter", "version", version.Info(), "concurrency", concurrency)
level.Info(logger).Log("build_context", version.BuildContext())

prometheus.MustRegister(version.NewCollector("snmp_exporter"))
Expand Down

0 comments on commit f4d7c3f

Please sign in to comment.