Jelajahi Sumber

swarm/shed: add metrics to each shed db (#18277)

* swarm/shed: add metrics to each shed db

* swarm/shed: push metrics prefix up

* swarm/shed: rename prefix to metricsPrefix

* swarm/shed: unexport Meter, remove Mutex for quit channel
Anton Evangelatov 7 tahun lalu
induk
melakukan
ebbf3dfafb
3 mengubah file dengan 206 tambahan dan 7 penghapusan
  1. 202 3
      swarm/shed/db.go
  2. 3 3
      swarm/shed/db_test.go
  3. 1 1
      swarm/shed/example_store_test.go

+ 202 - 3
swarm/shed/db.go

@@ -23,14 +23,23 @@
 package shed
 
 import (
+	"errors"
+	"fmt"
+	"strconv"
+	"strings"
+	"time"
+
 	"github.com/ethereum/go-ethereum/metrics"
+	"github.com/ethereum/go-ethereum/swarm/log"
 	"github.com/syndtr/goleveldb/leveldb"
 	"github.com/syndtr/goleveldb/leveldb/iterator"
 	"github.com/syndtr/goleveldb/leveldb/opt"
 )
 
-// The limit for LevelDB OpenFilesCacheCapacity.
-const openFileLimit = 128
+const (
+	openFileLimit              = 128 // The limit for LevelDB OpenFilesCacheCapacity.
+	writePauseWarningThrottler = 1 * time.Minute
+)
 
 // DB provides abstractions over LevelDB in order to
 // implement complex structures using fields and ordered indexes.
@@ -38,11 +47,22 @@ const openFileLimit = 128
 // information about naming and types.
 type DB struct {
 	ldb *leveldb.DB
+
+	compTimeMeter    metrics.Meter // Meter for measuring the total time spent in database compaction
+	compReadMeter    metrics.Meter // Meter for measuring the data read during compaction
+	compWriteMeter   metrics.Meter // Meter for measuring the data written during compaction
+	writeDelayNMeter metrics.Meter // Meter for measuring the write delay number due to database compaction
+	writeDelayMeter  metrics.Meter // Meter for measuring the write delay duration due to database compaction
+	diskReadMeter    metrics.Meter // Meter for measuring the effective amount of data read
+	diskWriteMeter   metrics.Meter // Meter for measuring the effective amount of data written
+
+	quitChan chan chan error // Quit channel to stop the metrics collection before closing the database
 }
 
 // NewDB constructs a new DB and validates the schema
 // if it exists in database on the given path.
-func NewDB(path string) (db *DB, err error) {
+// metricsPrefix is used for metrics collection for the given DB.
+func NewDB(path string, metricsPrefix string) (db *DB, err error) {
 	ldb, err := leveldb.OpenFile(path, &opt.Options{
 		OpenFilesCacheCapacity: openFileLimit,
 	})
@@ -66,6 +86,15 @@ func NewDB(path string) (db *DB, err error) {
 			return nil, err
 		}
 	}
+
+	// Configure meters for DB
+	db.configure(metricsPrefix)
+
+	// Create a quit channel for the periodic metrics collector and run it
+	db.quitChan = make(chan chan error)
+
+	go db.meter(10 * time.Second)
+
 	return db, nil
 }
 
@@ -126,5 +155,175 @@ func (db *DB) WriteBatch(batch *leveldb.Batch) (err error) {
 
 // Close closes LevelDB database.
 func (db *DB) Close() (err error) {
+	close(db.quitChan)
 	return db.ldb.Close()
 }
+
+// Configure configures the database metrics collectors
+func (db *DB) configure(prefix string) {
+	// Initialize all the metrics collector at the requested prefix
+	db.compTimeMeter = metrics.NewRegisteredMeter(prefix+"compact/time", nil)
+	db.compReadMeter = metrics.NewRegisteredMeter(prefix+"compact/input", nil)
+	db.compWriteMeter = metrics.NewRegisteredMeter(prefix+"compact/output", nil)
+	db.diskReadMeter = metrics.NewRegisteredMeter(prefix+"disk/read", nil)
+	db.diskWriteMeter = metrics.NewRegisteredMeter(prefix+"disk/write", nil)
+	db.writeDelayMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/duration", nil)
+	db.writeDelayNMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/counter", nil)
+}
+
+func (db *DB) meter(refresh time.Duration) {
+	// Create the counters to store current and previous compaction values
+	compactions := make([][]float64, 2)
+	for i := 0; i < 2; i++ {
+		compactions[i] = make([]float64, 3)
+	}
+	// Create storage for iostats.
+	var iostats [2]float64
+
+	// Create storage and warning log tracer for write delay.
+	var (
+		delaystats      [2]int64
+		lastWritePaused time.Time
+	)
+
+	var (
+		errc chan error
+		merr error
+	)
+
+	// Iterate ad infinitum and collect the stats
+	for i := 1; errc == nil && merr == nil; i++ {
+		// Retrieve the database stats
+		stats, err := db.ldb.GetProperty("leveldb.stats")
+		if err != nil {
+			log.Error("Failed to read database stats", "err", err)
+			merr = err
+			continue
+		}
+		// Find the compaction table, skip the header
+		lines := strings.Split(stats, "\n")
+		for len(lines) > 0 && strings.TrimSpace(lines[0]) != "Compactions" {
+			lines = lines[1:]
+		}
+		if len(lines) <= 3 {
+			log.Error("Compaction table not found")
+			merr = errors.New("compaction table not found")
+			continue
+		}
+		lines = lines[3:]
+
+		// Iterate over all the table rows, and accumulate the entries
+		for j := 0; j < len(compactions[i%2]); j++ {
+			compactions[i%2][j] = 0
+		}
+		for _, line := range lines {
+			parts := strings.Split(line, "|")
+			if len(parts) != 6 {
+				break
+			}
+			for idx, counter := range parts[3:] {
+				value, err := strconv.ParseFloat(strings.TrimSpace(counter), 64)
+				if err != nil {
+					log.Error("Compaction entry parsing failed", "err", err)
+					merr = err
+					continue
+				}
+				compactions[i%2][idx] += value
+			}
+		}
+		// Update all the requested meters
+		if db.compTimeMeter != nil {
+			db.compTimeMeter.Mark(int64((compactions[i%2][0] - compactions[(i-1)%2][0]) * 1000 * 1000 * 1000))
+		}
+		if db.compReadMeter != nil {
+			db.compReadMeter.Mark(int64((compactions[i%2][1] - compactions[(i-1)%2][1]) * 1024 * 1024))
+		}
+		if db.compWriteMeter != nil {
+			db.compWriteMeter.Mark(int64((compactions[i%2][2] - compactions[(i-1)%2][2]) * 1024 * 1024))
+		}
+
+		// Retrieve the write delay statistic
+		writedelay, err := db.ldb.GetProperty("leveldb.writedelay")
+		if err != nil {
+			log.Error("Failed to read database write delay statistic", "err", err)
+			merr = err
+			continue
+		}
+		var (
+			delayN        int64
+			delayDuration string
+			duration      time.Duration
+			paused        bool
+		)
+		if n, err := fmt.Sscanf(writedelay, "DelayN:%d Delay:%s Paused:%t", &delayN, &delayDuration, &paused); n != 3 || err != nil {
+			log.Error("Write delay statistic not found")
+			merr = err
+			continue
+		}
+		duration, err = time.ParseDuration(delayDuration)
+		if err != nil {
+			log.Error("Failed to parse delay duration", "err", err)
+			merr = err
+			continue
+		}
+		if db.writeDelayNMeter != nil {
+			db.writeDelayNMeter.Mark(delayN - delaystats[0])
+		}
+		if db.writeDelayMeter != nil {
+			db.writeDelayMeter.Mark(duration.Nanoseconds() - delaystats[1])
+		}
+		// If a warning that db is performing compaction has been displayed, any subsequent
+		// warnings will be withheld for one minute not to overwhelm the user.
+		if paused && delayN-delaystats[0] == 0 && duration.Nanoseconds()-delaystats[1] == 0 &&
+			time.Now().After(lastWritePaused.Add(writePauseWarningThrottler)) {
+			log.Warn("Database compacting, degraded performance")
+			lastWritePaused = time.Now()
+		}
+		delaystats[0], delaystats[1] = delayN, duration.Nanoseconds()
+
+		// Retrieve the database iostats.
+		ioStats, err := db.ldb.GetProperty("leveldb.iostats")
+		if err != nil {
+			log.Error("Failed to read database iostats", "err", err)
+			merr = err
+			continue
+		}
+		var nRead, nWrite float64
+		parts := strings.Split(ioStats, " ")
+		if len(parts) < 2 {
+			log.Error("Bad syntax of ioStats", "ioStats", ioStats)
+			merr = fmt.Errorf("bad syntax of ioStats %s", ioStats)
+			continue
+		}
+		if n, err := fmt.Sscanf(parts[0], "Read(MB):%f", &nRead); n != 1 || err != nil {
+			log.Error("Bad syntax of read entry", "entry", parts[0])
+			merr = err
+			continue
+		}
+		if n, err := fmt.Sscanf(parts[1], "Write(MB):%f", &nWrite); n != 1 || err != nil {
+			log.Error("Bad syntax of write entry", "entry", parts[1])
+			merr = err
+			continue
+		}
+		if db.diskReadMeter != nil {
+			db.diskReadMeter.Mark(int64((nRead - iostats[0]) * 1024 * 1024))
+		}
+		if db.diskWriteMeter != nil {
+			db.diskWriteMeter.Mark(int64((nWrite - iostats[1]) * 1024 * 1024))
+		}
+		iostats[0], iostats[1] = nRead, nWrite
+
+		// Sleep a bit, then repeat the stats collection
+		select {
+		case errc = <-db.quitChan:
+			// Quit requesting, stop hammering the database
+		case <-time.After(refresh):
+			// Timeout, gather a new set of stats
+		}
+	}
+
+	if errc == nil {
+		errc = <-db.quitChan
+	}
+	errc <- merr
+}

+ 3 - 3
swarm/shed/db_test.go

@@ -55,7 +55,7 @@ func TestDB_persistence(t *testing.T) {
 	}
 	defer os.RemoveAll(dir)
 
-	db, err := NewDB(dir)
+	db, err := NewDB(dir, "")
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -73,7 +73,7 @@ func TestDB_persistence(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	db2, err := NewDB(dir)
+	db2, err := NewDB(dir, "")
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -101,7 +101,7 @@ func newTestDB(t *testing.T) (db *DB, cleanupFunc func()) {
 		t.Fatal(err)
 	}
 	cleanupFunc = func() { os.RemoveAll(dir) }
-	db, err = NewDB(dir)
+	db, err = NewDB(dir, "")
 	if err != nil {
 		cleanupFunc()
 		t.Fatal(err)

+ 1 - 1
swarm/shed/example_store_test.go

@@ -52,7 +52,7 @@ type Store struct {
 // and possible conflicts with schema from existing database is checked
 // automatically.
 func New(path string) (s *Store, err error) {
-	db, err := shed.NewDB(path)
+	db, err := shed.NewDB(path, "")
 	if err != nil {
 		return nil, err
 	}