syncdb_test.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package network
  17. import (
  18. "bytes"
  19. "io/ioutil"
  20. "os"
  21. "path/filepath"
  22. "testing"
  23. "time"
  24. "github.com/ethereum/go-ethereum/crypto"
  25. "github.com/ethereum/go-ethereum/logger"
  26. "github.com/ethereum/go-ethereum/logger/glog"
  27. "github.com/ethereum/go-ethereum/swarm/storage"
  28. )
  29. func init() {
  30. glog.SetV(0)
  31. glog.SetToStderr(true)
  32. }
  33. type testSyncDb struct {
  34. *syncDb
  35. c int
  36. t *testing.T
  37. fromDb chan bool
  38. delivered [][]byte
  39. sent []int
  40. dbdir string
  41. at int
  42. }
  43. func newTestSyncDb(priority, bufferSize, batchSize int, dbdir string, t *testing.T) *testSyncDb {
  44. if len(dbdir) == 0 {
  45. tmp, err := ioutil.TempDir(os.TempDir(), "syncdb-test")
  46. if err != nil {
  47. t.Fatalf("unable to create temporary direcory %v: %v", tmp, err)
  48. }
  49. dbdir = tmp
  50. }
  51. db, err := storage.NewLDBDatabase(filepath.Join(dbdir, "requestdb"))
  52. if err != nil {
  53. t.Fatalf("unable to create db: %v", err)
  54. }
  55. self := &testSyncDb{
  56. fromDb: make(chan bool),
  57. dbdir: dbdir,
  58. t: t,
  59. }
  60. h := crypto.Sha3Hash([]byte{0})
  61. key := storage.Key(h[:])
  62. self.syncDb = newSyncDb(db, key, uint(priority), uint(bufferSize), uint(batchSize), self.deliver)
  63. // kick off db iterator right away, if no items on db this will allow
  64. // reading from the buffer
  65. return self
  66. }
  67. func (self *testSyncDb) close() {
  68. self.db.Close()
  69. os.RemoveAll(self.dbdir)
  70. }
  71. func (self *testSyncDb) push(n int) {
  72. for i := 0; i < n; i++ {
  73. self.buffer <- storage.Key(crypto.Sha3([]byte{byte(self.c)}))
  74. self.sent = append(self.sent, self.c)
  75. self.c++
  76. }
  77. glog.V(logger.Debug).Infof("pushed %v requests", n)
  78. }
  79. func (self *testSyncDb) draindb() {
  80. it := self.db.NewIterator()
  81. defer it.Release()
  82. for {
  83. it.Seek(self.start)
  84. if !it.Valid() {
  85. return
  86. }
  87. k := it.Key()
  88. if len(k) == 0 || k[0] == 1 {
  89. return
  90. }
  91. it.Release()
  92. it = self.db.NewIterator()
  93. }
  94. }
  95. func (self *testSyncDb) deliver(req interface{}, quit chan bool) bool {
  96. _, db := req.(*syncDbEntry)
  97. key, _, _, _, err := parseRequest(req)
  98. if err != nil {
  99. self.t.Fatalf("unexpected error of key %v: %v", key, err)
  100. }
  101. self.delivered = append(self.delivered, key)
  102. select {
  103. case self.fromDb <- db:
  104. return true
  105. case <-quit:
  106. return false
  107. }
  108. }
  109. func (self *testSyncDb) expect(n int, db bool) {
  110. var ok bool
  111. // for n items
  112. for i := 0; i < n; i++ {
  113. ok = <-self.fromDb
  114. if self.at+1 > len(self.delivered) {
  115. self.t.Fatalf("expected %v, got %v", self.at+1, len(self.delivered))
  116. }
  117. if len(self.sent) > self.at && !bytes.Equal(crypto.Sha3([]byte{byte(self.sent[self.at])}), self.delivered[self.at]) {
  118. self.t.Fatalf("expected delivery %v/%v/%v to be hash of %v, from db: %v = %v", i, n, self.at, self.sent[self.at], ok, db)
  119. glog.V(logger.Debug).Infof("%v/%v/%v to be hash of %v, from db: %v = %v", i, n, self.at, self.sent[self.at], ok, db)
  120. }
  121. if !ok && db {
  122. self.t.Fatalf("expected delivery %v/%v/%v from db", i, n, self.at)
  123. }
  124. if ok && !db {
  125. self.t.Fatalf("expected delivery %v/%v/%v from cache", i, n, self.at)
  126. }
  127. self.at++
  128. }
  129. }
  130. func TestSyncDb(t *testing.T) {
  131. priority := High
  132. bufferSize := 5
  133. batchSize := 2 * bufferSize
  134. s := newTestSyncDb(priority, bufferSize, batchSize, "", t)
  135. defer s.close()
  136. defer s.stop()
  137. s.dbRead(false, 0, s.deliver)
  138. s.draindb()
  139. s.push(4)
  140. s.expect(1, false)
  141. // 3 in buffer
  142. time.Sleep(100 * time.Millisecond)
  143. s.push(3)
  144. // push over limit
  145. s.expect(1, false)
  146. // one popped from the buffer, then contention detected
  147. s.expect(4, true)
  148. s.push(4)
  149. s.expect(5, true)
  150. // depleted db, switch back to buffer
  151. s.draindb()
  152. s.push(5)
  153. s.expect(4, false)
  154. s.push(3)
  155. s.expect(4, false)
  156. // buffer depleted
  157. time.Sleep(100 * time.Millisecond)
  158. s.push(6)
  159. s.expect(1, false)
  160. // push into buffer full, switch to db
  161. s.expect(5, true)
  162. s.draindb()
  163. s.push(1)
  164. s.expect(1, false)
  165. }
  166. func TestSaveSyncDb(t *testing.T) {
  167. amount := 30
  168. priority := High
  169. bufferSize := amount
  170. batchSize := 10
  171. s := newTestSyncDb(priority, bufferSize, batchSize, "", t)
  172. go s.dbRead(false, 0, s.deliver)
  173. s.push(amount)
  174. s.stop()
  175. s.db.Close()
  176. s = newTestSyncDb(priority, bufferSize, batchSize, s.dbdir, t)
  177. go s.dbRead(false, 0, s.deliver)
  178. s.expect(amount, true)
  179. for i, key := range s.delivered {
  180. expKey := crypto.Sha3([]byte{byte(i)})
  181. if !bytes.Equal(key, expKey) {
  182. t.Fatalf("delivery %v expected to be key %x, got %x", i, expKey, key)
  183. }
  184. }
  185. s.push(amount)
  186. s.expect(amount, false)
  187. for i := amount; i < 2*amount; i++ {
  188. key := s.delivered[i]
  189. expKey := crypto.Sha3([]byte{byte(i - amount)})
  190. if !bytes.Equal(key, expKey) {
  191. t.Fatalf("delivery %v expected to be key %x, got %x", i, expKey, key)
  192. }
  193. }
  194. s.stop()
  195. s.db.Close()
  196. s = newTestSyncDb(priority, bufferSize, batchSize, s.dbdir, t)
  197. defer s.close()
  198. defer s.stop()
  199. go s.dbRead(false, 0, s.deliver)
  200. s.push(1)
  201. s.expect(1, false)
  202. }