dbstore.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // disk storage layer for the package bzz
  17. // DbStore implements the ChunkStore interface and is used by the DPA as
  18. // persistent storage of chunks
  19. // it implements purging based on access count allowing for external control of
  20. // max capacity
  21. package storage
  22. import (
  23. "bytes"
  24. "encoding/binary"
  25. "fmt"
  26. "sync"
  27. "github.com/ethereum/go-ethereum/logger"
  28. "github.com/ethereum/go-ethereum/logger/glog"
  29. "github.com/ethereum/go-ethereum/rlp"
  30. "github.com/syndtr/goleveldb/leveldb"
  31. "github.com/syndtr/goleveldb/leveldb/iterator"
  32. )
  33. const (
  34. defaultDbCapacity = 5000000
  35. defaultRadius = 0 // not yet used
  36. gcArraySize = 10000
  37. gcArrayFreeRatio = 0.1
  38. // key prefixes for leveldb storage
  39. kpIndex = 0
  40. kpData = 1
  41. )
  42. var (
  43. keyAccessCnt = []byte{2}
  44. keyEntryCnt = []byte{3}
  45. keyDataIdx = []byte{4}
  46. keyGCPos = []byte{5}
  47. )
  48. type gcItem struct {
  49. idx uint64
  50. value uint64
  51. idxKey []byte
  52. }
  53. type DbStore struct {
  54. db *LDBDatabase
  55. // this should be stored in db, accessed transactionally
  56. entryCnt, accessCnt, dataIdx, capacity uint64
  57. gcPos, gcStartPos []byte
  58. gcArray []*gcItem
  59. hashfunc Hasher
  60. lock sync.Mutex
  61. }
  62. func NewDbStore(path string, hash Hasher, capacity uint64, radius int) (s *DbStore, err error) {
  63. s = new(DbStore)
  64. s.hashfunc = hash
  65. s.db, err = NewLDBDatabase(path)
  66. if err != nil {
  67. return
  68. }
  69. s.setCapacity(capacity)
  70. s.gcStartPos = make([]byte, 1)
  71. s.gcStartPos[0] = kpIndex
  72. s.gcArray = make([]*gcItem, gcArraySize)
  73. data, _ := s.db.Get(keyEntryCnt)
  74. s.entryCnt = BytesToU64(data)
  75. data, _ = s.db.Get(keyAccessCnt)
  76. s.accessCnt = BytesToU64(data)
  77. data, _ = s.db.Get(keyDataIdx)
  78. s.dataIdx = BytesToU64(data)
  79. s.gcPos, _ = s.db.Get(keyGCPos)
  80. if s.gcPos == nil {
  81. s.gcPos = s.gcStartPos
  82. }
  83. return
  84. }
  85. type dpaDBIndex struct {
  86. Idx uint64
  87. Access uint64
  88. }
  89. func BytesToU64(data []byte) uint64 {
  90. if len(data) < 8 {
  91. return 0
  92. }
  93. return binary.LittleEndian.Uint64(data)
  94. }
  95. func U64ToBytes(val uint64) []byte {
  96. data := make([]byte, 8)
  97. binary.LittleEndian.PutUint64(data, val)
  98. return data
  99. }
  100. func getIndexGCValue(index *dpaDBIndex) uint64 {
  101. return index.Access
  102. }
  103. func (s *DbStore) updateIndexAccess(index *dpaDBIndex) {
  104. index.Access = s.accessCnt
  105. }
  106. func getIndexKey(hash Key) []byte {
  107. HashSize := len(hash)
  108. key := make([]byte, HashSize+1)
  109. key[0] = 0
  110. copy(key[1:], hash[:])
  111. return key
  112. }
  113. func getDataKey(idx uint64) []byte {
  114. key := make([]byte, 9)
  115. key[0] = 1
  116. binary.BigEndian.PutUint64(key[1:9], idx)
  117. return key
  118. }
  119. func encodeIndex(index *dpaDBIndex) []byte {
  120. data, _ := rlp.EncodeToBytes(index)
  121. return data
  122. }
  123. func encodeData(chunk *Chunk) []byte {
  124. return chunk.SData
  125. }
  126. func decodeIndex(data []byte, index *dpaDBIndex) {
  127. dec := rlp.NewStream(bytes.NewReader(data), 0)
  128. dec.Decode(index)
  129. }
  130. func decodeData(data []byte, chunk *Chunk) {
  131. chunk.SData = data
  132. chunk.Size = int64(binary.LittleEndian.Uint64(data[0:8]))
  133. }
  134. func gcListPartition(list []*gcItem, left int, right int, pivotIndex int) int {
  135. pivotValue := list[pivotIndex].value
  136. dd := list[pivotIndex]
  137. list[pivotIndex] = list[right]
  138. list[right] = dd
  139. storeIndex := left
  140. for i := left; i < right; i++ {
  141. if list[i].value < pivotValue {
  142. dd = list[storeIndex]
  143. list[storeIndex] = list[i]
  144. list[i] = dd
  145. storeIndex++
  146. }
  147. }
  148. dd = list[storeIndex]
  149. list[storeIndex] = list[right]
  150. list[right] = dd
  151. return storeIndex
  152. }
  153. func gcListSelect(list []*gcItem, left int, right int, n int) int {
  154. if left == right {
  155. return left
  156. }
  157. pivotIndex := (left + right) / 2
  158. pivotIndex = gcListPartition(list, left, right, pivotIndex)
  159. if n == pivotIndex {
  160. return n
  161. } else {
  162. if n < pivotIndex {
  163. return gcListSelect(list, left, pivotIndex-1, n)
  164. } else {
  165. return gcListSelect(list, pivotIndex+1, right, n)
  166. }
  167. }
  168. }
  169. func (s *DbStore) collectGarbage(ratio float32) {
  170. it := s.db.NewIterator()
  171. it.Seek(s.gcPos)
  172. if it.Valid() {
  173. s.gcPos = it.Key()
  174. } else {
  175. s.gcPos = nil
  176. }
  177. gcnt := 0
  178. for (gcnt < gcArraySize) && (uint64(gcnt) < s.entryCnt) {
  179. if (s.gcPos == nil) || (s.gcPos[0] != kpIndex) {
  180. it.Seek(s.gcStartPos)
  181. if it.Valid() {
  182. s.gcPos = it.Key()
  183. } else {
  184. s.gcPos = nil
  185. }
  186. }
  187. if (s.gcPos == nil) || (s.gcPos[0] != kpIndex) {
  188. break
  189. }
  190. gci := new(gcItem)
  191. gci.idxKey = s.gcPos
  192. var index dpaDBIndex
  193. decodeIndex(it.Value(), &index)
  194. gci.idx = index.Idx
  195. // the smaller, the more likely to be gc'd
  196. gci.value = getIndexGCValue(&index)
  197. s.gcArray[gcnt] = gci
  198. gcnt++
  199. it.Next()
  200. if it.Valid() {
  201. s.gcPos = it.Key()
  202. } else {
  203. s.gcPos = nil
  204. }
  205. }
  206. it.Release()
  207. cutidx := gcListSelect(s.gcArray, 0, gcnt-1, int(float32(gcnt)*ratio))
  208. cutval := s.gcArray[cutidx].value
  209. // fmt.Print(gcnt, " ", s.entryCnt, " ")
  210. // actual gc
  211. for i := 0; i < gcnt; i++ {
  212. if s.gcArray[i].value <= cutval {
  213. batch := new(leveldb.Batch)
  214. batch.Delete(s.gcArray[i].idxKey)
  215. batch.Delete(getDataKey(s.gcArray[i].idx))
  216. s.entryCnt--
  217. batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt))
  218. s.db.Write(batch)
  219. }
  220. }
  221. // fmt.Println(s.entryCnt)
  222. s.db.Put(keyGCPos, s.gcPos)
  223. }
  224. func (s *DbStore) Counter() uint64 {
  225. s.lock.Lock()
  226. defer s.lock.Unlock()
  227. return s.dataIdx
  228. }
  229. func (s *DbStore) Put(chunk *Chunk) {
  230. s.lock.Lock()
  231. defer s.lock.Unlock()
  232. ikey := getIndexKey(chunk.Key)
  233. var index dpaDBIndex
  234. if s.tryAccessIdx(ikey, &index) {
  235. if chunk.dbStored != nil {
  236. close(chunk.dbStored)
  237. }
  238. return // already exists, only update access
  239. }
  240. data := encodeData(chunk)
  241. //data := ethutil.Encode([]interface{}{entry})
  242. if s.entryCnt >= s.capacity {
  243. s.collectGarbage(gcArrayFreeRatio)
  244. }
  245. batch := new(leveldb.Batch)
  246. batch.Put(getDataKey(s.dataIdx), data)
  247. index.Idx = s.dataIdx
  248. s.updateIndexAccess(&index)
  249. idata := encodeIndex(&index)
  250. batch.Put(ikey, idata)
  251. batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt))
  252. s.entryCnt++
  253. batch.Put(keyDataIdx, U64ToBytes(s.dataIdx))
  254. s.dataIdx++
  255. batch.Put(keyAccessCnt, U64ToBytes(s.accessCnt))
  256. s.accessCnt++
  257. s.db.Write(batch)
  258. if chunk.dbStored != nil {
  259. close(chunk.dbStored)
  260. }
  261. glog.V(logger.Detail).Infof("DbStore.Put: %v. db storage counter: %v ", chunk.Key.Log(), s.dataIdx)
  262. }
  263. // try to find index; if found, update access cnt and return true
  264. func (s *DbStore) tryAccessIdx(ikey []byte, index *dpaDBIndex) bool {
  265. idata, err := s.db.Get(ikey)
  266. if err != nil {
  267. return false
  268. }
  269. decodeIndex(idata, index)
  270. batch := new(leveldb.Batch)
  271. batch.Put(keyAccessCnt, U64ToBytes(s.accessCnt))
  272. s.accessCnt++
  273. s.updateIndexAccess(index)
  274. idata = encodeIndex(index)
  275. batch.Put(ikey, idata)
  276. s.db.Write(batch)
  277. return true
  278. }
  279. func (s *DbStore) Get(key Key) (chunk *Chunk, err error) {
  280. s.lock.Lock()
  281. defer s.lock.Unlock()
  282. var index dpaDBIndex
  283. if s.tryAccessIdx(getIndexKey(key), &index) {
  284. var data []byte
  285. data, err = s.db.Get(getDataKey(index.Idx))
  286. if err != nil {
  287. return
  288. }
  289. hasher := s.hashfunc()
  290. hasher.Write(data)
  291. hash := hasher.Sum(nil)
  292. if bytes.Compare(hash, key) != 0 {
  293. s.db.Delete(getDataKey(index.Idx))
  294. err = fmt.Errorf("invalid chunk. hash=%x, key=%v", hash, key[:])
  295. return
  296. }
  297. chunk = &Chunk{
  298. Key: key,
  299. }
  300. decodeData(data, chunk)
  301. } else {
  302. err = notFound
  303. }
  304. return
  305. }
  306. func (s *DbStore) updateAccessCnt(key Key) {
  307. s.lock.Lock()
  308. defer s.lock.Unlock()
  309. var index dpaDBIndex
  310. s.tryAccessIdx(getIndexKey(key), &index) // result_chn == nil, only update access cnt
  311. }
  312. func (s *DbStore) setCapacity(c uint64) {
  313. s.lock.Lock()
  314. defer s.lock.Unlock()
  315. s.capacity = c
  316. if s.entryCnt > c {
  317. var ratio float32
  318. ratio = float32(1.01) - float32(c)/float32(s.entryCnt)
  319. if ratio < gcArrayFreeRatio {
  320. ratio = gcArrayFreeRatio
  321. }
  322. if ratio > 1 {
  323. ratio = 1
  324. }
  325. for s.entryCnt > c {
  326. s.collectGarbage(ratio)
  327. }
  328. }
  329. }
  330. func (s *DbStore) getEntryCnt() uint64 {
  331. return s.entryCnt
  332. }
  333. func (s *DbStore) close() {
  334. s.db.Close()
  335. }
  336. // describes a section of the DbStore representing the unsynced
  337. // domain relevant to a peer
  338. // Start - Stop designate a continuous area Keys in an address space
  339. // typically the addresses closer to us than to the peer but not closer
  340. // another closer peer in between
  341. // From - To designates a time interval typically from the last disconnect
  342. // till the latest connection (real time traffic is relayed)
  343. type DbSyncState struct {
  344. Start, Stop Key
  345. First, Last uint64
  346. }
  347. // implements the syncer iterator interface
  348. // iterates by storage index (~ time of storage = first entry to db)
  349. type dbSyncIterator struct {
  350. it iterator.Iterator
  351. DbSyncState
  352. }
  353. // initialises a sync iterator from a syncToken (passed in with the handshake)
  354. func (self *DbStore) NewSyncIterator(state DbSyncState) (si *dbSyncIterator, err error) {
  355. if state.First > state.Last {
  356. return nil, fmt.Errorf("no entries found")
  357. }
  358. si = &dbSyncIterator{
  359. it: self.db.NewIterator(),
  360. DbSyncState: state,
  361. }
  362. si.it.Seek(getIndexKey(state.Start))
  363. return si, nil
  364. }
  365. // walk the area from Start to Stop and returns items within time interval
  366. // First to Last
  367. func (self *dbSyncIterator) Next() (key Key) {
  368. for self.it.Valid() {
  369. dbkey := self.it.Key()
  370. if dbkey[0] != 0 {
  371. break
  372. }
  373. key = Key(make([]byte, len(dbkey)-1))
  374. copy(key[:], dbkey[1:])
  375. if bytes.Compare(key[:], self.Start) <= 0 {
  376. self.it.Next()
  377. continue
  378. }
  379. if bytes.Compare(key[:], self.Stop) > 0 {
  380. break
  381. }
  382. var index dpaDBIndex
  383. decodeIndex(self.it.Value(), &index)
  384. self.it.Next()
  385. if (index.Idx >= self.First) && (index.Idx < self.Last) {
  386. return
  387. }
  388. }
  389. self.it.Release()
  390. return nil
  391. }