benchmark.go 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "encoding/binary"
  19. "fmt"
  20. "math/big"
  21. "math/rand"
  22. "time"
  23. "github.com/ethereum/go-ethereum/common"
  24. "github.com/ethereum/go-ethereum/common/mclock"
  25. "github.com/ethereum/go-ethereum/core/rawdb"
  26. "github.com/ethereum/go-ethereum/core/types"
  27. "github.com/ethereum/go-ethereum/crypto"
  28. "github.com/ethereum/go-ethereum/les/flowcontrol"
  29. "github.com/ethereum/go-ethereum/log"
  30. "github.com/ethereum/go-ethereum/p2p"
  31. "github.com/ethereum/go-ethereum/p2p/enode"
  32. "github.com/ethereum/go-ethereum/params"
  33. "github.com/ethereum/go-ethereum/rlp"
  34. )
  35. // requestBenchmark is an interface for different randomized request generators
  36. type requestBenchmark interface {
  37. // init initializes the generator for generating the given number of randomized requests
  38. init(pm *ProtocolManager, count int) error
  39. // request initiates sending a single request to the given peer
  40. request(peer *peer, index int) error
  41. }
  42. // benchmarkBlockHeaders implements requestBenchmark
  43. type benchmarkBlockHeaders struct {
  44. amount, skip int
  45. reverse, byHash bool
  46. offset, randMax int64
  47. hashes []common.Hash
  48. }
  49. func (b *benchmarkBlockHeaders) init(pm *ProtocolManager, count int) error {
  50. d := int64(b.amount-1) * int64(b.skip+1)
  51. b.offset = 0
  52. b.randMax = pm.blockchain.CurrentHeader().Number.Int64() + 1 - d
  53. if b.randMax < 0 {
  54. return fmt.Errorf("chain is too short")
  55. }
  56. if b.reverse {
  57. b.offset = d
  58. }
  59. if b.byHash {
  60. b.hashes = make([]common.Hash, count)
  61. for i := range b.hashes {
  62. b.hashes[i] = rawdb.ReadCanonicalHash(pm.chainDb, uint64(b.offset+rand.Int63n(b.randMax)))
  63. }
  64. }
  65. return nil
  66. }
  67. func (b *benchmarkBlockHeaders) request(peer *peer, index int) error {
  68. if b.byHash {
  69. return peer.RequestHeadersByHash(0, 0, b.hashes[index], b.amount, b.skip, b.reverse)
  70. } else {
  71. return peer.RequestHeadersByNumber(0, 0, uint64(b.offset+rand.Int63n(b.randMax)), b.amount, b.skip, b.reverse)
  72. }
  73. }
  74. // benchmarkBodiesOrReceipts implements requestBenchmark
  75. type benchmarkBodiesOrReceipts struct {
  76. receipts bool
  77. hashes []common.Hash
  78. }
  79. func (b *benchmarkBodiesOrReceipts) init(pm *ProtocolManager, count int) error {
  80. randMax := pm.blockchain.CurrentHeader().Number.Int64() + 1
  81. b.hashes = make([]common.Hash, count)
  82. for i := range b.hashes {
  83. b.hashes[i] = rawdb.ReadCanonicalHash(pm.chainDb, uint64(rand.Int63n(randMax)))
  84. }
  85. return nil
  86. }
  87. func (b *benchmarkBodiesOrReceipts) request(peer *peer, index int) error {
  88. if b.receipts {
  89. return peer.RequestReceipts(0, 0, []common.Hash{b.hashes[index]})
  90. } else {
  91. return peer.RequestBodies(0, 0, []common.Hash{b.hashes[index]})
  92. }
  93. }
  94. // benchmarkProofsOrCode implements requestBenchmark
  95. type benchmarkProofsOrCode struct {
  96. code bool
  97. headHash common.Hash
  98. }
  99. func (b *benchmarkProofsOrCode) init(pm *ProtocolManager, count int) error {
  100. b.headHash = pm.blockchain.CurrentHeader().Hash()
  101. return nil
  102. }
  103. func (b *benchmarkProofsOrCode) request(peer *peer, index int) error {
  104. key := make([]byte, 32)
  105. rand.Read(key)
  106. if b.code {
  107. return peer.RequestCode(0, 0, []CodeReq{{BHash: b.headHash, AccKey: key}})
  108. } else {
  109. return peer.RequestProofs(0, 0, []ProofReq{{BHash: b.headHash, Key: key}})
  110. }
  111. }
  112. // benchmarkHelperTrie implements requestBenchmark
  113. type benchmarkHelperTrie struct {
  114. bloom bool
  115. reqCount int
  116. sectionCount, headNum uint64
  117. }
  118. func (b *benchmarkHelperTrie) init(pm *ProtocolManager, count int) error {
  119. if b.bloom {
  120. b.sectionCount, b.headNum, _ = pm.server.bloomTrieIndexer.Sections()
  121. } else {
  122. b.sectionCount, _, _ = pm.server.chtIndexer.Sections()
  123. b.headNum = b.sectionCount*params.CHTFrequency - 1
  124. }
  125. if b.sectionCount == 0 {
  126. return fmt.Errorf("no processed sections available")
  127. }
  128. return nil
  129. }
  130. func (b *benchmarkHelperTrie) request(peer *peer, index int) error {
  131. reqs := make([]HelperTrieReq, b.reqCount)
  132. if b.bloom {
  133. bitIdx := uint16(rand.Intn(2048))
  134. for i := range reqs {
  135. key := make([]byte, 10)
  136. binary.BigEndian.PutUint16(key[:2], bitIdx)
  137. binary.BigEndian.PutUint64(key[2:], uint64(rand.Int63n(int64(b.sectionCount))))
  138. reqs[i] = HelperTrieReq{Type: htBloomBits, TrieIdx: b.sectionCount - 1, Key: key}
  139. }
  140. } else {
  141. for i := range reqs {
  142. key := make([]byte, 8)
  143. binary.BigEndian.PutUint64(key[:], uint64(rand.Int63n(int64(b.headNum))))
  144. reqs[i] = HelperTrieReq{Type: htCanonical, TrieIdx: b.sectionCount - 1, Key: key, AuxReq: auxHeader}
  145. }
  146. }
  147. return peer.RequestHelperTrieProofs(0, 0, reqs)
  148. }
  149. // benchmarkTxSend implements requestBenchmark
  150. type benchmarkTxSend struct {
  151. txs types.Transactions
  152. }
  153. func (b *benchmarkTxSend) init(pm *ProtocolManager, count int) error {
  154. key, _ := crypto.GenerateKey()
  155. addr := crypto.PubkeyToAddress(key.PublicKey)
  156. signer := types.NewEIP155Signer(big.NewInt(18))
  157. b.txs = make(types.Transactions, count)
  158. for i := range b.txs {
  159. data := make([]byte, txSizeCostLimit)
  160. rand.Read(data)
  161. tx, err := types.SignTx(types.NewTransaction(0, addr, new(big.Int), 0, new(big.Int), data), signer, key)
  162. if err != nil {
  163. panic(err)
  164. }
  165. b.txs[i] = tx
  166. }
  167. return nil
  168. }
  169. func (b *benchmarkTxSend) request(peer *peer, index int) error {
  170. enc, _ := rlp.EncodeToBytes(types.Transactions{b.txs[index]})
  171. return peer.SendTxs(0, 0, enc)
  172. }
  173. // benchmarkTxStatus implements requestBenchmark
  174. type benchmarkTxStatus struct{}
  175. func (b *benchmarkTxStatus) init(pm *ProtocolManager, count int) error {
  176. return nil
  177. }
  178. func (b *benchmarkTxStatus) request(peer *peer, index int) error {
  179. var hash common.Hash
  180. rand.Read(hash[:])
  181. return peer.RequestTxStatus(0, 0, []common.Hash{hash})
  182. }
  183. // benchmarkSetup stores measurement data for a single benchmark type
  184. type benchmarkSetup struct {
  185. req requestBenchmark
  186. totalCount int
  187. totalTime, avgTime time.Duration
  188. maxInSize, maxOutSize uint32
  189. err error
  190. }
  191. // runBenchmark runs a benchmark cycle for all benchmark types in the specified
  192. // number of passes
  193. func (pm *ProtocolManager) runBenchmark(benchmarks []requestBenchmark, passCount int, targetTime time.Duration) []*benchmarkSetup {
  194. setup := make([]*benchmarkSetup, len(benchmarks))
  195. for i, b := range benchmarks {
  196. setup[i] = &benchmarkSetup{req: b}
  197. }
  198. for i := 0; i < passCount; i++ {
  199. log.Info("Running benchmark", "pass", i+1, "total", passCount)
  200. todo := make([]*benchmarkSetup, len(benchmarks))
  201. copy(todo, setup)
  202. for len(todo) > 0 {
  203. // select a random element
  204. index := rand.Intn(len(todo))
  205. next := todo[index]
  206. todo[index] = todo[len(todo)-1]
  207. todo = todo[:len(todo)-1]
  208. if next.err == nil {
  209. // calculate request count
  210. count := 50
  211. if next.totalTime > 0 {
  212. count = int(uint64(next.totalCount) * uint64(targetTime) / uint64(next.totalTime))
  213. }
  214. if err := pm.measure(next, count); err != nil {
  215. next.err = err
  216. }
  217. }
  218. }
  219. }
  220. log.Info("Benchmark completed")
  221. for _, s := range setup {
  222. if s.err == nil {
  223. s.avgTime = s.totalTime / time.Duration(s.totalCount)
  224. }
  225. }
  226. return setup
  227. }
  228. // meteredPipe implements p2p.MsgReadWriter and remembers the largest single
  229. // message size sent through the pipe
  230. type meteredPipe struct {
  231. rw p2p.MsgReadWriter
  232. maxSize uint32
  233. }
  234. func (m *meteredPipe) ReadMsg() (p2p.Msg, error) {
  235. return m.rw.ReadMsg()
  236. }
  237. func (m *meteredPipe) WriteMsg(msg p2p.Msg) error {
  238. if msg.Size > m.maxSize {
  239. m.maxSize = msg.Size
  240. }
  241. return m.rw.WriteMsg(msg)
  242. }
  243. // measure runs a benchmark for a single type in a single pass, with the given
  244. // number of requests
  245. func (pm *ProtocolManager) measure(setup *benchmarkSetup, count int) error {
  246. clientPipe, serverPipe := p2p.MsgPipe()
  247. clientMeteredPipe := &meteredPipe{rw: clientPipe}
  248. serverMeteredPipe := &meteredPipe{rw: serverPipe}
  249. var id enode.ID
  250. rand.Read(id[:])
  251. clientPeer := pm.newPeer(lpv2, NetworkId, p2p.NewPeer(id, "client", nil), clientMeteredPipe)
  252. serverPeer := pm.newPeer(lpv2, NetworkId, p2p.NewPeer(id, "server", nil), serverMeteredPipe)
  253. serverPeer.sendQueue = newExecQueue(count)
  254. serverPeer.announceType = announceTypeNone
  255. serverPeer.fcCosts = make(requestCostTable)
  256. c := &requestCosts{}
  257. for code := range requests {
  258. serverPeer.fcCosts[code] = c
  259. }
  260. serverPeer.fcParams = flowcontrol.ServerParams{BufLimit: 1, MinRecharge: 1}
  261. serverPeer.fcClient = flowcontrol.NewClientNode(pm.server.fcManager, serverPeer.fcParams)
  262. defer serverPeer.fcClient.Disconnect()
  263. if err := setup.req.init(pm, count); err != nil {
  264. return err
  265. }
  266. errCh := make(chan error, 10)
  267. start := mclock.Now()
  268. go func() {
  269. for i := 0; i < count; i++ {
  270. if err := setup.req.request(clientPeer, i); err != nil {
  271. errCh <- err
  272. return
  273. }
  274. }
  275. }()
  276. go func() {
  277. for i := 0; i < count; i++ {
  278. if err := pm.handleMsg(serverPeer); err != nil {
  279. errCh <- err
  280. return
  281. }
  282. }
  283. }()
  284. go func() {
  285. for i := 0; i < count; i++ {
  286. msg, err := clientPipe.ReadMsg()
  287. if err != nil {
  288. errCh <- err
  289. return
  290. }
  291. var i interface{}
  292. msg.Decode(&i)
  293. }
  294. // at this point we can be sure that the other two
  295. // goroutines finished successfully too
  296. close(errCh)
  297. }()
  298. select {
  299. case err := <-errCh:
  300. if err != nil {
  301. return err
  302. }
  303. case <-pm.quitSync:
  304. clientPipe.Close()
  305. serverPipe.Close()
  306. return fmt.Errorf("Benchmark cancelled")
  307. }
  308. setup.totalTime += time.Duration(mclock.Now() - start)
  309. setup.totalCount += count
  310. setup.maxInSize = clientMeteredPipe.maxSize
  311. setup.maxOutSize = serverMeteredPipe.maxSize
  312. clientPipe.Close()
  313. serverPipe.Close()
  314. return nil
  315. }