benchmark.go 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "encoding/binary"
  19. "fmt"
  20. "math/big"
  21. "math/rand"
  22. "sync"
  23. "time"
  24. "github.com/ethereum/go-ethereum/common"
  25. "github.com/ethereum/go-ethereum/common/mclock"
  26. "github.com/ethereum/go-ethereum/core/rawdb"
  27. "github.com/ethereum/go-ethereum/core/types"
  28. "github.com/ethereum/go-ethereum/crypto"
  29. "github.com/ethereum/go-ethereum/les/flowcontrol"
  30. "github.com/ethereum/go-ethereum/log"
  31. "github.com/ethereum/go-ethereum/p2p"
  32. "github.com/ethereum/go-ethereum/p2p/enode"
  33. "github.com/ethereum/go-ethereum/params"
  34. "github.com/ethereum/go-ethereum/rlp"
  35. )
  36. // requestBenchmark is an interface for different randomized request generators
  37. type requestBenchmark interface {
  38. // init initializes the generator for generating the given number of randomized requests
  39. init(h *serverHandler, count int) error
  40. // request initiates sending a single request to the given peer
  41. request(peer *peer, index int) error
  42. }
  43. // benchmarkBlockHeaders implements requestBenchmark
  44. type benchmarkBlockHeaders struct {
  45. amount, skip int
  46. reverse, byHash bool
  47. offset, randMax int64
  48. hashes []common.Hash
  49. }
  50. func (b *benchmarkBlockHeaders) init(h *serverHandler, count int) error {
  51. d := int64(b.amount-1) * int64(b.skip+1)
  52. b.offset = 0
  53. b.randMax = h.blockchain.CurrentHeader().Number.Int64() + 1 - d
  54. if b.randMax < 0 {
  55. return fmt.Errorf("chain is too short")
  56. }
  57. if b.reverse {
  58. b.offset = d
  59. }
  60. if b.byHash {
  61. b.hashes = make([]common.Hash, count)
  62. for i := range b.hashes {
  63. b.hashes[i] = rawdb.ReadCanonicalHash(h.chainDb, uint64(b.offset+rand.Int63n(b.randMax)))
  64. }
  65. }
  66. return nil
  67. }
  68. func (b *benchmarkBlockHeaders) request(peer *peer, index int) error {
  69. if b.byHash {
  70. return peer.RequestHeadersByHash(0, 0, b.hashes[index], b.amount, b.skip, b.reverse)
  71. } else {
  72. return peer.RequestHeadersByNumber(0, 0, uint64(b.offset+rand.Int63n(b.randMax)), b.amount, b.skip, b.reverse)
  73. }
  74. }
  75. // benchmarkBodiesOrReceipts implements requestBenchmark
  76. type benchmarkBodiesOrReceipts struct {
  77. receipts bool
  78. hashes []common.Hash
  79. }
  80. func (b *benchmarkBodiesOrReceipts) init(h *serverHandler, count int) error {
  81. randMax := h.blockchain.CurrentHeader().Number.Int64() + 1
  82. b.hashes = make([]common.Hash, count)
  83. for i := range b.hashes {
  84. b.hashes[i] = rawdb.ReadCanonicalHash(h.chainDb, uint64(rand.Int63n(randMax)))
  85. }
  86. return nil
  87. }
  88. func (b *benchmarkBodiesOrReceipts) request(peer *peer, index int) error {
  89. if b.receipts {
  90. return peer.RequestReceipts(0, 0, []common.Hash{b.hashes[index]})
  91. } else {
  92. return peer.RequestBodies(0, 0, []common.Hash{b.hashes[index]})
  93. }
  94. }
  95. // benchmarkProofsOrCode implements requestBenchmark
  96. type benchmarkProofsOrCode struct {
  97. code bool
  98. headHash common.Hash
  99. }
  100. func (b *benchmarkProofsOrCode) init(h *serverHandler, count int) error {
  101. b.headHash = h.blockchain.CurrentHeader().Hash()
  102. return nil
  103. }
  104. func (b *benchmarkProofsOrCode) request(peer *peer, index int) error {
  105. key := make([]byte, 32)
  106. rand.Read(key)
  107. if b.code {
  108. return peer.RequestCode(0, 0, []CodeReq{{BHash: b.headHash, AccKey: key}})
  109. } else {
  110. return peer.RequestProofs(0, 0, []ProofReq{{BHash: b.headHash, Key: key}})
  111. }
  112. }
  113. // benchmarkHelperTrie implements requestBenchmark
  114. type benchmarkHelperTrie struct {
  115. bloom bool
  116. reqCount int
  117. sectionCount, headNum uint64
  118. }
  119. func (b *benchmarkHelperTrie) init(h *serverHandler, count int) error {
  120. if b.bloom {
  121. b.sectionCount, b.headNum, _ = h.server.bloomTrieIndexer.Sections()
  122. } else {
  123. b.sectionCount, _, _ = h.server.chtIndexer.Sections()
  124. b.headNum = b.sectionCount*params.CHTFrequency - 1
  125. }
  126. if b.sectionCount == 0 {
  127. return fmt.Errorf("no processed sections available")
  128. }
  129. return nil
  130. }
  131. func (b *benchmarkHelperTrie) request(peer *peer, index int) error {
  132. reqs := make([]HelperTrieReq, b.reqCount)
  133. if b.bloom {
  134. bitIdx := uint16(rand.Intn(2048))
  135. for i := range reqs {
  136. key := make([]byte, 10)
  137. binary.BigEndian.PutUint16(key[:2], bitIdx)
  138. binary.BigEndian.PutUint64(key[2:], uint64(rand.Int63n(int64(b.sectionCount))))
  139. reqs[i] = HelperTrieReq{Type: htBloomBits, TrieIdx: b.sectionCount - 1, Key: key}
  140. }
  141. } else {
  142. for i := range reqs {
  143. key := make([]byte, 8)
  144. binary.BigEndian.PutUint64(key[:], uint64(rand.Int63n(int64(b.headNum))))
  145. reqs[i] = HelperTrieReq{Type: htCanonical, TrieIdx: b.sectionCount - 1, Key: key, AuxReq: auxHeader}
  146. }
  147. }
  148. return peer.RequestHelperTrieProofs(0, 0, reqs)
  149. }
  150. // benchmarkTxSend implements requestBenchmark
  151. type benchmarkTxSend struct {
  152. txs types.Transactions
  153. }
  154. func (b *benchmarkTxSend) init(h *serverHandler, count int) error {
  155. key, _ := crypto.GenerateKey()
  156. addr := crypto.PubkeyToAddress(key.PublicKey)
  157. signer := types.NewEIP155Signer(big.NewInt(18))
  158. b.txs = make(types.Transactions, count)
  159. for i := range b.txs {
  160. data := make([]byte, txSizeCostLimit)
  161. rand.Read(data)
  162. tx, err := types.SignTx(types.NewTransaction(0, addr, new(big.Int), 0, new(big.Int), data), signer, key)
  163. if err != nil {
  164. panic(err)
  165. }
  166. b.txs[i] = tx
  167. }
  168. return nil
  169. }
  170. func (b *benchmarkTxSend) request(peer *peer, index int) error {
  171. enc, _ := rlp.EncodeToBytes(types.Transactions{b.txs[index]})
  172. return peer.SendTxs(0, 0, enc)
  173. }
  174. // benchmarkTxStatus implements requestBenchmark
  175. type benchmarkTxStatus struct{}
  176. func (b *benchmarkTxStatus) init(h *serverHandler, count int) error {
  177. return nil
  178. }
  179. func (b *benchmarkTxStatus) request(peer *peer, index int) error {
  180. var hash common.Hash
  181. rand.Read(hash[:])
  182. return peer.RequestTxStatus(0, 0, []common.Hash{hash})
  183. }
  184. // benchmarkSetup stores measurement data for a single benchmark type
  185. type benchmarkSetup struct {
  186. req requestBenchmark
  187. totalCount int
  188. totalTime, avgTime time.Duration
  189. maxInSize, maxOutSize uint32
  190. err error
  191. }
  192. // runBenchmark runs a benchmark cycle for all benchmark types in the specified
  193. // number of passes
  194. func (h *serverHandler) runBenchmark(benchmarks []requestBenchmark, passCount int, targetTime time.Duration) []*benchmarkSetup {
  195. setup := make([]*benchmarkSetup, len(benchmarks))
  196. for i, b := range benchmarks {
  197. setup[i] = &benchmarkSetup{req: b}
  198. }
  199. for i := 0; i < passCount; i++ {
  200. log.Info("Running benchmark", "pass", i+1, "total", passCount)
  201. todo := make([]*benchmarkSetup, len(benchmarks))
  202. copy(todo, setup)
  203. for len(todo) > 0 {
  204. // select a random element
  205. index := rand.Intn(len(todo))
  206. next := todo[index]
  207. todo[index] = todo[len(todo)-1]
  208. todo = todo[:len(todo)-1]
  209. if next.err == nil {
  210. // calculate request count
  211. count := 50
  212. if next.totalTime > 0 {
  213. count = int(uint64(next.totalCount) * uint64(targetTime) / uint64(next.totalTime))
  214. }
  215. if err := h.measure(next, count); err != nil {
  216. next.err = err
  217. }
  218. }
  219. }
  220. }
  221. log.Info("Benchmark completed")
  222. for _, s := range setup {
  223. if s.err == nil {
  224. s.avgTime = s.totalTime / time.Duration(s.totalCount)
  225. }
  226. }
  227. return setup
  228. }
  229. // meteredPipe implements p2p.MsgReadWriter and remembers the largest single
  230. // message size sent through the pipe
  231. type meteredPipe struct {
  232. rw p2p.MsgReadWriter
  233. maxSize uint32
  234. }
  235. func (m *meteredPipe) ReadMsg() (p2p.Msg, error) {
  236. return m.rw.ReadMsg()
  237. }
  238. func (m *meteredPipe) WriteMsg(msg p2p.Msg) error {
  239. if msg.Size > m.maxSize {
  240. m.maxSize = msg.Size
  241. }
  242. return m.rw.WriteMsg(msg)
  243. }
  244. // measure runs a benchmark for a single type in a single pass, with the given
  245. // number of requests
  246. func (h *serverHandler) measure(setup *benchmarkSetup, count int) error {
  247. clientPipe, serverPipe := p2p.MsgPipe()
  248. clientMeteredPipe := &meteredPipe{rw: clientPipe}
  249. serverMeteredPipe := &meteredPipe{rw: serverPipe}
  250. var id enode.ID
  251. rand.Read(id[:])
  252. clientPeer := newPeer(lpv2, NetworkId, false, p2p.NewPeer(id, "client", nil), clientMeteredPipe)
  253. serverPeer := newPeer(lpv2, NetworkId, false, p2p.NewPeer(id, "server", nil), serverMeteredPipe)
  254. serverPeer.sendQueue = newExecQueue(count)
  255. serverPeer.announceType = announceTypeNone
  256. serverPeer.fcCosts = make(requestCostTable)
  257. c := &requestCosts{}
  258. for code := range requests {
  259. serverPeer.fcCosts[code] = c
  260. }
  261. serverPeer.fcParams = flowcontrol.ServerParams{BufLimit: 1, MinRecharge: 1}
  262. serverPeer.fcClient = flowcontrol.NewClientNode(h.server.fcManager, serverPeer.fcParams)
  263. defer serverPeer.fcClient.Disconnect()
  264. if err := setup.req.init(h, count); err != nil {
  265. return err
  266. }
  267. errCh := make(chan error, 10)
  268. start := mclock.Now()
  269. go func() {
  270. for i := 0; i < count; i++ {
  271. if err := setup.req.request(clientPeer, i); err != nil {
  272. errCh <- err
  273. return
  274. }
  275. }
  276. }()
  277. go func() {
  278. for i := 0; i < count; i++ {
  279. if err := h.handleMsg(serverPeer, &sync.WaitGroup{}); err != nil {
  280. errCh <- err
  281. return
  282. }
  283. }
  284. }()
  285. go func() {
  286. for i := 0; i < count; i++ {
  287. msg, err := clientPipe.ReadMsg()
  288. if err != nil {
  289. errCh <- err
  290. return
  291. }
  292. var i interface{}
  293. msg.Decode(&i)
  294. }
  295. // at this point we can be sure that the other two
  296. // goroutines finished successfully too
  297. close(errCh)
  298. }()
  299. select {
  300. case err := <-errCh:
  301. if err != nil {
  302. return err
  303. }
  304. case <-h.closeCh:
  305. clientPipe.Close()
  306. serverPipe.Close()
  307. return fmt.Errorf("Benchmark cancelled")
  308. }
  309. setup.totalTime += time.Duration(mclock.Now() - start)
  310. setup.totalCount += count
  311. setup.maxInSize = clientMeteredPipe.maxSize
  312. setup.maxOutSize = serverMeteredPipe.maxSize
  313. clientPipe.Close()
  314. serverPipe.Close()
  315. return nil
  316. }