| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868 |
- // Copyright 2016 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- // disk storage layer for the package bzz
- // DbStore implements the ChunkStore interface and is used by the FileStore as
- // persistent storage of chunks
- // it implements purging based on access count allowing for external control of
- // max capacity
- package storage
- import (
- "archive/tar"
- "bytes"
- "context"
- "encoding/binary"
- "encoding/hex"
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "sort"
- "sync"
- "github.com/ethereum/go-ethereum/metrics"
- "github.com/ethereum/go-ethereum/rlp"
- ch "github.com/ethereum/go-ethereum/swarm/chunk"
- "github.com/ethereum/go-ethereum/swarm/log"
- "github.com/ethereum/go-ethereum/swarm/storage/mock"
- "github.com/syndtr/goleveldb/leveldb"
- "github.com/syndtr/goleveldb/leveldb/opt"
- )
- const (
- gcArrayFreeRatio = 0.1
- maxGCitems = 5000 // max number of items to be gc'd per call to collectGarbage()
- )
- var (
- dbEntryCount = metrics.NewRegisteredCounter("ldbstore.entryCnt", nil)
- )
- var (
- keyIndex = byte(0)
- keyOldData = byte(1)
- keyAccessCnt = []byte{2}
- keyEntryCnt = []byte{3}
- keyDataIdx = []byte{4}
- keyData = byte(6)
- keyDistanceCnt = byte(7)
- )
- var (
- ErrDBClosed = errors.New("LDBStore closed")
- )
- type gcItem struct {
- idx uint64
- value uint64
- idxKey []byte
- po uint8
- }
- type LDBStoreParams struct {
- *StoreParams
- Path string
- Po func(Address) uint8
- }
- // NewLDBStoreParams constructs LDBStoreParams with the specified values.
- func NewLDBStoreParams(storeparams *StoreParams, path string) *LDBStoreParams {
- return &LDBStoreParams{
- StoreParams: storeparams,
- Path: path,
- Po: func(k Address) (ret uint8) { return uint8(Proximity(storeparams.BaseKey, k[:])) },
- }
- }
- type LDBStore struct {
- db *LDBDatabase
- // this should be stored in db, accessed transactionally
- entryCnt uint64 // number of items in the LevelDB
- accessCnt uint64 // ever-accumulating number increased every time we read/access an entry
- dataIdx uint64 // similar to entryCnt, but we only increment it
- capacity uint64
- bucketCnt []uint64
- hashfunc SwarmHasher
- po func(Address) uint8
- batchC chan bool
- batchesC chan struct{}
- closed bool
- batch *dbBatch
- lock sync.RWMutex
- quit chan struct{}
- // Functions encodeDataFunc is used to bypass
- // the default functionality of DbStore with
- // mock.NodeStore for testing purposes.
- encodeDataFunc func(chunk Chunk) []byte
- // If getDataFunc is defined, it will be used for
- // retrieving the chunk data instead from the local
- // LevelDB database.
- getDataFunc func(key Address) (data []byte, err error)
- }
- type dbBatch struct {
- *leveldb.Batch
- err error
- c chan struct{}
- }
- func newBatch() *dbBatch {
- return &dbBatch{Batch: new(leveldb.Batch), c: make(chan struct{})}
- }
- // TODO: Instead of passing the distance function, just pass the address from which distances are calculated
- // to avoid the appearance of a pluggable distance metric and opportunities of bugs associated with providing
- // a function different from the one that is actually used.
- func NewLDBStore(params *LDBStoreParams) (s *LDBStore, err error) {
- s = new(LDBStore)
- s.hashfunc = params.Hash
- s.quit = make(chan struct{})
- s.batchesC = make(chan struct{}, 1)
- go s.writeBatches()
- s.batch = newBatch()
- // associate encodeData with default functionality
- s.encodeDataFunc = encodeData
- s.db, err = NewLDBDatabase(params.Path)
- if err != nil {
- return nil, err
- }
- s.po = params.Po
- s.setCapacity(params.DbCapacity)
- s.bucketCnt = make([]uint64, 0x100)
- for i := 0; i < 0x100; i++ {
- k := make([]byte, 2)
- k[0] = keyDistanceCnt
- k[1] = uint8(i)
- cnt, _ := s.db.Get(k)
- s.bucketCnt[i] = BytesToU64(cnt)
- }
- data, _ := s.db.Get(keyEntryCnt)
- s.entryCnt = BytesToU64(data)
- data, _ = s.db.Get(keyAccessCnt)
- s.accessCnt = BytesToU64(data)
- data, _ = s.db.Get(keyDataIdx)
- s.dataIdx = BytesToU64(data)
- return s, nil
- }
- // NewMockDbStore creates a new instance of DbStore with
- // mockStore set to a provided value. If mockStore argument is nil,
- // this function behaves exactly as NewDbStore.
- func NewMockDbStore(params *LDBStoreParams, mockStore *mock.NodeStore) (s *LDBStore, err error) {
- s, err = NewLDBStore(params)
- if err != nil {
- return nil, err
- }
- // replace put and get with mock store functionality
- if mockStore != nil {
- s.encodeDataFunc = newMockEncodeDataFunc(mockStore)
- s.getDataFunc = newMockGetDataFunc(mockStore)
- }
- return
- }
- type dpaDBIndex struct {
- Idx uint64
- Access uint64
- }
- func BytesToU64(data []byte) uint64 {
- if len(data) < 8 {
- return 0
- }
- return binary.BigEndian.Uint64(data)
- }
- func U64ToBytes(val uint64) []byte {
- data := make([]byte, 8)
- binary.BigEndian.PutUint64(data, val)
- return data
- }
- func (s *LDBStore) updateIndexAccess(index *dpaDBIndex) {
- index.Access = s.accessCnt
- }
- func getIndexKey(hash Address) []byte {
- hashSize := len(hash)
- key := make([]byte, hashSize+1)
- key[0] = keyIndex
- copy(key[1:], hash[:])
- return key
- }
- func getDataKey(idx uint64, po uint8) []byte {
- key := make([]byte, 10)
- key[0] = keyData
- key[1] = po
- binary.BigEndian.PutUint64(key[2:], idx)
- return key
- }
- func encodeIndex(index *dpaDBIndex) []byte {
- data, _ := rlp.EncodeToBytes(index)
- return data
- }
- func encodeData(chunk Chunk) []byte {
- // Always create a new underlying array for the returned byte slice.
- // The chunk.Address array may be used in the returned slice which
- // may be changed later in the code or by the LevelDB, resulting
- // that the Address is changed as well.
- return append(append([]byte{}, chunk.Address()[:]...), chunk.Data()...)
- }
- func decodeIndex(data []byte, index *dpaDBIndex) error {
- dec := rlp.NewStream(bytes.NewReader(data), 0)
- return dec.Decode(index)
- }
- func decodeData(addr Address, data []byte) (*chunk, error) {
- return NewChunk(addr, data[32:]), nil
- }
- func (s *LDBStore) collectGarbage(ratio float32) {
- log.Trace("collectGarbage", "ratio", ratio)
- metrics.GetOrRegisterCounter("ldbstore.collectgarbage", nil).Inc(1)
- it := s.db.NewIterator()
- defer it.Release()
- garbage := []*gcItem{}
- gcnt := 0
- for ok := it.Seek([]byte{keyIndex}); ok && (gcnt < maxGCitems) && (uint64(gcnt) < s.entryCnt); ok = it.Next() {
- itkey := it.Key()
- if (itkey == nil) || (itkey[0] != keyIndex) {
- break
- }
- // it.Key() contents change on next call to it.Next(), so we must copy it
- key := make([]byte, len(it.Key()))
- copy(key, it.Key())
- val := it.Value()
- var index dpaDBIndex
- hash := key[1:]
- decodeIndex(val, &index)
- po := s.po(hash)
- gci := &gcItem{
- idxKey: key,
- idx: index.Idx,
- value: index.Access, // the smaller, the more likely to be gc'd. see sort comparator below.
- po: po,
- }
- garbage = append(garbage, gci)
- gcnt++
- }
- sort.Slice(garbage[:gcnt], func(i, j int) bool { return garbage[i].value < garbage[j].value })
- cutoff := int(float32(gcnt) * ratio)
- metrics.GetOrRegisterCounter("ldbstore.collectgarbage.delete", nil).Inc(int64(cutoff))
- for i := 0; i < cutoff; i++ {
- s.delete(garbage[i].idx, garbage[i].idxKey, garbage[i].po)
- }
- }
- // Export writes all chunks from the store to a tar archive, returning the
- // number of chunks written.
- func (s *LDBStore) Export(out io.Writer) (int64, error) {
- tw := tar.NewWriter(out)
- defer tw.Close()
- it := s.db.NewIterator()
- defer it.Release()
- var count int64
- for ok := it.Seek([]byte{keyIndex}); ok; ok = it.Next() {
- key := it.Key()
- if (key == nil) || (key[0] != keyIndex) {
- break
- }
- var index dpaDBIndex
- hash := key[1:]
- decodeIndex(it.Value(), &index)
- po := s.po(hash)
- datakey := getDataKey(index.Idx, po)
- log.Trace("store.export", "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po)
- data, err := s.db.Get(datakey)
- if err != nil {
- log.Warn(fmt.Sprintf("Chunk %x found but could not be accessed: %v", key, err))
- continue
- }
- hdr := &tar.Header{
- Name: hex.EncodeToString(hash),
- Mode: 0644,
- Size: int64(len(data)),
- }
- if err := tw.WriteHeader(hdr); err != nil {
- return count, err
- }
- if _, err := tw.Write(data); err != nil {
- return count, err
- }
- count++
- }
- return count, nil
- }
- // of chunks read.
- func (s *LDBStore) Import(in io.Reader) (int64, error) {
- tr := tar.NewReader(in)
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- countC := make(chan int64)
- errC := make(chan error)
- var count int64
- go func() {
- for {
- hdr, err := tr.Next()
- if err == io.EOF {
- break
- } else if err != nil {
- select {
- case errC <- err:
- case <-ctx.Done():
- }
- }
- if len(hdr.Name) != 64 {
- log.Warn("ignoring non-chunk file", "name", hdr.Name)
- continue
- }
- keybytes, err := hex.DecodeString(hdr.Name)
- if err != nil {
- log.Warn("ignoring invalid chunk file", "name", hdr.Name, "err", err)
- continue
- }
- data, err := ioutil.ReadAll(tr)
- if err != nil {
- select {
- case errC <- err:
- case <-ctx.Done():
- }
- }
- key := Address(keybytes)
- chunk := NewChunk(key, data[32:])
- go func() {
- select {
- case errC <- s.Put(ctx, chunk):
- case <-ctx.Done():
- }
- }()
- count++
- }
- countC <- count
- }()
- // wait for all chunks to be stored
- i := int64(0)
- var total int64
- for {
- select {
- case err := <-errC:
- if err != nil {
- return count, err
- }
- i++
- case total = <-countC:
- case <-ctx.Done():
- return i, ctx.Err()
- }
- if total > 0 && i == total {
- return total, nil
- }
- }
- }
- func (s *LDBStore) Cleanup() {
- //Iterates over the database and checks that there are no chunks bigger than 4kb
- var errorsFound, removed, total int
- it := s.db.NewIterator()
- defer it.Release()
- for ok := it.Seek([]byte{keyIndex}); ok; ok = it.Next() {
- key := it.Key()
- if (key == nil) || (key[0] != keyIndex) {
- break
- }
- total++
- var index dpaDBIndex
- err := decodeIndex(it.Value(), &index)
- if err != nil {
- log.Warn("Cannot decode")
- errorsFound++
- continue
- }
- hash := key[1:]
- po := s.po(hash)
- datakey := getDataKey(index.Idx, po)
- data, err := s.db.Get(datakey)
- if err != nil {
- found := false
- // highest possible proximity is 255
- for po = 1; po <= 255; po++ {
- datakey = getDataKey(index.Idx, po)
- data, err = s.db.Get(datakey)
- if err == nil {
- found = true
- break
- }
- }
- if !found {
- log.Warn(fmt.Sprintf("Chunk %x found but count not be accessed with any po", key))
- errorsFound++
- continue
- }
- }
- ck := data[:32]
- c, err := decodeData(ck, data)
- if err != nil {
- log.Error("decodeData error", "err", err)
- continue
- }
- cs := int64(binary.LittleEndian.Uint64(c.sdata[:8]))
- log.Trace("chunk", "key", fmt.Sprintf("%x", key), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.sdata), "size", cs)
- if len(c.sdata) > ch.DefaultSize+8 {
- log.Warn("chunk for cleanup", "key", fmt.Sprintf("%x", key), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.sdata), "size", cs)
- s.delete(index.Idx, getIndexKey(key[1:]), po)
- removed++
- errorsFound++
- }
- }
- log.Warn(fmt.Sprintf("Found %v errors out of %v entries. Removed %v chunks.", errorsFound, total, removed))
- }
- func (s *LDBStore) ReIndex() {
- //Iterates over the database and checks that there are no faulty chunks
- it := s.db.NewIterator()
- startPosition := []byte{keyOldData}
- it.Seek(startPosition)
- var key []byte
- var errorsFound, total int
- for it.Valid() {
- key = it.Key()
- if (key == nil) || (key[0] != keyOldData) {
- break
- }
- data := it.Value()
- hasher := s.hashfunc()
- hasher.Write(data)
- hash := hasher.Sum(nil)
- newKey := make([]byte, 10)
- oldCntKey := make([]byte, 2)
- newCntKey := make([]byte, 2)
- oldCntKey[0] = keyDistanceCnt
- newCntKey[0] = keyDistanceCnt
- key[0] = keyData
- key[1] = s.po(Address(key[1:]))
- oldCntKey[1] = key[1]
- newCntKey[1] = s.po(Address(newKey[1:]))
- copy(newKey[2:], key[1:])
- newValue := append(hash, data...)
- batch := new(leveldb.Batch)
- batch.Delete(key)
- s.bucketCnt[oldCntKey[1]]--
- batch.Put(oldCntKey, U64ToBytes(s.bucketCnt[oldCntKey[1]]))
- batch.Put(newKey, newValue)
- s.bucketCnt[newCntKey[1]]++
- batch.Put(newCntKey, U64ToBytes(s.bucketCnt[newCntKey[1]]))
- s.db.Write(batch)
- it.Next()
- }
- it.Release()
- log.Warn(fmt.Sprintf("Found %v errors out of %v entries", errorsFound, total))
- }
- func (s *LDBStore) Delete(addr Address) {
- s.lock.Lock()
- defer s.lock.Unlock()
- ikey := getIndexKey(addr)
- var indx dpaDBIndex
- s.tryAccessIdx(ikey, &indx)
- s.delete(indx.Idx, ikey, s.po(addr))
- }
- func (s *LDBStore) delete(idx uint64, idxKey []byte, po uint8) {
- metrics.GetOrRegisterCounter("ldbstore.delete", nil).Inc(1)
- batch := new(leveldb.Batch)
- batch.Delete(idxKey)
- batch.Delete(getDataKey(idx, po))
- s.entryCnt--
- dbEntryCount.Dec(1)
- cntKey := make([]byte, 2)
- cntKey[0] = keyDistanceCnt
- cntKey[1] = po
- batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt))
- batch.Put(cntKey, U64ToBytes(s.bucketCnt[po]))
- s.db.Write(batch)
- }
- func (s *LDBStore) BinIndex(po uint8) uint64 {
- s.lock.RLock()
- defer s.lock.RUnlock()
- return s.bucketCnt[po]
- }
- func (s *LDBStore) Size() uint64 {
- s.lock.RLock()
- defer s.lock.RUnlock()
- return s.entryCnt
- }
- func (s *LDBStore) CurrentStorageIndex() uint64 {
- s.lock.RLock()
- defer s.lock.RUnlock()
- return s.dataIdx
- }
- func (s *LDBStore) Put(ctx context.Context, chunk Chunk) error {
- metrics.GetOrRegisterCounter("ldbstore.put", nil).Inc(1)
- log.Trace("ldbstore.put", "key", chunk.Address())
- ikey := getIndexKey(chunk.Address())
- var index dpaDBIndex
- po := s.po(chunk.Address())
- s.lock.Lock()
- if s.closed {
- s.lock.Unlock()
- return ErrDBClosed
- }
- batch := s.batch
- log.Trace("ldbstore.put: s.db.Get", "key", chunk.Address(), "ikey", fmt.Sprintf("%x", ikey))
- idata, err := s.db.Get(ikey)
- if err != nil {
- s.doPut(chunk, &index, po)
- } else {
- log.Trace("ldbstore.put: chunk already exists, only update access", "key", chunk.Address)
- decodeIndex(idata, &index)
- }
- index.Access = s.accessCnt
- s.accessCnt++
- idata = encodeIndex(&index)
- s.batch.Put(ikey, idata)
- s.lock.Unlock()
- select {
- case s.batchesC <- struct{}{}:
- default:
- }
- select {
- case <-batch.c:
- return batch.err
- case <-ctx.Done():
- return ctx.Err()
- }
- }
- // force putting into db, does not check access index
- func (s *LDBStore) doPut(chunk Chunk, index *dpaDBIndex, po uint8) {
- data := s.encodeDataFunc(chunk)
- dkey := getDataKey(s.dataIdx, po)
- s.batch.Put(dkey, data)
- index.Idx = s.dataIdx
- s.bucketCnt[po] = s.dataIdx
- s.entryCnt++
- dbEntryCount.Inc(1)
- s.dataIdx++
- cntKey := make([]byte, 2)
- cntKey[0] = keyDistanceCnt
- cntKey[1] = po
- s.batch.Put(cntKey, U64ToBytes(s.bucketCnt[po]))
- }
- func (s *LDBStore) writeBatches() {
- for {
- select {
- case <-s.quit:
- log.Debug("DbStore: quit batch write loop")
- return
- case <-s.batchesC:
- err := s.writeCurrentBatch()
- if err != nil {
- log.Debug("DbStore: quit batch write loop", "err", err.Error())
- return
- }
- }
- }
- }
- func (s *LDBStore) writeCurrentBatch() error {
- s.lock.Lock()
- defer s.lock.Unlock()
- b := s.batch
- l := b.Len()
- if l == 0 {
- return nil
- }
- e := s.entryCnt
- d := s.dataIdx
- a := s.accessCnt
- s.batch = newBatch()
- b.err = s.writeBatch(b, e, d, a)
- close(b.c)
- for e > s.capacity {
- log.Trace("for >", "e", e, "s.capacity", s.capacity)
- // Collect garbage in a separate goroutine
- // to be able to interrupt this loop by s.quit.
- done := make(chan struct{})
- go func() {
- s.collectGarbage(gcArrayFreeRatio)
- log.Trace("collectGarbage closing done")
- close(done)
- }()
- select {
- case <-s.quit:
- return errors.New("CollectGarbage terminated due to quit")
- case <-done:
- }
- e = s.entryCnt
- }
- return nil
- }
- // must be called non concurrently
- func (s *LDBStore) writeBatch(b *dbBatch, entryCnt, dataIdx, accessCnt uint64) error {
- b.Put(keyEntryCnt, U64ToBytes(entryCnt))
- b.Put(keyDataIdx, U64ToBytes(dataIdx))
- b.Put(keyAccessCnt, U64ToBytes(accessCnt))
- l := b.Len()
- if err := s.db.Write(b.Batch); err != nil {
- return fmt.Errorf("unable to write batch: %v", err)
- }
- log.Trace(fmt.Sprintf("batch write (%d entries)", l))
- return nil
- }
- // newMockEncodeDataFunc returns a function that stores the chunk data
- // to a mock store to bypass the default functionality encodeData.
- // The constructed function always returns the nil data, as DbStore does
- // not need to store the data, but still need to create the index.
- func newMockEncodeDataFunc(mockStore *mock.NodeStore) func(chunk Chunk) []byte {
- return func(chunk Chunk) []byte {
- if err := mockStore.Put(chunk.Address(), encodeData(chunk)); err != nil {
- log.Error(fmt.Sprintf("%T: Chunk %v put: %v", mockStore, chunk.Address().Log(), err))
- }
- return chunk.Address()[:]
- }
- }
- // try to find index; if found, update access cnt and return true
- func (s *LDBStore) tryAccessIdx(ikey []byte, index *dpaDBIndex) bool {
- idata, err := s.db.Get(ikey)
- if err != nil {
- return false
- }
- decodeIndex(idata, index)
- s.batch.Put(keyAccessCnt, U64ToBytes(s.accessCnt))
- s.accessCnt++
- index.Access = s.accessCnt
- idata = encodeIndex(index)
- s.batch.Put(ikey, idata)
- select {
- case s.batchesC <- struct{}{}:
- default:
- }
- return true
- }
- func (s *LDBStore) Get(_ context.Context, addr Address) (chunk Chunk, err error) {
- metrics.GetOrRegisterCounter("ldbstore.get", nil).Inc(1)
- log.Trace("ldbstore.get", "key", addr)
- s.lock.Lock()
- defer s.lock.Unlock()
- return s.get(addr)
- }
- func (s *LDBStore) get(addr Address) (chunk *chunk, err error) {
- var indx dpaDBIndex
- if s.closed {
- return nil, ErrDBClosed
- }
- if s.tryAccessIdx(getIndexKey(addr), &indx) {
- var data []byte
- if s.getDataFunc != nil {
- // if getDataFunc is defined, use it to retrieve the chunk data
- log.Trace("ldbstore.get retrieve with getDataFunc", "key", addr)
- data, err = s.getDataFunc(addr)
- if err != nil {
- return
- }
- } else {
- // default DbStore functionality to retrieve chunk data
- proximity := s.po(addr)
- datakey := getDataKey(indx.Idx, proximity)
- data, err = s.db.Get(datakey)
- log.Trace("ldbstore.get retrieve", "key", addr, "indexkey", indx.Idx, "datakey", fmt.Sprintf("%x", datakey), "proximity", proximity)
- if err != nil {
- log.Trace("ldbstore.get chunk found but could not be accessed", "key", addr, "err", err)
- s.delete(indx.Idx, getIndexKey(addr), s.po(addr))
- return
- }
- }
- return decodeData(addr, data)
- } else {
- err = ErrChunkNotFound
- }
- return
- }
- // newMockGetFunc returns a function that reads chunk data from
- // the mock database, which is used as the value for DbStore.getFunc
- // to bypass the default functionality of DbStore with a mock store.
- func newMockGetDataFunc(mockStore *mock.NodeStore) func(addr Address) (data []byte, err error) {
- return func(addr Address) (data []byte, err error) {
- data, err = mockStore.Get(addr)
- if err == mock.ErrNotFound {
- // preserve ErrChunkNotFound error
- err = ErrChunkNotFound
- }
- return data, err
- }
- }
- func (s *LDBStore) updateAccessCnt(addr Address) {
- s.lock.Lock()
- defer s.lock.Unlock()
- var index dpaDBIndex
- s.tryAccessIdx(getIndexKey(addr), &index) // result_chn == nil, only update access cnt
- }
- func (s *LDBStore) setCapacity(c uint64) {
- s.lock.Lock()
- defer s.lock.Unlock()
- s.capacity = c
- if s.entryCnt > c {
- ratio := float32(1.01) - float32(c)/float32(s.entryCnt)
- if ratio < gcArrayFreeRatio {
- ratio = gcArrayFreeRatio
- }
- if ratio > 1 {
- ratio = 1
- }
- for s.entryCnt > c {
- s.collectGarbage(ratio)
- }
- }
- }
- func (s *LDBStore) Close() {
- close(s.quit)
- s.lock.Lock()
- s.closed = true
- s.lock.Unlock()
- // force writing out current batch
- s.writeCurrentBatch()
- close(s.batchesC)
- s.db.Close()
- }
- // SyncIterator(start, stop, po, f) calls f on each hash of a bin po from start to stop
- func (s *LDBStore) SyncIterator(since uint64, until uint64, po uint8, f func(Address, uint64) bool) error {
- metrics.GetOrRegisterCounter("ldbstore.synciterator", nil).Inc(1)
- sincekey := getDataKey(since, po)
- untilkey := getDataKey(until, po)
- it := s.db.NewIterator()
- defer it.Release()
- for ok := it.Seek(sincekey); ok; ok = it.Next() {
- metrics.GetOrRegisterCounter("ldbstore.synciterator.seek", nil).Inc(1)
- dbkey := it.Key()
- if dbkey[0] != keyData || dbkey[1] != po || bytes.Compare(untilkey, dbkey) < 0 {
- break
- }
- key := make([]byte, 32)
- val := it.Value()
- copy(key, val[:32])
- if !f(Address(key), binary.BigEndian.Uint64(dbkey[2:])) {
- break
- }
- }
- return it.Error()
- }
- func databaseExists(path string) bool {
- o := &opt.Options{
- ErrorIfMissing: true,
- }
- tdb, err := leveldb.OpenFile(path, o)
- if err != nil {
- return false
- }
- defer tdb.Close()
- return true
- }
|