queue_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package downloader
  17. import (
  18. "fmt"
  19. "math/big"
  20. "math/rand"
  21. "sync"
  22. "testing"
  23. "time"
  24. "github.com/ethereum/go-ethereum/common"
  25. "github.com/ethereum/go-ethereum/consensus/ethash"
  26. "github.com/ethereum/go-ethereum/core"
  27. "github.com/ethereum/go-ethereum/core/types"
  28. "github.com/ethereum/go-ethereum/log"
  29. "github.com/ethereum/go-ethereum/params"
  30. )
  31. // makeChain creates a chain of n blocks starting at and including parent.
  32. // the returned hash chain is ordered head->parent. In addition, every 3rd block
  33. // contains a transaction and every 5th an uncle to allow testing correct block
  34. // reassembly.
  35. func makeChain(n int, seed byte, parent *types.Block, empty bool) ([]*types.Block, []types.Receipts) {
  36. blocks, receipts := core.GenerateChain(params.TestChainConfig, parent, ethash.NewFaker(), testDB, n, func(i int, block *core.BlockGen) {
  37. block.SetCoinbase(common.Address{seed})
  38. // Add one tx to every secondblock
  39. if !empty && i%2 == 0 {
  40. signer := types.MakeSigner(params.TestChainConfig, block.Number())
  41. tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, block.BaseFee(), nil), signer, testKey)
  42. if err != nil {
  43. panic(err)
  44. }
  45. block.AddTx(tx)
  46. }
  47. })
  48. return blocks, receipts
  49. }
  50. type chainData struct {
  51. blocks []*types.Block
  52. offset int
  53. }
  54. var chain *chainData
  55. var emptyChain *chainData
  56. func init() {
  57. // Create a chain of blocks to import
  58. targetBlocks := 128
  59. blocks, _ := makeChain(targetBlocks, 0, testGenesis, false)
  60. chain = &chainData{blocks, 0}
  61. blocks, _ = makeChain(targetBlocks, 0, testGenesis, true)
  62. emptyChain = &chainData{blocks, 0}
  63. }
  64. func (chain *chainData) headers() []*types.Header {
  65. hdrs := make([]*types.Header, len(chain.blocks))
  66. for i, b := range chain.blocks {
  67. hdrs[i] = b.Header()
  68. }
  69. return hdrs
  70. }
  71. func (chain *chainData) Len() int {
  72. return len(chain.blocks)
  73. }
  74. func dummyPeer(id string) *peerConnection {
  75. p := &peerConnection{
  76. id: id,
  77. lacking: make(map[common.Hash]struct{}),
  78. }
  79. return p
  80. }
  81. func TestBasics(t *testing.T) {
  82. numOfBlocks := len(emptyChain.blocks)
  83. numOfReceipts := len(emptyChain.blocks) / 2
  84. q := newQueue(10, 10)
  85. if !q.Idle() {
  86. t.Errorf("new queue should be idle")
  87. }
  88. q.Prepare(1, FastSync)
  89. if res := q.Results(false); len(res) != 0 {
  90. t.Fatal("new queue should have 0 results")
  91. }
  92. // Schedule a batch of headers
  93. q.Schedule(chain.headers(), 1)
  94. if q.Idle() {
  95. t.Errorf("queue should not be idle")
  96. }
  97. if got, exp := q.PendingBlocks(), chain.Len(); got != exp {
  98. t.Errorf("wrong pending block count, got %d, exp %d", got, exp)
  99. }
  100. // Only non-empty receipts get added to task-queue
  101. if got, exp := q.PendingReceipts(), 64; got != exp {
  102. t.Errorf("wrong pending receipt count, got %d, exp %d", got, exp)
  103. }
  104. // Items are now queued for downloading, next step is that we tell the
  105. // queue that a certain peer will deliver them for us
  106. {
  107. peer := dummyPeer("peer-1")
  108. fetchReq, _, throttle := q.ReserveBodies(peer, 50)
  109. if !throttle {
  110. // queue size is only 10, so throttling should occur
  111. t.Fatal("should throttle")
  112. }
  113. // But we should still get the first things to fetch
  114. if got, exp := len(fetchReq.Headers), 5; got != exp {
  115. t.Fatalf("expected %d requests, got %d", exp, got)
  116. }
  117. if got, exp := fetchReq.Headers[0].Number.Uint64(), uint64(1); got != exp {
  118. t.Fatalf("expected header %d, got %d", exp, got)
  119. }
  120. }
  121. if exp, got := q.blockTaskQueue.Size(), numOfBlocks-10; exp != got {
  122. t.Errorf("expected block task queue to be %d, got %d", exp, got)
  123. }
  124. if exp, got := q.receiptTaskQueue.Size(), numOfReceipts; exp != got {
  125. t.Errorf("expected receipt task queue to be %d, got %d", exp, got)
  126. }
  127. {
  128. peer := dummyPeer("peer-2")
  129. fetchReq, _, throttle := q.ReserveBodies(peer, 50)
  130. // The second peer should hit throttling
  131. if !throttle {
  132. t.Fatalf("should throttle")
  133. }
  134. // And not get any fetches at all, since it was throttled to begin with
  135. if fetchReq != nil {
  136. t.Fatalf("should have no fetches, got %d", len(fetchReq.Headers))
  137. }
  138. }
  139. if exp, got := q.blockTaskQueue.Size(), numOfBlocks-10; exp != got {
  140. t.Errorf("expected block task queue to be %d, got %d", exp, got)
  141. }
  142. if exp, got := q.receiptTaskQueue.Size(), numOfReceipts; exp != got {
  143. t.Errorf("expected receipt task queue to be %d, got %d", exp, got)
  144. }
  145. {
  146. // The receipt delivering peer should not be affected
  147. // by the throttling of body deliveries
  148. peer := dummyPeer("peer-3")
  149. fetchReq, _, throttle := q.ReserveReceipts(peer, 50)
  150. if !throttle {
  151. // queue size is only 10, so throttling should occur
  152. t.Fatal("should throttle")
  153. }
  154. // But we should still get the first things to fetch
  155. if got, exp := len(fetchReq.Headers), 5; got != exp {
  156. t.Fatalf("expected %d requests, got %d", exp, got)
  157. }
  158. if got, exp := fetchReq.Headers[0].Number.Uint64(), uint64(1); got != exp {
  159. t.Fatalf("expected header %d, got %d", exp, got)
  160. }
  161. }
  162. if exp, got := q.blockTaskQueue.Size(), numOfBlocks-10; exp != got {
  163. t.Errorf("expected block task queue to be %d, got %d", exp, got)
  164. }
  165. if exp, got := q.receiptTaskQueue.Size(), numOfReceipts-5; exp != got {
  166. t.Errorf("expected receipt task queue to be %d, got %d", exp, got)
  167. }
  168. if got, exp := q.resultCache.countCompleted(), 0; got != exp {
  169. t.Errorf("wrong processable count, got %d, exp %d", got, exp)
  170. }
  171. }
  172. func TestEmptyBlocks(t *testing.T) {
  173. numOfBlocks := len(emptyChain.blocks)
  174. q := newQueue(10, 10)
  175. q.Prepare(1, FastSync)
  176. // Schedule a batch of headers
  177. q.Schedule(emptyChain.headers(), 1)
  178. if q.Idle() {
  179. t.Errorf("queue should not be idle")
  180. }
  181. if got, exp := q.PendingBlocks(), len(emptyChain.blocks); got != exp {
  182. t.Errorf("wrong pending block count, got %d, exp %d", got, exp)
  183. }
  184. if got, exp := q.PendingReceipts(), 0; got != exp {
  185. t.Errorf("wrong pending receipt count, got %d, exp %d", got, exp)
  186. }
  187. // They won't be processable, because the fetchresults haven't been
  188. // created yet
  189. if got, exp := q.resultCache.countCompleted(), 0; got != exp {
  190. t.Errorf("wrong processable count, got %d, exp %d", got, exp)
  191. }
  192. // Items are now queued for downloading, next step is that we tell the
  193. // queue that a certain peer will deliver them for us
  194. // That should trigger all of them to suddenly become 'done'
  195. {
  196. // Reserve blocks
  197. peer := dummyPeer("peer-1")
  198. fetchReq, _, _ := q.ReserveBodies(peer, 50)
  199. // there should be nothing to fetch, blocks are empty
  200. if fetchReq != nil {
  201. t.Fatal("there should be no body fetch tasks remaining")
  202. }
  203. }
  204. if q.blockTaskQueue.Size() != numOfBlocks-10 {
  205. t.Errorf("expected block task queue to be %d, got %d", numOfBlocks-10, q.blockTaskQueue.Size())
  206. }
  207. if q.receiptTaskQueue.Size() != 0 {
  208. t.Errorf("expected receipt task queue to be %d, got %d", 0, q.receiptTaskQueue.Size())
  209. }
  210. {
  211. peer := dummyPeer("peer-3")
  212. fetchReq, _, _ := q.ReserveReceipts(peer, 50)
  213. // there should be nothing to fetch, blocks are empty
  214. if fetchReq != nil {
  215. t.Fatal("there should be no receipt fetch tasks remaining")
  216. }
  217. }
  218. if q.blockTaskQueue.Size() != numOfBlocks-10 {
  219. t.Errorf("expected block task queue to be %d, got %d", numOfBlocks-10, q.blockTaskQueue.Size())
  220. }
  221. if q.receiptTaskQueue.Size() != 0 {
  222. t.Errorf("expected receipt task queue to be %d, got %d", 0, q.receiptTaskQueue.Size())
  223. }
  224. if got, exp := q.resultCache.countCompleted(), 10; got != exp {
  225. t.Errorf("wrong processable count, got %d, exp %d", got, exp)
  226. }
  227. }
  228. // XTestDelivery does some more extensive testing of events that happen,
  229. // blocks that become known and peers that make reservations and deliveries.
  230. // disabled since it's not really a unit-test, but can be executed to test
  231. // some more advanced scenarios
  232. func XTestDelivery(t *testing.T) {
  233. // the outside network, holding blocks
  234. blo, rec := makeChain(128, 0, testGenesis, false)
  235. world := newNetwork()
  236. world.receipts = rec
  237. world.chain = blo
  238. world.progress(10)
  239. if false {
  240. log.Root().SetHandler(log.StdoutHandler)
  241. }
  242. q := newQueue(10, 10)
  243. var wg sync.WaitGroup
  244. q.Prepare(1, FastSync)
  245. wg.Add(1)
  246. go func() {
  247. // deliver headers
  248. defer wg.Done()
  249. c := 1
  250. for {
  251. //fmt.Printf("getting headers from %d\n", c)
  252. hdrs := world.headers(c)
  253. l := len(hdrs)
  254. //fmt.Printf("scheduling %d headers, first %d last %d\n",
  255. // l, hdrs[0].Number.Uint64(), hdrs[len(hdrs)-1].Number.Uint64())
  256. q.Schedule(hdrs, uint64(c))
  257. c += l
  258. }
  259. }()
  260. wg.Add(1)
  261. go func() {
  262. // collect results
  263. defer wg.Done()
  264. tot := 0
  265. for {
  266. res := q.Results(true)
  267. tot += len(res)
  268. fmt.Printf("got %d results, %d tot\n", len(res), tot)
  269. // Now we can forget about these
  270. world.forget(res[len(res)-1].Header.Number.Uint64())
  271. }
  272. }()
  273. wg.Add(1)
  274. go func() {
  275. defer wg.Done()
  276. // reserve body fetch
  277. i := 4
  278. for {
  279. peer := dummyPeer(fmt.Sprintf("peer-%d", i))
  280. f, _, _ := q.ReserveBodies(peer, rand.Intn(30))
  281. if f != nil {
  282. var emptyList []*types.Header
  283. var txs [][]*types.Transaction
  284. var uncles [][]*types.Header
  285. numToSkip := rand.Intn(len(f.Headers))
  286. for _, hdr := range f.Headers[0 : len(f.Headers)-numToSkip] {
  287. txs = append(txs, world.getTransactions(hdr.Number.Uint64()))
  288. uncles = append(uncles, emptyList)
  289. }
  290. time.Sleep(100 * time.Millisecond)
  291. _, err := q.DeliverBodies(peer.id, txs, uncles)
  292. if err != nil {
  293. fmt.Printf("delivered %d bodies %v\n", len(txs), err)
  294. }
  295. } else {
  296. i++
  297. time.Sleep(200 * time.Millisecond)
  298. }
  299. }
  300. }()
  301. go func() {
  302. defer wg.Done()
  303. // reserve receiptfetch
  304. peer := dummyPeer("peer-3")
  305. for {
  306. f, _, _ := q.ReserveReceipts(peer, rand.Intn(50))
  307. if f != nil {
  308. var rcs [][]*types.Receipt
  309. for _, hdr := range f.Headers {
  310. rcs = append(rcs, world.getReceipts(hdr.Number.Uint64()))
  311. }
  312. _, err := q.DeliverReceipts(peer.id, rcs)
  313. if err != nil {
  314. fmt.Printf("delivered %d receipts %v\n", len(rcs), err)
  315. }
  316. time.Sleep(100 * time.Millisecond)
  317. } else {
  318. time.Sleep(200 * time.Millisecond)
  319. }
  320. }
  321. }()
  322. wg.Add(1)
  323. go func() {
  324. defer wg.Done()
  325. for i := 0; i < 50; i++ {
  326. time.Sleep(300 * time.Millisecond)
  327. //world.tick()
  328. //fmt.Printf("trying to progress\n")
  329. world.progress(rand.Intn(100))
  330. }
  331. for i := 0; i < 50; i++ {
  332. time.Sleep(2990 * time.Millisecond)
  333. }
  334. }()
  335. wg.Add(1)
  336. go func() {
  337. defer wg.Done()
  338. for {
  339. time.Sleep(990 * time.Millisecond)
  340. fmt.Printf("world block tip is %d\n",
  341. world.chain[len(world.chain)-1].Header().Number.Uint64())
  342. fmt.Println(q.Stats())
  343. }
  344. }()
  345. wg.Wait()
  346. }
  347. func newNetwork() *network {
  348. var l sync.RWMutex
  349. return &network{
  350. cond: sync.NewCond(&l),
  351. offset: 1, // block 1 is at blocks[0]
  352. }
  353. }
  354. // represents the network
  355. type network struct {
  356. offset int
  357. chain []*types.Block
  358. receipts []types.Receipts
  359. lock sync.RWMutex
  360. cond *sync.Cond
  361. }
  362. func (n *network) getTransactions(blocknum uint64) types.Transactions {
  363. index := blocknum - uint64(n.offset)
  364. return n.chain[index].Transactions()
  365. }
  366. func (n *network) getReceipts(blocknum uint64) types.Receipts {
  367. index := blocknum - uint64(n.offset)
  368. if got := n.chain[index].Header().Number.Uint64(); got != blocknum {
  369. fmt.Printf("Err, got %d exp %d\n", got, blocknum)
  370. panic("sd")
  371. }
  372. return n.receipts[index]
  373. }
  374. func (n *network) forget(blocknum uint64) {
  375. index := blocknum - uint64(n.offset)
  376. n.chain = n.chain[index:]
  377. n.receipts = n.receipts[index:]
  378. n.offset = int(blocknum)
  379. }
  380. func (n *network) progress(numBlocks int) {
  381. n.lock.Lock()
  382. defer n.lock.Unlock()
  383. //fmt.Printf("progressing...\n")
  384. newBlocks, newR := makeChain(numBlocks, 0, n.chain[len(n.chain)-1], false)
  385. n.chain = append(n.chain, newBlocks...)
  386. n.receipts = append(n.receipts, newR...)
  387. n.cond.Broadcast()
  388. }
  389. func (n *network) headers(from int) []*types.Header {
  390. numHeaders := 128
  391. var hdrs []*types.Header
  392. index := from - n.offset
  393. for index >= len(n.chain) {
  394. // wait for progress
  395. n.cond.L.Lock()
  396. //fmt.Printf("header going into wait\n")
  397. n.cond.Wait()
  398. index = from - n.offset
  399. n.cond.L.Unlock()
  400. }
  401. n.lock.RLock()
  402. defer n.lock.RUnlock()
  403. for i, b := range n.chain[index:] {
  404. hdrs = append(hdrs, b.Header())
  405. if i >= numHeaders {
  406. break
  407. }
  408. }
  409. return hdrs
  410. }