sync.go 83 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package snap
  17. import (
  18. "bytes"
  19. "encoding/json"
  20. "errors"
  21. "fmt"
  22. "math/big"
  23. "math/rand"
  24. "sync"
  25. "time"
  26. "github.com/ethereum/go-ethereum/common"
  27. "github.com/ethereum/go-ethereum/core/rawdb"
  28. "github.com/ethereum/go-ethereum/core/state"
  29. "github.com/ethereum/go-ethereum/crypto"
  30. "github.com/ethereum/go-ethereum/ethdb"
  31. "github.com/ethereum/go-ethereum/event"
  32. "github.com/ethereum/go-ethereum/light"
  33. "github.com/ethereum/go-ethereum/log"
  34. "github.com/ethereum/go-ethereum/rlp"
  35. "github.com/ethereum/go-ethereum/trie"
  36. "golang.org/x/crypto/sha3"
  37. )
  38. var (
  39. // emptyRoot is the known root hash of an empty trie.
  40. emptyRoot = common.HexToHash("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")
  41. // emptyCode is the known hash of the empty EVM bytecode.
  42. emptyCode = crypto.Keccak256Hash(nil)
  43. )
  44. const (
  45. // maxRequestSize is the maximum number of bytes to request from a remote peer.
  46. maxRequestSize = 512 * 1024
  47. // maxStorageSetRequestCountis th maximum number of contracts to request the
  48. // storage of in a single query. If this number is too low, we're not filling
  49. // responses fully and waste round trip times. If it's too high, we're capping
  50. // responses and waste bandwidth.
  51. maxStorageSetRequestCount = maxRequestSize / 1024
  52. // maxCodeRequestCount is the maximum number of bytecode blobs to request in a
  53. // single query. If this number is too low, we're not filling responses fully
  54. // and waste round trip times. If it's too high, we're capping responses and
  55. // waste bandwidth.
  56. //
  57. // Depoyed bytecodes are currently capped at 24KB, so the minimum request
  58. // size should be maxRequestSize / 24K. Assuming that most contracts do not
  59. // come close to that, requesting 4x should be a good approximation.
  60. maxCodeRequestCount = maxRequestSize / (24 * 1024) * 4
  61. // maxTrieRequestCount is the maximum number of trie node blobs to request in
  62. // a single query. If this number is too low, we're not filling responses fully
  63. // and waste round trip times. If it's too high, we're capping responses and
  64. // waste bandwidth.
  65. maxTrieRequestCount = 512
  66. // requestTimeout is the maximum time a peer is allowed to spend on serving
  67. // a single network request.
  68. requestTimeout = 10 * time.Second // TODO(karalabe): Make it dynamic ala fast-sync?
  69. // accountConcurrency is the number of chunks to split the account trie into
  70. // to allow concurrent retrievals.
  71. accountConcurrency = 16
  72. // storageConcurrency is the number of chunks to split the a large contract
  73. // storage trie into to allow concurrent retrievals.
  74. storageConcurrency = 16
  75. )
  76. // accountRequest tracks a pending account range request to ensure responses are
  77. // to actual requests and to validate any security constraints.
  78. //
  79. // Concurrency note: account requests and responses are handled concurrently from
  80. // the main runloop to allow Merkle proof verifications on the peer's thread and
  81. // to drop on invalid response. The request struct must contain all the data to
  82. // construct the response without accessing runloop internals (i.e. task). That
  83. // is only included to allow the runloop to match a response to the task being
  84. // synced without having yet another set of maps.
  85. type accountRequest struct {
  86. peer string // Peer to which this request is assigned
  87. id uint64 // Request ID of this request
  88. cancel chan struct{} // Channel to track sync cancellation
  89. timeout *time.Timer // Timer to track delivery timeout
  90. stale chan struct{} // Channel to signal the request was dropped
  91. origin common.Hash // First account requested to allow continuation checks
  92. limit common.Hash // Last account requested to allow non-overlapping chunking
  93. task *accountTask // Task which this request is filling (only access fields through the runloop!!)
  94. }
  95. // accountResponse is an already Merkle-verified remote response to an account
  96. // range request. It contains the subtrie for the requested account range and
  97. // the database that's going to be filled with the internal nodes on commit.
  98. type accountResponse struct {
  99. task *accountTask // Task which this request is filling
  100. hashes []common.Hash // Account hashes in the returned range
  101. accounts []*state.Account // Expanded accounts in the returned range
  102. nodes ethdb.KeyValueStore // Database containing the reconstructed trie nodes
  103. trie *trie.Trie // Reconstructed trie to reject incomplete account paths
  104. bounds map[common.Hash]struct{} // Boundary nodes to avoid persisting incomplete accounts
  105. overflow *light.NodeSet // Overflow nodes to avoid persisting across chunk boundaries
  106. cont bool // Whether the account range has a continuation
  107. }
  108. // bytecodeRequest tracks a pending bytecode request to ensure responses are to
  109. // actual requests and to validate any security constraints.
  110. //
  111. // Concurrency note: bytecode requests and responses are handled concurrently from
  112. // the main runloop to allow Keccak256 hash verifications on the peer's thread and
  113. // to drop on invalid response. The request struct must contain all the data to
  114. // construct the response without accessing runloop internals (i.e. task). That
  115. // is only included to allow the runloop to match a response to the task being
  116. // synced without having yet another set of maps.
  117. type bytecodeRequest struct {
  118. peer string // Peer to which this request is assigned
  119. id uint64 // Request ID of this request
  120. cancel chan struct{} // Channel to track sync cancellation
  121. timeout *time.Timer // Timer to track delivery timeout
  122. stale chan struct{} // Channel to signal the request was dropped
  123. hashes []common.Hash // Bytecode hashes to validate responses
  124. task *accountTask // Task which this request is filling (only access fields through the runloop!!)
  125. }
  126. // bytecodeResponse is an already verified remote response to a bytecode request.
  127. type bytecodeResponse struct {
  128. task *accountTask // Task which this request is filling
  129. hashes []common.Hash // Hashes of the bytecode to avoid double hashing
  130. codes [][]byte // Actual bytecodes to store into the database (nil = missing)
  131. }
  132. // storageRequest tracks a pending storage ranges request to ensure responses are
  133. // to actual requests and to validate any security constraints.
  134. //
  135. // Concurrency note: storage requests and responses are handled concurrently from
  136. // the main runloop to allow Merkel proof verifications on the peer's thread and
  137. // to drop on invalid response. The request struct must contain all the data to
  138. // construct the response without accessing runloop internals (i.e. tasks). That
  139. // is only included to allow the runloop to match a response to the task being
  140. // synced without having yet another set of maps.
  141. type storageRequest struct {
  142. peer string // Peer to which this request is assigned
  143. id uint64 // Request ID of this request
  144. cancel chan struct{} // Channel to track sync cancellation
  145. timeout *time.Timer // Timer to track delivery timeout
  146. stale chan struct{} // Channel to signal the request was dropped
  147. accounts []common.Hash // Account hashes to validate responses
  148. roots []common.Hash // Storage roots to validate responses
  149. origin common.Hash // First storage slot requested to allow continuation checks
  150. limit common.Hash // Last storage slot requested to allow non-overlapping chunking
  151. mainTask *accountTask // Task which this response belongs to (only access fields through the runloop!!)
  152. subTask *storageTask // Task which this response is filling (only access fields through the runloop!!)
  153. }
  154. // storageResponse is an already Merkle-verified remote response to a storage
  155. // range request. It contains the subtries for the requested storage ranges and
  156. // the databases that's going to be filled with the internal nodes on commit.
  157. type storageResponse struct {
  158. mainTask *accountTask // Task which this response belongs to
  159. subTask *storageTask // Task which this response is filling
  160. accounts []common.Hash // Account hashes requested, may be only partially filled
  161. roots []common.Hash // Storage roots requested, may be only partially filled
  162. hashes [][]common.Hash // Storage slot hashes in the returned range
  163. slots [][][]byte // Storage slot values in the returned range
  164. nodes []ethdb.KeyValueStore // Database containing the reconstructed trie nodes
  165. tries []*trie.Trie // Reconstructed tries to reject overflown slots
  166. // Fields relevant for the last account only
  167. bounds map[common.Hash]struct{} // Boundary nodes to avoid persisting (incomplete)
  168. overflow *light.NodeSet // Overflow nodes to avoid persisting across chunk boundaries
  169. cont bool // Whether the last storage range has a continuation
  170. }
  171. // trienodeHealRequest tracks a pending state trie request to ensure responses
  172. // are to actual requests and to validate any security constraints.
  173. //
  174. // Concurrency note: trie node requests and responses are handled concurrently from
  175. // the main runloop to allow Keccak256 hash verifications on the peer's thread and
  176. // to drop on invalid response. The request struct must contain all the data to
  177. // construct the response without accessing runloop internals (i.e. task). That
  178. // is only included to allow the runloop to match a response to the task being
  179. // synced without having yet another set of maps.
  180. type trienodeHealRequest struct {
  181. peer string // Peer to which this request is assigned
  182. id uint64 // Request ID of this request
  183. cancel chan struct{} // Channel to track sync cancellation
  184. timeout *time.Timer // Timer to track delivery timeout
  185. stale chan struct{} // Channel to signal the request was dropped
  186. hashes []common.Hash // Trie node hashes to validate responses
  187. paths []trie.SyncPath // Trie node paths requested for rescheduling
  188. task *healTask // Task which this request is filling (only access fields through the runloop!!)
  189. }
  190. // trienodeHealResponse is an already verified remote response to a trie node request.
  191. type trienodeHealResponse struct {
  192. task *healTask // Task which this request is filling
  193. hashes []common.Hash // Hashes of the trie nodes to avoid double hashing
  194. paths []trie.SyncPath // Trie node paths requested for rescheduling missing ones
  195. nodes [][]byte // Actual trie nodes to store into the database (nil = missing)
  196. }
  197. // bytecodeHealRequest tracks a pending bytecode request to ensure responses are to
  198. // actual requests and to validate any security constraints.
  199. //
  200. // Concurrency note: bytecode requests and responses are handled concurrently from
  201. // the main runloop to allow Keccak256 hash verifications on the peer's thread and
  202. // to drop on invalid response. The request struct must contain all the data to
  203. // construct the response without accessing runloop internals (i.e. task). That
  204. // is only included to allow the runloop to match a response to the task being
  205. // synced without having yet another set of maps.
  206. type bytecodeHealRequest struct {
  207. peer string // Peer to which this request is assigned
  208. id uint64 // Request ID of this request
  209. cancel chan struct{} // Channel to track sync cancellation
  210. timeout *time.Timer // Timer to track delivery timeout
  211. stale chan struct{} // Channel to signal the request was dropped
  212. hashes []common.Hash // Bytecode hashes to validate responses
  213. task *healTask // Task which this request is filling (only access fields through the runloop!!)
  214. }
  215. // bytecodeHealResponse is an already verified remote response to a bytecode request.
  216. type bytecodeHealResponse struct {
  217. task *healTask // Task which this request is filling
  218. hashes []common.Hash // Hashes of the bytecode to avoid double hashing
  219. codes [][]byte // Actual bytecodes to store into the database (nil = missing)
  220. }
  221. // accountTask represents the sync task for a chunk of the account snapshot.
  222. type accountTask struct {
  223. // These fields get serialized to leveldb on shutdown
  224. Next common.Hash // Next account to sync in this interval
  225. Last common.Hash // Last account to sync in this interval
  226. SubTasks map[common.Hash][]*storageTask // Storage intervals needing fetching for large contracts
  227. // These fields are internals used during runtime
  228. req *accountRequest // Pending request to fill this task
  229. res *accountResponse // Validate response filling this task
  230. pend int // Number of pending subtasks for this round
  231. needCode []bool // Flags whether the filling accounts need code retrieval
  232. needState []bool // Flags whether the filling accounts need storage retrieval
  233. needHeal []bool // Flags whether the filling accounts's state was chunked and need healing
  234. codeTasks map[common.Hash]struct{} // Code hashes that need retrieval
  235. stateTasks map[common.Hash]common.Hash // Account hashes->roots that need full state retrieval
  236. done bool // Flag whether the task can be removed
  237. }
  238. // storageTask represents the sync task for a chunk of the storage snapshot.
  239. type storageTask struct {
  240. Next common.Hash // Next account to sync in this interval
  241. Last common.Hash // Last account to sync in this interval
  242. // These fields are internals used during runtime
  243. root common.Hash // Storage root hash for this instance
  244. req *storageRequest // Pending request to fill this task
  245. done bool // Flag whether the task can be removed
  246. }
  247. // healTask represents the sync task for healing the snap-synced chunk boundaries.
  248. type healTask struct {
  249. scheduler *trie.Sync // State trie sync scheduler defining the tasks
  250. trieTasks map[common.Hash]trie.SyncPath // Set of trie node tasks currently queued for retrieval
  251. codeTasks map[common.Hash]struct{} // Set of byte code tasks currently queued for retrieval
  252. }
  253. // syncProgress is a database entry to allow suspending and resuming a snapshot state
  254. // sync. Opposed to full and fast sync, there is no way to restart a suspended
  255. // snap sync without prior knowledge of the suspension point.
  256. type syncProgress struct {
  257. Tasks []*accountTask // The suspended account tasks (contract tasks within)
  258. // Status report during syncing phase
  259. AccountSynced uint64 // Number of accounts downloaded
  260. AccountBytes common.StorageSize // Number of account trie bytes persisted to disk
  261. BytecodeSynced uint64 // Number of bytecodes downloaded
  262. BytecodeBytes common.StorageSize // Number of bytecode bytes downloaded
  263. StorageSynced uint64 // Number of storage slots downloaded
  264. StorageBytes common.StorageSize // Number of storage trie bytes persisted to disk
  265. // Status report during healing phase
  266. TrienodeHealSynced uint64 // Number of state trie nodes downloaded
  267. TrienodeHealBytes common.StorageSize // Number of state trie bytes persisted to disk
  268. TrienodeHealDups uint64 // Number of state trie nodes already processed
  269. TrienodeHealNops uint64 // Number of state trie nodes not requested
  270. BytecodeHealSynced uint64 // Number of bytecodes downloaded
  271. BytecodeHealBytes common.StorageSize // Number of bytecodes persisted to disk
  272. BytecodeHealDups uint64 // Number of bytecodes already processed
  273. BytecodeHealNops uint64 // Number of bytecodes not requested
  274. }
  275. // Syncer is an Ethereum account and storage trie syncer based on snapshots and
  276. // the snap protocol. It's purpose is to download all the accounts and storage
  277. // slots from remote peers and reassemble chunks of the state trie, on top of
  278. // which a state sync can be run to fix any gaps / overlaps.
  279. //
  280. // Every network request has a variety of failure events:
  281. // - The peer disconnects after task assignment, failing to send the request
  282. // - The peer disconnects after sending the request, before delivering on it
  283. // - The peer remains connected, but does not deliver a response in time
  284. // - The peer delivers a stale response after a previous timeout
  285. // - The peer delivers a refusal to serve the requested state
  286. type Syncer struct {
  287. db ethdb.KeyValueStore // Database to store the trie nodes into (and dedup)
  288. bloom *trie.SyncBloom // Bloom filter to deduplicate nodes for state fixup
  289. root common.Hash // Current state trie root being synced
  290. tasks []*accountTask // Current account task set being synced
  291. healer *healTask // Current state healing task being executed
  292. update chan struct{} // Notification channel for possible sync progression
  293. peers map[string]*Peer // Currently active peers to download from
  294. peerJoin *event.Feed // Event feed to react to peers joining
  295. peerDrop *event.Feed // Event feed to react to peers dropping
  296. // Request tracking during syncing phase
  297. statelessPeers map[string]struct{} // Peers that failed to deliver state data
  298. accountIdlers map[string]struct{} // Peers that aren't serving account requests
  299. bytecodeIdlers map[string]struct{} // Peers that aren't serving bytecode requests
  300. storageIdlers map[string]struct{} // Peers that aren't serving storage requests
  301. accountReqs map[uint64]*accountRequest // Account requests currently running
  302. bytecodeReqs map[uint64]*bytecodeRequest // Bytecode requests currently running
  303. storageReqs map[uint64]*storageRequest // Storage requests currently running
  304. accountReqFails chan *accountRequest // Failed account range requests to revert
  305. bytecodeReqFails chan *bytecodeRequest // Failed bytecode requests to revert
  306. storageReqFails chan *storageRequest // Failed storage requests to revert
  307. accountResps chan *accountResponse // Account sub-tries to integrate into the database
  308. bytecodeResps chan *bytecodeResponse // Bytecodes to integrate into the database
  309. storageResps chan *storageResponse // Storage sub-tries to integrate into the database
  310. accountSynced uint64 // Number of accounts downloaded
  311. accountBytes common.StorageSize // Number of account trie bytes persisted to disk
  312. bytecodeSynced uint64 // Number of bytecodes downloaded
  313. bytecodeBytes common.StorageSize // Number of bytecode bytes downloaded
  314. storageSynced uint64 // Number of storage slots downloaded
  315. storageBytes common.StorageSize // Number of storage trie bytes persisted to disk
  316. // Request tracking during healing phase
  317. trienodeHealIdlers map[string]struct{} // Peers that aren't serving trie node requests
  318. bytecodeHealIdlers map[string]struct{} // Peers that aren't serving bytecode requests
  319. trienodeHealReqs map[uint64]*trienodeHealRequest // Trie node requests currently running
  320. bytecodeHealReqs map[uint64]*bytecodeHealRequest // Bytecode requests currently running
  321. trienodeHealReqFails chan *trienodeHealRequest // Failed trienode requests to revert
  322. bytecodeHealReqFails chan *bytecodeHealRequest // Failed bytecode requests to revert
  323. trienodeHealResps chan *trienodeHealResponse // Trie nodes to integrate into the database
  324. bytecodeHealResps chan *bytecodeHealResponse // Bytecodes to integrate into the database
  325. trienodeHealSynced uint64 // Number of state trie nodes downloaded
  326. trienodeHealBytes common.StorageSize // Number of state trie bytes persisted to disk
  327. trienodeHealDups uint64 // Number of state trie nodes already processed
  328. trienodeHealNops uint64 // Number of state trie nodes not requested
  329. bytecodeHealSynced uint64 // Number of bytecodes downloaded
  330. bytecodeHealBytes common.StorageSize // Number of bytecodes persisted to disk
  331. bytecodeHealDups uint64 // Number of bytecodes already processed
  332. bytecodeHealNops uint64 // Number of bytecodes not requested
  333. startTime time.Time // Time instance when snapshot sync started
  334. startAcc common.Hash // Account hash where sync started from
  335. logTime time.Time // Time instance when status was last reported
  336. pend sync.WaitGroup // Tracks network request goroutines for graceful shutdown
  337. lock sync.RWMutex // Protects fields that can change outside of sync (peers, reqs, root)
  338. }
  339. func NewSyncer(db ethdb.KeyValueStore, bloom *trie.SyncBloom) *Syncer {
  340. return &Syncer{
  341. db: db,
  342. bloom: bloom,
  343. peers: make(map[string]*Peer),
  344. peerJoin: new(event.Feed),
  345. peerDrop: new(event.Feed),
  346. update: make(chan struct{}, 1),
  347. accountIdlers: make(map[string]struct{}),
  348. storageIdlers: make(map[string]struct{}),
  349. bytecodeIdlers: make(map[string]struct{}),
  350. accountReqs: make(map[uint64]*accountRequest),
  351. storageReqs: make(map[uint64]*storageRequest),
  352. bytecodeReqs: make(map[uint64]*bytecodeRequest),
  353. accountReqFails: make(chan *accountRequest),
  354. storageReqFails: make(chan *storageRequest),
  355. bytecodeReqFails: make(chan *bytecodeRequest),
  356. accountResps: make(chan *accountResponse),
  357. storageResps: make(chan *storageResponse),
  358. bytecodeResps: make(chan *bytecodeResponse),
  359. trienodeHealIdlers: make(map[string]struct{}),
  360. bytecodeHealIdlers: make(map[string]struct{}),
  361. trienodeHealReqs: make(map[uint64]*trienodeHealRequest),
  362. bytecodeHealReqs: make(map[uint64]*bytecodeHealRequest),
  363. trienodeHealReqFails: make(chan *trienodeHealRequest),
  364. bytecodeHealReqFails: make(chan *bytecodeHealRequest),
  365. trienodeHealResps: make(chan *trienodeHealResponse),
  366. bytecodeHealResps: make(chan *bytecodeHealResponse),
  367. }
  368. }
  369. // Register injects a new data source into the syncer's peerset.
  370. func (s *Syncer) Register(peer *Peer) error {
  371. // Make sure the peer is not registered yet
  372. s.lock.Lock()
  373. if _, ok := s.peers[peer.id]; ok {
  374. log.Error("Snap peer already registered", "id", peer.id)
  375. s.lock.Unlock()
  376. return errors.New("already registered")
  377. }
  378. s.peers[peer.id] = peer
  379. // Mark the peer as idle, even if no sync is running
  380. s.accountIdlers[peer.id] = struct{}{}
  381. s.storageIdlers[peer.id] = struct{}{}
  382. s.bytecodeIdlers[peer.id] = struct{}{}
  383. s.trienodeHealIdlers[peer.id] = struct{}{}
  384. s.bytecodeHealIdlers[peer.id] = struct{}{}
  385. s.lock.Unlock()
  386. // Notify any active syncs that a new peer can be assigned data
  387. s.peerJoin.Send(peer.id)
  388. return nil
  389. }
  390. // Unregister injects a new data source into the syncer's peerset.
  391. func (s *Syncer) Unregister(id string) error {
  392. // Remove all traces of the peer from the registry
  393. s.lock.Lock()
  394. if _, ok := s.peers[id]; !ok {
  395. log.Error("Snap peer not registered", "id", id)
  396. s.lock.Unlock()
  397. return errors.New("not registered")
  398. }
  399. delete(s.peers, id)
  400. // Remove status markers, even if no sync is running
  401. delete(s.statelessPeers, id)
  402. delete(s.accountIdlers, id)
  403. delete(s.storageIdlers, id)
  404. delete(s.bytecodeIdlers, id)
  405. delete(s.trienodeHealIdlers, id)
  406. delete(s.bytecodeHealIdlers, id)
  407. s.lock.Unlock()
  408. // Notify any active syncs that pending requests need to be reverted
  409. s.peerDrop.Send(id)
  410. return nil
  411. }
  412. // Sync starts (or resumes a previous) sync cycle to iterate over an state trie
  413. // with the given root and reconstruct the nodes based on the snapshot leaves.
  414. // Previously downloaded segments will not be redownloaded of fixed, rather any
  415. // errors will be healed after the leaves are fully accumulated.
  416. func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
  417. // Move the trie root from any previous value, revert stateless markers for
  418. // any peers and initialize the syncer if it was not yet run
  419. s.lock.Lock()
  420. s.root = root
  421. s.healer = &healTask{
  422. scheduler: state.NewStateSync(root, s.db, s.bloom),
  423. trieTasks: make(map[common.Hash]trie.SyncPath),
  424. codeTasks: make(map[common.Hash]struct{}),
  425. }
  426. s.statelessPeers = make(map[string]struct{})
  427. s.lock.Unlock()
  428. if s.startTime == (time.Time{}) {
  429. s.startTime = time.Now()
  430. }
  431. // Retrieve the previous sync status from LevelDB and abort if already synced
  432. s.loadSyncStatus()
  433. if len(s.tasks) == 0 && s.healer.scheduler.Pending() == 0 {
  434. log.Debug("Snapshot sync already completed")
  435. return nil
  436. }
  437. defer func() { // Persist any progress, independent of failure
  438. for _, task := range s.tasks {
  439. s.forwardAccountTask(task)
  440. }
  441. s.cleanAccountTasks()
  442. s.saveSyncStatus()
  443. }()
  444. log.Debug("Starting snapshot sync cycle", "root", root)
  445. defer s.report(true)
  446. // Whether sync completed or not, disregard any future packets
  447. defer func() {
  448. log.Debug("Terminating snapshot sync cycle", "root", root)
  449. s.lock.Lock()
  450. s.accountReqs = make(map[uint64]*accountRequest)
  451. s.storageReqs = make(map[uint64]*storageRequest)
  452. s.bytecodeReqs = make(map[uint64]*bytecodeRequest)
  453. s.trienodeHealReqs = make(map[uint64]*trienodeHealRequest)
  454. s.bytecodeHealReqs = make(map[uint64]*bytecodeHealRequest)
  455. s.lock.Unlock()
  456. }()
  457. // Keep scheduling sync tasks
  458. peerJoin := make(chan string, 16)
  459. peerJoinSub := s.peerJoin.Subscribe(peerJoin)
  460. defer peerJoinSub.Unsubscribe()
  461. peerDrop := make(chan string, 16)
  462. peerDropSub := s.peerDrop.Subscribe(peerDrop)
  463. defer peerDropSub.Unsubscribe()
  464. for {
  465. // Remove all completed tasks and terminate sync if everything's done
  466. s.cleanStorageTasks()
  467. s.cleanAccountTasks()
  468. if len(s.tasks) == 0 && s.healer.scheduler.Pending() == 0 {
  469. return nil
  470. }
  471. // Assign all the data retrieval tasks to any free peers
  472. s.assignAccountTasks(cancel)
  473. s.assignBytecodeTasks(cancel)
  474. s.assignStorageTasks(cancel)
  475. if len(s.tasks) == 0 {
  476. // Sync phase done, run heal phase
  477. s.assignTrienodeHealTasks(cancel)
  478. s.assignBytecodeHealTasks(cancel)
  479. }
  480. // Wait for something to happen
  481. select {
  482. case <-s.update:
  483. // Something happened (new peer, delivery, timeout), recheck tasks
  484. case <-peerJoin:
  485. // A new peer joined, try to schedule it new tasks
  486. case id := <-peerDrop:
  487. s.revertRequests(id)
  488. case <-cancel:
  489. return nil
  490. case req := <-s.accountReqFails:
  491. s.revertAccountRequest(req)
  492. case req := <-s.bytecodeReqFails:
  493. s.revertBytecodeRequest(req)
  494. case req := <-s.storageReqFails:
  495. s.revertStorageRequest(req)
  496. case req := <-s.trienodeHealReqFails:
  497. s.revertTrienodeHealRequest(req)
  498. case req := <-s.bytecodeHealReqFails:
  499. s.revertBytecodeHealRequest(req)
  500. case res := <-s.accountResps:
  501. s.processAccountResponse(res)
  502. case res := <-s.bytecodeResps:
  503. s.processBytecodeResponse(res)
  504. case res := <-s.storageResps:
  505. s.processStorageResponse(res)
  506. case res := <-s.trienodeHealResps:
  507. s.processTrienodeHealResponse(res)
  508. case res := <-s.bytecodeHealResps:
  509. s.processBytecodeHealResponse(res)
  510. }
  511. // Report stats if something meaningful happened
  512. s.report(false)
  513. }
  514. }
  515. // loadSyncStatus retrieves a previously aborted sync status from the database,
  516. // or generates a fresh one if none is available.
  517. func (s *Syncer) loadSyncStatus() {
  518. var progress syncProgress
  519. if status := rawdb.ReadSanpshotSyncStatus(s.db); status != nil {
  520. if err := json.Unmarshal(status, &progress); err != nil {
  521. log.Error("Failed to decode snap sync status", "err", err)
  522. } else {
  523. for _, task := range progress.Tasks {
  524. log.Debug("Scheduled account sync task", "from", task.Next, "last", task.Last)
  525. }
  526. s.tasks = progress.Tasks
  527. s.accountSynced = progress.AccountSynced
  528. s.accountBytes = progress.AccountBytes
  529. s.bytecodeSynced = progress.BytecodeSynced
  530. s.bytecodeBytes = progress.BytecodeBytes
  531. s.storageSynced = progress.StorageSynced
  532. s.storageBytes = progress.StorageBytes
  533. s.trienodeHealSynced = progress.TrienodeHealSynced
  534. s.trienodeHealBytes = progress.TrienodeHealBytes
  535. s.bytecodeHealSynced = progress.BytecodeHealSynced
  536. s.bytecodeHealBytes = progress.BytecodeHealBytes
  537. return
  538. }
  539. }
  540. // Either we've failed to decode the previus state, or there was none.
  541. // Start a fresh sync by chunking up the account range and scheduling
  542. // them for retrieval.
  543. s.tasks = nil
  544. s.accountSynced, s.accountBytes = 0, 0
  545. s.bytecodeSynced, s.bytecodeBytes = 0, 0
  546. s.storageSynced, s.storageBytes = 0, 0
  547. s.trienodeHealSynced, s.trienodeHealBytes = 0, 0
  548. s.bytecodeHealSynced, s.bytecodeHealBytes = 0, 0
  549. var next common.Hash
  550. step := new(big.Int).Sub(
  551. new(big.Int).Div(
  552. new(big.Int).Exp(common.Big2, common.Big256, nil),
  553. big.NewInt(accountConcurrency),
  554. ), common.Big1,
  555. )
  556. for i := 0; i < accountConcurrency; i++ {
  557. last := common.BigToHash(new(big.Int).Add(next.Big(), step))
  558. if i == accountConcurrency-1 {
  559. // Make sure we don't overflow if the step is not a proper divisor
  560. last = common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
  561. }
  562. s.tasks = append(s.tasks, &accountTask{
  563. Next: next,
  564. Last: last,
  565. SubTasks: make(map[common.Hash][]*storageTask),
  566. })
  567. log.Debug("Created account sync task", "from", next, "last", last)
  568. next = common.BigToHash(new(big.Int).Add(last.Big(), common.Big1))
  569. }
  570. }
  571. // saveSyncStatus marshals the remaining sync tasks into leveldb.
  572. func (s *Syncer) saveSyncStatus() {
  573. progress := &syncProgress{
  574. Tasks: s.tasks,
  575. AccountSynced: s.accountSynced,
  576. AccountBytes: s.accountBytes,
  577. BytecodeSynced: s.bytecodeSynced,
  578. BytecodeBytes: s.bytecodeBytes,
  579. StorageSynced: s.storageSynced,
  580. StorageBytes: s.storageBytes,
  581. TrienodeHealSynced: s.trienodeHealSynced,
  582. TrienodeHealBytes: s.trienodeHealBytes,
  583. BytecodeHealSynced: s.bytecodeHealSynced,
  584. BytecodeHealBytes: s.bytecodeHealBytes,
  585. }
  586. status, err := json.Marshal(progress)
  587. if err != nil {
  588. panic(err) // This can only fail during implementation
  589. }
  590. rawdb.WriteSnapshotSyncStatus(s.db, status)
  591. }
  592. // cleanAccountTasks removes account range retrieval tasks that have already been
  593. // completed.
  594. func (s *Syncer) cleanAccountTasks() {
  595. for i := 0; i < len(s.tasks); i++ {
  596. if s.tasks[i].done {
  597. s.tasks = append(s.tasks[:i], s.tasks[i+1:]...)
  598. i--
  599. }
  600. }
  601. }
  602. // cleanStorageTasks iterates over all the account tasks and storage sub-tasks
  603. // within, cleaning any that have been completed.
  604. func (s *Syncer) cleanStorageTasks() {
  605. for _, task := range s.tasks {
  606. for account, subtasks := range task.SubTasks {
  607. // Remove storage range retrieval tasks that completed
  608. for j := 0; j < len(subtasks); j++ {
  609. if subtasks[j].done {
  610. subtasks = append(subtasks[:j], subtasks[j+1:]...)
  611. j--
  612. }
  613. }
  614. if len(subtasks) > 0 {
  615. task.SubTasks[account] = subtasks
  616. continue
  617. }
  618. // If all storage chunks are done, mark the account as done too
  619. for j, hash := range task.res.hashes {
  620. if hash == account {
  621. task.needState[j] = false
  622. }
  623. }
  624. delete(task.SubTasks, account)
  625. task.pend--
  626. // If this was the last pending task, forward the account task
  627. if task.pend == 0 {
  628. s.forwardAccountTask(task)
  629. }
  630. }
  631. }
  632. }
  633. // assignAccountTasks attempts to match idle peers to pending account range
  634. // retrievals.
  635. func (s *Syncer) assignAccountTasks(cancel chan struct{}) {
  636. s.lock.Lock()
  637. defer s.lock.Unlock()
  638. // If there are no idle peers, short circuit assignment
  639. if len(s.accountIdlers) == 0 {
  640. return
  641. }
  642. // Iterate over all the tasks and try to find a pending one
  643. for _, task := range s.tasks {
  644. // Skip any tasks already filling
  645. if task.req != nil || task.res != nil {
  646. continue
  647. }
  648. // Task pending retrieval, try to find an idle peer. If no such peer
  649. // exists, we probably assigned tasks for all (or they are stateless).
  650. // Abort the entire assignment mechanism.
  651. var idle string
  652. for id := range s.accountIdlers {
  653. // If the peer rejected a query in this sync cycle, don't bother asking
  654. // again for anything, it's either out of sync or already pruned
  655. if _, ok := s.statelessPeers[id]; ok {
  656. continue
  657. }
  658. idle = id
  659. break
  660. }
  661. if idle == "" {
  662. return
  663. }
  664. // Matched a pending task to an idle peer, allocate a unique request id
  665. var reqid uint64
  666. for {
  667. reqid = uint64(rand.Int63())
  668. if reqid == 0 {
  669. continue
  670. }
  671. if _, ok := s.accountReqs[reqid]; ok {
  672. continue
  673. }
  674. break
  675. }
  676. // Generate the network query and send it to the peer
  677. req := &accountRequest{
  678. peer: idle,
  679. id: reqid,
  680. cancel: cancel,
  681. stale: make(chan struct{}),
  682. origin: task.Next,
  683. limit: task.Last,
  684. task: task,
  685. }
  686. req.timeout = time.AfterFunc(requestTimeout, func() {
  687. log.Debug("Account range request timed out")
  688. select {
  689. case s.accountReqFails <- req:
  690. default:
  691. }
  692. })
  693. s.accountReqs[reqid] = req
  694. delete(s.accountIdlers, idle)
  695. s.pend.Add(1)
  696. go func(peer *Peer, root common.Hash) {
  697. defer s.pend.Done()
  698. // Attempt to send the remote request and revert if it fails
  699. if err := peer.RequestAccountRange(reqid, root, req.origin, req.limit, maxRequestSize); err != nil {
  700. peer.Log().Debug("Failed to request account range", "err", err)
  701. select {
  702. case s.accountReqFails <- req:
  703. default:
  704. }
  705. }
  706. // Request successfully sent, start a
  707. }(s.peers[idle], s.root) // We're in the lock, peers[id] surely exists
  708. // Inject the request into the task to block further assignments
  709. task.req = req
  710. }
  711. }
  712. // assignBytecodeTasks attempts to match idle peers to pending code retrievals.
  713. func (s *Syncer) assignBytecodeTasks(cancel chan struct{}) {
  714. s.lock.Lock()
  715. defer s.lock.Unlock()
  716. // If there are no idle peers, short circuit assignment
  717. if len(s.bytecodeIdlers) == 0 {
  718. return
  719. }
  720. // Iterate over all the tasks and try to find a pending one
  721. for _, task := range s.tasks {
  722. // Skip any tasks not in the bytecode retrieval phase
  723. if task.res == nil {
  724. continue
  725. }
  726. // Skip tasks that are already retrieving (or done with) all codes
  727. if len(task.codeTasks) == 0 {
  728. continue
  729. }
  730. // Task pending retrieval, try to find an idle peer. If no such peer
  731. // exists, we probably assigned tasks for all (or they are stateless).
  732. // Abort the entire assignment mechanism.
  733. var idle string
  734. for id := range s.bytecodeIdlers {
  735. // If the peer rejected a query in this sync cycle, don't bother asking
  736. // again for anything, it's either out of sync or already pruned
  737. if _, ok := s.statelessPeers[id]; ok {
  738. continue
  739. }
  740. idle = id
  741. break
  742. }
  743. if idle == "" {
  744. return
  745. }
  746. // Matched a pending task to an idle peer, allocate a unique request id
  747. var reqid uint64
  748. for {
  749. reqid = uint64(rand.Int63())
  750. if reqid == 0 {
  751. continue
  752. }
  753. if _, ok := s.bytecodeReqs[reqid]; ok {
  754. continue
  755. }
  756. break
  757. }
  758. // Generate the network query and send it to the peer
  759. hashes := make([]common.Hash, 0, maxCodeRequestCount)
  760. for hash := range task.codeTasks {
  761. delete(task.codeTasks, hash)
  762. hashes = append(hashes, hash)
  763. if len(hashes) >= maxCodeRequestCount {
  764. break
  765. }
  766. }
  767. req := &bytecodeRequest{
  768. peer: idle,
  769. id: reqid,
  770. cancel: cancel,
  771. stale: make(chan struct{}),
  772. hashes: hashes,
  773. task: task,
  774. }
  775. req.timeout = time.AfterFunc(requestTimeout, func() {
  776. log.Debug("Bytecode request timed out")
  777. select {
  778. case s.bytecodeReqFails <- req:
  779. default:
  780. }
  781. })
  782. s.bytecodeReqs[reqid] = req
  783. delete(s.bytecodeIdlers, idle)
  784. s.pend.Add(1)
  785. go func(peer *Peer) {
  786. defer s.pend.Done()
  787. // Attempt to send the remote request and revert if it fails
  788. if err := peer.RequestByteCodes(reqid, hashes, maxRequestSize); err != nil {
  789. log.Debug("Failed to request bytecodes", "err", err)
  790. select {
  791. case s.bytecodeReqFails <- req:
  792. default:
  793. }
  794. }
  795. // Request successfully sent, start a
  796. }(s.peers[idle]) // We're in the lock, peers[id] surely exists
  797. }
  798. }
  799. // assignStorageTasks attempts to match idle peers to pending storage range
  800. // retrievals.
  801. func (s *Syncer) assignStorageTasks(cancel chan struct{}) {
  802. s.lock.Lock()
  803. defer s.lock.Unlock()
  804. // If there are no idle peers, short circuit assignment
  805. if len(s.storageIdlers) == 0 {
  806. return
  807. }
  808. // Iterate over all the tasks and try to find a pending one
  809. for _, task := range s.tasks {
  810. // Skip any tasks not in the storage retrieval phase
  811. if task.res == nil {
  812. continue
  813. }
  814. // Skip tasks that are already retrieving (or done with) all small states
  815. if len(task.SubTasks) == 0 && len(task.stateTasks) == 0 {
  816. continue
  817. }
  818. // Task pending retrieval, try to find an idle peer. If no such peer
  819. // exists, we probably assigned tasks for all (or they are stateless).
  820. // Abort the entire assignment mechanism.
  821. var idle string
  822. for id := range s.storageIdlers {
  823. // If the peer rejected a query in this sync cycle, don't bother asking
  824. // again for anything, it's either out of sync or already pruned
  825. if _, ok := s.statelessPeers[id]; ok {
  826. continue
  827. }
  828. idle = id
  829. break
  830. }
  831. if idle == "" {
  832. return
  833. }
  834. // Matched a pending task to an idle peer, allocate a unique request id
  835. var reqid uint64
  836. for {
  837. reqid = uint64(rand.Int63())
  838. if reqid == 0 {
  839. continue
  840. }
  841. if _, ok := s.storageReqs[reqid]; ok {
  842. continue
  843. }
  844. break
  845. }
  846. // Generate the network query and send it to the peer. If there are
  847. // large contract tasks pending, complete those before diving into
  848. // even more new contracts.
  849. var (
  850. accounts = make([]common.Hash, 0, maxStorageSetRequestCount)
  851. roots = make([]common.Hash, 0, maxStorageSetRequestCount)
  852. subtask *storageTask
  853. )
  854. for account, subtasks := range task.SubTasks {
  855. for _, st := range subtasks {
  856. // Skip any subtasks already filling
  857. if st.req != nil {
  858. continue
  859. }
  860. // Found an incomplete storage chunk, schedule it
  861. accounts = append(accounts, account)
  862. roots = append(roots, st.root)
  863. subtask = st
  864. break // Large contract chunks are downloaded individually
  865. }
  866. if subtask != nil {
  867. break // Large contract chunks are downloaded individually
  868. }
  869. }
  870. if subtask == nil {
  871. // No large contract required retrieval, but small ones available
  872. for acccount, root := range task.stateTasks {
  873. delete(task.stateTasks, acccount)
  874. accounts = append(accounts, acccount)
  875. roots = append(roots, root)
  876. if len(accounts) >= maxStorageSetRequestCount {
  877. break
  878. }
  879. }
  880. }
  881. // If nothing was found, it means this task is actually already fully
  882. // retrieving, but large contracts are hard to detect. Skip to the next.
  883. if len(accounts) == 0 {
  884. continue
  885. }
  886. req := &storageRequest{
  887. peer: idle,
  888. id: reqid,
  889. cancel: cancel,
  890. stale: make(chan struct{}),
  891. accounts: accounts,
  892. roots: roots,
  893. mainTask: task,
  894. subTask: subtask,
  895. }
  896. if subtask != nil {
  897. req.origin = subtask.Next
  898. req.limit = subtask.Last
  899. }
  900. req.timeout = time.AfterFunc(requestTimeout, func() {
  901. log.Debug("Storage request timed out")
  902. select {
  903. case s.storageReqFails <- req:
  904. default:
  905. }
  906. })
  907. s.storageReqs[reqid] = req
  908. delete(s.storageIdlers, idle)
  909. s.pend.Add(1)
  910. go func(peer *Peer, root common.Hash) {
  911. defer s.pend.Done()
  912. // Attempt to send the remote request and revert if it fails
  913. var origin, limit []byte
  914. if subtask != nil {
  915. origin, limit = req.origin[:], req.limit[:]
  916. }
  917. if err := peer.RequestStorageRanges(reqid, root, accounts, origin, limit, maxRequestSize); err != nil {
  918. log.Debug("Failed to request storage", "err", err)
  919. select {
  920. case s.storageReqFails <- req:
  921. default:
  922. }
  923. }
  924. // Request successfully sent, start a
  925. }(s.peers[idle], s.root) // We're in the lock, peers[id] surely exists
  926. // Inject the request into the subtask to block further assignments
  927. if subtask != nil {
  928. subtask.req = req
  929. }
  930. }
  931. }
  932. // assignTrienodeHealTasks attempts to match idle peers to trie node requests to
  933. // heal any trie errors caused by the snap sync's chunked retrieval model.
  934. func (s *Syncer) assignTrienodeHealTasks(cancel chan struct{}) {
  935. s.lock.Lock()
  936. defer s.lock.Unlock()
  937. // If there are no idle peers, short circuit assignment
  938. if len(s.trienodeHealIdlers) == 0 {
  939. return
  940. }
  941. // Iterate over pending tasks and try to find a peer to retrieve with
  942. for len(s.healer.trieTasks) > 0 || s.healer.scheduler.Pending() > 0 {
  943. // If there are not enough trie tasks queued to fully assign, fill the
  944. // queue from the state sync scheduler. The trie synced schedules these
  945. // together with bytecodes, so we need to queue them combined.
  946. var (
  947. have = len(s.healer.trieTasks) + len(s.healer.codeTasks)
  948. want = maxTrieRequestCount + maxCodeRequestCount
  949. )
  950. if have < want {
  951. nodes, paths, codes := s.healer.scheduler.Missing(want - have)
  952. for i, hash := range nodes {
  953. s.healer.trieTasks[hash] = paths[i]
  954. }
  955. for _, hash := range codes {
  956. s.healer.codeTasks[hash] = struct{}{}
  957. }
  958. }
  959. // If all the heal tasks are bytecodes or already downloading, bail
  960. if len(s.healer.trieTasks) == 0 {
  961. return
  962. }
  963. // Task pending retrieval, try to find an idle peer. If no such peer
  964. // exists, we probably assigned tasks for all (or they are stateless).
  965. // Abort the entire assignment mechanism.
  966. var idle string
  967. for id := range s.trienodeHealIdlers {
  968. // If the peer rejected a query in this sync cycle, don't bother asking
  969. // again for anything, it's either out of sync or already pruned
  970. if _, ok := s.statelessPeers[id]; ok {
  971. continue
  972. }
  973. idle = id
  974. break
  975. }
  976. if idle == "" {
  977. return
  978. }
  979. // Matched a pending task to an idle peer, allocate a unique request id
  980. var reqid uint64
  981. for {
  982. reqid = uint64(rand.Int63())
  983. if reqid == 0 {
  984. continue
  985. }
  986. if _, ok := s.trienodeHealReqs[reqid]; ok {
  987. continue
  988. }
  989. break
  990. }
  991. // Generate the network query and send it to the peer
  992. var (
  993. hashes = make([]common.Hash, 0, maxTrieRequestCount)
  994. paths = make([]trie.SyncPath, 0, maxTrieRequestCount)
  995. pathsets = make([]TrieNodePathSet, 0, maxTrieRequestCount)
  996. )
  997. for hash, pathset := range s.healer.trieTasks {
  998. delete(s.healer.trieTasks, hash)
  999. hashes = append(hashes, hash)
  1000. paths = append(paths, pathset)
  1001. pathsets = append(pathsets, [][]byte(pathset)) // TODO(karalabe): group requests by account hash
  1002. if len(hashes) >= maxTrieRequestCount {
  1003. break
  1004. }
  1005. }
  1006. req := &trienodeHealRequest{
  1007. peer: idle,
  1008. id: reqid,
  1009. cancel: cancel,
  1010. stale: make(chan struct{}),
  1011. hashes: hashes,
  1012. paths: paths,
  1013. task: s.healer,
  1014. }
  1015. req.timeout = time.AfterFunc(requestTimeout, func() {
  1016. log.Debug("Trienode heal request timed out")
  1017. select {
  1018. case s.trienodeHealReqFails <- req:
  1019. default:
  1020. }
  1021. })
  1022. s.trienodeHealReqs[reqid] = req
  1023. delete(s.trienodeHealIdlers, idle)
  1024. s.pend.Add(1)
  1025. go func(peer *Peer, root common.Hash) {
  1026. defer s.pend.Done()
  1027. // Attempt to send the remote request and revert if it fails
  1028. if err := peer.RequestTrieNodes(reqid, root, pathsets, maxRequestSize); err != nil {
  1029. log.Debug("Failed to request trienode healers", "err", err)
  1030. select {
  1031. case s.trienodeHealReqFails <- req:
  1032. default:
  1033. }
  1034. }
  1035. // Request successfully sent, start a
  1036. }(s.peers[idle], s.root) // We're in the lock, peers[id] surely exists
  1037. }
  1038. }
  1039. // assignBytecodeHealTasks attempts to match idle peers to bytecode requests to
  1040. // heal any trie errors caused by the snap sync's chunked retrieval model.
  1041. func (s *Syncer) assignBytecodeHealTasks(cancel chan struct{}) {
  1042. s.lock.Lock()
  1043. defer s.lock.Unlock()
  1044. // If there are no idle peers, short circuit assignment
  1045. if len(s.bytecodeHealIdlers) == 0 {
  1046. return
  1047. }
  1048. // Iterate over pending tasks and try to find a peer to retrieve with
  1049. for len(s.healer.codeTasks) > 0 || s.healer.scheduler.Pending() > 0 {
  1050. // If there are not enough trie tasks queued to fully assign, fill the
  1051. // queue from the state sync scheduler. The trie synced schedules these
  1052. // together with trie nodes, so we need to queue them combined.
  1053. var (
  1054. have = len(s.healer.trieTasks) + len(s.healer.codeTasks)
  1055. want = maxTrieRequestCount + maxCodeRequestCount
  1056. )
  1057. if have < want {
  1058. nodes, paths, codes := s.healer.scheduler.Missing(want - have)
  1059. for i, hash := range nodes {
  1060. s.healer.trieTasks[hash] = paths[i]
  1061. }
  1062. for _, hash := range codes {
  1063. s.healer.codeTasks[hash] = struct{}{}
  1064. }
  1065. }
  1066. // If all the heal tasks are trienodes or already downloading, bail
  1067. if len(s.healer.codeTasks) == 0 {
  1068. return
  1069. }
  1070. // Task pending retrieval, try to find an idle peer. If no such peer
  1071. // exists, we probably assigned tasks for all (or they are stateless).
  1072. // Abort the entire assignment mechanism.
  1073. var idle string
  1074. for id := range s.bytecodeHealIdlers {
  1075. // If the peer rejected a query in this sync cycle, don't bother asking
  1076. // again for anything, it's either out of sync or already pruned
  1077. if _, ok := s.statelessPeers[id]; ok {
  1078. continue
  1079. }
  1080. idle = id
  1081. break
  1082. }
  1083. if idle == "" {
  1084. return
  1085. }
  1086. // Matched a pending task to an idle peer, allocate a unique request id
  1087. var reqid uint64
  1088. for {
  1089. reqid = uint64(rand.Int63())
  1090. if reqid == 0 {
  1091. continue
  1092. }
  1093. if _, ok := s.bytecodeHealReqs[reqid]; ok {
  1094. continue
  1095. }
  1096. break
  1097. }
  1098. // Generate the network query and send it to the peer
  1099. hashes := make([]common.Hash, 0, maxCodeRequestCount)
  1100. for hash := range s.healer.codeTasks {
  1101. delete(s.healer.codeTasks, hash)
  1102. hashes = append(hashes, hash)
  1103. if len(hashes) >= maxCodeRequestCount {
  1104. break
  1105. }
  1106. }
  1107. req := &bytecodeHealRequest{
  1108. peer: idle,
  1109. id: reqid,
  1110. cancel: cancel,
  1111. stale: make(chan struct{}),
  1112. hashes: hashes,
  1113. task: s.healer,
  1114. }
  1115. req.timeout = time.AfterFunc(requestTimeout, func() {
  1116. log.Debug("Bytecode heal request timed out")
  1117. select {
  1118. case s.bytecodeHealReqFails <- req:
  1119. default:
  1120. }
  1121. })
  1122. s.bytecodeHealReqs[reqid] = req
  1123. delete(s.bytecodeHealIdlers, idle)
  1124. s.pend.Add(1)
  1125. go func(peer *Peer) {
  1126. defer s.pend.Done()
  1127. // Attempt to send the remote request and revert if it fails
  1128. if err := peer.RequestByteCodes(reqid, hashes, maxRequestSize); err != nil {
  1129. log.Debug("Failed to request bytecode healers", "err", err)
  1130. select {
  1131. case s.bytecodeHealReqFails <- req:
  1132. default:
  1133. }
  1134. }
  1135. // Request successfully sent, start a
  1136. }(s.peers[idle]) // We're in the lock, peers[id] surely exists
  1137. }
  1138. }
  1139. // revertRequests locates all the currently pending reuqests from a particular
  1140. // peer and reverts them, rescheduling for others to fulfill.
  1141. func (s *Syncer) revertRequests(peer string) {
  1142. // Gather the requests first, revertals need the lock too
  1143. s.lock.Lock()
  1144. var accountReqs []*accountRequest
  1145. for _, req := range s.accountReqs {
  1146. if req.peer == peer {
  1147. accountReqs = append(accountReqs, req)
  1148. }
  1149. }
  1150. var bytecodeReqs []*bytecodeRequest
  1151. for _, req := range s.bytecodeReqs {
  1152. if req.peer == peer {
  1153. bytecodeReqs = append(bytecodeReqs, req)
  1154. }
  1155. }
  1156. var storageReqs []*storageRequest
  1157. for _, req := range s.storageReqs {
  1158. if req.peer == peer {
  1159. storageReqs = append(storageReqs, req)
  1160. }
  1161. }
  1162. var trienodeHealReqs []*trienodeHealRequest
  1163. for _, req := range s.trienodeHealReqs {
  1164. if req.peer == peer {
  1165. trienodeHealReqs = append(trienodeHealReqs, req)
  1166. }
  1167. }
  1168. var bytecodeHealReqs []*bytecodeHealRequest
  1169. for _, req := range s.bytecodeHealReqs {
  1170. if req.peer == peer {
  1171. bytecodeHealReqs = append(bytecodeHealReqs, req)
  1172. }
  1173. }
  1174. s.lock.Unlock()
  1175. // Revert all the requests matching the peer
  1176. for _, req := range accountReqs {
  1177. s.revertAccountRequest(req)
  1178. }
  1179. for _, req := range bytecodeReqs {
  1180. s.revertBytecodeRequest(req)
  1181. }
  1182. for _, req := range storageReqs {
  1183. s.revertStorageRequest(req)
  1184. }
  1185. for _, req := range trienodeHealReqs {
  1186. s.revertTrienodeHealRequest(req)
  1187. }
  1188. for _, req := range bytecodeHealReqs {
  1189. s.revertBytecodeHealRequest(req)
  1190. }
  1191. }
  1192. // revertAccountRequest cleans up an account range request and returns all failed
  1193. // retrieval tasks to the scheduler for reassignment.
  1194. func (s *Syncer) revertAccountRequest(req *accountRequest) {
  1195. log.Trace("Reverting account request", "peer", req.peer, "reqid", req.id)
  1196. select {
  1197. case <-req.stale:
  1198. log.Trace("Account request already reverted", "peer", req.peer, "reqid", req.id)
  1199. return
  1200. default:
  1201. }
  1202. close(req.stale)
  1203. // Remove the request from the tracked set
  1204. s.lock.Lock()
  1205. delete(s.accountReqs, req.id)
  1206. s.lock.Unlock()
  1207. // If there's a timeout timer still running, abort it and mark the account
  1208. // task as not-pending, ready for resheduling
  1209. req.timeout.Stop()
  1210. if req.task.req == req {
  1211. req.task.req = nil
  1212. }
  1213. }
  1214. // revertBytecodeRequest cleans up an bytecode request and returns all failed
  1215. // retrieval tasks to the scheduler for reassignment.
  1216. func (s *Syncer) revertBytecodeRequest(req *bytecodeRequest) {
  1217. log.Trace("Reverting bytecode request", "peer", req.peer)
  1218. select {
  1219. case <-req.stale:
  1220. log.Trace("Bytecode request already reverted", "peer", req.peer, "reqid", req.id)
  1221. return
  1222. default:
  1223. }
  1224. close(req.stale)
  1225. // Remove the request from the tracked set
  1226. s.lock.Lock()
  1227. delete(s.bytecodeReqs, req.id)
  1228. s.lock.Unlock()
  1229. // If there's a timeout timer still running, abort it and mark the code
  1230. // retrievals as not-pending, ready for resheduling
  1231. req.timeout.Stop()
  1232. for _, hash := range req.hashes {
  1233. req.task.codeTasks[hash] = struct{}{}
  1234. }
  1235. }
  1236. // revertStorageRequest cleans up a storage range request and returns all failed
  1237. // retrieval tasks to the scheduler for reassignment.
  1238. func (s *Syncer) revertStorageRequest(req *storageRequest) {
  1239. log.Trace("Reverting storage request", "peer", req.peer)
  1240. select {
  1241. case <-req.stale:
  1242. log.Trace("Storage request already reverted", "peer", req.peer, "reqid", req.id)
  1243. return
  1244. default:
  1245. }
  1246. close(req.stale)
  1247. // Remove the request from the tracked set
  1248. s.lock.Lock()
  1249. delete(s.storageReqs, req.id)
  1250. s.lock.Unlock()
  1251. // If there's a timeout timer still running, abort it and mark the storage
  1252. // task as not-pending, ready for resheduling
  1253. req.timeout.Stop()
  1254. if req.subTask != nil {
  1255. req.subTask.req = nil
  1256. } else {
  1257. for i, account := range req.accounts {
  1258. req.mainTask.stateTasks[account] = req.roots[i]
  1259. }
  1260. }
  1261. }
  1262. // revertTrienodeHealRequest cleans up an trienode heal request and returns all
  1263. // failed retrieval tasks to the scheduler for reassignment.
  1264. func (s *Syncer) revertTrienodeHealRequest(req *trienodeHealRequest) {
  1265. log.Trace("Reverting trienode heal request", "peer", req.peer)
  1266. select {
  1267. case <-req.stale:
  1268. log.Trace("Trienode heal request already reverted", "peer", req.peer, "reqid", req.id)
  1269. return
  1270. default:
  1271. }
  1272. close(req.stale)
  1273. // Remove the request from the tracked set
  1274. s.lock.Lock()
  1275. delete(s.trienodeHealReqs, req.id)
  1276. s.lock.Unlock()
  1277. // If there's a timeout timer still running, abort it and mark the trie node
  1278. // retrievals as not-pending, ready for resheduling
  1279. req.timeout.Stop()
  1280. for i, hash := range req.hashes {
  1281. req.task.trieTasks[hash] = [][]byte(req.paths[i])
  1282. }
  1283. }
  1284. // revertBytecodeHealRequest cleans up an bytecode request and returns all failed
  1285. // retrieval tasks to the scheduler for reassignment.
  1286. func (s *Syncer) revertBytecodeHealRequest(req *bytecodeHealRequest) {
  1287. log.Trace("Reverting bytecode heal request", "peer", req.peer)
  1288. select {
  1289. case <-req.stale:
  1290. log.Trace("Bytecode heal request already reverted", "peer", req.peer, "reqid", req.id)
  1291. return
  1292. default:
  1293. }
  1294. close(req.stale)
  1295. // Remove the request from the tracked set
  1296. s.lock.Lock()
  1297. delete(s.bytecodeHealReqs, req.id)
  1298. s.lock.Unlock()
  1299. // If there's a timeout timer still running, abort it and mark the code
  1300. // retrievals as not-pending, ready for resheduling
  1301. req.timeout.Stop()
  1302. for _, hash := range req.hashes {
  1303. req.task.codeTasks[hash] = struct{}{}
  1304. }
  1305. }
  1306. // processAccountResponse integrates an already validated account range response
  1307. // into the account tasks.
  1308. func (s *Syncer) processAccountResponse(res *accountResponse) {
  1309. // Switch the task from pending to filling
  1310. res.task.req = nil
  1311. res.task.res = res
  1312. // Ensure that the response doesn't overflow into the subsequent task
  1313. last := res.task.Last.Big()
  1314. for i, hash := range res.hashes {
  1315. if hash.Big().Cmp(last) > 0 {
  1316. // Chunk overflown, cut off excess, but also update the boundary nodes
  1317. for j := i; j < len(res.hashes); j++ {
  1318. if err := res.trie.Prove(res.hashes[j][:], 0, res.overflow); err != nil {
  1319. panic(err) // Account range was already proven, what happened
  1320. }
  1321. }
  1322. res.hashes = res.hashes[:i]
  1323. res.accounts = res.accounts[:i]
  1324. res.cont = false // Mark range completed
  1325. break
  1326. }
  1327. }
  1328. // Itereate over all the accounts and assemble which ones need further sub-
  1329. // filling before the entire account range can be persisted.
  1330. res.task.needCode = make([]bool, len(res.accounts))
  1331. res.task.needState = make([]bool, len(res.accounts))
  1332. res.task.needHeal = make([]bool, len(res.accounts))
  1333. res.task.codeTasks = make(map[common.Hash]struct{})
  1334. res.task.stateTasks = make(map[common.Hash]common.Hash)
  1335. resumed := make(map[common.Hash]struct{})
  1336. res.task.pend = 0
  1337. for i, account := range res.accounts {
  1338. // Check if the account is a contract with an unknown code
  1339. if !bytes.Equal(account.CodeHash, emptyCode[:]) {
  1340. if code := rawdb.ReadCodeWithPrefix(s.db, common.BytesToHash(account.CodeHash)); code == nil {
  1341. res.task.codeTasks[common.BytesToHash(account.CodeHash)] = struct{}{}
  1342. res.task.needCode[i] = true
  1343. res.task.pend++
  1344. }
  1345. }
  1346. // Check if the account is a contract with an unknown storage trie
  1347. if account.Root != emptyRoot {
  1348. if node, err := s.db.Get(account.Root[:]); err != nil || node == nil {
  1349. // If there was a previous large state retrieval in progress,
  1350. // don't restart it from scratch. This happens if a sync cycle
  1351. // is interrupted and resumed later. However, *do* update the
  1352. // previous root hash.
  1353. if subtasks, ok := res.task.SubTasks[res.hashes[i]]; ok {
  1354. log.Error("Resuming large storage retrieval", "account", res.hashes[i], "root", account.Root)
  1355. for _, subtask := range subtasks {
  1356. subtask.root = account.Root
  1357. }
  1358. res.task.needHeal[i] = true
  1359. resumed[res.hashes[i]] = struct{}{}
  1360. } else {
  1361. res.task.stateTasks[res.hashes[i]] = account.Root
  1362. }
  1363. res.task.needState[i] = true
  1364. res.task.pend++
  1365. }
  1366. }
  1367. }
  1368. // Delete any subtasks that have been aborted but not resumed. This may undo
  1369. // some progress if a newpeer gives us less accounts than an old one, but for
  1370. // now we have to live with that.
  1371. for hash := range res.task.SubTasks {
  1372. if _, ok := resumed[hash]; !ok {
  1373. log.Error("Aborting suspended storage retrieval", "account", hash)
  1374. delete(res.task.SubTasks, hash)
  1375. }
  1376. }
  1377. // If the account range contained no contracts, or all have been fully filled
  1378. // beforehand, short circuit storage filling and forward to the next task
  1379. if res.task.pend == 0 {
  1380. s.forwardAccountTask(res.task)
  1381. return
  1382. }
  1383. // Some accounts are incomplete, leave as is for the storage and contract
  1384. // task assigners to pick up and fill.
  1385. }
  1386. // processBytecodeResponse integrates an already validated bytecode response
  1387. // into the account tasks.
  1388. func (s *Syncer) processBytecodeResponse(res *bytecodeResponse) {
  1389. batch := s.db.NewBatch()
  1390. var (
  1391. codes uint64
  1392. bytes common.StorageSize
  1393. )
  1394. for i, hash := range res.hashes {
  1395. code := res.codes[i]
  1396. // If the bytecode was not delivered, reschedule it
  1397. if code == nil {
  1398. res.task.codeTasks[hash] = struct{}{}
  1399. continue
  1400. }
  1401. // Code was delivered, mark it not needed any more
  1402. for j, account := range res.task.res.accounts {
  1403. if res.task.needCode[j] && hash == common.BytesToHash(account.CodeHash) {
  1404. res.task.needCode[j] = false
  1405. res.task.pend--
  1406. }
  1407. }
  1408. // Push the bytecode into a database batch
  1409. s.bytecodeSynced++
  1410. s.bytecodeBytes += common.StorageSize(len(code))
  1411. codes++
  1412. bytes += common.StorageSize(len(code))
  1413. rawdb.WriteCode(batch, hash, code)
  1414. s.bloom.Add(hash[:])
  1415. }
  1416. if err := batch.Write(); err != nil {
  1417. log.Crit("Failed to persist bytecodes", "err", err)
  1418. }
  1419. log.Debug("Persisted set of bytecodes", "count", codes, "bytes", bytes)
  1420. // If this delivery completed the last pending task, forward the account task
  1421. // to the next chunk
  1422. if res.task.pend == 0 {
  1423. s.forwardAccountTask(res.task)
  1424. return
  1425. }
  1426. // Some accounts are still incomplete, leave as is for the storage and contract
  1427. // task assigners to pick up and fill.
  1428. }
  1429. // processStorageResponse integrates an already validated storage response
  1430. // into the account tasks.
  1431. func (s *Syncer) processStorageResponse(res *storageResponse) {
  1432. // Switch the suntask from pending to idle
  1433. if res.subTask != nil {
  1434. res.subTask.req = nil
  1435. }
  1436. batch := s.db.NewBatch()
  1437. var (
  1438. slots int
  1439. nodes int
  1440. skipped int
  1441. bytes common.StorageSize
  1442. )
  1443. // Iterate over all the accounts and reconstruct their storage tries from the
  1444. // delivered slots
  1445. delivered := make(map[common.Hash]bool)
  1446. for i := 0; i < len(res.hashes); i++ {
  1447. delivered[res.roots[i]] = true
  1448. }
  1449. for i, account := range res.accounts {
  1450. // If the account was not delivered, reschedule it
  1451. if i >= len(res.hashes) {
  1452. if !delivered[res.roots[i]] {
  1453. res.mainTask.stateTasks[account] = res.roots[i]
  1454. }
  1455. continue
  1456. }
  1457. // State was delivered, if complete mark as not needed any more, otherwise
  1458. // mark the account as needing healing
  1459. for j, acc := range res.mainTask.res.accounts {
  1460. if res.roots[i] == acc.Root {
  1461. // If the packet contains multiple contract storage slots, all
  1462. // but the last are surely complete. The last contract may be
  1463. // chunked, so check it's continuation flag.
  1464. if res.subTask == nil && res.mainTask.needState[j] && (i < len(res.hashes)-1 || !res.cont) {
  1465. res.mainTask.needState[j] = false
  1466. res.mainTask.pend--
  1467. }
  1468. // If the last contract was chunked, mark it as needing healing
  1469. // to avoid writing it out to disk prematurely.
  1470. if res.subTask == nil && !res.mainTask.needHeal[j] && i == len(res.hashes)-1 && res.cont {
  1471. res.mainTask.needHeal[j] = true
  1472. }
  1473. // If the last contract was chunked, we need to switch to large
  1474. // contract handling mode
  1475. if res.subTask == nil && i == len(res.hashes)-1 && res.cont {
  1476. // If we haven't yet started a large-contract retrieval, create
  1477. // the subtasks for it within the main account task
  1478. if tasks, ok := res.mainTask.SubTasks[account]; !ok {
  1479. var (
  1480. next common.Hash
  1481. )
  1482. step := new(big.Int).Sub(
  1483. new(big.Int).Div(
  1484. new(big.Int).Exp(common.Big2, common.Big256, nil),
  1485. big.NewInt(storageConcurrency),
  1486. ), common.Big1,
  1487. )
  1488. for k := 0; k < storageConcurrency; k++ {
  1489. last := common.BigToHash(new(big.Int).Add(next.Big(), step))
  1490. if k == storageConcurrency-1 {
  1491. // Make sure we don't overflow if the step is not a proper divisor
  1492. last = common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
  1493. }
  1494. tasks = append(tasks, &storageTask{
  1495. Next: next,
  1496. Last: last,
  1497. root: acc.Root,
  1498. })
  1499. log.Debug("Created storage sync task", "account", account, "root", acc.Root, "from", next, "last", last)
  1500. next = common.BigToHash(new(big.Int).Add(last.Big(), common.Big1))
  1501. }
  1502. res.mainTask.SubTasks[account] = tasks
  1503. // Since we've just created the sub-tasks, this response
  1504. // is surely for the first one (zero origin)
  1505. res.subTask = tasks[0]
  1506. }
  1507. }
  1508. // If we're in large contract delivery mode, forward the subtask
  1509. if res.subTask != nil {
  1510. // Ensure the response doesn't overflow into the subsequent task
  1511. last := res.subTask.Last.Big()
  1512. for k, hash := range res.hashes[i] {
  1513. if hash.Big().Cmp(last) > 0 {
  1514. // Chunk overflown, cut off excess, but also update the boundary
  1515. for l := k; l < len(res.hashes[i]); l++ {
  1516. if err := res.tries[i].Prove(res.hashes[i][l][:], 0, res.overflow); err != nil {
  1517. panic(err) // Account range was already proven, what happened
  1518. }
  1519. }
  1520. res.hashes[i] = res.hashes[i][:k]
  1521. res.slots[i] = res.slots[i][:k]
  1522. res.cont = false // Mark range completed
  1523. break
  1524. }
  1525. }
  1526. // Forward the relevant storage chunk (even if created just now)
  1527. if res.cont {
  1528. res.subTask.Next = common.BigToHash(new(big.Int).Add(res.hashes[i][len(res.hashes[i])-1].Big(), big.NewInt(1)))
  1529. } else {
  1530. res.subTask.done = true
  1531. }
  1532. }
  1533. }
  1534. }
  1535. // Iterate over all the reconstructed trie nodes and push them to disk
  1536. slots += len(res.hashes[i])
  1537. it := res.nodes[i].NewIterator(nil, nil)
  1538. for it.Next() {
  1539. // Boundary nodes are not written for the last result, since they are incomplete
  1540. if i == len(res.hashes)-1 {
  1541. if _, ok := res.bounds[common.BytesToHash(it.Key())]; ok {
  1542. skipped++
  1543. continue
  1544. }
  1545. }
  1546. // Node is not a boundary, persist to disk
  1547. batch.Put(it.Key(), it.Value())
  1548. s.bloom.Add(it.Key())
  1549. bytes += common.StorageSize(common.HashLength + len(it.Value()))
  1550. nodes++
  1551. }
  1552. it.Release()
  1553. }
  1554. if err := batch.Write(); err != nil {
  1555. log.Crit("Failed to persist storage slots", "err", err)
  1556. }
  1557. s.storageSynced += uint64(slots)
  1558. s.storageBytes += bytes
  1559. log.Debug("Persisted set of storage slots", "accounts", len(res.hashes), "slots", slots, "nodes", nodes, "skipped", skipped, "bytes", bytes)
  1560. // If this delivery completed the last pending task, forward the account task
  1561. // to the next chunk
  1562. if res.mainTask.pend == 0 {
  1563. s.forwardAccountTask(res.mainTask)
  1564. return
  1565. }
  1566. // Some accounts are still incomplete, leave as is for the storage and contract
  1567. // task assigners to pick up and fill.
  1568. }
  1569. // processTrienodeHealResponse integrates an already validated trienode response
  1570. // into the healer tasks.
  1571. func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
  1572. for i, hash := range res.hashes {
  1573. node := res.nodes[i]
  1574. // If the trie node was not delivered, reschedule it
  1575. if node == nil {
  1576. res.task.trieTasks[hash] = res.paths[i]
  1577. continue
  1578. }
  1579. // Push the trie node into the state syncer
  1580. s.trienodeHealSynced++
  1581. s.trienodeHealBytes += common.StorageSize(len(node))
  1582. err := s.healer.scheduler.Process(trie.SyncResult{Hash: hash, Data: node})
  1583. switch err {
  1584. case nil:
  1585. case trie.ErrAlreadyProcessed:
  1586. s.trienodeHealDups++
  1587. case trie.ErrNotRequested:
  1588. s.trienodeHealNops++
  1589. default:
  1590. log.Error("Invalid trienode processed", "hash", hash, "err", err)
  1591. }
  1592. }
  1593. batch := s.db.NewBatch()
  1594. if err := s.healer.scheduler.Commit(batch); err != nil {
  1595. log.Error("Failed to commit healing data", "err", err)
  1596. }
  1597. if err := batch.Write(); err != nil {
  1598. log.Crit("Failed to persist healing data", "err", err)
  1599. }
  1600. log.Debug("Persisted set of healing data", "bytes", common.StorageSize(batch.ValueSize()))
  1601. }
  1602. // processBytecodeHealResponse integrates an already validated bytecode response
  1603. // into the healer tasks.
  1604. func (s *Syncer) processBytecodeHealResponse(res *bytecodeHealResponse) {
  1605. for i, hash := range res.hashes {
  1606. node := res.codes[i]
  1607. // If the trie node was not delivered, reschedule it
  1608. if node == nil {
  1609. res.task.codeTasks[hash] = struct{}{}
  1610. continue
  1611. }
  1612. // Push the trie node into the state syncer
  1613. s.bytecodeHealSynced++
  1614. s.bytecodeHealBytes += common.StorageSize(len(node))
  1615. err := s.healer.scheduler.Process(trie.SyncResult{Hash: hash, Data: node})
  1616. switch err {
  1617. case nil:
  1618. case trie.ErrAlreadyProcessed:
  1619. s.bytecodeHealDups++
  1620. case trie.ErrNotRequested:
  1621. s.bytecodeHealNops++
  1622. default:
  1623. log.Error("Invalid bytecode processed", "hash", hash, "err", err)
  1624. }
  1625. }
  1626. batch := s.db.NewBatch()
  1627. if err := s.healer.scheduler.Commit(batch); err != nil {
  1628. log.Error("Failed to commit healing data", "err", err)
  1629. }
  1630. if err := batch.Write(); err != nil {
  1631. log.Crit("Failed to persist healing data", "err", err)
  1632. }
  1633. log.Debug("Persisted set of healing data", "bytes", common.StorageSize(batch.ValueSize()))
  1634. }
  1635. // forwardAccountTask takes a filled account task and persists anything available
  1636. // into the database, after which it forwards the next account marker so that the
  1637. // task's next chunk may be filled.
  1638. func (s *Syncer) forwardAccountTask(task *accountTask) {
  1639. // Remove any pending delivery
  1640. res := task.res
  1641. if res == nil {
  1642. return // nothing to forward
  1643. }
  1644. task.res = nil
  1645. // Iterate over all the accounts and gather all the incomplete trie nodes. A
  1646. // node is incomplete if we haven't yet filled it (sync was interrupted), or
  1647. // if we filled it in multiple chunks (storage trie), in which case the few
  1648. // nodes on the chunk boundaries are missing.
  1649. incompletes := light.NewNodeSet()
  1650. for i := range res.accounts {
  1651. // If the filling was interrupted, mark everything after as incomplete
  1652. if task.needCode[i] || task.needState[i] {
  1653. for j := i; j < len(res.accounts); j++ {
  1654. if err := res.trie.Prove(res.hashes[j][:], 0, incompletes); err != nil {
  1655. panic(err) // Account range was already proven, what happened
  1656. }
  1657. }
  1658. break
  1659. }
  1660. // Filling not interrupted until this point, mark incomplete if needs healing
  1661. if task.needHeal[i] {
  1662. if err := res.trie.Prove(res.hashes[i][:], 0, incompletes); err != nil {
  1663. panic(err) // Account range was already proven, what happened
  1664. }
  1665. }
  1666. }
  1667. // Persist every finalized trie node that's not on the boundary
  1668. batch := s.db.NewBatch()
  1669. var (
  1670. nodes int
  1671. skipped int
  1672. bytes common.StorageSize
  1673. )
  1674. it := res.nodes.NewIterator(nil, nil)
  1675. for it.Next() {
  1676. // Boundary nodes are not written, since they are incomplete
  1677. if _, ok := res.bounds[common.BytesToHash(it.Key())]; ok {
  1678. skipped++
  1679. continue
  1680. }
  1681. // Overflow nodes are not written, since they mess with another task
  1682. if _, err := res.overflow.Get(it.Key()); err == nil {
  1683. skipped++
  1684. continue
  1685. }
  1686. // Accounts with split storage requests are incomplete
  1687. if _, err := incompletes.Get(it.Key()); err == nil {
  1688. skipped++
  1689. continue
  1690. }
  1691. // Node is neither a boundary, not an incomplete account, persist to disk
  1692. batch.Put(it.Key(), it.Value())
  1693. s.bloom.Add(it.Key())
  1694. bytes += common.StorageSize(common.HashLength + len(it.Value()))
  1695. nodes++
  1696. }
  1697. it.Release()
  1698. if err := batch.Write(); err != nil {
  1699. log.Crit("Failed to persist accounts", "err", err)
  1700. }
  1701. s.accountBytes += bytes
  1702. s.accountSynced += uint64(len(res.accounts))
  1703. log.Debug("Persisted range of accounts", "accounts", len(res.accounts), "nodes", nodes, "skipped", skipped, "bytes", bytes)
  1704. // Task filling persisted, push it the chunk marker forward to the first
  1705. // account still missing data.
  1706. for i, hash := range res.hashes {
  1707. if task.needCode[i] || task.needState[i] {
  1708. return
  1709. }
  1710. task.Next = common.BigToHash(new(big.Int).Add(hash.Big(), big.NewInt(1)))
  1711. }
  1712. // All accounts marked as complete, track if the entire task is done
  1713. task.done = !res.cont
  1714. }
  1715. // OnAccounts is a callback method to invoke when a range of accounts are
  1716. // received from a remote peer.
  1717. func (s *Syncer) OnAccounts(peer *Peer, id uint64, hashes []common.Hash, accounts [][]byte, proof [][]byte) error {
  1718. size := common.StorageSize(len(hashes) * common.HashLength)
  1719. for _, account := range accounts {
  1720. size += common.StorageSize(len(account))
  1721. }
  1722. for _, node := range proof {
  1723. size += common.StorageSize(len(node))
  1724. }
  1725. logger := peer.logger.New("reqid", id)
  1726. logger.Trace("Delivering range of accounts", "hashes", len(hashes), "accounts", len(accounts), "proofs", len(proof), "bytes", size)
  1727. // Whether or not the response is valid, we can mark the peer as idle and
  1728. // notify the scheduler to assign a new task. If the response is invalid,
  1729. // we'll drop the peer in a bit.
  1730. s.lock.Lock()
  1731. if _, ok := s.peers[peer.id]; ok {
  1732. s.accountIdlers[peer.id] = struct{}{}
  1733. }
  1734. select {
  1735. case s.update <- struct{}{}:
  1736. default:
  1737. }
  1738. // Ensure the response is for a valid request
  1739. req, ok := s.accountReqs[id]
  1740. if !ok {
  1741. // Request stale, perhaps the peer timed out but came through in the end
  1742. logger.Warn("Unexpected account range packet")
  1743. s.lock.Unlock()
  1744. return nil
  1745. }
  1746. delete(s.accountReqs, id)
  1747. // Clean up the request timeout timer, we'll see how to proceed further based
  1748. // on the actual delivered content
  1749. req.timeout.Stop()
  1750. // Response is valid, but check if peer is signalling that it does not have
  1751. // the requested data. For account range queries that means the state being
  1752. // retrieved was either already pruned remotely, or the peer is not yet
  1753. // synced to our head.
  1754. if len(hashes) == 0 && len(accounts) == 0 && len(proof) == 0 {
  1755. logger.Debug("Peer rejected account range request", "root", s.root)
  1756. s.statelessPeers[peer.id] = struct{}{}
  1757. s.lock.Unlock()
  1758. return nil
  1759. }
  1760. root := s.root
  1761. s.lock.Unlock()
  1762. // Reconstruct a partial trie from the response and verify it
  1763. keys := make([][]byte, len(hashes))
  1764. for i, key := range hashes {
  1765. keys[i] = common.CopyBytes(key[:])
  1766. }
  1767. nodes := make(light.NodeList, len(proof))
  1768. for i, node := range proof {
  1769. nodes[i] = node
  1770. }
  1771. proofdb := nodes.NodeSet()
  1772. var end []byte
  1773. if len(keys) > 0 {
  1774. end = keys[len(keys)-1]
  1775. }
  1776. db, tr, notary, cont, err := trie.VerifyRangeProof(root, req.origin[:], end, keys, accounts, proofdb)
  1777. if err != nil {
  1778. logger.Warn("Account range failed proof", "err", err)
  1779. return err
  1780. }
  1781. // Partial trie reconstructed, send it to the scheduler for storage filling
  1782. bounds := make(map[common.Hash]struct{})
  1783. it := notary.Accessed().NewIterator(nil, nil)
  1784. for it.Next() {
  1785. bounds[common.BytesToHash(it.Key())] = struct{}{}
  1786. }
  1787. it.Release()
  1788. accs := make([]*state.Account, len(accounts))
  1789. for i, account := range accounts {
  1790. acc := new(state.Account)
  1791. if err := rlp.DecodeBytes(account, acc); err != nil {
  1792. panic(err) // We created these blobs, we must be able to decode them
  1793. }
  1794. accs[i] = acc
  1795. }
  1796. response := &accountResponse{
  1797. task: req.task,
  1798. hashes: hashes,
  1799. accounts: accs,
  1800. nodes: db,
  1801. trie: tr,
  1802. bounds: bounds,
  1803. overflow: light.NewNodeSet(),
  1804. cont: cont,
  1805. }
  1806. select {
  1807. case s.accountResps <- response:
  1808. case <-req.cancel:
  1809. case <-req.stale:
  1810. }
  1811. return nil
  1812. }
  1813. // OnByteCodes is a callback method to invoke when a batch of contract
  1814. // bytes codes are received from a remote peer.
  1815. func (s *Syncer) OnByteCodes(peer *Peer, id uint64, bytecodes [][]byte) error {
  1816. s.lock.RLock()
  1817. syncing := len(s.tasks) > 0
  1818. s.lock.RUnlock()
  1819. if syncing {
  1820. return s.onByteCodes(peer, id, bytecodes)
  1821. }
  1822. return s.onHealByteCodes(peer, id, bytecodes)
  1823. }
  1824. // onByteCodes is a callback method to invoke when a batch of contract
  1825. // bytes codes are received from a remote peer in the syncing phase.
  1826. func (s *Syncer) onByteCodes(peer *Peer, id uint64, bytecodes [][]byte) error {
  1827. var size common.StorageSize
  1828. for _, code := range bytecodes {
  1829. size += common.StorageSize(len(code))
  1830. }
  1831. logger := peer.logger.New("reqid", id)
  1832. logger.Trace("Delivering set of bytecodes", "bytecodes", len(bytecodes), "bytes", size)
  1833. // Whether or not the response is valid, we can mark the peer as idle and
  1834. // notify the scheduler to assign a new task. If the response is invalid,
  1835. // we'll drop the peer in a bit.
  1836. s.lock.Lock()
  1837. if _, ok := s.peers[peer.id]; ok {
  1838. s.bytecodeIdlers[peer.id] = struct{}{}
  1839. }
  1840. select {
  1841. case s.update <- struct{}{}:
  1842. default:
  1843. }
  1844. // Ensure the response is for a valid request
  1845. req, ok := s.bytecodeReqs[id]
  1846. if !ok {
  1847. // Request stale, perhaps the peer timed out but came through in the end
  1848. logger.Warn("Unexpected bytecode packet")
  1849. s.lock.Unlock()
  1850. return nil
  1851. }
  1852. delete(s.bytecodeReqs, id)
  1853. // Clean up the request timeout timer, we'll see how to proceed further based
  1854. // on the actual delivered content
  1855. req.timeout.Stop()
  1856. // Response is valid, but check if peer is signalling that it does not have
  1857. // the requested data. For bytecode range queries that means the peer is not
  1858. // yet synced.
  1859. if len(bytecodes) == 0 {
  1860. logger.Debug("Peer rejected bytecode request")
  1861. s.statelessPeers[peer.id] = struct{}{}
  1862. s.lock.Unlock()
  1863. return nil
  1864. }
  1865. s.lock.Unlock()
  1866. // Cross reference the requested bytecodes with the response to find gaps
  1867. // that the serving node is missing
  1868. hasher := sha3.NewLegacyKeccak256()
  1869. codes := make([][]byte, len(req.hashes))
  1870. for i, j := 0, 0; i < len(bytecodes); i++ {
  1871. // Find the next hash that we've been served, leaving misses with nils
  1872. hasher.Reset()
  1873. hasher.Write(bytecodes[i])
  1874. hash := hasher.Sum(nil)
  1875. for j < len(req.hashes) && !bytes.Equal(hash, req.hashes[j][:]) {
  1876. j++
  1877. }
  1878. if j < len(req.hashes) {
  1879. codes[j] = bytecodes[i]
  1880. j++
  1881. continue
  1882. }
  1883. // We've either ran out of hashes, or got unrequested data
  1884. logger.Warn("Unexpected bytecodes", "count", len(bytecodes)-i)
  1885. return errors.New("unexpected bytecode")
  1886. }
  1887. // Response validated, send it to the scheduler for filling
  1888. response := &bytecodeResponse{
  1889. task: req.task,
  1890. hashes: req.hashes,
  1891. codes: codes,
  1892. }
  1893. select {
  1894. case s.bytecodeResps <- response:
  1895. case <-req.cancel:
  1896. case <-req.stale:
  1897. }
  1898. return nil
  1899. }
  1900. // OnStorage is a callback method to invoke when ranges of storage slots
  1901. // are received from a remote peer.
  1902. func (s *Syncer) OnStorage(peer *Peer, id uint64, hashes [][]common.Hash, slots [][][]byte, proof [][]byte) error {
  1903. // Gather some trace stats to aid in debugging issues
  1904. var (
  1905. hashCount int
  1906. slotCount int
  1907. size common.StorageSize
  1908. )
  1909. for _, hashset := range hashes {
  1910. size += common.StorageSize(common.HashLength * len(hashset))
  1911. hashCount += len(hashset)
  1912. }
  1913. for _, slotset := range slots {
  1914. for _, slot := range slotset {
  1915. size += common.StorageSize(len(slot))
  1916. }
  1917. slotCount += len(slotset)
  1918. }
  1919. for _, node := range proof {
  1920. size += common.StorageSize(len(node))
  1921. }
  1922. logger := peer.logger.New("reqid", id)
  1923. logger.Trace("Delivering ranges of storage slots", "accounts", len(hashes), "hashes", hashCount, "slots", slotCount, "proofs", len(proof), "size", size)
  1924. // Whether or not the response is valid, we can mark the peer as idle and
  1925. // notify the scheduler to assign a new task. If the response is invalid,
  1926. // we'll drop the peer in a bit.
  1927. s.lock.Lock()
  1928. if _, ok := s.peers[peer.id]; ok {
  1929. s.storageIdlers[peer.id] = struct{}{}
  1930. }
  1931. select {
  1932. case s.update <- struct{}{}:
  1933. default:
  1934. }
  1935. // Ensure the response is for a valid request
  1936. req, ok := s.storageReqs[id]
  1937. if !ok {
  1938. // Request stale, perhaps the peer timed out but came through in the end
  1939. logger.Warn("Unexpected storage ranges packet")
  1940. s.lock.Unlock()
  1941. return nil
  1942. }
  1943. delete(s.storageReqs, id)
  1944. // Clean up the request timeout timer, we'll see how to proceed further based
  1945. // on the actual delivered content
  1946. req.timeout.Stop()
  1947. // Reject the response if the hash sets and slot sets don't match, or if the
  1948. // peer sent more data than requested.
  1949. if len(hashes) != len(slots) {
  1950. s.lock.Unlock()
  1951. logger.Warn("Hash and slot set size mismatch", "hashset", len(hashes), "slotset", len(slots))
  1952. return errors.New("hash and slot set size mismatch")
  1953. }
  1954. if len(hashes) > len(req.accounts) {
  1955. s.lock.Unlock()
  1956. logger.Warn("Hash set larger than requested", "hashset", len(hashes), "requested", len(req.accounts))
  1957. return errors.New("hash set larger than requested")
  1958. }
  1959. // Response is valid, but check if peer is signalling that it does not have
  1960. // the requested data. For storage range queries that means the state being
  1961. // retrieved was either already pruned remotely, or the peer is not yet
  1962. // synced to our head.
  1963. if len(hashes) == 0 {
  1964. logger.Debug("Peer rejected storage request")
  1965. s.statelessPeers[peer.id] = struct{}{}
  1966. s.lock.Unlock()
  1967. return nil
  1968. }
  1969. s.lock.Unlock()
  1970. // Reconstruct the partial tries from the response and verify them
  1971. var (
  1972. dbs = make([]ethdb.KeyValueStore, len(hashes))
  1973. tries = make([]*trie.Trie, len(hashes))
  1974. notary *trie.KeyValueNotary
  1975. cont bool
  1976. )
  1977. for i := 0; i < len(hashes); i++ {
  1978. // Convert the keys and proofs into an internal format
  1979. keys := make([][]byte, len(hashes[i]))
  1980. for j, key := range hashes[i] {
  1981. keys[j] = common.CopyBytes(key[:])
  1982. }
  1983. nodes := make(light.NodeList, 0, len(proof))
  1984. if i == len(hashes)-1 {
  1985. for _, node := range proof {
  1986. nodes = append(nodes, node)
  1987. }
  1988. }
  1989. var err error
  1990. if len(nodes) == 0 {
  1991. // No proof has been attached, the response must cover the entire key
  1992. // space and hash to the origin root.
  1993. dbs[i], tries[i], _, _, err = trie.VerifyRangeProof(req.roots[i], nil, nil, keys, slots[i], nil)
  1994. if err != nil {
  1995. logger.Warn("Storage slots failed proof", "err", err)
  1996. return err
  1997. }
  1998. } else {
  1999. // A proof was attached, the response is only partial, check that the
  2000. // returned data is indeed part of the storage trie
  2001. proofdb := nodes.NodeSet()
  2002. var end []byte
  2003. if len(keys) > 0 {
  2004. end = keys[len(keys)-1]
  2005. }
  2006. dbs[i], tries[i], notary, cont, err = trie.VerifyRangeProof(req.roots[i], req.origin[:], end, keys, slots[i], proofdb)
  2007. if err != nil {
  2008. logger.Warn("Storage range failed proof", "err", err)
  2009. return err
  2010. }
  2011. }
  2012. }
  2013. // Partial tries reconstructed, send them to the scheduler for storage filling
  2014. bounds := make(map[common.Hash]struct{})
  2015. if notary != nil { // if all contract storages are delivered in full, no notary will be created
  2016. it := notary.Accessed().NewIterator(nil, nil)
  2017. for it.Next() {
  2018. bounds[common.BytesToHash(it.Key())] = struct{}{}
  2019. }
  2020. it.Release()
  2021. }
  2022. response := &storageResponse{
  2023. mainTask: req.mainTask,
  2024. subTask: req.subTask,
  2025. accounts: req.accounts,
  2026. roots: req.roots,
  2027. hashes: hashes,
  2028. slots: slots,
  2029. nodes: dbs,
  2030. tries: tries,
  2031. bounds: bounds,
  2032. overflow: light.NewNodeSet(),
  2033. cont: cont,
  2034. }
  2035. select {
  2036. case s.storageResps <- response:
  2037. case <-req.cancel:
  2038. case <-req.stale:
  2039. }
  2040. return nil
  2041. }
  2042. // OnTrieNodes is a callback method to invoke when a batch of trie nodes
  2043. // are received from a remote peer.
  2044. func (s *Syncer) OnTrieNodes(peer *Peer, id uint64, trienodes [][]byte) error {
  2045. var size common.StorageSize
  2046. for _, node := range trienodes {
  2047. size += common.StorageSize(len(node))
  2048. }
  2049. logger := peer.logger.New("reqid", id)
  2050. logger.Trace("Delivering set of healing trienodes", "trienodes", len(trienodes), "bytes", size)
  2051. // Whether or not the response is valid, we can mark the peer as idle and
  2052. // notify the scheduler to assign a new task. If the response is invalid,
  2053. // we'll drop the peer in a bit.
  2054. s.lock.Lock()
  2055. if _, ok := s.peers[peer.id]; ok {
  2056. s.trienodeHealIdlers[peer.id] = struct{}{}
  2057. }
  2058. select {
  2059. case s.update <- struct{}{}:
  2060. default:
  2061. }
  2062. // Ensure the response is for a valid request
  2063. req, ok := s.trienodeHealReqs[id]
  2064. if !ok {
  2065. // Request stale, perhaps the peer timed out but came through in the end
  2066. logger.Warn("Unexpected trienode heal packet")
  2067. s.lock.Unlock()
  2068. return nil
  2069. }
  2070. delete(s.trienodeHealReqs, id)
  2071. // Clean up the request timeout timer, we'll see how to proceed further based
  2072. // on the actual delivered content
  2073. req.timeout.Stop()
  2074. // Response is valid, but check if peer is signalling that it does not have
  2075. // the requested data. For bytecode range queries that means the peer is not
  2076. // yet synced.
  2077. if len(trienodes) == 0 {
  2078. logger.Debug("Peer rejected trienode heal request")
  2079. s.statelessPeers[peer.id] = struct{}{}
  2080. s.lock.Unlock()
  2081. return nil
  2082. }
  2083. s.lock.Unlock()
  2084. // Cross reference the requested trienodes with the response to find gaps
  2085. // that the serving node is missing
  2086. hasher := sha3.NewLegacyKeccak256()
  2087. nodes := make([][]byte, len(req.hashes))
  2088. for i, j := 0, 0; i < len(trienodes); i++ {
  2089. // Find the next hash that we've been served, leaving misses with nils
  2090. hasher.Reset()
  2091. hasher.Write(trienodes[i])
  2092. hash := hasher.Sum(nil)
  2093. for j < len(req.hashes) && !bytes.Equal(hash, req.hashes[j][:]) {
  2094. j++
  2095. }
  2096. if j < len(req.hashes) {
  2097. nodes[j] = trienodes[i]
  2098. j++
  2099. continue
  2100. }
  2101. // We've either ran out of hashes, or got unrequested data
  2102. logger.Warn("Unexpected healing trienodes", "count", len(trienodes)-i)
  2103. return errors.New("unexpected healing trienode")
  2104. }
  2105. // Response validated, send it to the scheduler for filling
  2106. response := &trienodeHealResponse{
  2107. task: req.task,
  2108. hashes: req.hashes,
  2109. paths: req.paths,
  2110. nodes: nodes,
  2111. }
  2112. select {
  2113. case s.trienodeHealResps <- response:
  2114. case <-req.cancel:
  2115. case <-req.stale:
  2116. }
  2117. return nil
  2118. }
  2119. // onHealByteCodes is a callback method to invoke when a batch of contract
  2120. // bytes codes are received from a remote peer in the healing phase.
  2121. func (s *Syncer) onHealByteCodes(peer *Peer, id uint64, bytecodes [][]byte) error {
  2122. var size common.StorageSize
  2123. for _, code := range bytecodes {
  2124. size += common.StorageSize(len(code))
  2125. }
  2126. logger := peer.logger.New("reqid", id)
  2127. logger.Trace("Delivering set of healing bytecodes", "bytecodes", len(bytecodes), "bytes", size)
  2128. // Whether or not the response is valid, we can mark the peer as idle and
  2129. // notify the scheduler to assign a new task. If the response is invalid,
  2130. // we'll drop the peer in a bit.
  2131. s.lock.Lock()
  2132. if _, ok := s.peers[peer.id]; ok {
  2133. s.bytecodeHealIdlers[peer.id] = struct{}{}
  2134. }
  2135. select {
  2136. case s.update <- struct{}{}:
  2137. default:
  2138. }
  2139. // Ensure the response is for a valid request
  2140. req, ok := s.bytecodeHealReqs[id]
  2141. if !ok {
  2142. // Request stale, perhaps the peer timed out but came through in the end
  2143. logger.Warn("Unexpected bytecode heal packet")
  2144. s.lock.Unlock()
  2145. return nil
  2146. }
  2147. delete(s.bytecodeHealReqs, id)
  2148. // Clean up the request timeout timer, we'll see how to proceed further based
  2149. // on the actual delivered content
  2150. req.timeout.Stop()
  2151. // Response is valid, but check if peer is signalling that it does not have
  2152. // the requested data. For bytecode range queries that means the peer is not
  2153. // yet synced.
  2154. if len(bytecodes) == 0 {
  2155. logger.Debug("Peer rejected bytecode heal request")
  2156. s.statelessPeers[peer.id] = struct{}{}
  2157. s.lock.Unlock()
  2158. return nil
  2159. }
  2160. s.lock.Unlock()
  2161. // Cross reference the requested bytecodes with the response to find gaps
  2162. // that the serving node is missing
  2163. hasher := sha3.NewLegacyKeccak256()
  2164. codes := make([][]byte, len(req.hashes))
  2165. for i, j := 0, 0; i < len(bytecodes); i++ {
  2166. // Find the next hash that we've been served, leaving misses with nils
  2167. hasher.Reset()
  2168. hasher.Write(bytecodes[i])
  2169. hash := hasher.Sum(nil)
  2170. for j < len(req.hashes) && !bytes.Equal(hash, req.hashes[j][:]) {
  2171. j++
  2172. }
  2173. if j < len(req.hashes) {
  2174. codes[j] = bytecodes[i]
  2175. j++
  2176. continue
  2177. }
  2178. // We've either ran out of hashes, or got unrequested data
  2179. logger.Warn("Unexpected healing bytecodes", "count", len(bytecodes)-i)
  2180. return errors.New("unexpected healing bytecode")
  2181. }
  2182. // Response validated, send it to the scheduler for filling
  2183. response := &bytecodeHealResponse{
  2184. task: req.task,
  2185. hashes: req.hashes,
  2186. codes: codes,
  2187. }
  2188. select {
  2189. case s.bytecodeHealResps <- response:
  2190. case <-req.cancel:
  2191. case <-req.stale:
  2192. }
  2193. return nil
  2194. }
  2195. // hashSpace is the total size of the 256 bit hash space for accounts.
  2196. var hashSpace = new(big.Int).Exp(common.Big2, common.Big256, nil)
  2197. // report calculates various status reports and provides it to the user.
  2198. func (s *Syncer) report(force bool) {
  2199. if len(s.tasks) > 0 {
  2200. s.reportSyncProgress(force)
  2201. return
  2202. }
  2203. s.reportHealProgress(force)
  2204. }
  2205. // reportSyncProgress calculates various status reports and provides it to the user.
  2206. func (s *Syncer) reportSyncProgress(force bool) {
  2207. // Don't report all the events, just occasionally
  2208. if !force && time.Since(s.logTime) < 3*time.Second {
  2209. return
  2210. }
  2211. // Don't report anything until we have a meaningful progress
  2212. synced := s.accountBytes + s.bytecodeBytes + s.storageBytes
  2213. if synced == 0 {
  2214. return
  2215. }
  2216. accountGaps := new(big.Int)
  2217. for _, task := range s.tasks {
  2218. accountGaps.Add(accountGaps, new(big.Int).Sub(task.Last.Big(), task.Next.Big()))
  2219. }
  2220. accountFills := new(big.Int).Sub(hashSpace, accountGaps)
  2221. if accountFills.BitLen() == 0 {
  2222. return
  2223. }
  2224. s.logTime = time.Now()
  2225. estBytes := float64(new(big.Int).Div(
  2226. new(big.Int).Mul(new(big.Int).SetUint64(uint64(synced)), hashSpace),
  2227. accountFills,
  2228. ).Uint64())
  2229. elapsed := time.Since(s.startTime)
  2230. estTime := elapsed / time.Duration(synced) * time.Duration(estBytes)
  2231. // Create a mega progress report
  2232. var (
  2233. progress = fmt.Sprintf("%.2f%%", float64(synced)*100/estBytes)
  2234. accounts = fmt.Sprintf("%d@%v", s.accountSynced, s.accountBytes.TerminalString())
  2235. storage = fmt.Sprintf("%d@%v", s.storageSynced, s.storageBytes.TerminalString())
  2236. bytecode = fmt.Sprintf("%d@%v", s.bytecodeSynced, s.bytecodeBytes.TerminalString())
  2237. )
  2238. log.Info("State sync in progress", "synced", progress, "state", synced,
  2239. "accounts", accounts, "slots", storage, "codes", bytecode, "eta", common.PrettyDuration(estTime-elapsed))
  2240. }
  2241. // reportHealProgress calculates various status reports and provides it to the user.
  2242. func (s *Syncer) reportHealProgress(force bool) {
  2243. // Don't report all the events, just occasionally
  2244. if !force && time.Since(s.logTime) < 3*time.Second {
  2245. return
  2246. }
  2247. s.logTime = time.Now()
  2248. // Create a mega progress report
  2249. var (
  2250. trienode = fmt.Sprintf("%d@%v", s.trienodeHealSynced, s.trienodeHealBytes.TerminalString())
  2251. bytecode = fmt.Sprintf("%d@%v", s.bytecodeHealSynced, s.bytecodeHealBytes.TerminalString())
  2252. )
  2253. log.Info("State heal in progress", "nodes", trienode, "codes", bytecode,
  2254. "pending", s.healer.scheduler.Pending())
  2255. }