sync.go 92 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package snap
  17. import (
  18. "bytes"
  19. "encoding/json"
  20. "errors"
  21. "fmt"
  22. "math/big"
  23. "math/rand"
  24. "sync"
  25. "time"
  26. "github.com/ethereum/go-ethereum/common"
  27. "github.com/ethereum/go-ethereum/core/rawdb"
  28. "github.com/ethereum/go-ethereum/core/state"
  29. "github.com/ethereum/go-ethereum/core/state/snapshot"
  30. "github.com/ethereum/go-ethereum/crypto"
  31. "github.com/ethereum/go-ethereum/ethdb"
  32. "github.com/ethereum/go-ethereum/event"
  33. "github.com/ethereum/go-ethereum/light"
  34. "github.com/ethereum/go-ethereum/log"
  35. "github.com/ethereum/go-ethereum/rlp"
  36. "github.com/ethereum/go-ethereum/trie"
  37. "golang.org/x/crypto/sha3"
  38. )
  39. var (
  40. // emptyRoot is the known root hash of an empty trie.
  41. emptyRoot = common.HexToHash("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")
  42. // emptyCode is the known hash of the empty EVM bytecode.
  43. emptyCode = crypto.Keccak256Hash(nil)
  44. )
  45. const (
  46. // maxRequestSize is the maximum number of bytes to request from a remote peer.
  47. maxRequestSize = 512 * 1024
  48. // maxStorageSetRequestCount is the maximum number of contracts to request the
  49. // storage of in a single query. If this number is too low, we're not filling
  50. // responses fully and waste round trip times. If it's too high, we're capping
  51. // responses and waste bandwidth.
  52. maxStorageSetRequestCount = maxRequestSize / 1024
  53. // maxCodeRequestCount is the maximum number of bytecode blobs to request in a
  54. // single query. If this number is too low, we're not filling responses fully
  55. // and waste round trip times. If it's too high, we're capping responses and
  56. // waste bandwidth.
  57. //
  58. // Depoyed bytecodes are currently capped at 24KB, so the minimum request
  59. // size should be maxRequestSize / 24K. Assuming that most contracts do not
  60. // come close to that, requesting 4x should be a good approximation.
  61. maxCodeRequestCount = maxRequestSize / (24 * 1024) * 4
  62. // maxTrieRequestCount is the maximum number of trie node blobs to request in
  63. // a single query. If this number is too low, we're not filling responses fully
  64. // and waste round trip times. If it's too high, we're capping responses and
  65. // waste bandwidth.
  66. maxTrieRequestCount = 512
  67. // accountConcurrency is the number of chunks to split the account trie into
  68. // to allow concurrent retrievals.
  69. accountConcurrency = 16
  70. // storageConcurrency is the number of chunks to split the a large contract
  71. // storage trie into to allow concurrent retrievals.
  72. storageConcurrency = 16
  73. )
  74. var (
  75. // requestTimeout is the maximum time a peer is allowed to spend on serving
  76. // a single network request.
  77. requestTimeout = 15 * time.Second // TODO(karalabe): Make it dynamic ala fast-sync?
  78. )
  79. // ErrCancelled is returned from snap syncing if the operation was prematurely
  80. // terminated.
  81. var ErrCancelled = errors.New("sync cancelled")
  82. // accountRequest tracks a pending account range request to ensure responses are
  83. // to actual requests and to validate any security constraints.
  84. //
  85. // Concurrency note: account requests and responses are handled concurrently from
  86. // the main runloop to allow Merkle proof verifications on the peer's thread and
  87. // to drop on invalid response. The request struct must contain all the data to
  88. // construct the response without accessing runloop internals (i.e. task). That
  89. // is only included to allow the runloop to match a response to the task being
  90. // synced without having yet another set of maps.
  91. type accountRequest struct {
  92. peer string // Peer to which this request is assigned
  93. id uint64 // Request ID of this request
  94. cancel chan struct{} // Channel to track sync cancellation
  95. timeout *time.Timer // Timer to track delivery timeout
  96. stale chan struct{} // Channel to signal the request was dropped
  97. origin common.Hash // First account requested to allow continuation checks
  98. limit common.Hash // Last account requested to allow non-overlapping chunking
  99. task *accountTask // Task which this request is filling (only access fields through the runloop!!)
  100. }
  101. // accountResponse is an already Merkle-verified remote response to an account
  102. // range request. It contains the subtrie for the requested account range and
  103. // the database that's going to be filled with the internal nodes on commit.
  104. type accountResponse struct {
  105. task *accountTask // Task which this request is filling
  106. hashes []common.Hash // Account hashes in the returned range
  107. accounts []*state.Account // Expanded accounts in the returned range
  108. nodes ethdb.KeyValueStore // Database containing the reconstructed trie nodes
  109. trie *trie.Trie // Reconstructed trie to reject incomplete account paths
  110. bounds map[common.Hash]struct{} // Boundary nodes to avoid persisting incomplete accounts
  111. overflow *light.NodeSet // Overflow nodes to avoid persisting across chunk boundaries
  112. cont bool // Whether the account range has a continuation
  113. }
  114. // bytecodeRequest tracks a pending bytecode request to ensure responses are to
  115. // actual requests and to validate any security constraints.
  116. //
  117. // Concurrency note: bytecode requests and responses are handled concurrently from
  118. // the main runloop to allow Keccak256 hash verifications on the peer's thread and
  119. // to drop on invalid response. The request struct must contain all the data to
  120. // construct the response without accessing runloop internals (i.e. task). That
  121. // is only included to allow the runloop to match a response to the task being
  122. // synced without having yet another set of maps.
  123. type bytecodeRequest struct {
  124. peer string // Peer to which this request is assigned
  125. id uint64 // Request ID of this request
  126. cancel chan struct{} // Channel to track sync cancellation
  127. timeout *time.Timer // Timer to track delivery timeout
  128. stale chan struct{} // Channel to signal the request was dropped
  129. hashes []common.Hash // Bytecode hashes to validate responses
  130. task *accountTask // Task which this request is filling (only access fields through the runloop!!)
  131. }
  132. // bytecodeResponse is an already verified remote response to a bytecode request.
  133. type bytecodeResponse struct {
  134. task *accountTask // Task which this request is filling
  135. hashes []common.Hash // Hashes of the bytecode to avoid double hashing
  136. codes [][]byte // Actual bytecodes to store into the database (nil = missing)
  137. }
  138. // storageRequest tracks a pending storage ranges request to ensure responses are
  139. // to actual requests and to validate any security constraints.
  140. //
  141. // Concurrency note: storage requests and responses are handled concurrently from
  142. // the main runloop to allow Merkel proof verifications on the peer's thread and
  143. // to drop on invalid response. The request struct must contain all the data to
  144. // construct the response without accessing runloop internals (i.e. tasks). That
  145. // is only included to allow the runloop to match a response to the task being
  146. // synced without having yet another set of maps.
  147. type storageRequest struct {
  148. peer string // Peer to which this request is assigned
  149. id uint64 // Request ID of this request
  150. cancel chan struct{} // Channel to track sync cancellation
  151. timeout *time.Timer // Timer to track delivery timeout
  152. stale chan struct{} // Channel to signal the request was dropped
  153. accounts []common.Hash // Account hashes to validate responses
  154. roots []common.Hash // Storage roots to validate responses
  155. origin common.Hash // First storage slot requested to allow continuation checks
  156. limit common.Hash // Last storage slot requested to allow non-overlapping chunking
  157. mainTask *accountTask // Task which this response belongs to (only access fields through the runloop!!)
  158. subTask *storageTask // Task which this response is filling (only access fields through the runloop!!)
  159. }
  160. // storageResponse is an already Merkle-verified remote response to a storage
  161. // range request. It contains the subtries for the requested storage ranges and
  162. // the databases that's going to be filled with the internal nodes on commit.
  163. type storageResponse struct {
  164. mainTask *accountTask // Task which this response belongs to
  165. subTask *storageTask // Task which this response is filling
  166. accounts []common.Hash // Account hashes requested, may be only partially filled
  167. roots []common.Hash // Storage roots requested, may be only partially filled
  168. hashes [][]common.Hash // Storage slot hashes in the returned range
  169. slots [][][]byte // Storage slot values in the returned range
  170. nodes []ethdb.KeyValueStore // Database containing the reconstructed trie nodes
  171. tries []*trie.Trie // Reconstructed tries to reject overflown slots
  172. // Fields relevant for the last account only
  173. bounds map[common.Hash]struct{} // Boundary nodes to avoid persisting (incomplete)
  174. overflow *light.NodeSet // Overflow nodes to avoid persisting across chunk boundaries
  175. cont bool // Whether the last storage range has a continuation
  176. }
  177. // trienodeHealRequest tracks a pending state trie request to ensure responses
  178. // are to actual requests and to validate any security constraints.
  179. //
  180. // Concurrency note: trie node requests and responses are handled concurrently from
  181. // the main runloop to allow Keccak256 hash verifications on the peer's thread and
  182. // to drop on invalid response. The request struct must contain all the data to
  183. // construct the response without accessing runloop internals (i.e. task). That
  184. // is only included to allow the runloop to match a response to the task being
  185. // synced without having yet another set of maps.
  186. type trienodeHealRequest struct {
  187. peer string // Peer to which this request is assigned
  188. id uint64 // Request ID of this request
  189. cancel chan struct{} // Channel to track sync cancellation
  190. timeout *time.Timer // Timer to track delivery timeout
  191. stale chan struct{} // Channel to signal the request was dropped
  192. hashes []common.Hash // Trie node hashes to validate responses
  193. paths []trie.SyncPath // Trie node paths requested for rescheduling
  194. task *healTask // Task which this request is filling (only access fields through the runloop!!)
  195. }
  196. // trienodeHealResponse is an already verified remote response to a trie node request.
  197. type trienodeHealResponse struct {
  198. task *healTask // Task which this request is filling
  199. hashes []common.Hash // Hashes of the trie nodes to avoid double hashing
  200. paths []trie.SyncPath // Trie node paths requested for rescheduling missing ones
  201. nodes [][]byte // Actual trie nodes to store into the database (nil = missing)
  202. }
  203. // bytecodeHealRequest tracks a pending bytecode request to ensure responses are to
  204. // actual requests and to validate any security constraints.
  205. //
  206. // Concurrency note: bytecode requests and responses are handled concurrently from
  207. // the main runloop to allow Keccak256 hash verifications on the peer's thread and
  208. // to drop on invalid response. The request struct must contain all the data to
  209. // construct the response without accessing runloop internals (i.e. task). That
  210. // is only included to allow the runloop to match a response to the task being
  211. // synced without having yet another set of maps.
  212. type bytecodeHealRequest struct {
  213. peer string // Peer to which this request is assigned
  214. id uint64 // Request ID of this request
  215. cancel chan struct{} // Channel to track sync cancellation
  216. timeout *time.Timer // Timer to track delivery timeout
  217. stale chan struct{} // Channel to signal the request was dropped
  218. hashes []common.Hash // Bytecode hashes to validate responses
  219. task *healTask // Task which this request is filling (only access fields through the runloop!!)
  220. }
  221. // bytecodeHealResponse is an already verified remote response to a bytecode request.
  222. type bytecodeHealResponse struct {
  223. task *healTask // Task which this request is filling
  224. hashes []common.Hash // Hashes of the bytecode to avoid double hashing
  225. codes [][]byte // Actual bytecodes to store into the database (nil = missing)
  226. }
  227. // accountTask represents the sync task for a chunk of the account snapshot.
  228. type accountTask struct {
  229. // These fields get serialized to leveldb on shutdown
  230. Next common.Hash // Next account to sync in this interval
  231. Last common.Hash // Last account to sync in this interval
  232. SubTasks map[common.Hash][]*storageTask // Storage intervals needing fetching for large contracts
  233. // These fields are internals used during runtime
  234. req *accountRequest // Pending request to fill this task
  235. res *accountResponse // Validate response filling this task
  236. pend int // Number of pending subtasks for this round
  237. needCode []bool // Flags whether the filling accounts need code retrieval
  238. needState []bool // Flags whether the filling accounts need storage retrieval
  239. needHeal []bool // Flags whether the filling accounts's state was chunked and need healing
  240. codeTasks map[common.Hash]struct{} // Code hashes that need retrieval
  241. stateTasks map[common.Hash]common.Hash // Account hashes->roots that need full state retrieval
  242. done bool // Flag whether the task can be removed
  243. }
  244. // storageTask represents the sync task for a chunk of the storage snapshot.
  245. type storageTask struct {
  246. Next common.Hash // Next account to sync in this interval
  247. Last common.Hash // Last account to sync in this interval
  248. // These fields are internals used during runtime
  249. root common.Hash // Storage root hash for this instance
  250. req *storageRequest // Pending request to fill this task
  251. done bool // Flag whether the task can be removed
  252. }
  253. // healTask represents the sync task for healing the snap-synced chunk boundaries.
  254. type healTask struct {
  255. scheduler *trie.Sync // State trie sync scheduler defining the tasks
  256. trieTasks map[common.Hash]trie.SyncPath // Set of trie node tasks currently queued for retrieval
  257. codeTasks map[common.Hash]struct{} // Set of byte code tasks currently queued for retrieval
  258. }
  259. // syncProgress is a database entry to allow suspending and resuming a snapshot state
  260. // sync. Opposed to full and fast sync, there is no way to restart a suspended
  261. // snap sync without prior knowledge of the suspension point.
  262. type syncProgress struct {
  263. Tasks []*accountTask // The suspended account tasks (contract tasks within)
  264. // Status report during syncing phase
  265. AccountSynced uint64 // Number of accounts downloaded
  266. AccountBytes common.StorageSize // Number of account trie bytes persisted to disk
  267. BytecodeSynced uint64 // Number of bytecodes downloaded
  268. BytecodeBytes common.StorageSize // Number of bytecode bytes downloaded
  269. StorageSynced uint64 // Number of storage slots downloaded
  270. StorageBytes common.StorageSize // Number of storage trie bytes persisted to disk
  271. // Status report during healing phase
  272. TrienodeHealSynced uint64 // Number of state trie nodes downloaded
  273. TrienodeHealBytes common.StorageSize // Number of state trie bytes persisted to disk
  274. TrienodeHealDups uint64 // Number of state trie nodes already processed
  275. TrienodeHealNops uint64 // Number of state trie nodes not requested
  276. BytecodeHealSynced uint64 // Number of bytecodes downloaded
  277. BytecodeHealBytes common.StorageSize // Number of bytecodes persisted to disk
  278. BytecodeHealDups uint64 // Number of bytecodes already processed
  279. BytecodeHealNops uint64 // Number of bytecodes not requested
  280. }
  281. // SyncPeer abstracts out the methods required for a peer to be synced against
  282. // with the goal of allowing the construction of mock peers without the full
  283. // blown networking.
  284. type SyncPeer interface {
  285. // ID retrieves the peer's unique identifier.
  286. ID() string
  287. // RequestAccountRange fetches a batch of accounts rooted in a specific account
  288. // trie, starting with the origin.
  289. RequestAccountRange(id uint64, root, origin, limit common.Hash, bytes uint64) error
  290. // RequestStorageRange fetches a batch of storage slots belonging to one or
  291. // more accounts. If slots from only one accout is requested, an origin marker
  292. // may also be used to retrieve from there.
  293. RequestStorageRanges(id uint64, root common.Hash, accounts []common.Hash, origin, limit []byte, bytes uint64) error
  294. // RequestByteCodes fetches a batch of bytecodes by hash.
  295. RequestByteCodes(id uint64, hashes []common.Hash, bytes uint64) error
  296. // RequestTrieNodes fetches a batch of account or storage trie nodes rooted in
  297. // a specificstate trie.
  298. RequestTrieNodes(id uint64, root common.Hash, paths []TrieNodePathSet, bytes uint64) error
  299. // Log retrieves the peer's own contextual logger.
  300. Log() log.Logger
  301. }
  302. // Syncer is an Ethereum account and storage trie syncer based on snapshots and
  303. // the snap protocol. It's purpose is to download all the accounts and storage
  304. // slots from remote peers and reassemble chunks of the state trie, on top of
  305. // which a state sync can be run to fix any gaps / overlaps.
  306. //
  307. // Every network request has a variety of failure events:
  308. // - The peer disconnects after task assignment, failing to send the request
  309. // - The peer disconnects after sending the request, before delivering on it
  310. // - The peer remains connected, but does not deliver a response in time
  311. // - The peer delivers a stale response after a previous timeout
  312. // - The peer delivers a refusal to serve the requested state
  313. type Syncer struct {
  314. db ethdb.KeyValueStore // Database to store the trie nodes into (and dedup)
  315. root common.Hash // Current state trie root being synced
  316. tasks []*accountTask // Current account task set being synced
  317. snapped bool // Flag to signal that snap phase is done
  318. healer *healTask // Current state healing task being executed
  319. update chan struct{} // Notification channel for possible sync progression
  320. peers map[string]SyncPeer // Currently active peers to download from
  321. peerJoin *event.Feed // Event feed to react to peers joining
  322. peerDrop *event.Feed // Event feed to react to peers dropping
  323. // Request tracking during syncing phase
  324. statelessPeers map[string]struct{} // Peers that failed to deliver state data
  325. accountIdlers map[string]struct{} // Peers that aren't serving account requests
  326. bytecodeIdlers map[string]struct{} // Peers that aren't serving bytecode requests
  327. storageIdlers map[string]struct{} // Peers that aren't serving storage requests
  328. accountReqs map[uint64]*accountRequest // Account requests currently running
  329. bytecodeReqs map[uint64]*bytecodeRequest // Bytecode requests currently running
  330. storageReqs map[uint64]*storageRequest // Storage requests currently running
  331. accountReqFails chan *accountRequest // Failed account range requests to revert
  332. bytecodeReqFails chan *bytecodeRequest // Failed bytecode requests to revert
  333. storageReqFails chan *storageRequest // Failed storage requests to revert
  334. accountResps chan *accountResponse // Account sub-tries to integrate into the database
  335. bytecodeResps chan *bytecodeResponse // Bytecodes to integrate into the database
  336. storageResps chan *storageResponse // Storage sub-tries to integrate into the database
  337. accountSynced uint64 // Number of accounts downloaded
  338. accountBytes common.StorageSize // Number of account trie bytes persisted to disk
  339. bytecodeSynced uint64 // Number of bytecodes downloaded
  340. bytecodeBytes common.StorageSize // Number of bytecode bytes downloaded
  341. storageSynced uint64 // Number of storage slots downloaded
  342. storageBytes common.StorageSize // Number of storage trie bytes persisted to disk
  343. // Request tracking during healing phase
  344. trienodeHealIdlers map[string]struct{} // Peers that aren't serving trie node requests
  345. bytecodeHealIdlers map[string]struct{} // Peers that aren't serving bytecode requests
  346. trienodeHealReqs map[uint64]*trienodeHealRequest // Trie node requests currently running
  347. bytecodeHealReqs map[uint64]*bytecodeHealRequest // Bytecode requests currently running
  348. trienodeHealReqFails chan *trienodeHealRequest // Failed trienode requests to revert
  349. bytecodeHealReqFails chan *bytecodeHealRequest // Failed bytecode requests to revert
  350. trienodeHealResps chan *trienodeHealResponse // Trie nodes to integrate into the database
  351. bytecodeHealResps chan *bytecodeHealResponse // Bytecodes to integrate into the database
  352. trienodeHealSynced uint64 // Number of state trie nodes downloaded
  353. trienodeHealBytes common.StorageSize // Number of state trie bytes persisted to disk
  354. trienodeHealDups uint64 // Number of state trie nodes already processed
  355. trienodeHealNops uint64 // Number of state trie nodes not requested
  356. bytecodeHealSynced uint64 // Number of bytecodes downloaded
  357. bytecodeHealBytes common.StorageSize // Number of bytecodes persisted to disk
  358. bytecodeHealDups uint64 // Number of bytecodes already processed
  359. bytecodeHealNops uint64 // Number of bytecodes not requested
  360. stateWriter ethdb.Batch // Shared batch writer used for persisting raw states
  361. accountHealed uint64 // Number of accounts downloaded during the healing stage
  362. accountHealedBytes common.StorageSize // Number of raw account bytes persisted to disk during the healing stage
  363. storageHealed uint64 // Number of storage slots downloaded during the healing stage
  364. storageHealedBytes common.StorageSize // Number of raw storage bytes persisted to disk during the healing stage
  365. startTime time.Time // Time instance when snapshot sync started
  366. logTime time.Time // Time instance when status was last reported
  367. pend sync.WaitGroup // Tracks network request goroutines for graceful shutdown
  368. lock sync.RWMutex // Protects fields that can change outside of sync (peers, reqs, root)
  369. }
  370. // NewSyncer creates a new snapshot syncer to download the Ethereum state over the
  371. // snap protocol.
  372. func NewSyncer(db ethdb.KeyValueStore) *Syncer {
  373. return &Syncer{
  374. db: db,
  375. peers: make(map[string]SyncPeer),
  376. peerJoin: new(event.Feed),
  377. peerDrop: new(event.Feed),
  378. update: make(chan struct{}, 1),
  379. accountIdlers: make(map[string]struct{}),
  380. storageIdlers: make(map[string]struct{}),
  381. bytecodeIdlers: make(map[string]struct{}),
  382. accountReqs: make(map[uint64]*accountRequest),
  383. storageReqs: make(map[uint64]*storageRequest),
  384. bytecodeReqs: make(map[uint64]*bytecodeRequest),
  385. accountReqFails: make(chan *accountRequest),
  386. storageReqFails: make(chan *storageRequest),
  387. bytecodeReqFails: make(chan *bytecodeRequest),
  388. accountResps: make(chan *accountResponse),
  389. storageResps: make(chan *storageResponse),
  390. bytecodeResps: make(chan *bytecodeResponse),
  391. trienodeHealIdlers: make(map[string]struct{}),
  392. bytecodeHealIdlers: make(map[string]struct{}),
  393. trienodeHealReqs: make(map[uint64]*trienodeHealRequest),
  394. bytecodeHealReqs: make(map[uint64]*bytecodeHealRequest),
  395. trienodeHealReqFails: make(chan *trienodeHealRequest),
  396. bytecodeHealReqFails: make(chan *bytecodeHealRequest),
  397. trienodeHealResps: make(chan *trienodeHealResponse),
  398. bytecodeHealResps: make(chan *bytecodeHealResponse),
  399. stateWriter: db.NewBatch(),
  400. }
  401. }
  402. // Register injects a new data source into the syncer's peerset.
  403. func (s *Syncer) Register(peer SyncPeer) error {
  404. // Make sure the peer is not registered yet
  405. id := peer.ID()
  406. s.lock.Lock()
  407. if _, ok := s.peers[id]; ok {
  408. log.Error("Snap peer already registered", "id", id)
  409. s.lock.Unlock()
  410. return errors.New("already registered")
  411. }
  412. s.peers[id] = peer
  413. // Mark the peer as idle, even if no sync is running
  414. s.accountIdlers[id] = struct{}{}
  415. s.storageIdlers[id] = struct{}{}
  416. s.bytecodeIdlers[id] = struct{}{}
  417. s.trienodeHealIdlers[id] = struct{}{}
  418. s.bytecodeHealIdlers[id] = struct{}{}
  419. s.lock.Unlock()
  420. // Notify any active syncs that a new peer can be assigned data
  421. s.peerJoin.Send(id)
  422. return nil
  423. }
  424. // Unregister injects a new data source into the syncer's peerset.
  425. func (s *Syncer) Unregister(id string) error {
  426. // Remove all traces of the peer from the registry
  427. s.lock.Lock()
  428. if _, ok := s.peers[id]; !ok {
  429. log.Error("Snap peer not registered", "id", id)
  430. s.lock.Unlock()
  431. return errors.New("not registered")
  432. }
  433. delete(s.peers, id)
  434. // Remove status markers, even if no sync is running
  435. delete(s.statelessPeers, id)
  436. delete(s.accountIdlers, id)
  437. delete(s.storageIdlers, id)
  438. delete(s.bytecodeIdlers, id)
  439. delete(s.trienodeHealIdlers, id)
  440. delete(s.bytecodeHealIdlers, id)
  441. s.lock.Unlock()
  442. // Notify any active syncs that pending requests need to be reverted
  443. s.peerDrop.Send(id)
  444. return nil
  445. }
  446. // Sync starts (or resumes a previous) sync cycle to iterate over an state trie
  447. // with the given root and reconstruct the nodes based on the snapshot leaves.
  448. // Previously downloaded segments will not be redownloaded of fixed, rather any
  449. // errors will be healed after the leaves are fully accumulated.
  450. func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
  451. // Move the trie root from any previous value, revert stateless markers for
  452. // any peers and initialize the syncer if it was not yet run
  453. s.lock.Lock()
  454. s.root = root
  455. s.healer = &healTask{
  456. scheduler: state.NewStateSync(root, s.db, nil, s.onHealState),
  457. trieTasks: make(map[common.Hash]trie.SyncPath),
  458. codeTasks: make(map[common.Hash]struct{}),
  459. }
  460. s.statelessPeers = make(map[string]struct{})
  461. s.lock.Unlock()
  462. if s.startTime == (time.Time{}) {
  463. s.startTime = time.Now()
  464. }
  465. // Retrieve the previous sync status from LevelDB and abort if already synced
  466. s.loadSyncStatus()
  467. if len(s.tasks) == 0 && s.healer.scheduler.Pending() == 0 {
  468. log.Debug("Snapshot sync already completed")
  469. return nil
  470. }
  471. // If sync is still not finished, we need to ensure that any marker is wiped.
  472. // Otherwise, it may happen that requests for e.g. genesis-data is delivered
  473. // from the snapshot data, instead of from the trie
  474. snapshot.ClearSnapshotMarker(s.db)
  475. defer func() { // Persist any progress, independent of failure
  476. for _, task := range s.tasks {
  477. s.forwardAccountTask(task)
  478. }
  479. s.cleanAccountTasks()
  480. s.saveSyncStatus()
  481. }()
  482. log.Debug("Starting snapshot sync cycle", "root", root)
  483. // Flush out the last committed raw states
  484. defer func() {
  485. if s.stateWriter.ValueSize() > 0 {
  486. s.stateWriter.Write()
  487. s.stateWriter.Reset()
  488. }
  489. }()
  490. defer s.report(true)
  491. // Whether sync completed or not, disregard any future packets
  492. defer func() {
  493. log.Debug("Terminating snapshot sync cycle", "root", root)
  494. s.lock.Lock()
  495. s.accountReqs = make(map[uint64]*accountRequest)
  496. s.storageReqs = make(map[uint64]*storageRequest)
  497. s.bytecodeReqs = make(map[uint64]*bytecodeRequest)
  498. s.trienodeHealReqs = make(map[uint64]*trienodeHealRequest)
  499. s.bytecodeHealReqs = make(map[uint64]*bytecodeHealRequest)
  500. s.lock.Unlock()
  501. }()
  502. // Keep scheduling sync tasks
  503. peerJoin := make(chan string, 16)
  504. peerJoinSub := s.peerJoin.Subscribe(peerJoin)
  505. defer peerJoinSub.Unsubscribe()
  506. peerDrop := make(chan string, 16)
  507. peerDropSub := s.peerDrop.Subscribe(peerDrop)
  508. defer peerDropSub.Unsubscribe()
  509. for {
  510. // Remove all completed tasks and terminate sync if everything's done
  511. s.cleanStorageTasks()
  512. s.cleanAccountTasks()
  513. if len(s.tasks) == 0 && s.healer.scheduler.Pending() == 0 {
  514. return nil
  515. }
  516. // Assign all the data retrieval tasks to any free peers
  517. s.assignAccountTasks(cancel)
  518. s.assignBytecodeTasks(cancel)
  519. s.assignStorageTasks(cancel)
  520. if len(s.tasks) == 0 {
  521. // Sync phase done, run heal phase
  522. s.assignTrienodeHealTasks(cancel)
  523. s.assignBytecodeHealTasks(cancel)
  524. }
  525. // Wait for something to happen
  526. select {
  527. case <-s.update:
  528. // Something happened (new peer, delivery, timeout), recheck tasks
  529. case <-peerJoin:
  530. // A new peer joined, try to schedule it new tasks
  531. case id := <-peerDrop:
  532. s.revertRequests(id)
  533. case <-cancel:
  534. return ErrCancelled
  535. case req := <-s.accountReqFails:
  536. s.revertAccountRequest(req)
  537. case req := <-s.bytecodeReqFails:
  538. s.revertBytecodeRequest(req)
  539. case req := <-s.storageReqFails:
  540. s.revertStorageRequest(req)
  541. case req := <-s.trienodeHealReqFails:
  542. s.revertTrienodeHealRequest(req)
  543. case req := <-s.bytecodeHealReqFails:
  544. s.revertBytecodeHealRequest(req)
  545. case res := <-s.accountResps:
  546. s.processAccountResponse(res)
  547. case res := <-s.bytecodeResps:
  548. s.processBytecodeResponse(res)
  549. case res := <-s.storageResps:
  550. s.processStorageResponse(res)
  551. case res := <-s.trienodeHealResps:
  552. s.processTrienodeHealResponse(res)
  553. case res := <-s.bytecodeHealResps:
  554. s.processBytecodeHealResponse(res)
  555. }
  556. // Report stats if something meaningful happened
  557. s.report(false)
  558. }
  559. }
  560. // loadSyncStatus retrieves a previously aborted sync status from the database,
  561. // or generates a fresh one if none is available.
  562. func (s *Syncer) loadSyncStatus() {
  563. var progress syncProgress
  564. if status := rawdb.ReadSnapshotSyncStatus(s.db); status != nil {
  565. if err := json.Unmarshal(status, &progress); err != nil {
  566. log.Error("Failed to decode snap sync status", "err", err)
  567. } else {
  568. for _, task := range progress.Tasks {
  569. log.Debug("Scheduled account sync task", "from", task.Next, "last", task.Last)
  570. }
  571. s.tasks = progress.Tasks
  572. s.snapped = len(s.tasks) == 0
  573. s.accountSynced = progress.AccountSynced
  574. s.accountBytes = progress.AccountBytes
  575. s.bytecodeSynced = progress.BytecodeSynced
  576. s.bytecodeBytes = progress.BytecodeBytes
  577. s.storageSynced = progress.StorageSynced
  578. s.storageBytes = progress.StorageBytes
  579. s.trienodeHealSynced = progress.TrienodeHealSynced
  580. s.trienodeHealBytes = progress.TrienodeHealBytes
  581. s.bytecodeHealSynced = progress.BytecodeHealSynced
  582. s.bytecodeHealBytes = progress.BytecodeHealBytes
  583. return
  584. }
  585. }
  586. // Either we've failed to decode the previus state, or there was none.
  587. // Start a fresh sync by chunking up the account range and scheduling
  588. // them for retrieval.
  589. s.tasks = nil
  590. s.accountSynced, s.accountBytes = 0, 0
  591. s.bytecodeSynced, s.bytecodeBytes = 0, 0
  592. s.storageSynced, s.storageBytes = 0, 0
  593. s.trienodeHealSynced, s.trienodeHealBytes = 0, 0
  594. s.bytecodeHealSynced, s.bytecodeHealBytes = 0, 0
  595. var next common.Hash
  596. step := new(big.Int).Sub(
  597. new(big.Int).Div(
  598. new(big.Int).Exp(common.Big2, common.Big256, nil),
  599. big.NewInt(accountConcurrency),
  600. ), common.Big1,
  601. )
  602. for i := 0; i < accountConcurrency; i++ {
  603. last := common.BigToHash(new(big.Int).Add(next.Big(), step))
  604. if i == accountConcurrency-1 {
  605. // Make sure we don't overflow if the step is not a proper divisor
  606. last = common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
  607. }
  608. s.tasks = append(s.tasks, &accountTask{
  609. Next: next,
  610. Last: last,
  611. SubTasks: make(map[common.Hash][]*storageTask),
  612. })
  613. log.Debug("Created account sync task", "from", next, "last", last)
  614. next = common.BigToHash(new(big.Int).Add(last.Big(), common.Big1))
  615. }
  616. }
  617. // saveSyncStatus marshals the remaining sync tasks into leveldb.
  618. func (s *Syncer) saveSyncStatus() {
  619. progress := &syncProgress{
  620. Tasks: s.tasks,
  621. AccountSynced: s.accountSynced,
  622. AccountBytes: s.accountBytes,
  623. BytecodeSynced: s.bytecodeSynced,
  624. BytecodeBytes: s.bytecodeBytes,
  625. StorageSynced: s.storageSynced,
  626. StorageBytes: s.storageBytes,
  627. TrienodeHealSynced: s.trienodeHealSynced,
  628. TrienodeHealBytes: s.trienodeHealBytes,
  629. BytecodeHealSynced: s.bytecodeHealSynced,
  630. BytecodeHealBytes: s.bytecodeHealBytes,
  631. }
  632. status, err := json.Marshal(progress)
  633. if err != nil {
  634. panic(err) // This can only fail during implementation
  635. }
  636. rawdb.WriteSnapshotSyncStatus(s.db, status)
  637. }
  638. // cleanAccountTasks removes account range retrieval tasks that have already been
  639. // completed.
  640. func (s *Syncer) cleanAccountTasks() {
  641. for i := 0; i < len(s.tasks); i++ {
  642. if s.tasks[i].done {
  643. s.tasks = append(s.tasks[:i], s.tasks[i+1:]...)
  644. i--
  645. }
  646. }
  647. if len(s.tasks) == 0 {
  648. s.lock.Lock()
  649. s.snapped = true
  650. s.lock.Unlock()
  651. }
  652. }
  653. // cleanStorageTasks iterates over all the account tasks and storage sub-tasks
  654. // within, cleaning any that have been completed.
  655. func (s *Syncer) cleanStorageTasks() {
  656. for _, task := range s.tasks {
  657. for account, subtasks := range task.SubTasks {
  658. // Remove storage range retrieval tasks that completed
  659. for j := 0; j < len(subtasks); j++ {
  660. if subtasks[j].done {
  661. subtasks = append(subtasks[:j], subtasks[j+1:]...)
  662. j--
  663. }
  664. }
  665. if len(subtasks) > 0 {
  666. task.SubTasks[account] = subtasks
  667. continue
  668. }
  669. // If all storage chunks are done, mark the account as done too
  670. for j, hash := range task.res.hashes {
  671. if hash == account {
  672. task.needState[j] = false
  673. }
  674. }
  675. delete(task.SubTasks, account)
  676. task.pend--
  677. // If this was the last pending task, forward the account task
  678. if task.pend == 0 {
  679. s.forwardAccountTask(task)
  680. }
  681. }
  682. }
  683. }
  684. // assignAccountTasks attempts to match idle peers to pending account range
  685. // retrievals.
  686. func (s *Syncer) assignAccountTasks(cancel chan struct{}) {
  687. s.lock.Lock()
  688. defer s.lock.Unlock()
  689. // If there are no idle peers, short circuit assignment
  690. if len(s.accountIdlers) == 0 {
  691. return
  692. }
  693. // Iterate over all the tasks and try to find a pending one
  694. for _, task := range s.tasks {
  695. // Skip any tasks already filling
  696. if task.req != nil || task.res != nil {
  697. continue
  698. }
  699. // Task pending retrieval, try to find an idle peer. If no such peer
  700. // exists, we probably assigned tasks for all (or they are stateless).
  701. // Abort the entire assignment mechanism.
  702. var idle string
  703. for id := range s.accountIdlers {
  704. // If the peer rejected a query in this sync cycle, don't bother asking
  705. // again for anything, it's either out of sync or already pruned
  706. if _, ok := s.statelessPeers[id]; ok {
  707. continue
  708. }
  709. idle = id
  710. break
  711. }
  712. if idle == "" {
  713. return
  714. }
  715. peer := s.peers[idle]
  716. // Matched a pending task to an idle peer, allocate a unique request id
  717. var reqid uint64
  718. for {
  719. reqid = uint64(rand.Int63())
  720. if reqid == 0 {
  721. continue
  722. }
  723. if _, ok := s.accountReqs[reqid]; ok {
  724. continue
  725. }
  726. break
  727. }
  728. // Generate the network query and send it to the peer
  729. req := &accountRequest{
  730. peer: idle,
  731. id: reqid,
  732. cancel: cancel,
  733. stale: make(chan struct{}),
  734. origin: task.Next,
  735. limit: task.Last,
  736. task: task,
  737. }
  738. req.timeout = time.AfterFunc(requestTimeout, func() {
  739. peer.Log().Debug("Account range request timed out", "reqid", reqid)
  740. s.scheduleRevertAccountRequest(req)
  741. })
  742. s.accountReqs[reqid] = req
  743. delete(s.accountIdlers, idle)
  744. s.pend.Add(1)
  745. go func(root common.Hash) {
  746. defer s.pend.Done()
  747. // Attempt to send the remote request and revert if it fails
  748. if err := peer.RequestAccountRange(reqid, root, req.origin, req.limit, maxRequestSize); err != nil {
  749. peer.Log().Debug("Failed to request account range", "err", err)
  750. s.scheduleRevertAccountRequest(req)
  751. }
  752. }(s.root)
  753. // Inject the request into the task to block further assignments
  754. task.req = req
  755. }
  756. }
  757. // assignBytecodeTasks attempts to match idle peers to pending code retrievals.
  758. func (s *Syncer) assignBytecodeTasks(cancel chan struct{}) {
  759. s.lock.Lock()
  760. defer s.lock.Unlock()
  761. // If there are no idle peers, short circuit assignment
  762. if len(s.bytecodeIdlers) == 0 {
  763. return
  764. }
  765. // Iterate over all the tasks and try to find a pending one
  766. for _, task := range s.tasks {
  767. // Skip any tasks not in the bytecode retrieval phase
  768. if task.res == nil {
  769. continue
  770. }
  771. // Skip tasks that are already retrieving (or done with) all codes
  772. if len(task.codeTasks) == 0 {
  773. continue
  774. }
  775. // Task pending retrieval, try to find an idle peer. If no such peer
  776. // exists, we probably assigned tasks for all (or they are stateless).
  777. // Abort the entire assignment mechanism.
  778. var idle string
  779. for id := range s.bytecodeIdlers {
  780. // If the peer rejected a query in this sync cycle, don't bother asking
  781. // again for anything, it's either out of sync or already pruned
  782. if _, ok := s.statelessPeers[id]; ok {
  783. continue
  784. }
  785. idle = id
  786. break
  787. }
  788. if idle == "" {
  789. return
  790. }
  791. peer := s.peers[idle]
  792. // Matched a pending task to an idle peer, allocate a unique request id
  793. var reqid uint64
  794. for {
  795. reqid = uint64(rand.Int63())
  796. if reqid == 0 {
  797. continue
  798. }
  799. if _, ok := s.bytecodeReqs[reqid]; ok {
  800. continue
  801. }
  802. break
  803. }
  804. // Generate the network query and send it to the peer
  805. hashes := make([]common.Hash, 0, maxCodeRequestCount)
  806. for hash := range task.codeTasks {
  807. delete(task.codeTasks, hash)
  808. hashes = append(hashes, hash)
  809. if len(hashes) >= maxCodeRequestCount {
  810. break
  811. }
  812. }
  813. req := &bytecodeRequest{
  814. peer: idle,
  815. id: reqid,
  816. cancel: cancel,
  817. stale: make(chan struct{}),
  818. hashes: hashes,
  819. task: task,
  820. }
  821. req.timeout = time.AfterFunc(requestTimeout, func() {
  822. peer.Log().Debug("Bytecode request timed out", "reqid", reqid)
  823. s.scheduleRevertBytecodeRequest(req)
  824. })
  825. s.bytecodeReqs[reqid] = req
  826. delete(s.bytecodeIdlers, idle)
  827. s.pend.Add(1)
  828. go func() {
  829. defer s.pend.Done()
  830. // Attempt to send the remote request and revert if it fails
  831. if err := peer.RequestByteCodes(reqid, hashes, maxRequestSize); err != nil {
  832. log.Debug("Failed to request bytecodes", "err", err)
  833. s.scheduleRevertBytecodeRequest(req)
  834. }
  835. }()
  836. }
  837. }
  838. // assignStorageTasks attempts to match idle peers to pending storage range
  839. // retrievals.
  840. func (s *Syncer) assignStorageTasks(cancel chan struct{}) {
  841. s.lock.Lock()
  842. defer s.lock.Unlock()
  843. // If there are no idle peers, short circuit assignment
  844. if len(s.storageIdlers) == 0 {
  845. return
  846. }
  847. // Iterate over all the tasks and try to find a pending one
  848. for _, task := range s.tasks {
  849. // Skip any tasks not in the storage retrieval phase
  850. if task.res == nil {
  851. continue
  852. }
  853. // Skip tasks that are already retrieving (or done with) all small states
  854. if len(task.SubTasks) == 0 && len(task.stateTasks) == 0 {
  855. continue
  856. }
  857. // Task pending retrieval, try to find an idle peer. If no such peer
  858. // exists, we probably assigned tasks for all (or they are stateless).
  859. // Abort the entire assignment mechanism.
  860. var idle string
  861. for id := range s.storageIdlers {
  862. // If the peer rejected a query in this sync cycle, don't bother asking
  863. // again for anything, it's either out of sync or already pruned
  864. if _, ok := s.statelessPeers[id]; ok {
  865. continue
  866. }
  867. idle = id
  868. break
  869. }
  870. if idle == "" {
  871. return
  872. }
  873. peer := s.peers[idle]
  874. // Matched a pending task to an idle peer, allocate a unique request id
  875. var reqid uint64
  876. for {
  877. reqid = uint64(rand.Int63())
  878. if reqid == 0 {
  879. continue
  880. }
  881. if _, ok := s.storageReqs[reqid]; ok {
  882. continue
  883. }
  884. break
  885. }
  886. // Generate the network query and send it to the peer. If there are
  887. // large contract tasks pending, complete those before diving into
  888. // even more new contracts.
  889. var (
  890. accounts = make([]common.Hash, 0, maxStorageSetRequestCount)
  891. roots = make([]common.Hash, 0, maxStorageSetRequestCount)
  892. subtask *storageTask
  893. )
  894. for account, subtasks := range task.SubTasks {
  895. for _, st := range subtasks {
  896. // Skip any subtasks already filling
  897. if st.req != nil {
  898. continue
  899. }
  900. // Found an incomplete storage chunk, schedule it
  901. accounts = append(accounts, account)
  902. roots = append(roots, st.root)
  903. subtask = st
  904. break // Large contract chunks are downloaded individually
  905. }
  906. if subtask != nil {
  907. break // Large contract chunks are downloaded individually
  908. }
  909. }
  910. if subtask == nil {
  911. // No large contract required retrieval, but small ones available
  912. for acccount, root := range task.stateTasks {
  913. delete(task.stateTasks, acccount)
  914. accounts = append(accounts, acccount)
  915. roots = append(roots, root)
  916. if len(accounts) >= maxStorageSetRequestCount {
  917. break
  918. }
  919. }
  920. }
  921. // If nothing was found, it means this task is actually already fully
  922. // retrieving, but large contracts are hard to detect. Skip to the next.
  923. if len(accounts) == 0 {
  924. continue
  925. }
  926. req := &storageRequest{
  927. peer: idle,
  928. id: reqid,
  929. cancel: cancel,
  930. stale: make(chan struct{}),
  931. accounts: accounts,
  932. roots: roots,
  933. mainTask: task,
  934. subTask: subtask,
  935. }
  936. if subtask != nil {
  937. req.origin = subtask.Next
  938. req.limit = subtask.Last
  939. }
  940. req.timeout = time.AfterFunc(requestTimeout, func() {
  941. peer.Log().Debug("Storage request timed out", "reqid", reqid)
  942. s.scheduleRevertStorageRequest(req)
  943. })
  944. s.storageReqs[reqid] = req
  945. delete(s.storageIdlers, idle)
  946. s.pend.Add(1)
  947. go func(root common.Hash) {
  948. defer s.pend.Done()
  949. // Attempt to send the remote request and revert if it fails
  950. var origin, limit []byte
  951. if subtask != nil {
  952. origin, limit = req.origin[:], req.limit[:]
  953. }
  954. if err := peer.RequestStorageRanges(reqid, root, accounts, origin, limit, maxRequestSize); err != nil {
  955. log.Debug("Failed to request storage", "err", err)
  956. s.scheduleRevertStorageRequest(req)
  957. }
  958. }(s.root)
  959. // Inject the request into the subtask to block further assignments
  960. if subtask != nil {
  961. subtask.req = req
  962. }
  963. }
  964. }
  965. // assignTrienodeHealTasks attempts to match idle peers to trie node requests to
  966. // heal any trie errors caused by the snap sync's chunked retrieval model.
  967. func (s *Syncer) assignTrienodeHealTasks(cancel chan struct{}) {
  968. s.lock.Lock()
  969. defer s.lock.Unlock()
  970. // If there are no idle peers, short circuit assignment
  971. if len(s.trienodeHealIdlers) == 0 {
  972. return
  973. }
  974. // Iterate over pending tasks and try to find a peer to retrieve with
  975. for len(s.healer.trieTasks) > 0 || s.healer.scheduler.Pending() > 0 {
  976. // If there are not enough trie tasks queued to fully assign, fill the
  977. // queue from the state sync scheduler. The trie synced schedules these
  978. // together with bytecodes, so we need to queue them combined.
  979. var (
  980. have = len(s.healer.trieTasks) + len(s.healer.codeTasks)
  981. want = maxTrieRequestCount + maxCodeRequestCount
  982. )
  983. if have < want {
  984. nodes, paths, codes := s.healer.scheduler.Missing(want - have)
  985. for i, hash := range nodes {
  986. s.healer.trieTasks[hash] = paths[i]
  987. }
  988. for _, hash := range codes {
  989. s.healer.codeTasks[hash] = struct{}{}
  990. }
  991. }
  992. // If all the heal tasks are bytecodes or already downloading, bail
  993. if len(s.healer.trieTasks) == 0 {
  994. return
  995. }
  996. // Task pending retrieval, try to find an idle peer. If no such peer
  997. // exists, we probably assigned tasks for all (or they are stateless).
  998. // Abort the entire assignment mechanism.
  999. var idle string
  1000. for id := range s.trienodeHealIdlers {
  1001. // If the peer rejected a query in this sync cycle, don't bother asking
  1002. // again for anything, it's either out of sync or already pruned
  1003. if _, ok := s.statelessPeers[id]; ok {
  1004. continue
  1005. }
  1006. idle = id
  1007. break
  1008. }
  1009. if idle == "" {
  1010. return
  1011. }
  1012. peer := s.peers[idle]
  1013. // Matched a pending task to an idle peer, allocate a unique request id
  1014. var reqid uint64
  1015. for {
  1016. reqid = uint64(rand.Int63())
  1017. if reqid == 0 {
  1018. continue
  1019. }
  1020. if _, ok := s.trienodeHealReqs[reqid]; ok {
  1021. continue
  1022. }
  1023. break
  1024. }
  1025. // Generate the network query and send it to the peer
  1026. var (
  1027. hashes = make([]common.Hash, 0, maxTrieRequestCount)
  1028. paths = make([]trie.SyncPath, 0, maxTrieRequestCount)
  1029. pathsets = make([]TrieNodePathSet, 0, maxTrieRequestCount)
  1030. )
  1031. for hash, pathset := range s.healer.trieTasks {
  1032. delete(s.healer.trieTasks, hash)
  1033. hashes = append(hashes, hash)
  1034. paths = append(paths, pathset)
  1035. pathsets = append(pathsets, [][]byte(pathset)) // TODO(karalabe): group requests by account hash
  1036. if len(hashes) >= maxTrieRequestCount {
  1037. break
  1038. }
  1039. }
  1040. req := &trienodeHealRequest{
  1041. peer: idle,
  1042. id: reqid,
  1043. cancel: cancel,
  1044. stale: make(chan struct{}),
  1045. hashes: hashes,
  1046. paths: paths,
  1047. task: s.healer,
  1048. }
  1049. req.timeout = time.AfterFunc(requestTimeout, func() {
  1050. peer.Log().Debug("Trienode heal request timed out", "reqid", reqid)
  1051. s.scheduleRevertTrienodeHealRequest(req)
  1052. })
  1053. s.trienodeHealReqs[reqid] = req
  1054. delete(s.trienodeHealIdlers, idle)
  1055. s.pend.Add(1)
  1056. go func(root common.Hash) {
  1057. defer s.pend.Done()
  1058. // Attempt to send the remote request and revert if it fails
  1059. if err := peer.RequestTrieNodes(reqid, root, pathsets, maxRequestSize); err != nil {
  1060. log.Debug("Failed to request trienode healers", "err", err)
  1061. s.scheduleRevertTrienodeHealRequest(req)
  1062. }
  1063. }(s.root)
  1064. }
  1065. }
  1066. // assignBytecodeHealTasks attempts to match idle peers to bytecode requests to
  1067. // heal any trie errors caused by the snap sync's chunked retrieval model.
  1068. func (s *Syncer) assignBytecodeHealTasks(cancel chan struct{}) {
  1069. s.lock.Lock()
  1070. defer s.lock.Unlock()
  1071. // If there are no idle peers, short circuit assignment
  1072. if len(s.bytecodeHealIdlers) == 0 {
  1073. return
  1074. }
  1075. // Iterate over pending tasks and try to find a peer to retrieve with
  1076. for len(s.healer.codeTasks) > 0 || s.healer.scheduler.Pending() > 0 {
  1077. // If there are not enough trie tasks queued to fully assign, fill the
  1078. // queue from the state sync scheduler. The trie synced schedules these
  1079. // together with trie nodes, so we need to queue them combined.
  1080. var (
  1081. have = len(s.healer.trieTasks) + len(s.healer.codeTasks)
  1082. want = maxTrieRequestCount + maxCodeRequestCount
  1083. )
  1084. if have < want {
  1085. nodes, paths, codes := s.healer.scheduler.Missing(want - have)
  1086. for i, hash := range nodes {
  1087. s.healer.trieTasks[hash] = paths[i]
  1088. }
  1089. for _, hash := range codes {
  1090. s.healer.codeTasks[hash] = struct{}{}
  1091. }
  1092. }
  1093. // If all the heal tasks are trienodes or already downloading, bail
  1094. if len(s.healer.codeTasks) == 0 {
  1095. return
  1096. }
  1097. // Task pending retrieval, try to find an idle peer. If no such peer
  1098. // exists, we probably assigned tasks for all (or they are stateless).
  1099. // Abort the entire assignment mechanism.
  1100. var idle string
  1101. for id := range s.bytecodeHealIdlers {
  1102. // If the peer rejected a query in this sync cycle, don't bother asking
  1103. // again for anything, it's either out of sync or already pruned
  1104. if _, ok := s.statelessPeers[id]; ok {
  1105. continue
  1106. }
  1107. idle = id
  1108. break
  1109. }
  1110. if idle == "" {
  1111. return
  1112. }
  1113. peer := s.peers[idle]
  1114. // Matched a pending task to an idle peer, allocate a unique request id
  1115. var reqid uint64
  1116. for {
  1117. reqid = uint64(rand.Int63())
  1118. if reqid == 0 {
  1119. continue
  1120. }
  1121. if _, ok := s.bytecodeHealReqs[reqid]; ok {
  1122. continue
  1123. }
  1124. break
  1125. }
  1126. // Generate the network query and send it to the peer
  1127. hashes := make([]common.Hash, 0, maxCodeRequestCount)
  1128. for hash := range s.healer.codeTasks {
  1129. delete(s.healer.codeTasks, hash)
  1130. hashes = append(hashes, hash)
  1131. if len(hashes) >= maxCodeRequestCount {
  1132. break
  1133. }
  1134. }
  1135. req := &bytecodeHealRequest{
  1136. peer: idle,
  1137. id: reqid,
  1138. cancel: cancel,
  1139. stale: make(chan struct{}),
  1140. hashes: hashes,
  1141. task: s.healer,
  1142. }
  1143. req.timeout = time.AfterFunc(requestTimeout, func() {
  1144. peer.Log().Debug("Bytecode heal request timed out", "reqid", reqid)
  1145. s.scheduleRevertBytecodeHealRequest(req)
  1146. })
  1147. s.bytecodeHealReqs[reqid] = req
  1148. delete(s.bytecodeHealIdlers, idle)
  1149. s.pend.Add(1)
  1150. go func() {
  1151. defer s.pend.Done()
  1152. // Attempt to send the remote request and revert if it fails
  1153. if err := peer.RequestByteCodes(reqid, hashes, maxRequestSize); err != nil {
  1154. log.Debug("Failed to request bytecode healers", "err", err)
  1155. s.scheduleRevertBytecodeHealRequest(req)
  1156. }
  1157. }()
  1158. }
  1159. }
  1160. // revertRequests locates all the currently pending reuqests from a particular
  1161. // peer and reverts them, rescheduling for others to fulfill.
  1162. func (s *Syncer) revertRequests(peer string) {
  1163. // Gather the requests first, revertals need the lock too
  1164. s.lock.Lock()
  1165. var accountReqs []*accountRequest
  1166. for _, req := range s.accountReqs {
  1167. if req.peer == peer {
  1168. accountReqs = append(accountReqs, req)
  1169. }
  1170. }
  1171. var bytecodeReqs []*bytecodeRequest
  1172. for _, req := range s.bytecodeReqs {
  1173. if req.peer == peer {
  1174. bytecodeReqs = append(bytecodeReqs, req)
  1175. }
  1176. }
  1177. var storageReqs []*storageRequest
  1178. for _, req := range s.storageReqs {
  1179. if req.peer == peer {
  1180. storageReqs = append(storageReqs, req)
  1181. }
  1182. }
  1183. var trienodeHealReqs []*trienodeHealRequest
  1184. for _, req := range s.trienodeHealReqs {
  1185. if req.peer == peer {
  1186. trienodeHealReqs = append(trienodeHealReqs, req)
  1187. }
  1188. }
  1189. var bytecodeHealReqs []*bytecodeHealRequest
  1190. for _, req := range s.bytecodeHealReqs {
  1191. if req.peer == peer {
  1192. bytecodeHealReqs = append(bytecodeHealReqs, req)
  1193. }
  1194. }
  1195. s.lock.Unlock()
  1196. // Revert all the requests matching the peer
  1197. for _, req := range accountReqs {
  1198. s.revertAccountRequest(req)
  1199. }
  1200. for _, req := range bytecodeReqs {
  1201. s.revertBytecodeRequest(req)
  1202. }
  1203. for _, req := range storageReqs {
  1204. s.revertStorageRequest(req)
  1205. }
  1206. for _, req := range trienodeHealReqs {
  1207. s.revertTrienodeHealRequest(req)
  1208. }
  1209. for _, req := range bytecodeHealReqs {
  1210. s.revertBytecodeHealRequest(req)
  1211. }
  1212. }
  1213. // scheduleRevertAccountRequest asks the event loop to clean up an account range
  1214. // request and return all failed retrieval tasks to the scheduler for reassignment.
  1215. func (s *Syncer) scheduleRevertAccountRequest(req *accountRequest) {
  1216. select {
  1217. case s.accountReqFails <- req:
  1218. // Sync event loop notified
  1219. case <-req.cancel:
  1220. // Sync cycle got cancelled
  1221. case <-req.stale:
  1222. // Request already reverted
  1223. }
  1224. }
  1225. // revertAccountRequest cleans up an account range request and returns all failed
  1226. // retrieval tasks to the scheduler for reassignment.
  1227. //
  1228. // Note, this needs to run on the event runloop thread to reschedule to idle peers.
  1229. // On peer threads, use scheduleRevertAccountRequest.
  1230. func (s *Syncer) revertAccountRequest(req *accountRequest) {
  1231. log.Debug("Reverting account request", "peer", req.peer, "reqid", req.id)
  1232. select {
  1233. case <-req.stale:
  1234. log.Trace("Account request already reverted", "peer", req.peer, "reqid", req.id)
  1235. return
  1236. default:
  1237. }
  1238. close(req.stale)
  1239. // Remove the request from the tracked set
  1240. s.lock.Lock()
  1241. delete(s.accountReqs, req.id)
  1242. s.lock.Unlock()
  1243. // If there's a timeout timer still running, abort it and mark the account
  1244. // task as not-pending, ready for resheduling
  1245. req.timeout.Stop()
  1246. if req.task.req == req {
  1247. req.task.req = nil
  1248. }
  1249. }
  1250. // scheduleRevertBytecodeRequest asks the event loop to clean up a bytecode request
  1251. // and return all failed retrieval tasks to the scheduler for reassignment.
  1252. func (s *Syncer) scheduleRevertBytecodeRequest(req *bytecodeRequest) {
  1253. select {
  1254. case s.bytecodeReqFails <- req:
  1255. // Sync event loop notified
  1256. case <-req.cancel:
  1257. // Sync cycle got cancelled
  1258. case <-req.stale:
  1259. // Request already reverted
  1260. }
  1261. }
  1262. // revertBytecodeRequest cleans up a bytecode request and returns all failed
  1263. // retrieval tasks to the scheduler for reassignment.
  1264. //
  1265. // Note, this needs to run on the event runloop thread to reschedule to idle peers.
  1266. // On peer threads, use scheduleRevertBytecodeRequest.
  1267. func (s *Syncer) revertBytecodeRequest(req *bytecodeRequest) {
  1268. log.Debug("Reverting bytecode request", "peer", req.peer)
  1269. select {
  1270. case <-req.stale:
  1271. log.Trace("Bytecode request already reverted", "peer", req.peer, "reqid", req.id)
  1272. return
  1273. default:
  1274. }
  1275. close(req.stale)
  1276. // Remove the request from the tracked set
  1277. s.lock.Lock()
  1278. delete(s.bytecodeReqs, req.id)
  1279. s.lock.Unlock()
  1280. // If there's a timeout timer still running, abort it and mark the code
  1281. // retrievals as not-pending, ready for resheduling
  1282. req.timeout.Stop()
  1283. for _, hash := range req.hashes {
  1284. req.task.codeTasks[hash] = struct{}{}
  1285. }
  1286. }
  1287. // scheduleRevertStorageRequest asks the event loop to clean up a storage range
  1288. // request and return all failed retrieval tasks to the scheduler for reassignment.
  1289. func (s *Syncer) scheduleRevertStorageRequest(req *storageRequest) {
  1290. select {
  1291. case s.storageReqFails <- req:
  1292. // Sync event loop notified
  1293. case <-req.cancel:
  1294. // Sync cycle got cancelled
  1295. case <-req.stale:
  1296. // Request already reverted
  1297. }
  1298. }
  1299. // revertStorageRequest cleans up a storage range request and returns all failed
  1300. // retrieval tasks to the scheduler for reassignment.
  1301. //
  1302. // Note, this needs to run on the event runloop thread to reschedule to idle peers.
  1303. // On peer threads, use scheduleRevertStorageRequest.
  1304. func (s *Syncer) revertStorageRequest(req *storageRequest) {
  1305. log.Debug("Reverting storage request", "peer", req.peer)
  1306. select {
  1307. case <-req.stale:
  1308. log.Trace("Storage request already reverted", "peer", req.peer, "reqid", req.id)
  1309. return
  1310. default:
  1311. }
  1312. close(req.stale)
  1313. // Remove the request from the tracked set
  1314. s.lock.Lock()
  1315. delete(s.storageReqs, req.id)
  1316. s.lock.Unlock()
  1317. // If there's a timeout timer still running, abort it and mark the storage
  1318. // task as not-pending, ready for resheduling
  1319. req.timeout.Stop()
  1320. if req.subTask != nil {
  1321. req.subTask.req = nil
  1322. } else {
  1323. for i, account := range req.accounts {
  1324. req.mainTask.stateTasks[account] = req.roots[i]
  1325. }
  1326. }
  1327. }
  1328. // scheduleRevertTrienodeHealRequest asks the event loop to clean up a trienode heal
  1329. // request and return all failed retrieval tasks to the scheduler for reassignment.
  1330. func (s *Syncer) scheduleRevertTrienodeHealRequest(req *trienodeHealRequest) {
  1331. select {
  1332. case s.trienodeHealReqFails <- req:
  1333. // Sync event loop notified
  1334. case <-req.cancel:
  1335. // Sync cycle got cancelled
  1336. case <-req.stale:
  1337. // Request already reverted
  1338. }
  1339. }
  1340. // revertTrienodeHealRequest cleans up a trienode heal request and returns all
  1341. // failed retrieval tasks to the scheduler for reassignment.
  1342. //
  1343. // Note, this needs to run on the event runloop thread to reschedule to idle peers.
  1344. // On peer threads, use scheduleRevertTrienodeHealRequest.
  1345. func (s *Syncer) revertTrienodeHealRequest(req *trienodeHealRequest) {
  1346. log.Debug("Reverting trienode heal request", "peer", req.peer)
  1347. select {
  1348. case <-req.stale:
  1349. log.Trace("Trienode heal request already reverted", "peer", req.peer, "reqid", req.id)
  1350. return
  1351. default:
  1352. }
  1353. close(req.stale)
  1354. // Remove the request from the tracked set
  1355. s.lock.Lock()
  1356. delete(s.trienodeHealReqs, req.id)
  1357. s.lock.Unlock()
  1358. // If there's a timeout timer still running, abort it and mark the trie node
  1359. // retrievals as not-pending, ready for resheduling
  1360. req.timeout.Stop()
  1361. for i, hash := range req.hashes {
  1362. req.task.trieTasks[hash] = req.paths[i]
  1363. }
  1364. }
  1365. // scheduleRevertBytecodeHealRequest asks the event loop to clean up a bytecode heal
  1366. // request and return all failed retrieval tasks to the scheduler for reassignment.
  1367. func (s *Syncer) scheduleRevertBytecodeHealRequest(req *bytecodeHealRequest) {
  1368. select {
  1369. case s.bytecodeHealReqFails <- req:
  1370. // Sync event loop notified
  1371. case <-req.cancel:
  1372. // Sync cycle got cancelled
  1373. case <-req.stale:
  1374. // Request already reverted
  1375. }
  1376. }
  1377. // revertBytecodeHealRequest cleans up a bytecode heal request and returns all
  1378. // failed retrieval tasks to the scheduler for reassignment.
  1379. //
  1380. // Note, this needs to run on the event runloop thread to reschedule to idle peers.
  1381. // On peer threads, use scheduleRevertBytecodeHealRequest.
  1382. func (s *Syncer) revertBytecodeHealRequest(req *bytecodeHealRequest) {
  1383. log.Debug("Reverting bytecode heal request", "peer", req.peer)
  1384. select {
  1385. case <-req.stale:
  1386. log.Trace("Bytecode heal request already reverted", "peer", req.peer, "reqid", req.id)
  1387. return
  1388. default:
  1389. }
  1390. close(req.stale)
  1391. // Remove the request from the tracked set
  1392. s.lock.Lock()
  1393. delete(s.bytecodeHealReqs, req.id)
  1394. s.lock.Unlock()
  1395. // If there's a timeout timer still running, abort it and mark the code
  1396. // retrievals as not-pending, ready for resheduling
  1397. req.timeout.Stop()
  1398. for _, hash := range req.hashes {
  1399. req.task.codeTasks[hash] = struct{}{}
  1400. }
  1401. }
  1402. // processAccountResponse integrates an already validated account range response
  1403. // into the account tasks.
  1404. func (s *Syncer) processAccountResponse(res *accountResponse) {
  1405. // Switch the task from pending to filling
  1406. res.task.req = nil
  1407. res.task.res = res
  1408. // Ensure that the response doesn't overflow into the subsequent task
  1409. last := res.task.Last.Big()
  1410. for i, hash := range res.hashes {
  1411. // Mark the range complete if the last is already included.
  1412. // Keep iteration to delete the extra states if exists.
  1413. cmp := hash.Big().Cmp(last)
  1414. if cmp == 0 {
  1415. res.cont = false
  1416. continue
  1417. }
  1418. if cmp > 0 {
  1419. // Chunk overflown, cut off excess, but also update the boundary nodes
  1420. for j := i; j < len(res.hashes); j++ {
  1421. if err := res.trie.Prove(res.hashes[j][:], 0, res.overflow); err != nil {
  1422. panic(err) // Account range was already proven, what happened
  1423. }
  1424. }
  1425. res.hashes = res.hashes[:i]
  1426. res.accounts = res.accounts[:i]
  1427. res.cont = false // Mark range completed
  1428. break
  1429. }
  1430. }
  1431. // Iterate over all the accounts and assemble which ones need further sub-
  1432. // filling before the entire account range can be persisted.
  1433. res.task.needCode = make([]bool, len(res.accounts))
  1434. res.task.needState = make([]bool, len(res.accounts))
  1435. res.task.needHeal = make([]bool, len(res.accounts))
  1436. res.task.codeTasks = make(map[common.Hash]struct{})
  1437. res.task.stateTasks = make(map[common.Hash]common.Hash)
  1438. resumed := make(map[common.Hash]struct{})
  1439. res.task.pend = 0
  1440. for i, account := range res.accounts {
  1441. // Check if the account is a contract with an unknown code
  1442. if !bytes.Equal(account.CodeHash, emptyCode[:]) {
  1443. if code := rawdb.ReadCodeWithPrefix(s.db, common.BytesToHash(account.CodeHash)); code == nil {
  1444. res.task.codeTasks[common.BytesToHash(account.CodeHash)] = struct{}{}
  1445. res.task.needCode[i] = true
  1446. res.task.pend++
  1447. }
  1448. }
  1449. // Check if the account is a contract with an unknown storage trie
  1450. if account.Root != emptyRoot {
  1451. if node, err := s.db.Get(account.Root[:]); err != nil || node == nil {
  1452. // If there was a previous large state retrieval in progress,
  1453. // don't restart it from scratch. This happens if a sync cycle
  1454. // is interrupted and resumed later. However, *do* update the
  1455. // previous root hash.
  1456. if subtasks, ok := res.task.SubTasks[res.hashes[i]]; ok {
  1457. log.Debug("Resuming large storage retrieval", "account", res.hashes[i], "root", account.Root)
  1458. for _, subtask := range subtasks {
  1459. subtask.root = account.Root
  1460. }
  1461. res.task.needHeal[i] = true
  1462. resumed[res.hashes[i]] = struct{}{}
  1463. } else {
  1464. res.task.stateTasks[res.hashes[i]] = account.Root
  1465. }
  1466. res.task.needState[i] = true
  1467. res.task.pend++
  1468. }
  1469. }
  1470. }
  1471. // Delete any subtasks that have been aborted but not resumed. This may undo
  1472. // some progress if a new peer gives us less accounts than an old one, but for
  1473. // now we have to live with that.
  1474. for hash := range res.task.SubTasks {
  1475. if _, ok := resumed[hash]; !ok {
  1476. log.Debug("Aborting suspended storage retrieval", "account", hash)
  1477. delete(res.task.SubTasks, hash)
  1478. }
  1479. }
  1480. // If the account range contained no contracts, or all have been fully filled
  1481. // beforehand, short circuit storage filling and forward to the next task
  1482. if res.task.pend == 0 {
  1483. s.forwardAccountTask(res.task)
  1484. return
  1485. }
  1486. // Some accounts are incomplete, leave as is for the storage and contract
  1487. // task assigners to pick up and fill.
  1488. }
  1489. // processBytecodeResponse integrates an already validated bytecode response
  1490. // into the account tasks.
  1491. func (s *Syncer) processBytecodeResponse(res *bytecodeResponse) {
  1492. batch := s.db.NewBatch()
  1493. var (
  1494. codes uint64
  1495. bytes common.StorageSize
  1496. )
  1497. for i, hash := range res.hashes {
  1498. code := res.codes[i]
  1499. // If the bytecode was not delivered, reschedule it
  1500. if code == nil {
  1501. res.task.codeTasks[hash] = struct{}{}
  1502. continue
  1503. }
  1504. // Code was delivered, mark it not needed any more
  1505. for j, account := range res.task.res.accounts {
  1506. if res.task.needCode[j] && hash == common.BytesToHash(account.CodeHash) {
  1507. res.task.needCode[j] = false
  1508. res.task.pend--
  1509. }
  1510. }
  1511. // Push the bytecode into a database batch
  1512. s.bytecodeSynced++
  1513. s.bytecodeBytes += common.StorageSize(len(code))
  1514. codes++
  1515. bytes += common.StorageSize(len(code))
  1516. rawdb.WriteCode(batch, hash, code)
  1517. }
  1518. if err := batch.Write(); err != nil {
  1519. log.Crit("Failed to persist bytecodes", "err", err)
  1520. }
  1521. log.Debug("Persisted set of bytecodes", "count", codes, "bytes", bytes)
  1522. // If this delivery completed the last pending task, forward the account task
  1523. // to the next chunk
  1524. if res.task.pend == 0 {
  1525. s.forwardAccountTask(res.task)
  1526. return
  1527. }
  1528. // Some accounts are still incomplete, leave as is for the storage and contract
  1529. // task assigners to pick up and fill.
  1530. }
  1531. // processStorageResponse integrates an already validated storage response
  1532. // into the account tasks.
  1533. func (s *Syncer) processStorageResponse(res *storageResponse) {
  1534. // Switch the subtask from pending to idle
  1535. if res.subTask != nil {
  1536. res.subTask.req = nil
  1537. }
  1538. batch := s.db.NewBatch()
  1539. var (
  1540. slots int
  1541. nodes int
  1542. skipped int
  1543. bytes common.StorageSize
  1544. )
  1545. // Iterate over all the accounts and reconstruct their storage tries from the
  1546. // delivered slots
  1547. for i, account := range res.accounts {
  1548. // If the account was not delivered, reschedule it
  1549. if i >= len(res.hashes) {
  1550. res.mainTask.stateTasks[account] = res.roots[i]
  1551. continue
  1552. }
  1553. // State was delivered, if complete mark as not needed any more, otherwise
  1554. // mark the account as needing healing
  1555. for j, hash := range res.mainTask.res.hashes {
  1556. if account != hash {
  1557. continue
  1558. }
  1559. acc := res.mainTask.res.accounts[j]
  1560. // If the packet contains multiple contract storage slots, all
  1561. // but the last are surely complete. The last contract may be
  1562. // chunked, so check it's continuation flag.
  1563. if res.subTask == nil && res.mainTask.needState[j] && (i < len(res.hashes)-1 || !res.cont) {
  1564. res.mainTask.needState[j] = false
  1565. res.mainTask.pend--
  1566. }
  1567. // If the last contract was chunked, mark it as needing healing
  1568. // to avoid writing it out to disk prematurely.
  1569. if res.subTask == nil && !res.mainTask.needHeal[j] && i == len(res.hashes)-1 && res.cont {
  1570. res.mainTask.needHeal[j] = true
  1571. }
  1572. // If the last contract was chunked, we need to switch to large
  1573. // contract handling mode
  1574. if res.subTask == nil && i == len(res.hashes)-1 && res.cont {
  1575. // If we haven't yet started a large-contract retrieval, create
  1576. // the subtasks for it within the main account task
  1577. if tasks, ok := res.mainTask.SubTasks[account]; !ok {
  1578. var (
  1579. next common.Hash
  1580. )
  1581. step := new(big.Int).Sub(
  1582. new(big.Int).Div(
  1583. new(big.Int).Exp(common.Big2, common.Big256, nil),
  1584. big.NewInt(storageConcurrency),
  1585. ), common.Big1,
  1586. )
  1587. for k := 0; k < storageConcurrency; k++ {
  1588. last := common.BigToHash(new(big.Int).Add(next.Big(), step))
  1589. if k == storageConcurrency-1 {
  1590. // Make sure we don't overflow if the step is not a proper divisor
  1591. last = common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
  1592. }
  1593. tasks = append(tasks, &storageTask{
  1594. Next: next,
  1595. Last: last,
  1596. root: acc.Root,
  1597. })
  1598. log.Debug("Created storage sync task", "account", account, "root", acc.Root, "from", next, "last", last)
  1599. next = common.BigToHash(new(big.Int).Add(last.Big(), common.Big1))
  1600. }
  1601. res.mainTask.SubTasks[account] = tasks
  1602. // Since we've just created the sub-tasks, this response
  1603. // is surely for the first one (zero origin)
  1604. res.subTask = tasks[0]
  1605. }
  1606. }
  1607. // If we're in large contract delivery mode, forward the subtask
  1608. if res.subTask != nil {
  1609. // Ensure the response doesn't overflow into the subsequent task
  1610. last := res.subTask.Last.Big()
  1611. for k, hash := range res.hashes[i] {
  1612. // Mark the range complete if the last is already included.
  1613. // Keep iteration to delete the extra states if exists.
  1614. cmp := hash.Big().Cmp(last)
  1615. if cmp == 0 {
  1616. res.cont = false
  1617. continue
  1618. }
  1619. if cmp > 0 {
  1620. // Chunk overflown, cut off excess, but also update the boundary
  1621. for l := k; l < len(res.hashes[i]); l++ {
  1622. if err := res.tries[i].Prove(res.hashes[i][l][:], 0, res.overflow); err != nil {
  1623. panic(err) // Account range was already proven, what happened
  1624. }
  1625. }
  1626. res.hashes[i] = res.hashes[i][:k]
  1627. res.slots[i] = res.slots[i][:k]
  1628. res.cont = false // Mark range completed
  1629. break
  1630. }
  1631. }
  1632. // Forward the relevant storage chunk (even if created just now)
  1633. if res.cont {
  1634. res.subTask.Next = common.BigToHash(new(big.Int).Add(res.hashes[i][len(res.hashes[i])-1].Big(), big.NewInt(1)))
  1635. } else {
  1636. res.subTask.done = true
  1637. }
  1638. }
  1639. }
  1640. // Iterate over all the reconstructed trie nodes and push them to disk
  1641. slots += len(res.hashes[i])
  1642. it := res.nodes[i].NewIterator(nil, nil)
  1643. for it.Next() {
  1644. // Boundary nodes are not written for the last result, since they are incomplete
  1645. if i == len(res.hashes)-1 && res.subTask != nil {
  1646. if _, ok := res.bounds[common.BytesToHash(it.Key())]; ok {
  1647. skipped++
  1648. continue
  1649. }
  1650. if _, err := res.overflow.Get(it.Key()); err == nil {
  1651. skipped++
  1652. continue
  1653. }
  1654. }
  1655. // Node is not a boundary, persist to disk
  1656. batch.Put(it.Key(), it.Value())
  1657. bytes += common.StorageSize(common.HashLength + len(it.Value()))
  1658. nodes++
  1659. }
  1660. it.Release()
  1661. // Persist the received storage segements. These flat state maybe
  1662. // outdated during the sync, but it can be fixed later during the
  1663. // snapshot generation.
  1664. for j := 0; j < len(res.hashes[i]); j++ {
  1665. rawdb.WriteStorageSnapshot(batch, account, res.hashes[i][j], res.slots[i][j])
  1666. bytes += common.StorageSize(1 + 2*common.HashLength + len(res.slots[i][j]))
  1667. }
  1668. }
  1669. if err := batch.Write(); err != nil {
  1670. log.Crit("Failed to persist storage slots", "err", err)
  1671. }
  1672. s.storageSynced += uint64(slots)
  1673. s.storageBytes += bytes
  1674. log.Debug("Persisted set of storage slots", "accounts", len(res.hashes), "slots", slots, "nodes", nodes, "skipped", skipped, "bytes", bytes)
  1675. // If this delivery completed the last pending task, forward the account task
  1676. // to the next chunk
  1677. if res.mainTask.pend == 0 {
  1678. s.forwardAccountTask(res.mainTask)
  1679. return
  1680. }
  1681. // Some accounts are still incomplete, leave as is for the storage and contract
  1682. // task assigners to pick up and fill.
  1683. }
  1684. // processTrienodeHealResponse integrates an already validated trienode response
  1685. // into the healer tasks.
  1686. func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
  1687. for i, hash := range res.hashes {
  1688. node := res.nodes[i]
  1689. // If the trie node was not delivered, reschedule it
  1690. if node == nil {
  1691. res.task.trieTasks[hash] = res.paths[i]
  1692. continue
  1693. }
  1694. // Push the trie node into the state syncer
  1695. s.trienodeHealSynced++
  1696. s.trienodeHealBytes += common.StorageSize(len(node))
  1697. err := s.healer.scheduler.Process(trie.SyncResult{Hash: hash, Data: node})
  1698. switch err {
  1699. case nil:
  1700. case trie.ErrAlreadyProcessed:
  1701. s.trienodeHealDups++
  1702. case trie.ErrNotRequested:
  1703. s.trienodeHealNops++
  1704. default:
  1705. log.Error("Invalid trienode processed", "hash", hash, "err", err)
  1706. }
  1707. }
  1708. batch := s.db.NewBatch()
  1709. if err := s.healer.scheduler.Commit(batch); err != nil {
  1710. log.Error("Failed to commit healing data", "err", err)
  1711. }
  1712. if err := batch.Write(); err != nil {
  1713. log.Crit("Failed to persist healing data", "err", err)
  1714. }
  1715. log.Debug("Persisted set of healing data", "type", "trienodes", "bytes", common.StorageSize(batch.ValueSize()))
  1716. }
  1717. // processBytecodeHealResponse integrates an already validated bytecode response
  1718. // into the healer tasks.
  1719. func (s *Syncer) processBytecodeHealResponse(res *bytecodeHealResponse) {
  1720. for i, hash := range res.hashes {
  1721. node := res.codes[i]
  1722. // If the trie node was not delivered, reschedule it
  1723. if node == nil {
  1724. res.task.codeTasks[hash] = struct{}{}
  1725. continue
  1726. }
  1727. // Push the trie node into the state syncer
  1728. s.bytecodeHealSynced++
  1729. s.bytecodeHealBytes += common.StorageSize(len(node))
  1730. err := s.healer.scheduler.Process(trie.SyncResult{Hash: hash, Data: node})
  1731. switch err {
  1732. case nil:
  1733. case trie.ErrAlreadyProcessed:
  1734. s.bytecodeHealDups++
  1735. case trie.ErrNotRequested:
  1736. s.bytecodeHealNops++
  1737. default:
  1738. log.Error("Invalid bytecode processed", "hash", hash, "err", err)
  1739. }
  1740. }
  1741. batch := s.db.NewBatch()
  1742. if err := s.healer.scheduler.Commit(batch); err != nil {
  1743. log.Error("Failed to commit healing data", "err", err)
  1744. }
  1745. if err := batch.Write(); err != nil {
  1746. log.Crit("Failed to persist healing data", "err", err)
  1747. }
  1748. log.Debug("Persisted set of healing data", "type", "bytecode", "bytes", common.StorageSize(batch.ValueSize()))
  1749. }
  1750. // forwardAccountTask takes a filled account task and persists anything available
  1751. // into the database, after which it forwards the next account marker so that the
  1752. // task's next chunk may be filled.
  1753. func (s *Syncer) forwardAccountTask(task *accountTask) {
  1754. // Remove any pending delivery
  1755. res := task.res
  1756. if res == nil {
  1757. return // nothing to forward
  1758. }
  1759. task.res = nil
  1760. // Iterate over all the accounts and gather all the incomplete trie nodes. A
  1761. // node is incomplete if we haven't yet filled it (sync was interrupted), or
  1762. // if we filled it in multiple chunks (storage trie), in which case the few
  1763. // nodes on the chunk boundaries are missing.
  1764. incompletes := light.NewNodeSet()
  1765. for i := range res.accounts {
  1766. // If the filling was interrupted, mark everything after as incomplete
  1767. if task.needCode[i] || task.needState[i] {
  1768. for j := i; j < len(res.accounts); j++ {
  1769. if err := res.trie.Prove(res.hashes[j][:], 0, incompletes); err != nil {
  1770. panic(err) // Account range was already proven, what happened
  1771. }
  1772. }
  1773. break
  1774. }
  1775. // Filling not interrupted until this point, mark incomplete if needs healing
  1776. if task.needHeal[i] {
  1777. if err := res.trie.Prove(res.hashes[i][:], 0, incompletes); err != nil {
  1778. panic(err) // Account range was already proven, what happened
  1779. }
  1780. }
  1781. }
  1782. // Persist every finalized trie node that's not on the boundary
  1783. batch := s.db.NewBatch()
  1784. var (
  1785. nodes int
  1786. skipped int
  1787. bytes common.StorageSize
  1788. )
  1789. it := res.nodes.NewIterator(nil, nil)
  1790. for it.Next() {
  1791. // Boundary nodes are not written, since they are incomplete
  1792. if _, ok := res.bounds[common.BytesToHash(it.Key())]; ok {
  1793. skipped++
  1794. continue
  1795. }
  1796. // Overflow nodes are not written, since they mess with another task
  1797. if _, err := res.overflow.Get(it.Key()); err == nil {
  1798. skipped++
  1799. continue
  1800. }
  1801. // Accounts with split storage requests are incomplete
  1802. if _, err := incompletes.Get(it.Key()); err == nil {
  1803. skipped++
  1804. continue
  1805. }
  1806. // Node is neither a boundary, not an incomplete account, persist to disk
  1807. batch.Put(it.Key(), it.Value())
  1808. bytes += common.StorageSize(common.HashLength + len(it.Value()))
  1809. nodes++
  1810. }
  1811. it.Release()
  1812. // Persist the received account segements. These flat state maybe
  1813. // outdated during the sync, but it can be fixed later during the
  1814. // snapshot generation.
  1815. for i, hash := range res.hashes {
  1816. blob := snapshot.SlimAccountRLP(res.accounts[i].Nonce, res.accounts[i].Balance, res.accounts[i].Root, res.accounts[i].CodeHash)
  1817. rawdb.WriteAccountSnapshot(batch, hash, blob)
  1818. bytes += common.StorageSize(1 + common.HashLength + len(blob))
  1819. }
  1820. if err := batch.Write(); err != nil {
  1821. log.Crit("Failed to persist accounts", "err", err)
  1822. }
  1823. s.accountBytes += bytes
  1824. s.accountSynced += uint64(len(res.accounts))
  1825. log.Debug("Persisted range of accounts", "accounts", len(res.accounts), "nodes", nodes, "skipped", skipped, "bytes", bytes)
  1826. // Task filling persisted, push it the chunk marker forward to the first
  1827. // account still missing data.
  1828. for i, hash := range res.hashes {
  1829. if task.needCode[i] || task.needState[i] {
  1830. return
  1831. }
  1832. task.Next = common.BigToHash(new(big.Int).Add(hash.Big(), big.NewInt(1)))
  1833. }
  1834. // All accounts marked as complete, track if the entire task is done
  1835. task.done = !res.cont
  1836. }
  1837. // OnAccounts is a callback method to invoke when a range of accounts are
  1838. // received from a remote peer.
  1839. func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, accounts [][]byte, proof [][]byte) error {
  1840. size := common.StorageSize(len(hashes) * common.HashLength)
  1841. for _, account := range accounts {
  1842. size += common.StorageSize(len(account))
  1843. }
  1844. for _, node := range proof {
  1845. size += common.StorageSize(len(node))
  1846. }
  1847. logger := peer.Log().New("reqid", id)
  1848. logger.Trace("Delivering range of accounts", "hashes", len(hashes), "accounts", len(accounts), "proofs", len(proof), "bytes", size)
  1849. // Whether or not the response is valid, we can mark the peer as idle and
  1850. // notify the scheduler to assign a new task. If the response is invalid,
  1851. // we'll drop the peer in a bit.
  1852. s.lock.Lock()
  1853. if _, ok := s.peers[peer.ID()]; ok {
  1854. s.accountIdlers[peer.ID()] = struct{}{}
  1855. }
  1856. select {
  1857. case s.update <- struct{}{}:
  1858. default:
  1859. }
  1860. // Ensure the response is for a valid request
  1861. req, ok := s.accountReqs[id]
  1862. if !ok {
  1863. // Request stale, perhaps the peer timed out but came through in the end
  1864. logger.Warn("Unexpected account range packet")
  1865. s.lock.Unlock()
  1866. return nil
  1867. }
  1868. delete(s.accountReqs, id)
  1869. // Clean up the request timeout timer, we'll see how to proceed further based
  1870. // on the actual delivered content
  1871. if !req.timeout.Stop() {
  1872. // The timeout is already triggered, and this request will be reverted+rescheduled
  1873. s.lock.Unlock()
  1874. return nil
  1875. }
  1876. // Response is valid, but check if peer is signalling that it does not have
  1877. // the requested data. For account range queries that means the state being
  1878. // retrieved was either already pruned remotely, or the peer is not yet
  1879. // synced to our head.
  1880. if len(hashes) == 0 && len(accounts) == 0 && len(proof) == 0 {
  1881. logger.Debug("Peer rejected account range request", "root", s.root)
  1882. s.statelessPeers[peer.ID()] = struct{}{}
  1883. s.lock.Unlock()
  1884. // Signal this request as failed, and ready for rescheduling
  1885. s.scheduleRevertAccountRequest(req)
  1886. return nil
  1887. }
  1888. root := s.root
  1889. s.lock.Unlock()
  1890. // Reconstruct a partial trie from the response and verify it
  1891. keys := make([][]byte, len(hashes))
  1892. for i, key := range hashes {
  1893. keys[i] = common.CopyBytes(key[:])
  1894. }
  1895. nodes := make(light.NodeList, len(proof))
  1896. for i, node := range proof {
  1897. nodes[i] = node
  1898. }
  1899. proofdb := nodes.NodeSet()
  1900. var end []byte
  1901. if len(keys) > 0 {
  1902. end = keys[len(keys)-1]
  1903. }
  1904. db, tr, notary, cont, err := trie.VerifyRangeProof(root, req.origin[:], end, keys, accounts, proofdb)
  1905. if err != nil {
  1906. logger.Warn("Account range failed proof", "err", err)
  1907. // Signal this request as failed, and ready for rescheduling
  1908. s.scheduleRevertAccountRequest(req)
  1909. return err
  1910. }
  1911. // Partial trie reconstructed, send it to the scheduler for storage filling
  1912. bounds := make(map[common.Hash]struct{})
  1913. it := notary.Accessed().NewIterator(nil, nil)
  1914. for it.Next() {
  1915. bounds[common.BytesToHash(it.Key())] = struct{}{}
  1916. }
  1917. it.Release()
  1918. accs := make([]*state.Account, len(accounts))
  1919. for i, account := range accounts {
  1920. acc := new(state.Account)
  1921. if err := rlp.DecodeBytes(account, acc); err != nil {
  1922. panic(err) // We created these blobs, we must be able to decode them
  1923. }
  1924. accs[i] = acc
  1925. }
  1926. response := &accountResponse{
  1927. task: req.task,
  1928. hashes: hashes,
  1929. accounts: accs,
  1930. nodes: db,
  1931. trie: tr,
  1932. bounds: bounds,
  1933. overflow: light.NewNodeSet(),
  1934. cont: cont,
  1935. }
  1936. select {
  1937. case s.accountResps <- response:
  1938. case <-req.cancel:
  1939. case <-req.stale:
  1940. }
  1941. return nil
  1942. }
  1943. // OnByteCodes is a callback method to invoke when a batch of contract
  1944. // bytes codes are received from a remote peer.
  1945. func (s *Syncer) OnByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error {
  1946. s.lock.RLock()
  1947. syncing := !s.snapped
  1948. s.lock.RUnlock()
  1949. if syncing {
  1950. return s.onByteCodes(peer, id, bytecodes)
  1951. }
  1952. return s.onHealByteCodes(peer, id, bytecodes)
  1953. }
  1954. // onByteCodes is a callback method to invoke when a batch of contract
  1955. // bytes codes are received from a remote peer in the syncing phase.
  1956. func (s *Syncer) onByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error {
  1957. var size common.StorageSize
  1958. for _, code := range bytecodes {
  1959. size += common.StorageSize(len(code))
  1960. }
  1961. logger := peer.Log().New("reqid", id)
  1962. logger.Trace("Delivering set of bytecodes", "bytecodes", len(bytecodes), "bytes", size)
  1963. // Whether or not the response is valid, we can mark the peer as idle and
  1964. // notify the scheduler to assign a new task. If the response is invalid,
  1965. // we'll drop the peer in a bit.
  1966. s.lock.Lock()
  1967. if _, ok := s.peers[peer.ID()]; ok {
  1968. s.bytecodeIdlers[peer.ID()] = struct{}{}
  1969. }
  1970. select {
  1971. case s.update <- struct{}{}:
  1972. default:
  1973. }
  1974. // Ensure the response is for a valid request
  1975. req, ok := s.bytecodeReqs[id]
  1976. if !ok {
  1977. // Request stale, perhaps the peer timed out but came through in the end
  1978. logger.Warn("Unexpected bytecode packet")
  1979. s.lock.Unlock()
  1980. return nil
  1981. }
  1982. delete(s.bytecodeReqs, id)
  1983. // Clean up the request timeout timer, we'll see how to proceed further based
  1984. // on the actual delivered content
  1985. if !req.timeout.Stop() {
  1986. // The timeout is already triggered, and this request will be reverted+rescheduled
  1987. s.lock.Unlock()
  1988. return nil
  1989. }
  1990. // Response is valid, but check if peer is signalling that it does not have
  1991. // the requested data. For bytecode range queries that means the peer is not
  1992. // yet synced.
  1993. if len(bytecodes) == 0 {
  1994. logger.Debug("Peer rejected bytecode request")
  1995. s.statelessPeers[peer.ID()] = struct{}{}
  1996. s.lock.Unlock()
  1997. // Signal this request as failed, and ready for rescheduling
  1998. s.scheduleRevertBytecodeRequest(req)
  1999. return nil
  2000. }
  2001. s.lock.Unlock()
  2002. // Cross reference the requested bytecodes with the response to find gaps
  2003. // that the serving node is missing
  2004. hasher := sha3.NewLegacyKeccak256().(crypto.KeccakState)
  2005. hash := make([]byte, 32)
  2006. codes := make([][]byte, len(req.hashes))
  2007. for i, j := 0, 0; i < len(bytecodes); i++ {
  2008. // Find the next hash that we've been served, leaving misses with nils
  2009. hasher.Reset()
  2010. hasher.Write(bytecodes[i])
  2011. hasher.Read(hash)
  2012. for j < len(req.hashes) && !bytes.Equal(hash, req.hashes[j][:]) {
  2013. j++
  2014. }
  2015. if j < len(req.hashes) {
  2016. codes[j] = bytecodes[i]
  2017. j++
  2018. continue
  2019. }
  2020. // We've either ran out of hashes, or got unrequested data
  2021. logger.Warn("Unexpected bytecodes", "count", len(bytecodes)-i)
  2022. // Signal this request as failed, and ready for rescheduling
  2023. s.scheduleRevertBytecodeRequest(req)
  2024. return errors.New("unexpected bytecode")
  2025. }
  2026. // Response validated, send it to the scheduler for filling
  2027. response := &bytecodeResponse{
  2028. task: req.task,
  2029. hashes: req.hashes,
  2030. codes: codes,
  2031. }
  2032. select {
  2033. case s.bytecodeResps <- response:
  2034. case <-req.cancel:
  2035. case <-req.stale:
  2036. }
  2037. return nil
  2038. }
  2039. // OnStorage is a callback method to invoke when ranges of storage slots
  2040. // are received from a remote peer.
  2041. func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slots [][][]byte, proof [][]byte) error {
  2042. // Gather some trace stats to aid in debugging issues
  2043. var (
  2044. hashCount int
  2045. slotCount int
  2046. size common.StorageSize
  2047. )
  2048. for _, hashset := range hashes {
  2049. size += common.StorageSize(common.HashLength * len(hashset))
  2050. hashCount += len(hashset)
  2051. }
  2052. for _, slotset := range slots {
  2053. for _, slot := range slotset {
  2054. size += common.StorageSize(len(slot))
  2055. }
  2056. slotCount += len(slotset)
  2057. }
  2058. for _, node := range proof {
  2059. size += common.StorageSize(len(node))
  2060. }
  2061. logger := peer.Log().New("reqid", id)
  2062. logger.Trace("Delivering ranges of storage slots", "accounts", len(hashes), "hashes", hashCount, "slots", slotCount, "proofs", len(proof), "size", size)
  2063. // Whether or not the response is valid, we can mark the peer as idle and
  2064. // notify the scheduler to assign a new task. If the response is invalid,
  2065. // we'll drop the peer in a bit.
  2066. s.lock.Lock()
  2067. if _, ok := s.peers[peer.ID()]; ok {
  2068. s.storageIdlers[peer.ID()] = struct{}{}
  2069. }
  2070. select {
  2071. case s.update <- struct{}{}:
  2072. default:
  2073. }
  2074. // Ensure the response is for a valid request
  2075. req, ok := s.storageReqs[id]
  2076. if !ok {
  2077. // Request stale, perhaps the peer timed out but came through in the end
  2078. logger.Warn("Unexpected storage ranges packet")
  2079. s.lock.Unlock()
  2080. return nil
  2081. }
  2082. delete(s.storageReqs, id)
  2083. // Clean up the request timeout timer, we'll see how to proceed further based
  2084. // on the actual delivered content
  2085. if !req.timeout.Stop() {
  2086. // The timeout is already triggered, and this request will be reverted+rescheduled
  2087. s.lock.Unlock()
  2088. return nil
  2089. }
  2090. // Reject the response if the hash sets and slot sets don't match, or if the
  2091. // peer sent more data than requested.
  2092. if len(hashes) != len(slots) {
  2093. s.lock.Unlock()
  2094. s.scheduleRevertStorageRequest(req) // reschedule request
  2095. logger.Warn("Hash and slot set size mismatch", "hashset", len(hashes), "slotset", len(slots))
  2096. return errors.New("hash and slot set size mismatch")
  2097. }
  2098. if len(hashes) > len(req.accounts) {
  2099. s.lock.Unlock()
  2100. s.scheduleRevertStorageRequest(req) // reschedule request
  2101. logger.Warn("Hash set larger than requested", "hashset", len(hashes), "requested", len(req.accounts))
  2102. return errors.New("hash set larger than requested")
  2103. }
  2104. // Response is valid, but check if peer is signalling that it does not have
  2105. // the requested data. For storage range queries that means the state being
  2106. // retrieved was either already pruned remotely, or the peer is not yet
  2107. // synced to our head.
  2108. if len(hashes) == 0 {
  2109. logger.Debug("Peer rejected storage request")
  2110. s.statelessPeers[peer.ID()] = struct{}{}
  2111. s.lock.Unlock()
  2112. s.scheduleRevertStorageRequest(req) // reschedule request
  2113. return nil
  2114. }
  2115. s.lock.Unlock()
  2116. // Reconstruct the partial tries from the response and verify them
  2117. var (
  2118. dbs = make([]ethdb.KeyValueStore, len(hashes))
  2119. tries = make([]*trie.Trie, len(hashes))
  2120. notary *trie.KeyValueNotary
  2121. cont bool
  2122. )
  2123. for i := 0; i < len(hashes); i++ {
  2124. // Convert the keys and proofs into an internal format
  2125. keys := make([][]byte, len(hashes[i]))
  2126. for j, key := range hashes[i] {
  2127. keys[j] = common.CopyBytes(key[:])
  2128. }
  2129. nodes := make(light.NodeList, 0, len(proof))
  2130. if i == len(hashes)-1 {
  2131. for _, node := range proof {
  2132. nodes = append(nodes, node)
  2133. }
  2134. }
  2135. var err error
  2136. if len(nodes) == 0 {
  2137. // No proof has been attached, the response must cover the entire key
  2138. // space and hash to the origin root.
  2139. dbs[i], tries[i], _, _, err = trie.VerifyRangeProof(req.roots[i], nil, nil, keys, slots[i], nil)
  2140. if err != nil {
  2141. s.scheduleRevertStorageRequest(req) // reschedule request
  2142. logger.Warn("Storage slots failed proof", "err", err)
  2143. return err
  2144. }
  2145. } else {
  2146. // A proof was attached, the response is only partial, check that the
  2147. // returned data is indeed part of the storage trie
  2148. proofdb := nodes.NodeSet()
  2149. var end []byte
  2150. if len(keys) > 0 {
  2151. end = keys[len(keys)-1]
  2152. }
  2153. dbs[i], tries[i], notary, cont, err = trie.VerifyRangeProof(req.roots[i], req.origin[:], end, keys, slots[i], proofdb)
  2154. if err != nil {
  2155. s.scheduleRevertStorageRequest(req) // reschedule request
  2156. logger.Warn("Storage range failed proof", "err", err)
  2157. return err
  2158. }
  2159. }
  2160. }
  2161. // Partial tries reconstructed, send them to the scheduler for storage filling
  2162. bounds := make(map[common.Hash]struct{})
  2163. if notary != nil { // if all contract storages are delivered in full, no notary will be created
  2164. it := notary.Accessed().NewIterator(nil, nil)
  2165. for it.Next() {
  2166. bounds[common.BytesToHash(it.Key())] = struct{}{}
  2167. }
  2168. it.Release()
  2169. }
  2170. response := &storageResponse{
  2171. mainTask: req.mainTask,
  2172. subTask: req.subTask,
  2173. accounts: req.accounts,
  2174. roots: req.roots,
  2175. hashes: hashes,
  2176. slots: slots,
  2177. nodes: dbs,
  2178. tries: tries,
  2179. bounds: bounds,
  2180. overflow: light.NewNodeSet(),
  2181. cont: cont,
  2182. }
  2183. select {
  2184. case s.storageResps <- response:
  2185. case <-req.cancel:
  2186. case <-req.stale:
  2187. }
  2188. return nil
  2189. }
  2190. // OnTrieNodes is a callback method to invoke when a batch of trie nodes
  2191. // are received from a remote peer.
  2192. func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error {
  2193. var size common.StorageSize
  2194. for _, node := range trienodes {
  2195. size += common.StorageSize(len(node))
  2196. }
  2197. logger := peer.Log().New("reqid", id)
  2198. logger.Trace("Delivering set of healing trienodes", "trienodes", len(trienodes), "bytes", size)
  2199. // Whether or not the response is valid, we can mark the peer as idle and
  2200. // notify the scheduler to assign a new task. If the response is invalid,
  2201. // we'll drop the peer in a bit.
  2202. s.lock.Lock()
  2203. if _, ok := s.peers[peer.ID()]; ok {
  2204. s.trienodeHealIdlers[peer.ID()] = struct{}{}
  2205. }
  2206. select {
  2207. case s.update <- struct{}{}:
  2208. default:
  2209. }
  2210. // Ensure the response is for a valid request
  2211. req, ok := s.trienodeHealReqs[id]
  2212. if !ok {
  2213. // Request stale, perhaps the peer timed out but came through in the end
  2214. logger.Warn("Unexpected trienode heal packet")
  2215. s.lock.Unlock()
  2216. return nil
  2217. }
  2218. delete(s.trienodeHealReqs, id)
  2219. // Clean up the request timeout timer, we'll see how to proceed further based
  2220. // on the actual delivered content
  2221. if !req.timeout.Stop() {
  2222. // The timeout is already triggered, and this request will be reverted+rescheduled
  2223. s.lock.Unlock()
  2224. return nil
  2225. }
  2226. // Response is valid, but check if peer is signalling that it does not have
  2227. // the requested data. For bytecode range queries that means the peer is not
  2228. // yet synced.
  2229. if len(trienodes) == 0 {
  2230. logger.Debug("Peer rejected trienode heal request")
  2231. s.statelessPeers[peer.ID()] = struct{}{}
  2232. s.lock.Unlock()
  2233. // Signal this request as failed, and ready for rescheduling
  2234. s.scheduleRevertTrienodeHealRequest(req)
  2235. return nil
  2236. }
  2237. s.lock.Unlock()
  2238. // Cross reference the requested trienodes with the response to find gaps
  2239. // that the serving node is missing
  2240. hasher := sha3.NewLegacyKeccak256().(crypto.KeccakState)
  2241. hash := make([]byte, 32)
  2242. nodes := make([][]byte, len(req.hashes))
  2243. for i, j := 0, 0; i < len(trienodes); i++ {
  2244. // Find the next hash that we've been served, leaving misses with nils
  2245. hasher.Reset()
  2246. hasher.Write(trienodes[i])
  2247. hasher.Read(hash)
  2248. for j < len(req.hashes) && !bytes.Equal(hash, req.hashes[j][:]) {
  2249. j++
  2250. }
  2251. if j < len(req.hashes) {
  2252. nodes[j] = trienodes[i]
  2253. j++
  2254. continue
  2255. }
  2256. // We've either ran out of hashes, or got unrequested data
  2257. logger.Warn("Unexpected healing trienodes", "count", len(trienodes)-i)
  2258. // Signal this request as failed, and ready for rescheduling
  2259. s.scheduleRevertTrienodeHealRequest(req)
  2260. return errors.New("unexpected healing trienode")
  2261. }
  2262. // Response validated, send it to the scheduler for filling
  2263. response := &trienodeHealResponse{
  2264. task: req.task,
  2265. hashes: req.hashes,
  2266. paths: req.paths,
  2267. nodes: nodes,
  2268. }
  2269. select {
  2270. case s.trienodeHealResps <- response:
  2271. case <-req.cancel:
  2272. case <-req.stale:
  2273. }
  2274. return nil
  2275. }
  2276. // onHealByteCodes is a callback method to invoke when a batch of contract
  2277. // bytes codes are received from a remote peer in the healing phase.
  2278. func (s *Syncer) onHealByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error {
  2279. var size common.StorageSize
  2280. for _, code := range bytecodes {
  2281. size += common.StorageSize(len(code))
  2282. }
  2283. logger := peer.Log().New("reqid", id)
  2284. logger.Trace("Delivering set of healing bytecodes", "bytecodes", len(bytecodes), "bytes", size)
  2285. // Whether or not the response is valid, we can mark the peer as idle and
  2286. // notify the scheduler to assign a new task. If the response is invalid,
  2287. // we'll drop the peer in a bit.
  2288. s.lock.Lock()
  2289. if _, ok := s.peers[peer.ID()]; ok {
  2290. s.bytecodeHealIdlers[peer.ID()] = struct{}{}
  2291. }
  2292. select {
  2293. case s.update <- struct{}{}:
  2294. default:
  2295. }
  2296. // Ensure the response is for a valid request
  2297. req, ok := s.bytecodeHealReqs[id]
  2298. if !ok {
  2299. // Request stale, perhaps the peer timed out but came through in the end
  2300. logger.Warn("Unexpected bytecode heal packet")
  2301. s.lock.Unlock()
  2302. return nil
  2303. }
  2304. delete(s.bytecodeHealReqs, id)
  2305. // Clean up the request timeout timer, we'll see how to proceed further based
  2306. // on the actual delivered content
  2307. if !req.timeout.Stop() {
  2308. // The timeout is already triggered, and this request will be reverted+rescheduled
  2309. s.lock.Unlock()
  2310. return nil
  2311. }
  2312. // Response is valid, but check if peer is signalling that it does not have
  2313. // the requested data. For bytecode range queries that means the peer is not
  2314. // yet synced.
  2315. if len(bytecodes) == 0 {
  2316. logger.Debug("Peer rejected bytecode heal request")
  2317. s.statelessPeers[peer.ID()] = struct{}{}
  2318. s.lock.Unlock()
  2319. // Signal this request as failed, and ready for rescheduling
  2320. s.scheduleRevertBytecodeHealRequest(req)
  2321. return nil
  2322. }
  2323. s.lock.Unlock()
  2324. // Cross reference the requested bytecodes with the response to find gaps
  2325. // that the serving node is missing
  2326. hasher := sha3.NewLegacyKeccak256().(crypto.KeccakState)
  2327. hash := make([]byte, 32)
  2328. codes := make([][]byte, len(req.hashes))
  2329. for i, j := 0, 0; i < len(bytecodes); i++ {
  2330. // Find the next hash that we've been served, leaving misses with nils
  2331. hasher.Reset()
  2332. hasher.Write(bytecodes[i])
  2333. hasher.Read(hash)
  2334. for j < len(req.hashes) && !bytes.Equal(hash, req.hashes[j][:]) {
  2335. j++
  2336. }
  2337. if j < len(req.hashes) {
  2338. codes[j] = bytecodes[i]
  2339. j++
  2340. continue
  2341. }
  2342. // We've either ran out of hashes, or got unrequested data
  2343. logger.Warn("Unexpected healing bytecodes", "count", len(bytecodes)-i)
  2344. // Signal this request as failed, and ready for rescheduling
  2345. s.scheduleRevertBytecodeHealRequest(req)
  2346. return errors.New("unexpected healing bytecode")
  2347. }
  2348. // Response validated, send it to the scheduler for filling
  2349. response := &bytecodeHealResponse{
  2350. task: req.task,
  2351. hashes: req.hashes,
  2352. codes: codes,
  2353. }
  2354. select {
  2355. case s.bytecodeHealResps <- response:
  2356. case <-req.cancel:
  2357. case <-req.stale:
  2358. }
  2359. return nil
  2360. }
  2361. // onHealState is a callback method to invoke when a flat state(account
  2362. // or storage slot) is downloded during the healing stage. The flat states
  2363. // can be persisted blindly and can be fixed later in the generation stage.
  2364. // Note it's not concurrent safe, please handle the concurrent issue outside.
  2365. func (s *Syncer) onHealState(paths [][]byte, value []byte) error {
  2366. if len(paths) == 1 {
  2367. var account state.Account
  2368. if err := rlp.DecodeBytes(value, &account); err != nil {
  2369. return nil
  2370. }
  2371. blob := snapshot.SlimAccountRLP(account.Nonce, account.Balance, account.Root, account.CodeHash)
  2372. rawdb.WriteAccountSnapshot(s.stateWriter, common.BytesToHash(paths[0]), blob)
  2373. s.accountHealed += 1
  2374. s.accountHealedBytes += common.StorageSize(1 + common.HashLength + len(blob))
  2375. }
  2376. if len(paths) == 2 {
  2377. rawdb.WriteStorageSnapshot(s.stateWriter, common.BytesToHash(paths[0]), common.BytesToHash(paths[1]), value)
  2378. s.storageHealed += 1
  2379. s.storageHealedBytes += common.StorageSize(1 + 2*common.HashLength + len(value))
  2380. }
  2381. if s.stateWriter.ValueSize() > ethdb.IdealBatchSize {
  2382. s.stateWriter.Write() // It's fine to ignore the error here
  2383. s.stateWriter.Reset()
  2384. }
  2385. return nil
  2386. }
  2387. // hashSpace is the total size of the 256 bit hash space for accounts.
  2388. var hashSpace = new(big.Int).Exp(common.Big2, common.Big256, nil)
  2389. // report calculates various status reports and provides it to the user.
  2390. func (s *Syncer) report(force bool) {
  2391. if len(s.tasks) > 0 {
  2392. s.reportSyncProgress(force)
  2393. return
  2394. }
  2395. s.reportHealProgress(force)
  2396. }
  2397. // reportSyncProgress calculates various status reports and provides it to the user.
  2398. func (s *Syncer) reportSyncProgress(force bool) {
  2399. // Don't report all the events, just occasionally
  2400. if !force && time.Since(s.logTime) < 3*time.Second {
  2401. return
  2402. }
  2403. // Don't report anything until we have a meaningful progress
  2404. synced := s.accountBytes + s.bytecodeBytes + s.storageBytes
  2405. if synced == 0 {
  2406. return
  2407. }
  2408. accountGaps := new(big.Int)
  2409. for _, task := range s.tasks {
  2410. accountGaps.Add(accountGaps, new(big.Int).Sub(task.Last.Big(), task.Next.Big()))
  2411. }
  2412. accountFills := new(big.Int).Sub(hashSpace, accountGaps)
  2413. if accountFills.BitLen() == 0 {
  2414. return
  2415. }
  2416. s.logTime = time.Now()
  2417. estBytes := float64(new(big.Int).Div(
  2418. new(big.Int).Mul(new(big.Int).SetUint64(uint64(synced)), hashSpace),
  2419. accountFills,
  2420. ).Uint64())
  2421. elapsed := time.Since(s.startTime)
  2422. estTime := elapsed / time.Duration(synced) * time.Duration(estBytes)
  2423. // Create a mega progress report
  2424. var (
  2425. progress = fmt.Sprintf("%.2f%%", float64(synced)*100/estBytes)
  2426. accounts = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(s.accountSynced), s.accountBytes.TerminalString())
  2427. storage = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(s.storageSynced), s.storageBytes.TerminalString())
  2428. bytecode = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(s.bytecodeSynced), s.bytecodeBytes.TerminalString())
  2429. )
  2430. log.Info("State sync in progress", "synced", progress, "state", synced,
  2431. "accounts", accounts, "slots", storage, "codes", bytecode, "eta", common.PrettyDuration(estTime-elapsed))
  2432. }
  2433. // reportHealProgress calculates various status reports and provides it to the user.
  2434. func (s *Syncer) reportHealProgress(force bool) {
  2435. // Don't report all the events, just occasionally
  2436. if !force && time.Since(s.logTime) < 3*time.Second {
  2437. return
  2438. }
  2439. s.logTime = time.Now()
  2440. // Create a mega progress report
  2441. var (
  2442. trienode = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(s.trienodeHealSynced), s.trienodeHealBytes.TerminalString())
  2443. bytecode = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(s.bytecodeHealSynced), s.bytecodeHealBytes.TerminalString())
  2444. accounts = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(s.accountHealed), s.accountHealedBytes.TerminalString())
  2445. storage = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(s.storageHealed), s.storageHealedBytes.TerminalString())
  2446. )
  2447. log.Info("State heal in progress", "accounts", accounts, "slots", storage,
  2448. "codes", bytecode, "nodes", trienode, "pending", s.healer.scheduler.Pending())
  2449. }