ldbstore.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // disk storage layer for the package bzz
  17. // DbStore implements the ChunkStore interface and is used by the FileStore as
  18. // persistent storage of chunks
  19. // it implements purging based on access count allowing for external control of
  20. // max capacity
  21. package storage
  22. import (
  23. "archive/tar"
  24. "bytes"
  25. "context"
  26. "encoding/binary"
  27. "encoding/hex"
  28. "errors"
  29. "fmt"
  30. "io"
  31. "io/ioutil"
  32. "sort"
  33. "sync"
  34. "github.com/ethereum/go-ethereum/metrics"
  35. "github.com/ethereum/go-ethereum/rlp"
  36. ch "github.com/ethereum/go-ethereum/swarm/chunk"
  37. "github.com/ethereum/go-ethereum/swarm/log"
  38. "github.com/ethereum/go-ethereum/swarm/storage/mock"
  39. "github.com/syndtr/goleveldb/leveldb"
  40. "github.com/syndtr/goleveldb/leveldb/opt"
  41. )
  42. const (
  43. gcArrayFreeRatio = 0.1
  44. maxGCitems = 5000 // max number of items to be gc'd per call to collectGarbage()
  45. )
  46. var (
  47. dbEntryCount = metrics.NewRegisteredCounter("ldbstore.entryCnt", nil)
  48. )
  49. var (
  50. keyIndex = byte(0)
  51. keyOldData = byte(1)
  52. keyAccessCnt = []byte{2}
  53. keyEntryCnt = []byte{3}
  54. keyDataIdx = []byte{4}
  55. keyData = byte(6)
  56. keyDistanceCnt = byte(7)
  57. )
  58. var (
  59. ErrDBClosed = errors.New("LDBStore closed")
  60. )
  61. type gcItem struct {
  62. idx uint64
  63. value uint64
  64. idxKey []byte
  65. po uint8
  66. }
  67. type LDBStoreParams struct {
  68. *StoreParams
  69. Path string
  70. Po func(Address) uint8
  71. }
  72. // NewLDBStoreParams constructs LDBStoreParams with the specified values.
  73. func NewLDBStoreParams(storeparams *StoreParams, path string) *LDBStoreParams {
  74. return &LDBStoreParams{
  75. StoreParams: storeparams,
  76. Path: path,
  77. Po: func(k Address) (ret uint8) { return uint8(Proximity(storeparams.BaseKey, k[:])) },
  78. }
  79. }
  80. type LDBStore struct {
  81. db *LDBDatabase
  82. // this should be stored in db, accessed transactionally
  83. entryCnt uint64 // number of items in the LevelDB
  84. accessCnt uint64 // ever-accumulating number increased every time we read/access an entry
  85. dataIdx uint64 // similar to entryCnt, but we only increment it
  86. capacity uint64
  87. bucketCnt []uint64
  88. hashfunc SwarmHasher
  89. po func(Address) uint8
  90. batchC chan bool
  91. batchesC chan struct{}
  92. closed bool
  93. batch *dbBatch
  94. lock sync.RWMutex
  95. quit chan struct{}
  96. // Functions encodeDataFunc is used to bypass
  97. // the default functionality of DbStore with
  98. // mock.NodeStore for testing purposes.
  99. encodeDataFunc func(chunk Chunk) []byte
  100. // If getDataFunc is defined, it will be used for
  101. // retrieving the chunk data instead from the local
  102. // LevelDB database.
  103. getDataFunc func(key Address) (data []byte, err error)
  104. }
  105. type dbBatch struct {
  106. *leveldb.Batch
  107. err error
  108. c chan struct{}
  109. }
  110. func newBatch() *dbBatch {
  111. return &dbBatch{Batch: new(leveldb.Batch), c: make(chan struct{})}
  112. }
  113. // TODO: Instead of passing the distance function, just pass the address from which distances are calculated
  114. // to avoid the appearance of a pluggable distance metric and opportunities of bugs associated with providing
  115. // a function different from the one that is actually used.
  116. func NewLDBStore(params *LDBStoreParams) (s *LDBStore, err error) {
  117. s = new(LDBStore)
  118. s.hashfunc = params.Hash
  119. s.quit = make(chan struct{})
  120. s.batchesC = make(chan struct{}, 1)
  121. go s.writeBatches()
  122. s.batch = newBatch()
  123. // associate encodeData with default functionality
  124. s.encodeDataFunc = encodeData
  125. s.db, err = NewLDBDatabase(params.Path)
  126. if err != nil {
  127. return nil, err
  128. }
  129. s.po = params.Po
  130. s.setCapacity(params.DbCapacity)
  131. s.bucketCnt = make([]uint64, 0x100)
  132. for i := 0; i < 0x100; i++ {
  133. k := make([]byte, 2)
  134. k[0] = keyDistanceCnt
  135. k[1] = uint8(i)
  136. cnt, _ := s.db.Get(k)
  137. s.bucketCnt[i] = BytesToU64(cnt)
  138. }
  139. data, _ := s.db.Get(keyEntryCnt)
  140. s.entryCnt = BytesToU64(data)
  141. data, _ = s.db.Get(keyAccessCnt)
  142. s.accessCnt = BytesToU64(data)
  143. data, _ = s.db.Get(keyDataIdx)
  144. s.dataIdx = BytesToU64(data)
  145. return s, nil
  146. }
  147. // NewMockDbStore creates a new instance of DbStore with
  148. // mockStore set to a provided value. If mockStore argument is nil,
  149. // this function behaves exactly as NewDbStore.
  150. func NewMockDbStore(params *LDBStoreParams, mockStore *mock.NodeStore) (s *LDBStore, err error) {
  151. s, err = NewLDBStore(params)
  152. if err != nil {
  153. return nil, err
  154. }
  155. // replace put and get with mock store functionality
  156. if mockStore != nil {
  157. s.encodeDataFunc = newMockEncodeDataFunc(mockStore)
  158. s.getDataFunc = newMockGetDataFunc(mockStore)
  159. }
  160. return
  161. }
  162. type dpaDBIndex struct {
  163. Idx uint64
  164. Access uint64
  165. }
  166. func BytesToU64(data []byte) uint64 {
  167. if len(data) < 8 {
  168. return 0
  169. }
  170. return binary.BigEndian.Uint64(data)
  171. }
  172. func U64ToBytes(val uint64) []byte {
  173. data := make([]byte, 8)
  174. binary.BigEndian.PutUint64(data, val)
  175. return data
  176. }
  177. func (s *LDBStore) updateIndexAccess(index *dpaDBIndex) {
  178. index.Access = s.accessCnt
  179. }
  180. func getIndexKey(hash Address) []byte {
  181. hashSize := len(hash)
  182. key := make([]byte, hashSize+1)
  183. key[0] = keyIndex
  184. copy(key[1:], hash[:])
  185. return key
  186. }
  187. func getDataKey(idx uint64, po uint8) []byte {
  188. key := make([]byte, 10)
  189. key[0] = keyData
  190. key[1] = po
  191. binary.BigEndian.PutUint64(key[2:], idx)
  192. return key
  193. }
  194. func encodeIndex(index *dpaDBIndex) []byte {
  195. data, _ := rlp.EncodeToBytes(index)
  196. return data
  197. }
  198. func encodeData(chunk Chunk) []byte {
  199. // Always create a new underlying array for the returned byte slice.
  200. // The chunk.Address array may be used in the returned slice which
  201. // may be changed later in the code or by the LevelDB, resulting
  202. // that the Address is changed as well.
  203. return append(append([]byte{}, chunk.Address()[:]...), chunk.Data()...)
  204. }
  205. func decodeIndex(data []byte, index *dpaDBIndex) error {
  206. dec := rlp.NewStream(bytes.NewReader(data), 0)
  207. return dec.Decode(index)
  208. }
  209. func decodeData(addr Address, data []byte) (*chunk, error) {
  210. return NewChunk(addr, data[32:]), nil
  211. }
  212. func (s *LDBStore) collectGarbage(ratio float32) {
  213. log.Trace("collectGarbage", "ratio", ratio)
  214. metrics.GetOrRegisterCounter("ldbstore.collectgarbage", nil).Inc(1)
  215. it := s.db.NewIterator()
  216. defer it.Release()
  217. garbage := []*gcItem{}
  218. gcnt := 0
  219. for ok := it.Seek([]byte{keyIndex}); ok && (gcnt < maxGCitems) && (uint64(gcnt) < s.entryCnt); ok = it.Next() {
  220. itkey := it.Key()
  221. if (itkey == nil) || (itkey[0] != keyIndex) {
  222. break
  223. }
  224. // it.Key() contents change on next call to it.Next(), so we must copy it
  225. key := make([]byte, len(it.Key()))
  226. copy(key, it.Key())
  227. val := it.Value()
  228. var index dpaDBIndex
  229. hash := key[1:]
  230. decodeIndex(val, &index)
  231. po := s.po(hash)
  232. gci := &gcItem{
  233. idxKey: key,
  234. idx: index.Idx,
  235. value: index.Access, // the smaller, the more likely to be gc'd. see sort comparator below.
  236. po: po,
  237. }
  238. garbage = append(garbage, gci)
  239. gcnt++
  240. }
  241. sort.Slice(garbage[:gcnt], func(i, j int) bool { return garbage[i].value < garbage[j].value })
  242. cutoff := int(float32(gcnt) * ratio)
  243. metrics.GetOrRegisterCounter("ldbstore.collectgarbage.delete", nil).Inc(int64(cutoff))
  244. for i := 0; i < cutoff; i++ {
  245. s.delete(garbage[i].idx, garbage[i].idxKey, garbage[i].po)
  246. }
  247. }
  248. // Export writes all chunks from the store to a tar archive, returning the
  249. // number of chunks written.
  250. func (s *LDBStore) Export(out io.Writer) (int64, error) {
  251. tw := tar.NewWriter(out)
  252. defer tw.Close()
  253. it := s.db.NewIterator()
  254. defer it.Release()
  255. var count int64
  256. for ok := it.Seek([]byte{keyIndex}); ok; ok = it.Next() {
  257. key := it.Key()
  258. if (key == nil) || (key[0] != keyIndex) {
  259. break
  260. }
  261. var index dpaDBIndex
  262. hash := key[1:]
  263. decodeIndex(it.Value(), &index)
  264. po := s.po(hash)
  265. datakey := getDataKey(index.Idx, po)
  266. log.Trace("store.export", "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po)
  267. data, err := s.db.Get(datakey)
  268. if err != nil {
  269. log.Warn(fmt.Sprintf("Chunk %x found but could not be accessed: %v", key, err))
  270. continue
  271. }
  272. hdr := &tar.Header{
  273. Name: hex.EncodeToString(hash),
  274. Mode: 0644,
  275. Size: int64(len(data)),
  276. }
  277. if err := tw.WriteHeader(hdr); err != nil {
  278. return count, err
  279. }
  280. if _, err := tw.Write(data); err != nil {
  281. return count, err
  282. }
  283. count++
  284. }
  285. return count, nil
  286. }
  287. // of chunks read.
  288. func (s *LDBStore) Import(in io.Reader) (int64, error) {
  289. tr := tar.NewReader(in)
  290. ctx, cancel := context.WithCancel(context.Background())
  291. defer cancel()
  292. countC := make(chan int64)
  293. errC := make(chan error)
  294. var count int64
  295. go func() {
  296. for {
  297. hdr, err := tr.Next()
  298. if err == io.EOF {
  299. break
  300. } else if err != nil {
  301. select {
  302. case errC <- err:
  303. case <-ctx.Done():
  304. }
  305. }
  306. if len(hdr.Name) != 64 {
  307. log.Warn("ignoring non-chunk file", "name", hdr.Name)
  308. continue
  309. }
  310. keybytes, err := hex.DecodeString(hdr.Name)
  311. if err != nil {
  312. log.Warn("ignoring invalid chunk file", "name", hdr.Name, "err", err)
  313. continue
  314. }
  315. data, err := ioutil.ReadAll(tr)
  316. if err != nil {
  317. select {
  318. case errC <- err:
  319. case <-ctx.Done():
  320. }
  321. }
  322. key := Address(keybytes)
  323. chunk := NewChunk(key, data[32:])
  324. go func() {
  325. select {
  326. case errC <- s.Put(ctx, chunk):
  327. case <-ctx.Done():
  328. }
  329. }()
  330. count++
  331. }
  332. countC <- count
  333. }()
  334. // wait for all chunks to be stored
  335. i := int64(0)
  336. var total int64
  337. for {
  338. select {
  339. case err := <-errC:
  340. if err != nil {
  341. return count, err
  342. }
  343. i++
  344. case total = <-countC:
  345. case <-ctx.Done():
  346. return i, ctx.Err()
  347. }
  348. if total > 0 && i == total {
  349. return total, nil
  350. }
  351. }
  352. }
  353. func (s *LDBStore) Cleanup() {
  354. //Iterates over the database and checks that there are no chunks bigger than 4kb
  355. var errorsFound, removed, total int
  356. it := s.db.NewIterator()
  357. defer it.Release()
  358. for ok := it.Seek([]byte{keyIndex}); ok; ok = it.Next() {
  359. key := it.Key()
  360. if (key == nil) || (key[0] != keyIndex) {
  361. break
  362. }
  363. total++
  364. var index dpaDBIndex
  365. err := decodeIndex(it.Value(), &index)
  366. if err != nil {
  367. log.Warn("Cannot decode")
  368. errorsFound++
  369. continue
  370. }
  371. hash := key[1:]
  372. po := s.po(hash)
  373. datakey := getDataKey(index.Idx, po)
  374. data, err := s.db.Get(datakey)
  375. if err != nil {
  376. found := false
  377. // highest possible proximity is 255
  378. for po = 1; po <= 255; po++ {
  379. datakey = getDataKey(index.Idx, po)
  380. data, err = s.db.Get(datakey)
  381. if err == nil {
  382. found = true
  383. break
  384. }
  385. }
  386. if !found {
  387. log.Warn(fmt.Sprintf("Chunk %x found but count not be accessed with any po", key))
  388. errorsFound++
  389. continue
  390. }
  391. }
  392. ck := data[:32]
  393. c, err := decodeData(ck, data)
  394. if err != nil {
  395. log.Error("decodeData error", "err", err)
  396. continue
  397. }
  398. cs := int64(binary.LittleEndian.Uint64(c.sdata[:8]))
  399. log.Trace("chunk", "key", fmt.Sprintf("%x", key), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.sdata), "size", cs)
  400. if len(c.sdata) > ch.DefaultSize+8 {
  401. log.Warn("chunk for cleanup", "key", fmt.Sprintf("%x", key), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.sdata), "size", cs)
  402. s.delete(index.Idx, getIndexKey(key[1:]), po)
  403. removed++
  404. errorsFound++
  405. }
  406. }
  407. log.Warn(fmt.Sprintf("Found %v errors out of %v entries. Removed %v chunks.", errorsFound, total, removed))
  408. }
  409. func (s *LDBStore) ReIndex() {
  410. //Iterates over the database and checks that there are no faulty chunks
  411. it := s.db.NewIterator()
  412. startPosition := []byte{keyOldData}
  413. it.Seek(startPosition)
  414. var key []byte
  415. var errorsFound, total int
  416. for it.Valid() {
  417. key = it.Key()
  418. if (key == nil) || (key[0] != keyOldData) {
  419. break
  420. }
  421. data := it.Value()
  422. hasher := s.hashfunc()
  423. hasher.Write(data)
  424. hash := hasher.Sum(nil)
  425. newKey := make([]byte, 10)
  426. oldCntKey := make([]byte, 2)
  427. newCntKey := make([]byte, 2)
  428. oldCntKey[0] = keyDistanceCnt
  429. newCntKey[0] = keyDistanceCnt
  430. key[0] = keyData
  431. key[1] = s.po(Address(key[1:]))
  432. oldCntKey[1] = key[1]
  433. newCntKey[1] = s.po(Address(newKey[1:]))
  434. copy(newKey[2:], key[1:])
  435. newValue := append(hash, data...)
  436. batch := new(leveldb.Batch)
  437. batch.Delete(key)
  438. s.bucketCnt[oldCntKey[1]]--
  439. batch.Put(oldCntKey, U64ToBytes(s.bucketCnt[oldCntKey[1]]))
  440. batch.Put(newKey, newValue)
  441. s.bucketCnt[newCntKey[1]]++
  442. batch.Put(newCntKey, U64ToBytes(s.bucketCnt[newCntKey[1]]))
  443. s.db.Write(batch)
  444. it.Next()
  445. }
  446. it.Release()
  447. log.Warn(fmt.Sprintf("Found %v errors out of %v entries", errorsFound, total))
  448. }
  449. func (s *LDBStore) Delete(addr Address) {
  450. s.lock.Lock()
  451. defer s.lock.Unlock()
  452. ikey := getIndexKey(addr)
  453. var indx dpaDBIndex
  454. s.tryAccessIdx(ikey, &indx)
  455. s.delete(indx.Idx, ikey, s.po(addr))
  456. }
  457. func (s *LDBStore) delete(idx uint64, idxKey []byte, po uint8) {
  458. metrics.GetOrRegisterCounter("ldbstore.delete", nil).Inc(1)
  459. batch := new(leveldb.Batch)
  460. batch.Delete(idxKey)
  461. batch.Delete(getDataKey(idx, po))
  462. s.entryCnt--
  463. dbEntryCount.Dec(1)
  464. cntKey := make([]byte, 2)
  465. cntKey[0] = keyDistanceCnt
  466. cntKey[1] = po
  467. batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt))
  468. batch.Put(cntKey, U64ToBytes(s.bucketCnt[po]))
  469. s.db.Write(batch)
  470. }
  471. func (s *LDBStore) BinIndex(po uint8) uint64 {
  472. s.lock.RLock()
  473. defer s.lock.RUnlock()
  474. return s.bucketCnt[po]
  475. }
  476. func (s *LDBStore) Size() uint64 {
  477. s.lock.RLock()
  478. defer s.lock.RUnlock()
  479. return s.entryCnt
  480. }
  481. func (s *LDBStore) CurrentStorageIndex() uint64 {
  482. s.lock.RLock()
  483. defer s.lock.RUnlock()
  484. return s.dataIdx
  485. }
  486. func (s *LDBStore) Put(ctx context.Context, chunk Chunk) error {
  487. metrics.GetOrRegisterCounter("ldbstore.put", nil).Inc(1)
  488. log.Trace("ldbstore.put", "key", chunk.Address())
  489. ikey := getIndexKey(chunk.Address())
  490. var index dpaDBIndex
  491. po := s.po(chunk.Address())
  492. s.lock.Lock()
  493. if s.closed {
  494. s.lock.Unlock()
  495. return ErrDBClosed
  496. }
  497. batch := s.batch
  498. log.Trace("ldbstore.put: s.db.Get", "key", chunk.Address(), "ikey", fmt.Sprintf("%x", ikey))
  499. idata, err := s.db.Get(ikey)
  500. if err != nil {
  501. s.doPut(chunk, &index, po)
  502. } else {
  503. log.Trace("ldbstore.put: chunk already exists, only update access", "key", chunk.Address)
  504. decodeIndex(idata, &index)
  505. }
  506. index.Access = s.accessCnt
  507. s.accessCnt++
  508. idata = encodeIndex(&index)
  509. s.batch.Put(ikey, idata)
  510. s.lock.Unlock()
  511. select {
  512. case s.batchesC <- struct{}{}:
  513. default:
  514. }
  515. select {
  516. case <-batch.c:
  517. return batch.err
  518. case <-ctx.Done():
  519. return ctx.Err()
  520. }
  521. }
  522. // force putting into db, does not check access index
  523. func (s *LDBStore) doPut(chunk Chunk, index *dpaDBIndex, po uint8) {
  524. data := s.encodeDataFunc(chunk)
  525. dkey := getDataKey(s.dataIdx, po)
  526. s.batch.Put(dkey, data)
  527. index.Idx = s.dataIdx
  528. s.bucketCnt[po] = s.dataIdx
  529. s.entryCnt++
  530. dbEntryCount.Inc(1)
  531. s.dataIdx++
  532. cntKey := make([]byte, 2)
  533. cntKey[0] = keyDistanceCnt
  534. cntKey[1] = po
  535. s.batch.Put(cntKey, U64ToBytes(s.bucketCnt[po]))
  536. }
  537. func (s *LDBStore) writeBatches() {
  538. for {
  539. select {
  540. case <-s.quit:
  541. log.Debug("DbStore: quit batch write loop")
  542. return
  543. case <-s.batchesC:
  544. err := s.writeCurrentBatch()
  545. if err != nil {
  546. log.Debug("DbStore: quit batch write loop", "err", err.Error())
  547. return
  548. }
  549. }
  550. }
  551. }
  552. func (s *LDBStore) writeCurrentBatch() error {
  553. s.lock.Lock()
  554. defer s.lock.Unlock()
  555. b := s.batch
  556. l := b.Len()
  557. if l == 0 {
  558. return nil
  559. }
  560. e := s.entryCnt
  561. d := s.dataIdx
  562. a := s.accessCnt
  563. s.batch = newBatch()
  564. b.err = s.writeBatch(b, e, d, a)
  565. close(b.c)
  566. for e > s.capacity {
  567. log.Trace("for >", "e", e, "s.capacity", s.capacity)
  568. // Collect garbage in a separate goroutine
  569. // to be able to interrupt this loop by s.quit.
  570. done := make(chan struct{})
  571. go func() {
  572. s.collectGarbage(gcArrayFreeRatio)
  573. log.Trace("collectGarbage closing done")
  574. close(done)
  575. }()
  576. select {
  577. case <-s.quit:
  578. return errors.New("CollectGarbage terminated due to quit")
  579. case <-done:
  580. }
  581. e = s.entryCnt
  582. }
  583. return nil
  584. }
  585. // must be called non concurrently
  586. func (s *LDBStore) writeBatch(b *dbBatch, entryCnt, dataIdx, accessCnt uint64) error {
  587. b.Put(keyEntryCnt, U64ToBytes(entryCnt))
  588. b.Put(keyDataIdx, U64ToBytes(dataIdx))
  589. b.Put(keyAccessCnt, U64ToBytes(accessCnt))
  590. l := b.Len()
  591. if err := s.db.Write(b.Batch); err != nil {
  592. return fmt.Errorf("unable to write batch: %v", err)
  593. }
  594. log.Trace(fmt.Sprintf("batch write (%d entries)", l))
  595. return nil
  596. }
  597. // newMockEncodeDataFunc returns a function that stores the chunk data
  598. // to a mock store to bypass the default functionality encodeData.
  599. // The constructed function always returns the nil data, as DbStore does
  600. // not need to store the data, but still need to create the index.
  601. func newMockEncodeDataFunc(mockStore *mock.NodeStore) func(chunk Chunk) []byte {
  602. return func(chunk Chunk) []byte {
  603. if err := mockStore.Put(chunk.Address(), encodeData(chunk)); err != nil {
  604. log.Error(fmt.Sprintf("%T: Chunk %v put: %v", mockStore, chunk.Address().Log(), err))
  605. }
  606. return chunk.Address()[:]
  607. }
  608. }
  609. // try to find index; if found, update access cnt and return true
  610. func (s *LDBStore) tryAccessIdx(ikey []byte, index *dpaDBIndex) bool {
  611. idata, err := s.db.Get(ikey)
  612. if err != nil {
  613. return false
  614. }
  615. decodeIndex(idata, index)
  616. s.batch.Put(keyAccessCnt, U64ToBytes(s.accessCnt))
  617. s.accessCnt++
  618. index.Access = s.accessCnt
  619. idata = encodeIndex(index)
  620. s.batch.Put(ikey, idata)
  621. select {
  622. case s.batchesC <- struct{}{}:
  623. default:
  624. }
  625. return true
  626. }
  627. func (s *LDBStore) Get(_ context.Context, addr Address) (chunk Chunk, err error) {
  628. metrics.GetOrRegisterCounter("ldbstore.get", nil).Inc(1)
  629. log.Trace("ldbstore.get", "key", addr)
  630. s.lock.Lock()
  631. defer s.lock.Unlock()
  632. return s.get(addr)
  633. }
  634. func (s *LDBStore) get(addr Address) (chunk *chunk, err error) {
  635. var indx dpaDBIndex
  636. if s.closed {
  637. return nil, ErrDBClosed
  638. }
  639. if s.tryAccessIdx(getIndexKey(addr), &indx) {
  640. var data []byte
  641. if s.getDataFunc != nil {
  642. // if getDataFunc is defined, use it to retrieve the chunk data
  643. log.Trace("ldbstore.get retrieve with getDataFunc", "key", addr)
  644. data, err = s.getDataFunc(addr)
  645. if err != nil {
  646. return
  647. }
  648. } else {
  649. // default DbStore functionality to retrieve chunk data
  650. proximity := s.po(addr)
  651. datakey := getDataKey(indx.Idx, proximity)
  652. data, err = s.db.Get(datakey)
  653. log.Trace("ldbstore.get retrieve", "key", addr, "indexkey", indx.Idx, "datakey", fmt.Sprintf("%x", datakey), "proximity", proximity)
  654. if err != nil {
  655. log.Trace("ldbstore.get chunk found but could not be accessed", "key", addr, "err", err)
  656. s.delete(indx.Idx, getIndexKey(addr), s.po(addr))
  657. return
  658. }
  659. }
  660. return decodeData(addr, data)
  661. } else {
  662. err = ErrChunkNotFound
  663. }
  664. return
  665. }
  666. // newMockGetFunc returns a function that reads chunk data from
  667. // the mock database, which is used as the value for DbStore.getFunc
  668. // to bypass the default functionality of DbStore with a mock store.
  669. func newMockGetDataFunc(mockStore *mock.NodeStore) func(addr Address) (data []byte, err error) {
  670. return func(addr Address) (data []byte, err error) {
  671. data, err = mockStore.Get(addr)
  672. if err == mock.ErrNotFound {
  673. // preserve ErrChunkNotFound error
  674. err = ErrChunkNotFound
  675. }
  676. return data, err
  677. }
  678. }
  679. func (s *LDBStore) updateAccessCnt(addr Address) {
  680. s.lock.Lock()
  681. defer s.lock.Unlock()
  682. var index dpaDBIndex
  683. s.tryAccessIdx(getIndexKey(addr), &index) // result_chn == nil, only update access cnt
  684. }
  685. func (s *LDBStore) setCapacity(c uint64) {
  686. s.lock.Lock()
  687. defer s.lock.Unlock()
  688. s.capacity = c
  689. if s.entryCnt > c {
  690. ratio := float32(1.01) - float32(c)/float32(s.entryCnt)
  691. if ratio < gcArrayFreeRatio {
  692. ratio = gcArrayFreeRatio
  693. }
  694. if ratio > 1 {
  695. ratio = 1
  696. }
  697. for s.entryCnt > c {
  698. s.collectGarbage(ratio)
  699. }
  700. }
  701. }
  702. func (s *LDBStore) Close() {
  703. close(s.quit)
  704. s.lock.Lock()
  705. s.closed = true
  706. s.lock.Unlock()
  707. // force writing out current batch
  708. s.writeCurrentBatch()
  709. close(s.batchesC)
  710. s.db.Close()
  711. }
  712. // SyncIterator(start, stop, po, f) calls f on each hash of a bin po from start to stop
  713. func (s *LDBStore) SyncIterator(since uint64, until uint64, po uint8, f func(Address, uint64) bool) error {
  714. metrics.GetOrRegisterCounter("ldbstore.synciterator", nil).Inc(1)
  715. sincekey := getDataKey(since, po)
  716. untilkey := getDataKey(until, po)
  717. it := s.db.NewIterator()
  718. defer it.Release()
  719. for ok := it.Seek(sincekey); ok; ok = it.Next() {
  720. metrics.GetOrRegisterCounter("ldbstore.synciterator.seek", nil).Inc(1)
  721. dbkey := it.Key()
  722. if dbkey[0] != keyData || dbkey[1] != po || bytes.Compare(untilkey, dbkey) < 0 {
  723. break
  724. }
  725. key := make([]byte, 32)
  726. val := it.Value()
  727. copy(key, val[:32])
  728. if !f(Address(key), binary.BigEndian.Uint64(dbkey[2:])) {
  729. break
  730. }
  731. }
  732. return it.Error()
  733. }
  734. func databaseExists(path string) bool {
  735. o := &opt.Options{
  736. ErrorIfMissing: true,
  737. }
  738. tdb, err := leveldb.OpenFile(path, o)
  739. if err != nil {
  740. return false
  741. }
  742. defer tdb.Close()
  743. return true
  744. }