syncdb.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package network
  17. import (
  18. "encoding/binary"
  19. "fmt"
  20. "github.com/ethereum/go-ethereum/logger"
  21. "github.com/ethereum/go-ethereum/logger/glog"
  22. "github.com/ethereum/go-ethereum/swarm/storage"
  23. "github.com/syndtr/goleveldb/leveldb"
  24. "github.com/syndtr/goleveldb/leveldb/iterator"
  25. )
  26. const counterKeyPrefix = 0x01
  27. /*
  28. syncDb is a queueing service for outgoing deliveries.
  29. One instance per priority queue for each peer
  30. a syncDb instance maintains an in-memory buffer (of capacity bufferSize)
  31. once its in-memory buffer is full it switches to persisting in db
  32. and dbRead iterator iterates through the items keeping their order
  33. once the db read catches up (there is no more items in the db) then
  34. it switches back to in-memory buffer.
  35. when syncdb is stopped all items in the buffer are saved to the db
  36. */
  37. type syncDb struct {
  38. start []byte // this syncdb starting index in requestdb
  39. key storage.Key // remote peers address key
  40. counterKey []byte // db key to persist counter
  41. priority uint // priotity High|Medium|Low
  42. buffer chan interface{} // incoming request channel
  43. db *storage.LDBDatabase // underlying db (TODO should be interface)
  44. done chan bool // chan to signal goroutines finished quitting
  45. quit chan bool // chan to signal quitting to goroutines
  46. total, dbTotal int // counts for one session
  47. batch chan chan int // channel for batch requests
  48. dbBatchSize uint // number of items before batch is saved
  49. }
  50. // constructor needs a shared request db (leveldb)
  51. // priority is used in the index key
  52. // uses a buffer and a leveldb for persistent storage
  53. // bufferSize, dbBatchSize are config parameters
  54. func newSyncDb(db *storage.LDBDatabase, key storage.Key, priority uint, bufferSize, dbBatchSize uint, deliver func(interface{}, chan bool) bool) *syncDb {
  55. start := make([]byte, 42)
  56. start[1] = byte(priorities - priority)
  57. copy(start[2:34], key)
  58. counterKey := make([]byte, 34)
  59. counterKey[0] = counterKeyPrefix
  60. copy(counterKey[1:], start[1:34])
  61. syncdb := &syncDb{
  62. start: start,
  63. key: key,
  64. counterKey: counterKey,
  65. priority: priority,
  66. buffer: make(chan interface{}, bufferSize),
  67. db: db,
  68. done: make(chan bool),
  69. quit: make(chan bool),
  70. batch: make(chan chan int),
  71. dbBatchSize: dbBatchSize,
  72. }
  73. glog.V(logger.Detail).Infof("syncDb[peer: %v, priority: %v] - initialised", key.Log(), priority)
  74. // starts the main forever loop reading from buffer
  75. go syncdb.bufferRead(deliver)
  76. return syncdb
  77. }
  78. /*
  79. bufferRead is a forever iterator loop that takes care of delivering
  80. outgoing store requests reads from incoming buffer
  81. its argument is the deliver function taking the item as first argument
  82. and a quit channel as second.
  83. Closing of this channel is supposed to abort all waiting for delivery
  84. (typically network write)
  85. The iteration switches between 2 modes,
  86. * buffer mode reads the in-memory buffer and delivers the items directly
  87. * db mode reads from the buffer and writes to the db, parallelly another
  88. routine is started that reads from the db and delivers items
  89. If there is buffer contention in buffer mode (slow network, high upload volume)
  90. syncdb switches to db mode and starts dbRead
  91. Once db backlog is delivered, it reverts back to in-memory buffer
  92. It is automatically started when syncdb is initialised.
  93. It saves the buffer to db upon receiving quit signal. syncDb#stop()
  94. */
  95. func (self *syncDb) bufferRead(deliver func(interface{}, chan bool) bool) {
  96. var buffer, db chan interface{} // channels representing the two read modes
  97. var more bool
  98. var req interface{}
  99. var entry *syncDbEntry
  100. var inBatch, inDb int
  101. batch := new(leveldb.Batch)
  102. var dbSize chan int
  103. quit := self.quit
  104. counterValue := make([]byte, 8)
  105. // counter is used for keeping the items in order, persisted to db
  106. // start counter where db was at, 0 if not found
  107. data, err := self.db.Get(self.counterKey)
  108. var counter uint64
  109. if err == nil {
  110. counter = binary.BigEndian.Uint64(data)
  111. glog.V(logger.Detail).Infof("syncDb[%v/%v] - counter read from db at %v", self.key.Log(), self.priority, counter)
  112. } else {
  113. glog.V(logger.Detail).Infof("syncDb[%v/%v] - counter starts at %v", self.key.Log(), self.priority, counter)
  114. }
  115. LOOP:
  116. for {
  117. // waiting for item next in the buffer, or quit signal or batch request
  118. select {
  119. // buffer only closes when writing to db
  120. case req = <-buffer:
  121. // deliver request : this is blocking on network write so
  122. // it is passed the quit channel as argument, so that it returns
  123. // if syncdb is stopped. In this case we need to save the item to the db
  124. more = deliver(req, self.quit)
  125. if !more {
  126. glog.V(logger.Debug).Infof("syncDb[%v/%v] quit: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total)
  127. // received quit signal, save request currently waiting delivery
  128. // by switching to db mode and closing the buffer
  129. buffer = nil
  130. db = self.buffer
  131. close(db)
  132. quit = nil // needs to block the quit case in select
  133. break // break from select, this item will be written to the db
  134. }
  135. self.total++
  136. glog.V(logger.Detail).Infof("syncDb[%v/%v] deliver (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total)
  137. // by the time deliver returns, there were new writes to the buffer
  138. // if buffer contention is detected, switch to db mode which drains
  139. // the buffer so no process will block on pushing store requests
  140. if len(buffer) == cap(buffer) {
  141. glog.V(logger.Debug).Infof("syncDb[%v/%v] buffer full %v: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, cap(buffer), self.dbTotal, self.total)
  142. buffer = nil
  143. db = self.buffer
  144. }
  145. continue LOOP
  146. // incoming entry to put into db
  147. case req, more = <-db:
  148. if !more {
  149. // only if quit is called, saved all the buffer
  150. binary.BigEndian.PutUint64(counterValue, counter)
  151. batch.Put(self.counterKey, counterValue) // persist counter in batch
  152. self.writeSyncBatch(batch) // save batch
  153. glog.V(logger.Detail).Infof("syncDb[%v/%v] quitting: save current batch to db", self.key.Log(), self.priority)
  154. break LOOP
  155. }
  156. self.dbTotal++
  157. self.total++
  158. // otherwise break after select
  159. case dbSize = <-self.batch:
  160. // explicit request for batch
  161. if inBatch == 0 && quit != nil {
  162. // there was no writes since the last batch so db depleted
  163. // switch to buffer mode
  164. glog.V(logger.Debug).Infof("syncDb[%v/%v] empty db: switching to buffer", self.key.Log(), self.priority)
  165. db = nil
  166. buffer = self.buffer
  167. dbSize <- 0 // indicates to 'caller' that batch has been written
  168. inDb = 0
  169. continue LOOP
  170. }
  171. binary.BigEndian.PutUint64(counterValue, counter)
  172. batch.Put(self.counterKey, counterValue)
  173. glog.V(logger.Debug).Infof("syncDb[%v/%v] write batch %v/%v - %x - %x", self.key.Log(), self.priority, inBatch, counter, self.counterKey, counterValue)
  174. batch = self.writeSyncBatch(batch)
  175. dbSize <- inBatch // indicates to 'caller' that batch has been written
  176. inBatch = 0
  177. continue LOOP
  178. // closing syncDb#quit channel is used to signal to all goroutines to quit
  179. case <-quit:
  180. // need to save backlog, so switch to db mode
  181. db = self.buffer
  182. buffer = nil
  183. quit = nil
  184. glog.V(logger.Detail).Infof("syncDb[%v/%v] quitting: save buffer to db", self.key.Log(), self.priority)
  185. close(db)
  186. continue LOOP
  187. }
  188. // only get here if we put req into db
  189. entry, err = self.newSyncDbEntry(req, counter)
  190. if err != nil {
  191. glog.V(logger.Warn).Infof("syncDb[%v/%v] saving request %v (#%v/%v) failed: %v", self.key.Log(), self.priority, req, inBatch, inDb, err)
  192. continue LOOP
  193. }
  194. batch.Put(entry.key, entry.val)
  195. glog.V(logger.Detail).Infof("syncDb[%v/%v] to batch %v '%v' (#%v/%v/%v)", self.key.Log(), self.priority, req, entry, inBatch, inDb, counter)
  196. // if just switched to db mode and not quitting, then launch dbRead
  197. // in a parallel go routine to send deliveries from db
  198. if inDb == 0 && quit != nil {
  199. glog.V(logger.Detail).Infof("syncDb[%v/%v] start dbRead")
  200. go self.dbRead(true, counter, deliver)
  201. }
  202. inDb++
  203. inBatch++
  204. counter++
  205. // need to save the batch if it gets too large (== dbBatchSize)
  206. if inBatch%int(self.dbBatchSize) == 0 {
  207. batch = self.writeSyncBatch(batch)
  208. }
  209. }
  210. glog.V(logger.Info).Infof("syncDb[%v:%v]: saved %v keys (saved counter at %v)", self.key.Log(), self.priority, inBatch, counter)
  211. close(self.done)
  212. }
  213. // writes the batch to the db and returns a new batch object
  214. func (self *syncDb) writeSyncBatch(batch *leveldb.Batch) *leveldb.Batch {
  215. err := self.db.Write(batch)
  216. if err != nil {
  217. glog.V(logger.Warn).Infof("syncDb[%v/%v] saving batch to db failed: %v", self.key.Log(), self.priority, err)
  218. return batch
  219. }
  220. return new(leveldb.Batch)
  221. }
  222. // abstract type for db entries (TODO could be a feature of Receipts)
  223. type syncDbEntry struct {
  224. key, val []byte
  225. }
  226. func (self syncDbEntry) String() string {
  227. return fmt.Sprintf("key: %x, value: %x", self.key, self.val)
  228. }
  229. /*
  230. dbRead is iterating over store requests to be sent over to the peer
  231. this is mainly to prevent crashes due to network output buffer contention (???)
  232. as well as to make syncronisation resilient to disconnects
  233. the messages are supposed to be sent in the p2p priority queue.
  234. the request DB is shared between peers, but domains for each syncdb
  235. are disjoint. dbkeys (42 bytes) are structured:
  236. * 0: 0x00 (0x01 reserved for counter key)
  237. * 1: priorities - priority (so that high priority can be replayed first)
  238. * 2-33: peers address
  239. * 34-41: syncdb counter to preserve order (this field is missing for the counter key)
  240. values (40 bytes) are:
  241. * 0-31: key
  242. * 32-39: request id
  243. dbRead needs a boolean to indicate if on first round all the historical
  244. record is synced. Second argument to indicate current db counter
  245. The third is the function to apply
  246. */
  247. func (self *syncDb) dbRead(useBatches bool, counter uint64, fun func(interface{}, chan bool) bool) {
  248. key := make([]byte, 42)
  249. copy(key, self.start)
  250. binary.BigEndian.PutUint64(key[34:], counter)
  251. var batches, n, cnt, total int
  252. var more bool
  253. var entry *syncDbEntry
  254. var it iterator.Iterator
  255. var del *leveldb.Batch
  256. batchSizes := make(chan int)
  257. for {
  258. // if useBatches is false, cnt is not set
  259. if useBatches {
  260. // this could be called before all cnt items sent out
  261. // so that loop is not blocking while delivering
  262. // only relevant if cnt is large
  263. select {
  264. case self.batch <- batchSizes:
  265. case <-self.quit:
  266. return
  267. }
  268. // wait for the write to finish and get the item count in the next batch
  269. cnt = <-batchSizes
  270. batches++
  271. if cnt == 0 {
  272. // empty
  273. return
  274. }
  275. }
  276. it = self.db.NewIterator()
  277. it.Seek(key)
  278. if !it.Valid() {
  279. copy(key, self.start)
  280. useBatches = true
  281. continue
  282. }
  283. del = new(leveldb.Batch)
  284. glog.V(logger.Detail).Infof("syncDb[%v/%v]: new iterator: %x (batch %v, count %v)", self.key.Log(), self.priority, key, batches, cnt)
  285. for n = 0; !useBatches || n < cnt; it.Next() {
  286. copy(key, it.Key())
  287. if len(key) == 0 || key[0] != 0 {
  288. copy(key, self.start)
  289. useBatches = true
  290. break
  291. }
  292. val := make([]byte, 40)
  293. copy(val, it.Value())
  294. entry = &syncDbEntry{key, val}
  295. // glog.V(logger.Detail).Infof("syncDb[%v/%v] - %v, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, self.key.Log(), batches, total, self.dbTotal, self.total)
  296. more = fun(entry, self.quit)
  297. if !more {
  298. // quit received when waiting to deliver entry, the entry will not be deleted
  299. glog.V(logger.Detail).Infof("syncDb[%v/%v] batch %v quit after %v/%v items", self.key.Log(), self.priority, batches, n, cnt)
  300. break
  301. }
  302. // since subsequent batches of the same db session are indexed incrementally
  303. // deleting earlier batches can be delayed and parallelised
  304. // this could be batch delete when db is idle (but added complexity esp when quitting)
  305. del.Delete(key)
  306. n++
  307. total++
  308. }
  309. glog.V(logger.Debug).Infof("syncDb[%v/%v] - db session closed, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, batches, total, self.dbTotal, self.total)
  310. self.db.Write(del) // this could be async called only when db is idle
  311. it.Release()
  312. }
  313. }
  314. //
  315. func (self *syncDb) stop() {
  316. close(self.quit)
  317. <-self.done
  318. }
  319. // calculate a dbkey for the request, for the db to work
  320. // see syncdb for db key structure
  321. // polimorphic: accepted types, see syncer#addRequest
  322. func (self *syncDb) newSyncDbEntry(req interface{}, counter uint64) (entry *syncDbEntry, err error) {
  323. var key storage.Key
  324. var chunk *storage.Chunk
  325. var id uint64
  326. var ok bool
  327. var sreq *storeRequestMsgData
  328. if key, ok = req.(storage.Key); ok {
  329. id = generateId()
  330. } else if chunk, ok = req.(*storage.Chunk); ok {
  331. key = chunk.Key
  332. id = generateId()
  333. } else if sreq, ok = req.(*storeRequestMsgData); ok {
  334. key = sreq.Key
  335. id = sreq.Id
  336. } else if entry, ok = req.(*syncDbEntry); !ok {
  337. return nil, fmt.Errorf("type not allowed: %v (%T)", req, req)
  338. }
  339. // order by peer > priority > seqid
  340. // value is request id if exists
  341. if entry == nil {
  342. dbkey := make([]byte, 42)
  343. dbval := make([]byte, 40)
  344. // encode key
  345. copy(dbkey[:], self.start[:34]) // db peer
  346. binary.BigEndian.PutUint64(dbkey[34:], counter)
  347. // encode value
  348. copy(dbval, key[:])
  349. binary.BigEndian.PutUint64(dbval[32:], id)
  350. entry = &syncDbEntry{dbkey, dbval}
  351. }
  352. return
  353. }