瀏覽代碼

swarm/network: fix chunk integrity checks (#3665)

* swarm/network: integrity on incoming known chunks
* swarm/network: fix integrity check for incoming chunks
* swarm/storage: imrpoved integrity checking on chunks
* dbstore panics on corrupt chunk entry an prompts user to run cleandb
* memstore adds logging for garbage collection
* dbstore refactor item delete. correct partial deletes in Get
* cmd/swarm: added cleandb subcommand
Viktor Trón 8 年之前
父節點
當前提交
e23e86921b
共有 6 個文件被更改,包括 125 次插入21 次删除
  1. 39 0
      cmd/swarm/cleandb.go
  2. 9 0
      cmd/swarm/main.go
  3. 15 9
      swarm/network/depo.go
  4. 3 3
      swarm/network/syncer.go
  5. 52 9
      swarm/storage/dbstore.go
  6. 7 0
      swarm/storage/memstore.go

+ 39 - 0
cmd/swarm/cleandb.go

@@ -0,0 +1,39 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package main
+
+import (
+	"log"
+
+	"github.com/ethereum/go-ethereum/swarm/storage"
+	"gopkg.in/urfave/cli.v1"
+)
+
+func cleandb(ctx *cli.Context) {
+	args := ctx.Args()
+	if len(args) != 1 {
+		log.Fatal("need path to chunks database as the first and only argument")
+	}
+
+	chunkDbPath := args[0]
+	hash := storage.MakeHashFunc("SHA3")
+	dbStore, err := storage.NewDbStore(chunkDbPath, hash, 10000000, 0)
+	if err != nil {
+		log.Fatalf("cannot initialise dbstore: %v", err)
+	}
+	dbStore.Cleanup()
+}

+ 9 - 0
cmd/swarm/main.go

@@ -191,6 +191,15 @@ Removes a path from the manifest
 				},
 			},
 		},
+		{
+			Action:    cleandb,
+			Name:      "cleandb",
+			Usage:     "Cleans database of corrupted entries",
+			ArgsUsage: " ",
+			Description: `
+Cleans database of corrupted entries.
+`,
+		},
 	}
 
 	app.Flags = []cli.Flag{

+ 15 - 9
swarm/network/depo.go

@@ -99,6 +99,7 @@ func (self *Depo) HandleDeliveryRequestMsg(req *deliveryRequestMsgData, p *peer)
 // if key found locally, return. otherwise
 // remote is untrusted, so hash is verified and chunk passed on to NetStore
 func (self *Depo) HandleStoreRequestMsg(req *storeRequestMsgData, p *peer) {
+	var islocal bool
 	req.from = p
 	chunk, err := self.localStore.Get(req.Key)
 	switch {
@@ -110,27 +111,32 @@ func (self *Depo) HandleStoreRequestMsg(req *storeRequestMsgData, p *peer) {
 
 	case chunk.SData == nil:
 		// found chunk in memory store, needs the data, validate now
-		hasher := self.hashfunc()
-		hasher.Write(req.SData)
-		if !bytes.Equal(hasher.Sum(nil), req.Key) {
-			// data does not validate, ignore
-			// TODO: peer should be penalised/dropped?
-			glog.V(logger.Warn).Infof("Depo.HandleStoreRequest: chunk invalid. store request ignored: %v", req)
-			return
-		}
 		glog.V(logger.Detail).Infof("Depo.HandleStoreRequest: %v. request entry found", req)
 
 	default:
 		// data is found, store request ignored
 		// this should update access count?
 		glog.V(logger.Detail).Infof("Depo.HandleStoreRequest: %v found locally. ignore.", req)
+		islocal = true
+		//return
+	}
+	
+	hasher := self.hashfunc()
+	hasher.Write(req.SData)
+	if !bytes.Equal(hasher.Sum(nil), req.Key) {
+		// data does not validate, ignore
+		// TODO: peer should be penalised/dropped?
+		glog.V(logger.Warn).Infof("Depo.HandleStoreRequest: chunk invalid. store request ignored: %v", req)
 		return
 	}
 
+	if islocal {
+		return
+	}
 	// update chunk with size and data
 	chunk.SData = req.SData // protocol validates that SData is minimum 9 bytes long (int64 size  + at least one byte of data)
 	chunk.Size = int64(binary.LittleEndian.Uint64(req.SData[0:8]))
-	glog.V(logger.Detail).Infof("delivery of %p from %v", chunk, p)
+	glog.V(logger.Detail).Infof("delivery of %v from %v", chunk, p)
 	chunk.Source = p
 	self.netStore.Put(chunk)
 }

+ 3 - 3
swarm/network/syncer.go

@@ -438,7 +438,7 @@ LOOP:
 			for priority = High; priority >= 0; priority-- {
 				// the first priority channel that is non-empty will be assigned to keys
 				if len(self.keys[priority]) > 0 {
-					glog.V(logger.Detail).Infof("syncer[%v]: reading request with	 priority %v", self.key.Log(), priority)
+					glog.V(logger.Detail).Infof("syncer[%v]: reading request with	priority %v", self.key.Log(), priority)
 					keys = self.keys[priority]
 					break PRIORITIES
 				}
@@ -551,10 +551,10 @@ LOOP:
 		}
 		if sreq, err := self.newSyncRequest(req, priority); err == nil {
 			// extract key from req
-			glog.V(logger.Detail).Infof("syncer(priority %v): request %v (synced = %v)", self.key.Log(), priority, req, state.Synced)
+			glog.V(logger.Detail).Infof("syncer[%v]: (priority %v): request %v (synced = %v)", self.key.Log(), priority, req, state.Synced)
 			unsynced = append(unsynced, sreq)
 		} else {
-			glog.V(logger.Warn).Infof("syncer(priority %v): error creating request for %v: %v)", self.key.Log(), priority, req, state.Synced, err)
+			glog.V(logger.Warn).Infof("syncer[%v]: (priority %v): error creating request for %v: %v)", self.key.Log(), priority, req, state.Synced, err)
 		}
 
 	}

+ 52 - 9
swarm/storage/dbstore.go

@@ -252,12 +252,7 @@ func (s *DbStore) collectGarbage(ratio float32) {
 	// actual gc
 	for i := 0; i < gcnt; i++ {
 		if s.gcArray[i].value <= cutval {
-			batch := new(leveldb.Batch)
-			batch.Delete(s.gcArray[i].idxKey)
-			batch.Delete(getDataKey(s.gcArray[i].idx))
-			s.entryCnt--
-			batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt))
-			s.db.Write(batch)
+			s.delete(s.gcArray[i].idx, s.gcArray[i].idxKey)
 		}
 	}
 
@@ -266,6 +261,52 @@ func (s *DbStore) collectGarbage(ratio float32) {
 	s.db.Put(keyGCPos, s.gcPos)
 }
 
+func (s *DbStore) Cleanup() {
+	//Iterates over the database and checks that there are no faulty chunks
+	it := s.db.NewIterator()
+	startPosition := []byte{kpIndex}
+	it.Seek(startPosition)
+	var key []byte
+	var errorsFound, total int
+	for it.Valid() {
+		key = it.Key()
+		if (key == nil) || (key[0] != kpIndex) {
+			break
+		}
+		total++
+		var index dpaDBIndex
+		decodeIndex(it.Value(), &index)
+
+		data, err := s.db.Get(getDataKey(index.Idx))
+		if err != nil {
+			glog.V(logger.Warn).Infof("Chunk %x found but could not be accessed: %v", key[:], err)
+			s.delete(index.Idx, getIndexKey(key[1:]))
+			errorsFound++
+		} else {
+			hasher := s.hashfunc()
+			hasher.Write(data)
+			hash := hasher.Sum(nil)
+			if !bytes.Equal(hash, key[1:]) {
+				glog.V(logger.Warn).Infof("Found invalid chunk. Hash mismatch. hash=%x, key=%x", hash, key[:])
+				s.delete(index.Idx, getIndexKey(key[1:]))
+				errorsFound++
+			}
+		}
+		it.Next()
+	}
+	it.Release()
+	glog.V(logger.Warn).Infof("Found %v errors out of %v entries", errorsFound, total)
+}
+
+func (s *DbStore) delete(idx uint64, idxKey []byte) {
+	batch := new(leveldb.Batch)
+	batch.Delete(idxKey)
+	batch.Delete(getDataKey(idx))
+	s.entryCnt--
+	batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt))
+	s.db.Write(batch)
+}
+
 func (s *DbStore) Counter() uint64 {
 	s.lock.Lock()
 	defer s.lock.Unlock()
@@ -283,6 +324,7 @@ func (s *DbStore) Put(chunk *Chunk) {
 		if chunk.dbStored != nil {
 			close(chunk.dbStored)
 		}
+		glog.V(logger.Detail).Infof("Storing to DB: chunk already exists, only update access")
 		return // already exists, only update access
 	}
 
@@ -348,6 +390,8 @@ func (s *DbStore) Get(key Key) (chunk *Chunk, err error) {
 		var data []byte
 		data, err = s.db.Get(getDataKey(index.Idx))
 		if err != nil {
+			glog.V(logger.Detail).Infof("DBStore: Chunk %v found but could not be accessed: %v", key.Log(), err)
+			s.delete(index.Idx, getIndexKey(key))
 			return
 		}
 
@@ -355,9 +399,8 @@ func (s *DbStore) Get(key Key) (chunk *Chunk, err error) {
 		hasher.Write(data)
 		hash := hasher.Sum(nil)
 		if !bytes.Equal(hash, key) {
-			s.db.Delete(getDataKey(index.Idx))
-			err = fmt.Errorf("invalid chunk. hash=%x, key=%v", hash, key[:])
-			return
+			s.delete(index.Idx, getIndexKey(key))
+			panic("Invalid Chunk in Database. Please repair with command: 'swarm cleandb'")
 		}
 
 		chunk = &Chunk{

+ 7 - 0
swarm/storage/memstore.go

@@ -20,6 +20,9 @@ package storage
 
 import (
 	"sync"
+
+	"github.com/ethereum/go-ethereum/logger"
+	"github.com/ethereum/go-ethereum/logger/glog"
 )
 
 const (
@@ -284,7 +287,11 @@ func (s *MemStore) removeOldest() {
 	}
 
 	if node.entry.dbStored != nil {
+		glog.V(logger.Detail).Infof("Memstore Clean: Waiting for chunk %v to be saved", node.entry.Key.Log())
 		<-node.entry.dbStored
+		glog.V(logger.Detail).Infof("Memstore Clean: Chunk %v saved to DBStore. Ready to clear from mem.", node.entry.Key.Log())
+	} else {
+		glog.V(logger.Detail).Infof("Memstore Clean: Chunk %v already in DB. Ready to delete.", node.entry.Key.Log())
 	}
 
 	if node.entry.SData != nil {