net.go 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package discv5
  17. import (
  18. "bytes"
  19. "crypto/ecdsa"
  20. "errors"
  21. "fmt"
  22. "net"
  23. "time"
  24. "github.com/ethereum/go-ethereum/common"
  25. "github.com/ethereum/go-ethereum/common/mclock"
  26. "github.com/ethereum/go-ethereum/crypto"
  27. "github.com/ethereum/go-ethereum/crypto/sha3"
  28. "github.com/ethereum/go-ethereum/logger"
  29. "github.com/ethereum/go-ethereum/logger/glog"
  30. "github.com/ethereum/go-ethereum/p2p/nat"
  31. "github.com/ethereum/go-ethereum/rlp"
  32. )
  33. var (
  34. errInvalidEvent = errors.New("invalid in current state")
  35. errNoQuery = errors.New("no pending query")
  36. errWrongAddress = errors.New("unknown sender address")
  37. )
  38. const (
  39. autoRefreshInterval = 1 * time.Hour
  40. bucketRefreshInterval = 1 * time.Minute
  41. seedCount = 30
  42. seedMaxAge = 5 * 24 * time.Hour
  43. )
  44. const testTopic = "foo"
  45. const (
  46. printDebugLogs = false
  47. printTestImgLogs = false
  48. )
  49. func debugLog(s string) {
  50. if printDebugLogs {
  51. fmt.Println(s)
  52. }
  53. }
  54. // Network manages the table and all protocol interaction.
  55. type Network struct {
  56. db *nodeDB // database of known nodes
  57. conn transport
  58. closed chan struct{} // closed when loop is done
  59. closeReq chan struct{} // 'request to close'
  60. refreshReq chan []*Node // lookups ask for refresh on this channel
  61. refreshResp chan (<-chan struct{}) // ...and get the channel to block on from this one
  62. read chan ingressPacket // ingress packets arrive here
  63. timeout chan timeoutEvent
  64. queryReq chan *findnodeQuery // lookups submit findnode queries on this channel
  65. tableOpReq chan func()
  66. tableOpResp chan struct{}
  67. topicRegisterReq chan topicRegisterReq
  68. topicSearchReq chan topicSearchReq
  69. // State of the main loop.
  70. tab *Table
  71. topictab *topicTable
  72. ticketStore *ticketStore
  73. nursery []*Node
  74. nodes map[NodeID]*Node // tracks active nodes with state != known
  75. timeoutTimers map[timeoutEvent]*time.Timer
  76. // Revalidation queues.
  77. // Nodes put on these queues will be pinged eventually.
  78. slowRevalidateQueue []*Node
  79. fastRevalidateQueue []*Node
  80. // Buffers for state transition.
  81. sendBuf []*ingressPacket
  82. }
  83. // transport is implemented by the UDP transport.
  84. // it is an interface so we can test without opening lots of UDP
  85. // sockets and without generating a private key.
  86. type transport interface {
  87. sendPing(remote *Node, remoteAddr *net.UDPAddr, topics []Topic) (hash []byte)
  88. sendNeighbours(remote *Node, nodes []*Node)
  89. sendFindnodeHash(remote *Node, target common.Hash)
  90. sendTopicRegister(remote *Node, topics []Topic, topicIdx int, pong []byte)
  91. sendTopicNodes(remote *Node, queryHash common.Hash, nodes []*Node)
  92. send(remote *Node, ptype nodeEvent, p interface{}) (hash []byte)
  93. localAddr() *net.UDPAddr
  94. Close()
  95. }
  96. type findnodeQuery struct {
  97. remote *Node
  98. target common.Hash
  99. reply chan<- []*Node
  100. nresults int // counter for received nodes
  101. }
  102. type topicRegisterReq struct {
  103. add bool
  104. topic Topic
  105. }
  106. type topicSearchReq struct {
  107. topic Topic
  108. found chan<- string
  109. }
  110. type timeoutEvent struct {
  111. ev nodeEvent
  112. node *Node
  113. }
  114. func newNetwork(conn transport, ourPubkey ecdsa.PublicKey, natm nat.Interface, dbPath string) (*Network, error) {
  115. ourID := PubkeyID(&ourPubkey)
  116. var db *nodeDB
  117. if dbPath != "<no database>" {
  118. var err error
  119. if db, err = newNodeDB(dbPath, Version, ourID); err != nil {
  120. return nil, err
  121. }
  122. }
  123. tab := newTable(ourID, conn.localAddr())
  124. net := &Network{
  125. db: db,
  126. conn: conn,
  127. tab: tab,
  128. topictab: newTopicTable(db, tab.self),
  129. ticketStore: newTicketStore(),
  130. refreshReq: make(chan []*Node),
  131. refreshResp: make(chan (<-chan struct{})),
  132. closed: make(chan struct{}),
  133. closeReq: make(chan struct{}),
  134. read: make(chan ingressPacket, 100),
  135. timeout: make(chan timeoutEvent),
  136. timeoutTimers: make(map[timeoutEvent]*time.Timer),
  137. tableOpReq: make(chan func()),
  138. tableOpResp: make(chan struct{}),
  139. queryReq: make(chan *findnodeQuery),
  140. topicRegisterReq: make(chan topicRegisterReq),
  141. topicSearchReq: make(chan topicSearchReq),
  142. nodes: make(map[NodeID]*Node),
  143. }
  144. go net.loop()
  145. return net, nil
  146. }
  147. // Close terminates the network listener and flushes the node database.
  148. func (net *Network) Close() {
  149. net.conn.Close()
  150. select {
  151. case <-net.closed:
  152. case net.closeReq <- struct{}{}:
  153. <-net.closed
  154. }
  155. }
  156. // Self returns the local node.
  157. // The returned node should not be modified by the caller.
  158. func (net *Network) Self() *Node {
  159. return net.tab.self
  160. }
  161. // ReadRandomNodes fills the given slice with random nodes from the
  162. // table. It will not write the same node more than once. The nodes in
  163. // the slice are copies and can be modified by the caller.
  164. func (net *Network) ReadRandomNodes(buf []*Node) (n int) {
  165. net.reqTableOp(func() { n = net.tab.readRandomNodes(buf) })
  166. return n
  167. }
  168. // SetFallbackNodes sets the initial points of contact. These nodes
  169. // are used to connect to the network if the table is empty and there
  170. // are no known nodes in the database.
  171. func (net *Network) SetFallbackNodes(nodes []*Node) error {
  172. nursery := make([]*Node, 0, len(nodes))
  173. for _, n := range nodes {
  174. if err := n.validateComplete(); err != nil {
  175. return fmt.Errorf("bad bootstrap/fallback node %q (%v)", n, err)
  176. }
  177. // Recompute cpy.sha because the node might not have been
  178. // created by NewNode or ParseNode.
  179. cpy := *n
  180. cpy.sha = crypto.Keccak256Hash(n.ID[:])
  181. nursery = append(nursery, &cpy)
  182. }
  183. net.reqRefresh(nursery)
  184. return nil
  185. }
  186. // Resolve searches for a specific node with the given ID.
  187. // It returns nil if the node could not be found.
  188. func (net *Network) Resolve(targetID NodeID) *Node {
  189. result := net.lookup(crypto.Keccak256Hash(targetID[:]), true)
  190. for _, n := range result {
  191. if n.ID == targetID {
  192. return n
  193. }
  194. }
  195. return nil
  196. }
  197. // Lookup performs a network search for nodes close
  198. // to the given target. It approaches the target by querying
  199. // nodes that are closer to it on each iteration.
  200. // The given target does not need to be an actual node
  201. // identifier.
  202. //
  203. // The local node may be included in the result.
  204. func (net *Network) Lookup(targetID NodeID) []*Node {
  205. return net.lookup(crypto.Keccak256Hash(targetID[:]), false)
  206. }
  207. func (net *Network) lookup(target common.Hash, stopOnMatch bool) []*Node {
  208. var (
  209. asked = make(map[NodeID]bool)
  210. seen = make(map[NodeID]bool)
  211. reply = make(chan []*Node, alpha)
  212. result = nodesByDistance{target: target}
  213. pendingQueries = 0
  214. )
  215. // Get initial answers from the local node.
  216. result.push(net.tab.self, bucketSize)
  217. for {
  218. // Ask the α closest nodes that we haven't asked yet.
  219. for i := 0; i < len(result.entries) && pendingQueries < alpha; i++ {
  220. n := result.entries[i]
  221. if !asked[n.ID] {
  222. asked[n.ID] = true
  223. pendingQueries++
  224. net.reqQueryFindnode(n, target, reply)
  225. }
  226. }
  227. if pendingQueries == 0 {
  228. // We have asked all closest nodes, stop the search.
  229. break
  230. }
  231. // Wait for the next reply.
  232. for _, n := range <-reply {
  233. if n != nil && !seen[n.ID] {
  234. seen[n.ID] = true
  235. result.push(n, bucketSize)
  236. if stopOnMatch && n.sha == target {
  237. return result.entries
  238. }
  239. }
  240. }
  241. pendingQueries--
  242. }
  243. return result.entries
  244. }
  245. func (net *Network) RegisterTopic(topic Topic, stop <-chan struct{}) {
  246. select {
  247. case net.topicRegisterReq <- topicRegisterReq{true, topic}:
  248. case <-net.closed:
  249. return
  250. }
  251. select {
  252. case <-net.closed:
  253. case <-stop:
  254. select {
  255. case net.topicRegisterReq <- topicRegisterReq{false, topic}:
  256. case <-net.closed:
  257. }
  258. }
  259. }
  260. func (net *Network) SearchTopic(topic Topic, stop <-chan struct{}, found chan<- string) {
  261. select {
  262. case net.topicSearchReq <- topicSearchReq{topic, found}:
  263. case <-net.closed:
  264. return
  265. }
  266. select {
  267. case <-net.closed:
  268. case <-stop:
  269. select {
  270. case net.topicSearchReq <- topicSearchReq{topic, nil}:
  271. case <-net.closed:
  272. }
  273. }
  274. }
  275. func (net *Network) reqRefresh(nursery []*Node) <-chan struct{} {
  276. select {
  277. case net.refreshReq <- nursery:
  278. return <-net.refreshResp
  279. case <-net.closed:
  280. return net.closed
  281. }
  282. }
  283. func (net *Network) reqQueryFindnode(n *Node, target common.Hash, reply chan []*Node) bool {
  284. q := &findnodeQuery{remote: n, target: target, reply: reply}
  285. select {
  286. case net.queryReq <- q:
  287. return true
  288. case <-net.closed:
  289. return false
  290. }
  291. }
  292. func (net *Network) reqReadPacket(pkt ingressPacket) {
  293. select {
  294. case net.read <- pkt:
  295. case <-net.closed:
  296. }
  297. }
  298. func (net *Network) reqTableOp(f func()) (called bool) {
  299. select {
  300. case net.tableOpReq <- f:
  301. <-net.tableOpResp
  302. return true
  303. case <-net.closed:
  304. return false
  305. }
  306. }
  307. // TODO: external address handling.
  308. func (net *Network) loop() {
  309. var (
  310. refreshTimer = time.NewTicker(autoRefreshInterval)
  311. bucketRefreshTimer = time.NewTimer(bucketRefreshInterval)
  312. refreshDone chan struct{} // closed when the 'refresh' lookup has ended
  313. )
  314. // Tracking the next ticket to register.
  315. var (
  316. nextTicket *ticketRef
  317. nextRegisterTimer *time.Timer
  318. nextRegisterTime <-chan time.Time
  319. )
  320. defer func() {
  321. if nextRegisterTimer != nil {
  322. nextRegisterTimer.Stop()
  323. }
  324. }()
  325. resetNextTicket := func() {
  326. t, timeout := net.ticketStore.nextFilteredTicket()
  327. if t != nextTicket {
  328. nextTicket = t
  329. if nextRegisterTimer != nil {
  330. nextRegisterTimer.Stop()
  331. nextRegisterTime = nil
  332. }
  333. if t != nil {
  334. nextRegisterTimer = time.NewTimer(timeout)
  335. nextRegisterTime = nextRegisterTimer.C
  336. }
  337. }
  338. }
  339. // Tracking registration and search lookups.
  340. var (
  341. topicRegisterLookupTarget lookupInfo
  342. topicRegisterLookupDone chan []*Node
  343. topicRegisterLookupTick = time.NewTimer(0)
  344. topicSearchLookupTarget lookupInfo
  345. searchReqWhenRefreshDone []topicSearchReq
  346. )
  347. topicSearchLookupDone := make(chan []*Node, 1)
  348. <-topicRegisterLookupTick.C
  349. statsDump := time.NewTicker(10 * time.Second)
  350. loop:
  351. for {
  352. resetNextTicket()
  353. select {
  354. case <-net.closeReq:
  355. debugLog("<-net.closeReq")
  356. break loop
  357. // Ingress packet handling.
  358. case pkt := <-net.read:
  359. //fmt.Println("read", pkt.ev)
  360. debugLog("<-net.read")
  361. n := net.internNode(&pkt)
  362. prestate := n.state
  363. status := "ok"
  364. if err := net.handle(n, pkt.ev, &pkt); err != nil {
  365. status = err.Error()
  366. }
  367. if glog.V(logger.Detail) {
  368. glog.Infof("<<< (%d) %v from %x@%v: %v -> %v (%v)",
  369. net.tab.count, pkt.ev, pkt.remoteID[:8], pkt.remoteAddr, prestate, n.state, status)
  370. }
  371. // TODO: persist state if n.state goes >= known, delete if it goes <= known
  372. // State transition timeouts.
  373. case timeout := <-net.timeout:
  374. debugLog("<-net.timeout")
  375. if net.timeoutTimers[timeout] == nil {
  376. // Stale timer (was aborted).
  377. continue
  378. }
  379. delete(net.timeoutTimers, timeout)
  380. prestate := timeout.node.state
  381. status := "ok"
  382. if err := net.handle(timeout.node, timeout.ev, nil); err != nil {
  383. status = err.Error()
  384. }
  385. if glog.V(logger.Detail) {
  386. glog.Infof("--- (%d) %v for %x@%v: %v -> %v (%v)",
  387. net.tab.count, timeout.ev, timeout.node.ID[:8], timeout.node.addr(), prestate, timeout.node.state, status)
  388. }
  389. // Querying.
  390. case q := <-net.queryReq:
  391. debugLog("<-net.queryReq")
  392. if !q.start(net) {
  393. q.remote.deferQuery(q)
  394. }
  395. // Interacting with the table.
  396. case f := <-net.tableOpReq:
  397. debugLog("<-net.tableOpReq")
  398. f()
  399. net.tableOpResp <- struct{}{}
  400. // Topic registration stuff.
  401. case req := <-net.topicRegisterReq:
  402. debugLog("<-net.topicRegisterReq")
  403. if !req.add {
  404. net.ticketStore.removeRegisterTopic(req.topic)
  405. continue
  406. }
  407. net.ticketStore.addTopic(req.topic, true)
  408. // If we're currently waiting idle (nothing to look up), give the ticket store a
  409. // chance to start it sooner. This should speed up convergence of the radius
  410. // determination for new topics.
  411. // if topicRegisterLookupDone == nil {
  412. if topicRegisterLookupTarget.target == (common.Hash{}) {
  413. debugLog("topicRegisterLookupTarget == null")
  414. if topicRegisterLookupTick.Stop() {
  415. <-topicRegisterLookupTick.C
  416. }
  417. target, delay := net.ticketStore.nextRegisterLookup()
  418. topicRegisterLookupTarget = target
  419. topicRegisterLookupTick.Reset(delay)
  420. }
  421. case nodes := <-topicRegisterLookupDone:
  422. debugLog("<-topicRegisterLookupDone")
  423. net.ticketStore.registerLookupDone(topicRegisterLookupTarget, nodes, func(n *Node) []byte {
  424. net.ping(n, n.addr())
  425. return n.pingEcho
  426. })
  427. target, delay := net.ticketStore.nextRegisterLookup()
  428. topicRegisterLookupTarget = target
  429. topicRegisterLookupTick.Reset(delay)
  430. topicRegisterLookupDone = nil
  431. case <-topicRegisterLookupTick.C:
  432. debugLog("<-topicRegisterLookupTick")
  433. if (topicRegisterLookupTarget.target == common.Hash{}) {
  434. target, delay := net.ticketStore.nextRegisterLookup()
  435. topicRegisterLookupTarget = target
  436. topicRegisterLookupTick.Reset(delay)
  437. topicRegisterLookupDone = nil
  438. } else {
  439. topicRegisterLookupDone = make(chan []*Node)
  440. target := topicRegisterLookupTarget.target
  441. go func() { topicRegisterLookupDone <- net.lookup(target, false) }()
  442. }
  443. case <-nextRegisterTime:
  444. debugLog("<-nextRegisterTime")
  445. net.ticketStore.ticketRegistered(*nextTicket)
  446. //fmt.Println("sendTopicRegister", nextTicket.t.node.addr().String(), nextTicket.t.topics, nextTicket.idx, nextTicket.t.pong)
  447. net.conn.sendTopicRegister(nextTicket.t.node, nextTicket.t.topics, nextTicket.idx, nextTicket.t.pong)
  448. case req := <-net.topicSearchReq:
  449. if refreshDone == nil {
  450. debugLog("<-net.topicSearchReq")
  451. if req.found == nil {
  452. net.ticketStore.removeSearchTopic(req.topic)
  453. continue
  454. }
  455. net.ticketStore.addSearchTopic(req.topic, req.found)
  456. if (topicSearchLookupTarget.target == common.Hash{}) {
  457. topicSearchLookupDone <- nil
  458. }
  459. } else {
  460. searchReqWhenRefreshDone = append(searchReqWhenRefreshDone, req)
  461. }
  462. case nodes := <-topicSearchLookupDone:
  463. debugLog("<-topicSearchLookupDone")
  464. net.ticketStore.searchLookupDone(topicSearchLookupTarget, nodes, func(n *Node) []byte {
  465. net.ping(n, n.addr())
  466. return n.pingEcho
  467. }, func(n *Node, topic Topic) []byte {
  468. if n.state == known {
  469. return net.conn.send(n, topicQueryPacket, topicQuery{Topic: topic}) // TODO: set expiration
  470. } else {
  471. if n.state == unknown {
  472. net.ping(n, n.addr())
  473. }
  474. return nil
  475. }
  476. })
  477. topicSearchLookupTarget = net.ticketStore.nextSearchLookup()
  478. target := topicSearchLookupTarget.target
  479. if (target != common.Hash{}) {
  480. go func() { topicSearchLookupDone <- net.lookup(target, false) }()
  481. }
  482. case <-statsDump.C:
  483. debugLog("<-statsDump.C")
  484. /*r, ok := net.ticketStore.radius[testTopic]
  485. if !ok {
  486. fmt.Printf("(%x) no radius @ %v\n", net.tab.self.ID[:8], time.Now())
  487. } else {
  488. topics := len(net.ticketStore.tickets)
  489. tickets := len(net.ticketStore.nodes)
  490. rad := r.radius / (maxRadius/10000+1)
  491. fmt.Printf("(%x) topics:%d radius:%d tickets:%d @ %v\n", net.tab.self.ID[:8], topics, rad, tickets, time.Now())
  492. }*/
  493. tm := mclock.Now()
  494. for topic, r := range net.ticketStore.radius {
  495. if printTestImgLogs {
  496. rad := r.radius / (maxRadius/1000000 + 1)
  497. minrad := r.minRadius / (maxRadius/1000000 + 1)
  498. fmt.Printf("*R %d %v %016x %v\n", tm/1000000, topic, net.tab.self.sha[:8], rad)
  499. fmt.Printf("*MR %d %v %016x %v\n", tm/1000000, topic, net.tab.self.sha[:8], minrad)
  500. }
  501. }
  502. for topic, t := range net.topictab.topics {
  503. wp := t.wcl.nextWaitPeriod(tm)
  504. if printTestImgLogs {
  505. fmt.Printf("*W %d %v %016x %d\n", tm/1000000, topic, net.tab.self.sha[:8], wp/1000000)
  506. }
  507. }
  508. // Periodic / lookup-initiated bucket refresh.
  509. case <-refreshTimer.C:
  510. debugLog("<-refreshTimer.C")
  511. // TODO: ideally we would start the refresh timer after
  512. // fallback nodes have been set for the first time.
  513. if refreshDone == nil {
  514. refreshDone = make(chan struct{})
  515. net.refresh(refreshDone)
  516. }
  517. case <-bucketRefreshTimer.C:
  518. target := net.tab.chooseBucketRefreshTarget()
  519. go func() {
  520. net.lookup(target, false)
  521. bucketRefreshTimer.Reset(bucketRefreshInterval)
  522. }()
  523. case newNursery := <-net.refreshReq:
  524. debugLog("<-net.refreshReq")
  525. if newNursery != nil {
  526. net.nursery = newNursery
  527. }
  528. if refreshDone == nil {
  529. refreshDone = make(chan struct{})
  530. net.refresh(refreshDone)
  531. }
  532. net.refreshResp <- refreshDone
  533. case <-refreshDone:
  534. debugLog("<-net.refreshDone")
  535. refreshDone = nil
  536. list := searchReqWhenRefreshDone
  537. searchReqWhenRefreshDone = nil
  538. go func() {
  539. for _, req := range list {
  540. net.topicSearchReq <- req
  541. }
  542. }()
  543. }
  544. }
  545. debugLog("loop stopped")
  546. glog.V(logger.Debug).Infof("shutting down")
  547. if net.conn != nil {
  548. net.conn.Close()
  549. }
  550. if refreshDone != nil {
  551. // TODO: wait for pending refresh.
  552. //<-refreshResults
  553. }
  554. // Cancel all pending timeouts.
  555. for _, timer := range net.timeoutTimers {
  556. timer.Stop()
  557. }
  558. if net.db != nil {
  559. net.db.close()
  560. }
  561. close(net.closed)
  562. }
  563. // Everything below runs on the Network.loop goroutine
  564. // and can modify Node, Table and Network at any time without locking.
  565. func (net *Network) refresh(done chan<- struct{}) {
  566. var seeds []*Node
  567. if net.db != nil {
  568. seeds = net.db.querySeeds(seedCount, seedMaxAge)
  569. }
  570. if len(seeds) == 0 {
  571. seeds = net.nursery
  572. }
  573. if len(seeds) == 0 {
  574. glog.V(logger.Detail).Info("no seed nodes found")
  575. close(done)
  576. return
  577. }
  578. for _, n := range seeds {
  579. if glog.V(logger.Debug) {
  580. var age string
  581. if net.db != nil {
  582. age = time.Since(net.db.lastPong(n.ID)).String()
  583. } else {
  584. age = "unknown"
  585. }
  586. glog.Infof("seed node (age %s): %v", age, n)
  587. }
  588. n = net.internNodeFromDB(n)
  589. if n.state == unknown {
  590. net.transition(n, verifyinit)
  591. }
  592. // Force-add the seed node so Lookup does something.
  593. // It will be deleted again if verification fails.
  594. net.tab.add(n)
  595. }
  596. // Start self lookup to fill up the buckets.
  597. go func() {
  598. net.Lookup(net.tab.self.ID)
  599. close(done)
  600. }()
  601. }
  602. // Node Interning.
  603. func (net *Network) internNode(pkt *ingressPacket) *Node {
  604. if n := net.nodes[pkt.remoteID]; n != nil {
  605. n.IP = pkt.remoteAddr.IP
  606. n.UDP = uint16(pkt.remoteAddr.Port)
  607. n.TCP = uint16(pkt.remoteAddr.Port)
  608. return n
  609. }
  610. n := NewNode(pkt.remoteID, pkt.remoteAddr.IP, uint16(pkt.remoteAddr.Port), uint16(pkt.remoteAddr.Port))
  611. n.state = unknown
  612. net.nodes[pkt.remoteID] = n
  613. return n
  614. }
  615. func (net *Network) internNodeFromDB(dbn *Node) *Node {
  616. if n := net.nodes[dbn.ID]; n != nil {
  617. return n
  618. }
  619. n := NewNode(dbn.ID, dbn.IP, dbn.UDP, dbn.TCP)
  620. n.state = unknown
  621. net.nodes[n.ID] = n
  622. return n
  623. }
  624. func (net *Network) internNodeFromNeighbours(rn rpcNode) (n *Node, err error) {
  625. if rn.ID == net.tab.self.ID {
  626. return nil, errors.New("is self")
  627. }
  628. n = net.nodes[rn.ID]
  629. if n == nil {
  630. // We haven't seen this node before.
  631. n, err = nodeFromRPC(rn)
  632. n.state = unknown
  633. if err == nil {
  634. net.nodes[n.ID] = n
  635. }
  636. return n, err
  637. }
  638. if !bytes.Equal(n.IP, rn.IP) || n.UDP != rn.UDP || n.TCP != rn.TCP {
  639. err = fmt.Errorf("metadata mismatch: got %v, want %v", rn, n)
  640. }
  641. return n, err
  642. }
  643. // nodeNetGuts is embedded in Node and contains fields.
  644. type nodeNetGuts struct {
  645. // This is a cached copy of sha3(ID) which is used for node
  646. // distance calculations. This is part of Node in order to make it
  647. // possible to write tests that need a node at a certain distance.
  648. // In those tests, the content of sha will not actually correspond
  649. // with ID.
  650. sha common.Hash
  651. // State machine fields. Access to these fields
  652. // is restricted to the Network.loop goroutine.
  653. state *nodeState
  654. pingEcho []byte // hash of last ping sent by us
  655. pingTopics []Topic // topic set sent by us in last ping
  656. deferredQueries []*findnodeQuery // queries that can't be sent yet
  657. pendingNeighbours *findnodeQuery // current query, waiting for reply
  658. queryTimeouts int
  659. }
  660. func (n *nodeNetGuts) deferQuery(q *findnodeQuery) {
  661. n.deferredQueries = append(n.deferredQueries, q)
  662. }
  663. func (n *nodeNetGuts) startNextQuery(net *Network) {
  664. if len(n.deferredQueries) == 0 {
  665. return
  666. }
  667. nextq := n.deferredQueries[0]
  668. if nextq.start(net) {
  669. n.deferredQueries = append(n.deferredQueries[:0], n.deferredQueries[1:]...)
  670. }
  671. }
  672. func (q *findnodeQuery) start(net *Network) bool {
  673. // Satisfy queries against the local node directly.
  674. if q.remote == net.tab.self {
  675. closest := net.tab.closest(crypto.Keccak256Hash(q.target[:]), bucketSize)
  676. q.reply <- closest.entries
  677. return true
  678. }
  679. if q.remote.state.canQuery && q.remote.pendingNeighbours == nil {
  680. net.conn.sendFindnodeHash(q.remote, q.target)
  681. net.timedEvent(respTimeout, q.remote, neighboursTimeout)
  682. q.remote.pendingNeighbours = q
  683. return true
  684. }
  685. // If the node is not known yet, it won't accept queries.
  686. // Initiate the transition to known.
  687. // The request will be sent later when the node reaches known state.
  688. if q.remote.state == unknown {
  689. net.transition(q.remote, verifyinit)
  690. }
  691. return false
  692. }
  693. // Node Events (the input to the state machine).
  694. type nodeEvent uint
  695. //go:generate stringer -type=nodeEvent
  696. const (
  697. invalidEvent nodeEvent = iota // zero is reserved
  698. // Packet type events.
  699. // These correspond to packet types in the UDP protocol.
  700. pingPacket
  701. pongPacket
  702. findnodePacket
  703. neighborsPacket
  704. findnodeHashPacket
  705. topicRegisterPacket
  706. topicQueryPacket
  707. topicNodesPacket
  708. // Non-packet events.
  709. // Event values in this category are allocated outside
  710. // the packet type range (packet types are encoded as a single byte).
  711. pongTimeout nodeEvent = iota + 256
  712. pingTimeout
  713. neighboursTimeout
  714. )
  715. // Node State Machine.
  716. type nodeState struct {
  717. name string
  718. handle func(*Network, *Node, nodeEvent, *ingressPacket) (next *nodeState, err error)
  719. enter func(*Network, *Node)
  720. canQuery bool
  721. }
  722. func (s *nodeState) String() string {
  723. return s.name
  724. }
  725. var (
  726. unknown *nodeState
  727. verifyinit *nodeState
  728. verifywait *nodeState
  729. remoteverifywait *nodeState
  730. known *nodeState
  731. contested *nodeState
  732. unresponsive *nodeState
  733. )
  734. func init() {
  735. unknown = &nodeState{
  736. name: "unknown",
  737. enter: func(net *Network, n *Node) {
  738. net.tab.delete(n)
  739. n.pingEcho = nil
  740. // Abort active queries.
  741. for _, q := range n.deferredQueries {
  742. q.reply <- nil
  743. }
  744. n.deferredQueries = nil
  745. if n.pendingNeighbours != nil {
  746. n.pendingNeighbours.reply <- nil
  747. n.pendingNeighbours = nil
  748. }
  749. n.queryTimeouts = 0
  750. },
  751. handle: func(net *Network, n *Node, ev nodeEvent, pkt *ingressPacket) (*nodeState, error) {
  752. switch ev {
  753. case pingPacket:
  754. net.handlePing(n, pkt)
  755. net.ping(n, pkt.remoteAddr)
  756. return verifywait, nil
  757. default:
  758. return unknown, errInvalidEvent
  759. }
  760. },
  761. }
  762. verifyinit = &nodeState{
  763. name: "verifyinit",
  764. enter: func(net *Network, n *Node) {
  765. net.ping(n, n.addr())
  766. },
  767. handle: func(net *Network, n *Node, ev nodeEvent, pkt *ingressPacket) (*nodeState, error) {
  768. switch ev {
  769. case pingPacket:
  770. net.handlePing(n, pkt)
  771. return verifywait, nil
  772. case pongPacket:
  773. err := net.handleKnownPong(n, pkt)
  774. return remoteverifywait, err
  775. case pongTimeout:
  776. return unknown, nil
  777. default:
  778. return verifyinit, errInvalidEvent
  779. }
  780. },
  781. }
  782. verifywait = &nodeState{
  783. name: "verifywait",
  784. handle: func(net *Network, n *Node, ev nodeEvent, pkt *ingressPacket) (*nodeState, error) {
  785. switch ev {
  786. case pingPacket:
  787. net.handlePing(n, pkt)
  788. return verifywait, nil
  789. case pongPacket:
  790. err := net.handleKnownPong(n, pkt)
  791. return known, err
  792. case pongTimeout:
  793. return unknown, nil
  794. default:
  795. return verifywait, errInvalidEvent
  796. }
  797. },
  798. }
  799. remoteverifywait = &nodeState{
  800. name: "remoteverifywait",
  801. enter: func(net *Network, n *Node) {
  802. net.timedEvent(respTimeout, n, pingTimeout)
  803. },
  804. handle: func(net *Network, n *Node, ev nodeEvent, pkt *ingressPacket) (*nodeState, error) {
  805. switch ev {
  806. case pingPacket:
  807. net.handlePing(n, pkt)
  808. return remoteverifywait, nil
  809. case pingTimeout:
  810. return known, nil
  811. default:
  812. return remoteverifywait, errInvalidEvent
  813. }
  814. },
  815. }
  816. known = &nodeState{
  817. name: "known",
  818. canQuery: true,
  819. enter: func(net *Network, n *Node) {
  820. n.queryTimeouts = 0
  821. n.startNextQuery(net)
  822. // Insert into the table and start revalidation of the last node
  823. // in the bucket if it is full.
  824. last := net.tab.add(n)
  825. if last != nil && last.state == known {
  826. // TODO: do this asynchronously
  827. net.transition(last, contested)
  828. }
  829. },
  830. handle: func(net *Network, n *Node, ev nodeEvent, pkt *ingressPacket) (*nodeState, error) {
  831. switch ev {
  832. case pingPacket:
  833. net.handlePing(n, pkt)
  834. return known, nil
  835. case pongPacket:
  836. err := net.handleKnownPong(n, pkt)
  837. return known, err
  838. default:
  839. return net.handleQueryEvent(n, ev, pkt)
  840. }
  841. },
  842. }
  843. contested = &nodeState{
  844. name: "contested",
  845. canQuery: true,
  846. enter: func(net *Network, n *Node) {
  847. net.ping(n, n.addr())
  848. },
  849. handle: func(net *Network, n *Node, ev nodeEvent, pkt *ingressPacket) (*nodeState, error) {
  850. switch ev {
  851. case pongPacket:
  852. // Node is still alive.
  853. err := net.handleKnownPong(n, pkt)
  854. return known, err
  855. case pongTimeout:
  856. net.tab.deleteReplace(n)
  857. return unresponsive, nil
  858. case pingPacket:
  859. net.handlePing(n, pkt)
  860. return contested, nil
  861. default:
  862. return net.handleQueryEvent(n, ev, pkt)
  863. }
  864. },
  865. }
  866. unresponsive = &nodeState{
  867. name: "unresponsive",
  868. canQuery: true,
  869. handle: func(net *Network, n *Node, ev nodeEvent, pkt *ingressPacket) (*nodeState, error) {
  870. switch ev {
  871. case pingPacket:
  872. net.handlePing(n, pkt)
  873. return known, nil
  874. case pongPacket:
  875. err := net.handleKnownPong(n, pkt)
  876. return known, err
  877. default:
  878. return net.handleQueryEvent(n, ev, pkt)
  879. }
  880. },
  881. }
  882. }
  883. // handle processes packets sent by n and events related to n.
  884. func (net *Network) handle(n *Node, ev nodeEvent, pkt *ingressPacket) error {
  885. //fmt.Println("handle", n.addr().String(), n.state, ev)
  886. if pkt != nil {
  887. if err := net.checkPacket(n, ev, pkt); err != nil {
  888. //fmt.Println("check err:", err)
  889. return err
  890. }
  891. // Start the background expiration goroutine after the first
  892. // successful communication. Subsequent calls have no effect if it
  893. // is already running. We do this here instead of somewhere else
  894. // so that the search for seed nodes also considers older nodes
  895. // that would otherwise be removed by the expirer.
  896. if net.db != nil {
  897. net.db.ensureExpirer()
  898. }
  899. }
  900. if n.state == nil {
  901. n.state = unknown //???
  902. }
  903. next, err := n.state.handle(net, n, ev, pkt)
  904. net.transition(n, next)
  905. //fmt.Println("new state:", n.state)
  906. return err
  907. }
  908. func (net *Network) checkPacket(n *Node, ev nodeEvent, pkt *ingressPacket) error {
  909. // Replay prevention checks.
  910. switch ev {
  911. case pingPacket, findnodeHashPacket, neighborsPacket:
  912. // TODO: check date is > last date seen
  913. // TODO: check ping version
  914. case pongPacket:
  915. if !bytes.Equal(pkt.data.(*pong).ReplyTok, n.pingEcho) {
  916. // fmt.Println("pong reply token mismatch")
  917. return fmt.Errorf("pong reply token mismatch")
  918. }
  919. n.pingEcho = nil
  920. }
  921. // Address validation.
  922. // TODO: Ideally we would do the following:
  923. // - reject all packets with wrong address except ping.
  924. // - for ping with new address, transition to verifywait but keep the
  925. // previous node (with old address) around. if the new one reaches known,
  926. // swap it out.
  927. return nil
  928. }
  929. func (net *Network) transition(n *Node, next *nodeState) {
  930. if n.state != next {
  931. n.state = next
  932. if next.enter != nil {
  933. next.enter(net, n)
  934. }
  935. }
  936. // TODO: persist/unpersist node
  937. }
  938. func (net *Network) timedEvent(d time.Duration, n *Node, ev nodeEvent) {
  939. timeout := timeoutEvent{ev, n}
  940. net.timeoutTimers[timeout] = time.AfterFunc(d, func() {
  941. select {
  942. case net.timeout <- timeout:
  943. case <-net.closed:
  944. }
  945. })
  946. }
  947. func (net *Network) abortTimedEvent(n *Node, ev nodeEvent) {
  948. timer := net.timeoutTimers[timeoutEvent{ev, n}]
  949. if timer != nil {
  950. timer.Stop()
  951. delete(net.timeoutTimers, timeoutEvent{ev, n})
  952. }
  953. }
  954. func (net *Network) ping(n *Node, addr *net.UDPAddr) {
  955. //fmt.Println("ping", n.addr().String(), n.ID.String(), n.sha.Hex())
  956. if n.pingEcho != nil || n.ID == net.tab.self.ID {
  957. //fmt.Println(" not sent")
  958. return
  959. }
  960. debugLog(fmt.Sprintf("ping(node = %x)", n.ID[:8]))
  961. n.pingTopics = net.ticketStore.regTopicSet()
  962. n.pingEcho = net.conn.sendPing(n, addr, n.pingTopics)
  963. net.timedEvent(respTimeout, n, pongTimeout)
  964. }
  965. func (net *Network) handlePing(n *Node, pkt *ingressPacket) {
  966. debugLog(fmt.Sprintf("handlePing(node = %x)", n.ID[:8]))
  967. ping := pkt.data.(*ping)
  968. n.TCP = ping.From.TCP
  969. t := net.topictab.getTicket(n, ping.Topics)
  970. pong := &pong{
  971. To: makeEndpoint(n.addr(), n.TCP), // TODO: maybe use known TCP port from DB
  972. ReplyTok: pkt.hash,
  973. Expiration: uint64(time.Now().Add(expiration).Unix()),
  974. }
  975. ticketToPong(t, pong)
  976. net.conn.send(n, pongPacket, pong)
  977. }
  978. func (net *Network) handleKnownPong(n *Node, pkt *ingressPacket) error {
  979. debugLog(fmt.Sprintf("handleKnownPong(node = %x)", n.ID[:8]))
  980. net.abortTimedEvent(n, pongTimeout)
  981. now := mclock.Now()
  982. ticket, err := pongToTicket(now, n.pingTopics, n, pkt)
  983. if err == nil {
  984. // fmt.Printf("(%x) ticket: %+v\n", net.tab.self.ID[:8], pkt.data)
  985. net.ticketStore.addTicket(now, pkt.data.(*pong).ReplyTok, ticket)
  986. } else {
  987. debugLog(fmt.Sprintf(" error: %v", err))
  988. }
  989. n.pingEcho = nil
  990. n.pingTopics = nil
  991. return err
  992. }
  993. func (net *Network) handleQueryEvent(n *Node, ev nodeEvent, pkt *ingressPacket) (*nodeState, error) {
  994. switch ev {
  995. case findnodePacket:
  996. target := crypto.Keccak256Hash(pkt.data.(*findnode).Target[:])
  997. results := net.tab.closest(target, bucketSize).entries
  998. net.conn.sendNeighbours(n, results)
  999. return n.state, nil
  1000. case neighborsPacket:
  1001. err := net.handleNeighboursPacket(n, pkt.data.(*neighbors))
  1002. return n.state, err
  1003. case neighboursTimeout:
  1004. if n.pendingNeighbours != nil {
  1005. n.pendingNeighbours.reply <- nil
  1006. n.pendingNeighbours = nil
  1007. }
  1008. n.queryTimeouts++
  1009. if n.queryTimeouts > maxFindnodeFailures && n.state == known {
  1010. return contested, errors.New("too many timeouts")
  1011. }
  1012. return n.state, nil
  1013. // v5
  1014. case findnodeHashPacket:
  1015. results := net.tab.closest(pkt.data.(*findnodeHash).Target, bucketSize).entries
  1016. net.conn.sendNeighbours(n, results)
  1017. return n.state, nil
  1018. case topicRegisterPacket:
  1019. //fmt.Println("got topicRegisterPacket")
  1020. regdata := pkt.data.(*topicRegister)
  1021. pong, err := net.checkTopicRegister(regdata)
  1022. if err != nil {
  1023. //fmt.Println(err)
  1024. return n.state, fmt.Errorf("bad waiting ticket: %v", err)
  1025. }
  1026. net.topictab.useTicket(n, pong.TicketSerial, regdata.Topics, int(regdata.Idx), pong.Expiration, pong.WaitPeriods)
  1027. return n.state, nil
  1028. case topicQueryPacket:
  1029. // TODO: handle expiration
  1030. topic := pkt.data.(*topicQuery).Topic
  1031. results := net.topictab.getEntries(topic)
  1032. if _, ok := net.ticketStore.tickets[topic]; ok {
  1033. results = append(results, net.tab.self) // we're not registering in our own table but if we're advertising, return ourselves too
  1034. }
  1035. if len(results) > 10 {
  1036. results = results[:10]
  1037. }
  1038. var hash common.Hash
  1039. copy(hash[:], pkt.hash)
  1040. net.conn.sendTopicNodes(n, hash, results)
  1041. return n.state, nil
  1042. case topicNodesPacket:
  1043. p := pkt.data.(*topicNodes)
  1044. if net.ticketStore.gotTopicNodes(n, p.Echo, p.Nodes) {
  1045. n.queryTimeouts++
  1046. if n.queryTimeouts > maxFindnodeFailures && n.state == known {
  1047. return contested, errors.New("too many timeouts")
  1048. }
  1049. }
  1050. return n.state, nil
  1051. default:
  1052. return n.state, errInvalidEvent
  1053. }
  1054. }
  1055. func (net *Network) checkTopicRegister(data *topicRegister) (*pong, error) {
  1056. var pongpkt ingressPacket
  1057. if err := decodePacket(data.Pong, &pongpkt); err != nil {
  1058. return nil, err
  1059. }
  1060. if pongpkt.ev != pongPacket {
  1061. return nil, errors.New("is not pong packet")
  1062. }
  1063. if pongpkt.remoteID != net.tab.self.ID {
  1064. return nil, errors.New("not signed by us")
  1065. }
  1066. // check that we previously authorised all topics
  1067. // that the other side is trying to register.
  1068. if rlpHash(data.Topics) != pongpkt.data.(*pong).TopicHash {
  1069. return nil, errors.New("topic hash mismatch")
  1070. }
  1071. if data.Idx < 0 || int(data.Idx) >= len(data.Topics) {
  1072. return nil, errors.New("topic index out of range")
  1073. }
  1074. return pongpkt.data.(*pong), nil
  1075. }
  1076. func rlpHash(x interface{}) (h common.Hash) {
  1077. hw := sha3.NewKeccak256()
  1078. rlp.Encode(hw, x)
  1079. hw.Sum(h[:0])
  1080. return h
  1081. }
  1082. func (net *Network) handleNeighboursPacket(n *Node, req *neighbors) error {
  1083. if n.pendingNeighbours == nil {
  1084. return errNoQuery
  1085. }
  1086. net.abortTimedEvent(n, neighboursTimeout)
  1087. nodes := make([]*Node, len(req.Nodes))
  1088. for i, rn := range req.Nodes {
  1089. nn, err := net.internNodeFromNeighbours(rn)
  1090. if err != nil {
  1091. glog.V(logger.Debug).Infof("invalid neighbour from %x: %v", n.ID[:8], err)
  1092. continue
  1093. }
  1094. nodes[i] = nn
  1095. // Start validation of query results immediately.
  1096. // This fills the table quickly.
  1097. // TODO: generates way too many packets, maybe do it via queue.
  1098. if nn.state == unknown {
  1099. net.transition(nn, verifyinit)
  1100. }
  1101. }
  1102. // TODO: don't ignore second packet
  1103. n.pendingNeighbours.reply <- nodes
  1104. n.pendingNeighbours = nil
  1105. // Now that this query is done, start the next one.
  1106. n.startNextQuery(net)
  1107. return nil
  1108. }