queue_test.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package downloader
  17. import (
  18. "fmt"
  19. "math/big"
  20. "math/rand"
  21. "sync"
  22. "testing"
  23. "time"
  24. "github.com/ethereum/go-ethereum/common"
  25. "github.com/ethereum/go-ethereum/consensus/ethash"
  26. "github.com/ethereum/go-ethereum/core"
  27. "github.com/ethereum/go-ethereum/core/types"
  28. "github.com/ethereum/go-ethereum/log"
  29. "github.com/ethereum/go-ethereum/params"
  30. "github.com/ethereum/go-ethereum/trie"
  31. )
  32. // makeChain creates a chain of n blocks starting at and including parent.
  33. // the returned hash chain is ordered head->parent. In addition, every 3rd block
  34. // contains a transaction and every 5th an uncle to allow testing correct block
  35. // reassembly.
  36. func makeChain(n int, seed byte, parent *types.Block, empty bool) ([]*types.Block, []types.Receipts) {
  37. blocks, receipts := core.GenerateChain(params.TestChainConfig, parent, ethash.NewFaker(), testDB, n, func(i int, block *core.BlockGen) {
  38. block.SetCoinbase(common.Address{seed})
  39. // Add one tx to every secondblock
  40. if !empty && i%2 == 0 {
  41. signer := types.MakeSigner(params.TestChainConfig, block.Number())
  42. tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, block.BaseFee(), nil), signer, testKey)
  43. if err != nil {
  44. panic(err)
  45. }
  46. block.AddTx(tx)
  47. }
  48. })
  49. return blocks, receipts
  50. }
  51. type chainData struct {
  52. blocks []*types.Block
  53. offset int
  54. }
  55. var chain *chainData
  56. var emptyChain *chainData
  57. func init() {
  58. // Create a chain of blocks to import
  59. targetBlocks := 128
  60. blocks, _ := makeChain(targetBlocks, 0, testGenesis, false)
  61. chain = &chainData{blocks, 0}
  62. blocks, _ = makeChain(targetBlocks, 0, testGenesis, true)
  63. emptyChain = &chainData{blocks, 0}
  64. }
  65. func (chain *chainData) headers() []*types.Header {
  66. hdrs := make([]*types.Header, len(chain.blocks))
  67. for i, b := range chain.blocks {
  68. hdrs[i] = b.Header()
  69. }
  70. return hdrs
  71. }
  72. func (chain *chainData) Len() int {
  73. return len(chain.blocks)
  74. }
  75. func dummyPeer(id string) *peerConnection {
  76. p := &peerConnection{
  77. id: id,
  78. lacking: make(map[common.Hash]struct{}),
  79. }
  80. return p
  81. }
  82. func TestBasics(t *testing.T) {
  83. numOfBlocks := len(emptyChain.blocks)
  84. numOfReceipts := len(emptyChain.blocks) / 2
  85. q := newQueue(10, 10)
  86. if !q.Idle() {
  87. t.Errorf("new queue should be idle")
  88. }
  89. q.Prepare(1, SnapSync)
  90. if res := q.Results(false); len(res) != 0 {
  91. t.Fatal("new queue should have 0 results")
  92. }
  93. // Schedule a batch of headers
  94. headers := chain.headers()
  95. hashes := make([]common.Hash, len(headers))
  96. for i, header := range headers {
  97. hashes[i] = header.Hash()
  98. }
  99. q.Schedule(headers, hashes, 1)
  100. if q.Idle() {
  101. t.Errorf("queue should not be idle")
  102. }
  103. if got, exp := q.PendingBodies(), chain.Len(); got != exp {
  104. t.Errorf("wrong pending block count, got %d, exp %d", got, exp)
  105. }
  106. // Only non-empty receipts get added to task-queue
  107. if got, exp := q.PendingReceipts(), 64; got != exp {
  108. t.Errorf("wrong pending receipt count, got %d, exp %d", got, exp)
  109. }
  110. // Items are now queued for downloading, next step is that we tell the
  111. // queue that a certain peer will deliver them for us
  112. {
  113. peer := dummyPeer("peer-1")
  114. fetchReq, _, throttle := q.ReserveBodies(peer, 50)
  115. if !throttle {
  116. // queue size is only 10, so throttling should occur
  117. t.Fatal("should throttle")
  118. }
  119. // But we should still get the first things to fetch
  120. if got, exp := len(fetchReq.Headers), 5; got != exp {
  121. t.Fatalf("expected %d requests, got %d", exp, got)
  122. }
  123. if got, exp := fetchReq.Headers[0].Number.Uint64(), uint64(1); got != exp {
  124. t.Fatalf("expected header %d, got %d", exp, got)
  125. }
  126. }
  127. if exp, got := q.blockTaskQueue.Size(), numOfBlocks-10; exp != got {
  128. t.Errorf("expected block task queue to be %d, got %d", exp, got)
  129. }
  130. if exp, got := q.receiptTaskQueue.Size(), numOfReceipts; exp != got {
  131. t.Errorf("expected receipt task queue to be %d, got %d", exp, got)
  132. }
  133. {
  134. peer := dummyPeer("peer-2")
  135. fetchReq, _, throttle := q.ReserveBodies(peer, 50)
  136. // The second peer should hit throttling
  137. if !throttle {
  138. t.Fatalf("should throttle")
  139. }
  140. // And not get any fetches at all, since it was throttled to begin with
  141. if fetchReq != nil {
  142. t.Fatalf("should have no fetches, got %d", len(fetchReq.Headers))
  143. }
  144. }
  145. if exp, got := q.blockTaskQueue.Size(), numOfBlocks-10; exp != got {
  146. t.Errorf("expected block task queue to be %d, got %d", exp, got)
  147. }
  148. if exp, got := q.receiptTaskQueue.Size(), numOfReceipts; exp != got {
  149. t.Errorf("expected receipt task queue to be %d, got %d", exp, got)
  150. }
  151. {
  152. // The receipt delivering peer should not be affected
  153. // by the throttling of body deliveries
  154. peer := dummyPeer("peer-3")
  155. fetchReq, _, throttle := q.ReserveReceipts(peer, 50)
  156. if !throttle {
  157. // queue size is only 10, so throttling should occur
  158. t.Fatal("should throttle")
  159. }
  160. // But we should still get the first things to fetch
  161. if got, exp := len(fetchReq.Headers), 5; got != exp {
  162. t.Fatalf("expected %d requests, got %d", exp, got)
  163. }
  164. if got, exp := fetchReq.Headers[0].Number.Uint64(), uint64(1); got != exp {
  165. t.Fatalf("expected header %d, got %d", exp, got)
  166. }
  167. }
  168. if exp, got := q.blockTaskQueue.Size(), numOfBlocks-10; exp != got {
  169. t.Errorf("expected block task queue to be %d, got %d", exp, got)
  170. }
  171. if exp, got := q.receiptTaskQueue.Size(), numOfReceipts-5; exp != got {
  172. t.Errorf("expected receipt task queue to be %d, got %d", exp, got)
  173. }
  174. if got, exp := q.resultCache.countCompleted(), 0; got != exp {
  175. t.Errorf("wrong processable count, got %d, exp %d", got, exp)
  176. }
  177. }
  178. func TestEmptyBlocks(t *testing.T) {
  179. numOfBlocks := len(emptyChain.blocks)
  180. q := newQueue(10, 10)
  181. q.Prepare(1, SnapSync)
  182. // Schedule a batch of headers
  183. headers := emptyChain.headers()
  184. hashes := make([]common.Hash, len(headers))
  185. for i, header := range headers {
  186. hashes[i] = header.Hash()
  187. }
  188. q.Schedule(headers, hashes, 1)
  189. if q.Idle() {
  190. t.Errorf("queue should not be idle")
  191. }
  192. if got, exp := q.PendingBodies(), len(emptyChain.blocks); got != exp {
  193. t.Errorf("wrong pending block count, got %d, exp %d", got, exp)
  194. }
  195. if got, exp := q.PendingReceipts(), 0; got != exp {
  196. t.Errorf("wrong pending receipt count, got %d, exp %d", got, exp)
  197. }
  198. // They won't be processable, because the fetchresults haven't been
  199. // created yet
  200. if got, exp := q.resultCache.countCompleted(), 0; got != exp {
  201. t.Errorf("wrong processable count, got %d, exp %d", got, exp)
  202. }
  203. // Items are now queued for downloading, next step is that we tell the
  204. // queue that a certain peer will deliver them for us
  205. // That should trigger all of them to suddenly become 'done'
  206. {
  207. // Reserve blocks
  208. peer := dummyPeer("peer-1")
  209. fetchReq, _, _ := q.ReserveBodies(peer, 50)
  210. // there should be nothing to fetch, blocks are empty
  211. if fetchReq != nil {
  212. t.Fatal("there should be no body fetch tasks remaining")
  213. }
  214. }
  215. if q.blockTaskQueue.Size() != numOfBlocks-10 {
  216. t.Errorf("expected block task queue to be %d, got %d", numOfBlocks-10, q.blockTaskQueue.Size())
  217. }
  218. if q.receiptTaskQueue.Size() != 0 {
  219. t.Errorf("expected receipt task queue to be %d, got %d", 0, q.receiptTaskQueue.Size())
  220. }
  221. {
  222. peer := dummyPeer("peer-3")
  223. fetchReq, _, _ := q.ReserveReceipts(peer, 50)
  224. // there should be nothing to fetch, blocks are empty
  225. if fetchReq != nil {
  226. t.Fatal("there should be no receipt fetch tasks remaining")
  227. }
  228. }
  229. if q.blockTaskQueue.Size() != numOfBlocks-10 {
  230. t.Errorf("expected block task queue to be %d, got %d", numOfBlocks-10, q.blockTaskQueue.Size())
  231. }
  232. if q.receiptTaskQueue.Size() != 0 {
  233. t.Errorf("expected receipt task queue to be %d, got %d", 0, q.receiptTaskQueue.Size())
  234. }
  235. if got, exp := q.resultCache.countCompleted(), 10; got != exp {
  236. t.Errorf("wrong processable count, got %d, exp %d", got, exp)
  237. }
  238. }
  239. // XTestDelivery does some more extensive testing of events that happen,
  240. // blocks that become known and peers that make reservations and deliveries.
  241. // disabled since it's not really a unit-test, but can be executed to test
  242. // some more advanced scenarios
  243. func XTestDelivery(t *testing.T) {
  244. // the outside network, holding blocks
  245. blo, rec := makeChain(128, 0, testGenesis, false)
  246. world := newNetwork()
  247. world.receipts = rec
  248. world.chain = blo
  249. world.progress(10)
  250. if false {
  251. log.Root().SetHandler(log.StdoutHandler)
  252. }
  253. q := newQueue(10, 10)
  254. var wg sync.WaitGroup
  255. q.Prepare(1, SnapSync)
  256. wg.Add(1)
  257. go func() {
  258. // deliver headers
  259. defer wg.Done()
  260. c := 1
  261. for {
  262. //fmt.Printf("getting headers from %d\n", c)
  263. headers := world.headers(c)
  264. hashes := make([]common.Hash, len(headers))
  265. for i, header := range headers {
  266. hashes[i] = header.Hash()
  267. }
  268. l := len(headers)
  269. //fmt.Printf("scheduling %d headers, first %d last %d\n",
  270. // l, headers[0].Number.Uint64(), headers[len(headers)-1].Number.Uint64())
  271. q.Schedule(headers, hashes, uint64(c))
  272. c += l
  273. }
  274. }()
  275. wg.Add(1)
  276. go func() {
  277. // collect results
  278. defer wg.Done()
  279. tot := 0
  280. for {
  281. res := q.Results(true)
  282. tot += len(res)
  283. fmt.Printf("got %d results, %d tot\n", len(res), tot)
  284. // Now we can forget about these
  285. world.forget(res[len(res)-1].Header.Number.Uint64())
  286. }
  287. }()
  288. wg.Add(1)
  289. go func() {
  290. defer wg.Done()
  291. // reserve body fetch
  292. i := 4
  293. for {
  294. peer := dummyPeer(fmt.Sprintf("peer-%d", i))
  295. f, _, _ := q.ReserveBodies(peer, rand.Intn(30))
  296. if f != nil {
  297. var (
  298. emptyList []*types.Header
  299. txset [][]*types.Transaction
  300. uncleset [][]*types.Header
  301. )
  302. numToSkip := rand.Intn(len(f.Headers))
  303. for _, hdr := range f.Headers[0 : len(f.Headers)-numToSkip] {
  304. txset = append(txset, world.getTransactions(hdr.Number.Uint64()))
  305. uncleset = append(uncleset, emptyList)
  306. }
  307. var (
  308. txsHashes = make([]common.Hash, len(txset))
  309. uncleHashes = make([]common.Hash, len(uncleset))
  310. )
  311. hasher := trie.NewStackTrie(nil)
  312. for i, txs := range txset {
  313. txsHashes[i] = types.DeriveSha(types.Transactions(txs), hasher)
  314. }
  315. for i, uncles := range uncleset {
  316. uncleHashes[i] = types.CalcUncleHash(uncles)
  317. }
  318. time.Sleep(100 * time.Millisecond)
  319. _, err := q.DeliverBodies(peer.id, txset, txsHashes, uncleset, uncleHashes)
  320. if err != nil {
  321. fmt.Printf("delivered %d bodies %v\n", len(txset), err)
  322. }
  323. } else {
  324. i++
  325. time.Sleep(200 * time.Millisecond)
  326. }
  327. }
  328. }()
  329. go func() {
  330. defer wg.Done()
  331. // reserve receiptfetch
  332. peer := dummyPeer("peer-3")
  333. for {
  334. f, _, _ := q.ReserveReceipts(peer, rand.Intn(50))
  335. if f != nil {
  336. var rcs [][]*types.Receipt
  337. for _, hdr := range f.Headers {
  338. rcs = append(rcs, world.getReceipts(hdr.Number.Uint64()))
  339. }
  340. hasher := trie.NewStackTrie(nil)
  341. hashes := make([]common.Hash, len(rcs))
  342. for i, receipt := range rcs {
  343. hashes[i] = types.DeriveSha(types.Receipts(receipt), hasher)
  344. }
  345. _, err := q.DeliverReceipts(peer.id, rcs, hashes)
  346. if err != nil {
  347. fmt.Printf("delivered %d receipts %v\n", len(rcs), err)
  348. }
  349. time.Sleep(100 * time.Millisecond)
  350. } else {
  351. time.Sleep(200 * time.Millisecond)
  352. }
  353. }
  354. }()
  355. wg.Add(1)
  356. go func() {
  357. defer wg.Done()
  358. for i := 0; i < 50; i++ {
  359. time.Sleep(300 * time.Millisecond)
  360. //world.tick()
  361. //fmt.Printf("trying to progress\n")
  362. world.progress(rand.Intn(100))
  363. }
  364. for i := 0; i < 50; i++ {
  365. time.Sleep(2990 * time.Millisecond)
  366. }
  367. }()
  368. wg.Add(1)
  369. go func() {
  370. defer wg.Done()
  371. for {
  372. time.Sleep(990 * time.Millisecond)
  373. fmt.Printf("world block tip is %d\n",
  374. world.chain[len(world.chain)-1].Header().Number.Uint64())
  375. fmt.Println(q.Stats())
  376. }
  377. }()
  378. wg.Wait()
  379. }
  380. func newNetwork() *network {
  381. var l sync.RWMutex
  382. return &network{
  383. cond: sync.NewCond(&l),
  384. offset: 1, // block 1 is at blocks[0]
  385. }
  386. }
  387. // represents the network
  388. type network struct {
  389. offset int
  390. chain []*types.Block
  391. receipts []types.Receipts
  392. lock sync.RWMutex
  393. cond *sync.Cond
  394. }
  395. func (n *network) getTransactions(blocknum uint64) types.Transactions {
  396. index := blocknum - uint64(n.offset)
  397. return n.chain[index].Transactions()
  398. }
  399. func (n *network) getReceipts(blocknum uint64) types.Receipts {
  400. index := blocknum - uint64(n.offset)
  401. if got := n.chain[index].Header().Number.Uint64(); got != blocknum {
  402. fmt.Printf("Err, got %d exp %d\n", got, blocknum)
  403. panic("sd")
  404. }
  405. return n.receipts[index]
  406. }
  407. func (n *network) forget(blocknum uint64) {
  408. index := blocknum - uint64(n.offset)
  409. n.chain = n.chain[index:]
  410. n.receipts = n.receipts[index:]
  411. n.offset = int(blocknum)
  412. }
  413. func (n *network) progress(numBlocks int) {
  414. n.lock.Lock()
  415. defer n.lock.Unlock()
  416. //fmt.Printf("progressing...\n")
  417. newBlocks, newR := makeChain(numBlocks, 0, n.chain[len(n.chain)-1], false)
  418. n.chain = append(n.chain, newBlocks...)
  419. n.receipts = append(n.receipts, newR...)
  420. n.cond.Broadcast()
  421. }
  422. func (n *network) headers(from int) []*types.Header {
  423. numHeaders := 128
  424. var hdrs []*types.Header
  425. index := from - n.offset
  426. for index >= len(n.chain) {
  427. // wait for progress
  428. n.cond.L.Lock()
  429. //fmt.Printf("header going into wait\n")
  430. n.cond.Wait()
  431. index = from - n.offset
  432. n.cond.L.Unlock()
  433. }
  434. n.lock.RLock()
  435. defer n.lock.RUnlock()
  436. for i, b := range n.chain[index:] {
  437. hdrs = append(hdrs, b.Header())
  438. if i >= numHeaders {
  439. break
  440. }
  441. }
  442. return hdrs
  443. }