v5_udp.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package discover
  17. import (
  18. "bytes"
  19. "context"
  20. "crypto/ecdsa"
  21. crand "crypto/rand"
  22. "errors"
  23. "fmt"
  24. "io"
  25. "math"
  26. "net"
  27. "sync"
  28. "time"
  29. "github.com/ethereum/go-ethereum/common/mclock"
  30. "github.com/ethereum/go-ethereum/log"
  31. "github.com/ethereum/go-ethereum/p2p/discover/v5wire"
  32. "github.com/ethereum/go-ethereum/p2p/enode"
  33. "github.com/ethereum/go-ethereum/p2p/enr"
  34. "github.com/ethereum/go-ethereum/p2p/netutil"
  35. )
  36. const (
  37. lookupRequestLimit = 3 // max requests against a single node during lookup
  38. findnodeResultLimit = 16 // applies in FINDNODE handler
  39. totalNodesResponseLimit = 5 // applies in waitForNodes
  40. nodesResponseItemLimit = 3 // applies in sendNodes
  41. respTimeoutV5 = 700 * time.Millisecond
  42. )
  43. // codecV5 is implemented by v5wire.Codec (and testCodec).
  44. //
  45. // The UDPv5 transport is split into two objects: the codec object deals with
  46. // encoding/decoding and with the handshake; the UDPv5 object handles higher-level concerns.
  47. type codecV5 interface {
  48. // Encode encodes a packet.
  49. Encode(enode.ID, string, v5wire.Packet, *v5wire.Whoareyou) ([]byte, v5wire.Nonce, error)
  50. // decode decodes a packet. It returns a *v5wire.Unknown packet if decryption fails.
  51. // The *enode.Node return value is non-nil when the input contains a handshake response.
  52. Decode([]byte, string) (enode.ID, *enode.Node, v5wire.Packet, error)
  53. }
  54. // UDPv5 is the implementation of protocol version 5.
  55. type UDPv5 struct {
  56. // static fields
  57. conn UDPConn
  58. tab *Table
  59. netrestrict *netutil.Netlist
  60. priv *ecdsa.PrivateKey
  61. localNode *enode.LocalNode
  62. db *enode.DB
  63. log log.Logger
  64. clock mclock.Clock
  65. validSchemes enr.IdentityScheme
  66. // talkreq handler registry
  67. trlock sync.Mutex
  68. trhandlers map[string]TalkRequestHandler
  69. // channels into dispatch
  70. packetInCh chan ReadPacket
  71. readNextCh chan struct{}
  72. callCh chan *callV5
  73. callDoneCh chan *callV5
  74. respTimeoutCh chan *callTimeout
  75. // state of dispatch
  76. codec codecV5
  77. activeCallByNode map[enode.ID]*callV5
  78. activeCallByAuth map[v5wire.Nonce]*callV5
  79. callQueue map[enode.ID][]*callV5
  80. // shutdown stuff
  81. closeOnce sync.Once
  82. closeCtx context.Context
  83. cancelCloseCtx context.CancelFunc
  84. wg sync.WaitGroup
  85. }
  86. // TalkRequestHandler callback processes a talk request and optionally returns a reply
  87. type TalkRequestHandler func(enode.ID, *net.UDPAddr, []byte) []byte
  88. // callV5 represents a remote procedure call against another node.
  89. type callV5 struct {
  90. node *enode.Node
  91. packet v5wire.Packet
  92. responseType byte // expected packet type of response
  93. reqid []byte
  94. ch chan v5wire.Packet // responses sent here
  95. err chan error // errors sent here
  96. // Valid for active calls only:
  97. nonce v5wire.Nonce // nonce of request packet
  98. handshakeCount int // # times we attempted handshake for this call
  99. challenge *v5wire.Whoareyou // last sent handshake challenge
  100. timeout mclock.Timer
  101. }
  102. // callTimeout is the response timeout event of a call.
  103. type callTimeout struct {
  104. c *callV5
  105. timer mclock.Timer
  106. }
  107. // ListenV5 listens on the given connection.
  108. func ListenV5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) {
  109. t, err := newUDPv5(conn, ln, cfg)
  110. if err != nil {
  111. return nil, err
  112. }
  113. go t.tab.loop()
  114. t.wg.Add(2)
  115. go t.readLoop()
  116. go t.dispatch()
  117. return t, nil
  118. }
  119. // newUDPv5 creates a UDPv5 transport, but doesn't start any goroutines.
  120. func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) {
  121. closeCtx, cancelCloseCtx := context.WithCancel(context.Background())
  122. cfg = cfg.withDefaults()
  123. t := &UDPv5{
  124. // static fields
  125. conn: conn,
  126. localNode: ln,
  127. db: ln.Database(),
  128. netrestrict: cfg.NetRestrict,
  129. priv: cfg.PrivateKey,
  130. log: cfg.Log,
  131. validSchemes: cfg.ValidSchemes,
  132. clock: cfg.Clock,
  133. trhandlers: make(map[string]TalkRequestHandler),
  134. // channels into dispatch
  135. packetInCh: make(chan ReadPacket, 1),
  136. readNextCh: make(chan struct{}, 1),
  137. callCh: make(chan *callV5),
  138. callDoneCh: make(chan *callV5),
  139. respTimeoutCh: make(chan *callTimeout),
  140. // state of dispatch
  141. codec: v5wire.NewCodec(ln, cfg.PrivateKey, cfg.Clock),
  142. activeCallByNode: make(map[enode.ID]*callV5),
  143. activeCallByAuth: make(map[v5wire.Nonce]*callV5),
  144. callQueue: make(map[enode.ID][]*callV5),
  145. // shutdown
  146. closeCtx: closeCtx,
  147. cancelCloseCtx: cancelCloseCtx,
  148. }
  149. tab, err := newTable(t, t.db, cfg.Bootnodes, cfg.Log)
  150. if err != nil {
  151. return nil, err
  152. }
  153. t.tab = tab
  154. return t, nil
  155. }
  156. // Self returns the local node record.
  157. func (t *UDPv5) Self() *enode.Node {
  158. return t.localNode.Node()
  159. }
  160. // Close shuts down packet processing.
  161. func (t *UDPv5) Close() {
  162. t.closeOnce.Do(func() {
  163. t.cancelCloseCtx()
  164. t.conn.Close()
  165. t.wg.Wait()
  166. t.tab.close()
  167. })
  168. }
  169. // Ping sends a ping message to the given node.
  170. func (t *UDPv5) Ping(n *enode.Node) error {
  171. _, err := t.ping(n)
  172. return err
  173. }
  174. // Resolve searches for a specific node with the given ID and tries to get the most recent
  175. // version of the node record for it. It returns n if the node could not be resolved.
  176. func (t *UDPv5) Resolve(n *enode.Node) *enode.Node {
  177. if intable := t.tab.getNode(n.ID()); intable != nil && intable.Seq() > n.Seq() {
  178. n = intable
  179. }
  180. // Try asking directly. This works if the node is still responding on the endpoint we have.
  181. if resp, err := t.RequestENR(n); err == nil {
  182. return resp
  183. }
  184. // Otherwise do a network lookup.
  185. result := t.Lookup(n.ID())
  186. for _, rn := range result {
  187. if rn.ID() == n.ID() && rn.Seq() > n.Seq() {
  188. return rn
  189. }
  190. }
  191. return n
  192. }
  193. // AllNodes returns all the nodes stored in the local table.
  194. func (t *UDPv5) AllNodes() []*enode.Node {
  195. t.tab.mutex.Lock()
  196. defer t.tab.mutex.Unlock()
  197. nodes := make([]*enode.Node, 0)
  198. for _, b := range &t.tab.buckets {
  199. for _, n := range b.entries {
  200. nodes = append(nodes, unwrapNode(n))
  201. }
  202. }
  203. return nodes
  204. }
  205. // LocalNode returns the current local node running the
  206. // protocol.
  207. func (t *UDPv5) LocalNode() *enode.LocalNode {
  208. return t.localNode
  209. }
  210. // RegisterTalkHandler adds a handler for 'talk requests'. The handler function is called
  211. // whenever a request for the given protocol is received and should return the response
  212. // data or nil.
  213. func (t *UDPv5) RegisterTalkHandler(protocol string, handler TalkRequestHandler) {
  214. t.trlock.Lock()
  215. defer t.trlock.Unlock()
  216. t.trhandlers[protocol] = handler
  217. }
  218. // TalkRequest sends a talk request to n and waits for a response.
  219. func (t *UDPv5) TalkRequest(n *enode.Node, protocol string, request []byte) ([]byte, error) {
  220. req := &v5wire.TalkRequest{Protocol: protocol, Message: request}
  221. resp := t.call(n, v5wire.TalkResponseMsg, req)
  222. defer t.callDone(resp)
  223. select {
  224. case respMsg := <-resp.ch:
  225. return respMsg.(*v5wire.TalkResponse).Message, nil
  226. case err := <-resp.err:
  227. return nil, err
  228. }
  229. }
  230. // RandomNodes returns an iterator that finds random nodes in the DHT.
  231. func (t *UDPv5) RandomNodes() enode.Iterator {
  232. if t.tab.len() == 0 {
  233. // All nodes were dropped, refresh. The very first query will hit this
  234. // case and run the bootstrapping logic.
  235. <-t.tab.refresh()
  236. }
  237. return newLookupIterator(t.closeCtx, t.newRandomLookup)
  238. }
  239. // Lookup performs a recursive lookup for the given target.
  240. // It returns the closest nodes to target.
  241. func (t *UDPv5) Lookup(target enode.ID) []*enode.Node {
  242. return t.newLookup(t.closeCtx, target).run()
  243. }
  244. // lookupRandom looks up a random target.
  245. // This is needed to satisfy the transport interface.
  246. func (t *UDPv5) lookupRandom() []*enode.Node {
  247. return t.newRandomLookup(t.closeCtx).run()
  248. }
  249. // lookupSelf looks up our own node ID.
  250. // This is needed to satisfy the transport interface.
  251. func (t *UDPv5) lookupSelf() []*enode.Node {
  252. return t.newLookup(t.closeCtx, t.Self().ID()).run()
  253. }
  254. func (t *UDPv5) newRandomLookup(ctx context.Context) *lookup {
  255. var target enode.ID
  256. crand.Read(target[:])
  257. return t.newLookup(ctx, target)
  258. }
  259. func (t *UDPv5) newLookup(ctx context.Context, target enode.ID) *lookup {
  260. return newLookup(ctx, t.tab, target, func(n *node) ([]*node, error) {
  261. return t.lookupWorker(n, target)
  262. })
  263. }
  264. // lookupWorker performs FINDNODE calls against a single node during lookup.
  265. func (t *UDPv5) lookupWorker(destNode *node, target enode.ID) ([]*node, error) {
  266. var (
  267. dists = lookupDistances(target, destNode.ID())
  268. nodes = nodesByDistance{target: target}
  269. err error
  270. )
  271. var r []*enode.Node
  272. r, err = t.findnode(unwrapNode(destNode), dists)
  273. if errors.Is(err, errClosed) {
  274. return nil, err
  275. }
  276. for _, n := range r {
  277. if n.ID() != t.Self().ID() {
  278. nodes.push(wrapNode(n), findnodeResultLimit)
  279. }
  280. }
  281. return nodes.entries, err
  282. }
  283. // lookupDistances computes the distance parameter for FINDNODE calls to dest.
  284. // It chooses distances adjacent to logdist(target, dest), e.g. for a target
  285. // with logdist(target, dest) = 255 the result is [255, 256, 254].
  286. func lookupDistances(target, dest enode.ID) (dists []uint) {
  287. td := enode.LogDist(target, dest)
  288. dists = append(dists, uint(td))
  289. for i := 1; len(dists) < lookupRequestLimit; i++ {
  290. if td+i < 256 {
  291. dists = append(dists, uint(td+i))
  292. }
  293. if td-i > 0 {
  294. dists = append(dists, uint(td-i))
  295. }
  296. }
  297. return dists
  298. }
  299. // ping calls PING on a node and waits for a PONG response.
  300. func (t *UDPv5) ping(n *enode.Node) (uint64, error) {
  301. req := &v5wire.Ping{ENRSeq: t.localNode.Node().Seq()}
  302. resp := t.call(n, v5wire.PongMsg, req)
  303. defer t.callDone(resp)
  304. select {
  305. case pong := <-resp.ch:
  306. return pong.(*v5wire.Pong).ENRSeq, nil
  307. case err := <-resp.err:
  308. return 0, err
  309. }
  310. }
  311. // RequestENR requests n's record.
  312. func (t *UDPv5) RequestENR(n *enode.Node) (*enode.Node, error) {
  313. nodes, err := t.findnode(n, []uint{0})
  314. if err != nil {
  315. return nil, err
  316. }
  317. if len(nodes) != 1 {
  318. return nil, fmt.Errorf("%d nodes in response for distance zero", len(nodes))
  319. }
  320. return nodes[0], nil
  321. }
  322. // findnode calls FINDNODE on a node and waits for responses.
  323. func (t *UDPv5) findnode(n *enode.Node, distances []uint) ([]*enode.Node, error) {
  324. resp := t.call(n, v5wire.NodesMsg, &v5wire.Findnode{Distances: distances})
  325. return t.waitForNodes(resp, distances)
  326. }
  327. // waitForNodes waits for NODES responses to the given call.
  328. func (t *UDPv5) waitForNodes(c *callV5, distances []uint) ([]*enode.Node, error) {
  329. defer t.callDone(c)
  330. var (
  331. nodes []*enode.Node
  332. seen = make(map[enode.ID]struct{})
  333. received, total = 0, -1
  334. )
  335. for {
  336. select {
  337. case responseP := <-c.ch:
  338. response := responseP.(*v5wire.Nodes)
  339. for _, record := range response.Nodes {
  340. node, err := t.verifyResponseNode(c, record, distances, seen)
  341. if err != nil {
  342. t.log.Debug("Invalid record in "+response.Name(), "id", c.node.ID(), "err", err)
  343. continue
  344. }
  345. nodes = append(nodes, node)
  346. }
  347. if total == -1 {
  348. total = min(int(response.Total), totalNodesResponseLimit)
  349. }
  350. if received++; received == total {
  351. return nodes, nil
  352. }
  353. case err := <-c.err:
  354. return nodes, err
  355. }
  356. }
  357. }
  358. // verifyResponseNode checks validity of a record in a NODES response.
  359. func (t *UDPv5) verifyResponseNode(c *callV5, r *enr.Record, distances []uint, seen map[enode.ID]struct{}) (*enode.Node, error) {
  360. node, err := enode.New(t.validSchemes, r)
  361. if err != nil {
  362. return nil, err
  363. }
  364. if err := netutil.CheckRelayIP(c.node.IP(), node.IP()); err != nil {
  365. return nil, err
  366. }
  367. if t.netrestrict != nil && !t.netrestrict.Contains(node.IP()) {
  368. return nil, errors.New("not contained in netrestrict list")
  369. }
  370. if c.node.UDP() <= 1024 {
  371. return nil, errLowPort
  372. }
  373. if distances != nil {
  374. nd := enode.LogDist(c.node.ID(), node.ID())
  375. if !containsUint(uint(nd), distances) {
  376. return nil, errors.New("does not match any requested distance")
  377. }
  378. }
  379. if _, ok := seen[node.ID()]; ok {
  380. return nil, fmt.Errorf("duplicate record")
  381. }
  382. seen[node.ID()] = struct{}{}
  383. return node, nil
  384. }
  385. func containsUint(x uint, xs []uint) bool {
  386. for _, v := range xs {
  387. if x == v {
  388. return true
  389. }
  390. }
  391. return false
  392. }
  393. // call sends the given call and sets up a handler for response packets (of message type
  394. // responseType). Responses are dispatched to the call's response channel.
  395. func (t *UDPv5) call(node *enode.Node, responseType byte, packet v5wire.Packet) *callV5 {
  396. c := &callV5{
  397. node: node,
  398. packet: packet,
  399. responseType: responseType,
  400. reqid: make([]byte, 8),
  401. ch: make(chan v5wire.Packet, 1),
  402. err: make(chan error, 1),
  403. }
  404. // Assign request ID.
  405. crand.Read(c.reqid)
  406. packet.SetRequestID(c.reqid)
  407. // Send call to dispatch.
  408. select {
  409. case t.callCh <- c:
  410. case <-t.closeCtx.Done():
  411. c.err <- errClosed
  412. }
  413. return c
  414. }
  415. // callDone tells dispatch that the active call is done.
  416. func (t *UDPv5) callDone(c *callV5) {
  417. // This needs a loop because further responses may be incoming until the
  418. // send to callDoneCh has completed. Such responses need to be discarded
  419. // in order to avoid blocking the dispatch loop.
  420. for {
  421. select {
  422. case <-c.ch:
  423. // late response, discard.
  424. case <-c.err:
  425. // late error, discard.
  426. case t.callDoneCh <- c:
  427. return
  428. case <-t.closeCtx.Done():
  429. return
  430. }
  431. }
  432. }
  433. // dispatch runs in its own goroutine, handles incoming packets and deals with calls.
  434. //
  435. // For any destination node there is at most one 'active call', stored in the t.activeCall*
  436. // maps. A call is made active when it is sent. The active call can be answered by a
  437. // matching response, in which case c.ch receives the response; or by timing out, in which case
  438. // c.err receives the error. When the function that created the call signals the active
  439. // call is done through callDone, the next call from the call queue is started.
  440. //
  441. // Calls may also be answered by a WHOAREYOU packet referencing the call packet's authTag.
  442. // When that happens the call is simply re-sent to complete the handshake. We allow one
  443. // handshake attempt per call.
  444. func (t *UDPv5) dispatch() {
  445. defer t.wg.Done()
  446. // Arm first read.
  447. t.readNextCh <- struct{}{}
  448. for {
  449. select {
  450. case c := <-t.callCh:
  451. id := c.node.ID()
  452. t.callQueue[id] = append(t.callQueue[id], c)
  453. t.sendNextCall(id)
  454. case ct := <-t.respTimeoutCh:
  455. active := t.activeCallByNode[ct.c.node.ID()]
  456. if ct.c == active && ct.timer == active.timeout {
  457. ct.c.err <- errTimeout
  458. }
  459. case c := <-t.callDoneCh:
  460. id := c.node.ID()
  461. active := t.activeCallByNode[id]
  462. if active != c {
  463. panic("BUG: callDone for inactive call")
  464. }
  465. c.timeout.Stop()
  466. delete(t.activeCallByAuth, c.nonce)
  467. delete(t.activeCallByNode, id)
  468. t.sendNextCall(id)
  469. case p := <-t.packetInCh:
  470. t.handlePacket(p.Data, p.Addr)
  471. // Arm next read.
  472. t.readNextCh <- struct{}{}
  473. case <-t.closeCtx.Done():
  474. close(t.readNextCh)
  475. for id, queue := range t.callQueue {
  476. for _, c := range queue {
  477. c.err <- errClosed
  478. }
  479. delete(t.callQueue, id)
  480. }
  481. for id, c := range t.activeCallByNode {
  482. c.err <- errClosed
  483. delete(t.activeCallByNode, id)
  484. delete(t.activeCallByAuth, c.nonce)
  485. }
  486. return
  487. }
  488. }
  489. }
  490. // startResponseTimeout sets the response timer for a call.
  491. func (t *UDPv5) startResponseTimeout(c *callV5) {
  492. if c.timeout != nil {
  493. c.timeout.Stop()
  494. }
  495. var (
  496. timer mclock.Timer
  497. done = make(chan struct{})
  498. )
  499. timer = t.clock.AfterFunc(respTimeoutV5, func() {
  500. <-done
  501. select {
  502. case t.respTimeoutCh <- &callTimeout{c, timer}:
  503. case <-t.closeCtx.Done():
  504. }
  505. })
  506. c.timeout = timer
  507. close(done)
  508. }
  509. // sendNextCall sends the next call in the call queue if there is no active call.
  510. func (t *UDPv5) sendNextCall(id enode.ID) {
  511. queue := t.callQueue[id]
  512. if len(queue) == 0 || t.activeCallByNode[id] != nil {
  513. return
  514. }
  515. t.activeCallByNode[id] = queue[0]
  516. t.sendCall(t.activeCallByNode[id])
  517. if len(queue) == 1 {
  518. delete(t.callQueue, id)
  519. } else {
  520. copy(queue, queue[1:])
  521. t.callQueue[id] = queue[:len(queue)-1]
  522. }
  523. }
  524. // sendCall encodes and sends a request packet to the call's recipient node.
  525. // This performs a handshake if needed.
  526. func (t *UDPv5) sendCall(c *callV5) {
  527. // The call might have a nonce from a previous handshake attempt. Remove the entry for
  528. // the old nonce because we're about to generate a new nonce for this call.
  529. if c.nonce != (v5wire.Nonce{}) {
  530. delete(t.activeCallByAuth, c.nonce)
  531. }
  532. addr := &net.UDPAddr{IP: c.node.IP(), Port: c.node.UDP()}
  533. newNonce, _ := t.send(c.node.ID(), addr, c.packet, c.challenge)
  534. c.nonce = newNonce
  535. t.activeCallByAuth[newNonce] = c
  536. t.startResponseTimeout(c)
  537. }
  538. // sendResponse sends a response packet to the given node.
  539. // This doesn't trigger a handshake even if no keys are available.
  540. func (t *UDPv5) sendResponse(toID enode.ID, toAddr *net.UDPAddr, packet v5wire.Packet) error {
  541. _, err := t.send(toID, toAddr, packet, nil)
  542. return err
  543. }
  544. // send sends a packet to the given node.
  545. func (t *UDPv5) send(toID enode.ID, toAddr *net.UDPAddr, packet v5wire.Packet, c *v5wire.Whoareyou) (v5wire.Nonce, error) {
  546. addr := toAddr.String()
  547. enc, nonce, err := t.codec.Encode(toID, addr, packet, c)
  548. if err != nil {
  549. t.log.Warn(">> "+packet.Name(), "id", toID, "addr", addr, "err", err)
  550. return nonce, err
  551. }
  552. _, err = t.conn.WriteToUDP(enc, toAddr)
  553. t.log.Trace(">> "+packet.Name(), "id", toID, "addr", addr)
  554. return nonce, err
  555. }
  556. // readLoop runs in its own goroutine and reads packets from the network.
  557. func (t *UDPv5) readLoop() {
  558. defer t.wg.Done()
  559. buf := make([]byte, maxPacketSize)
  560. for range t.readNextCh {
  561. nbytes, from, err := t.conn.ReadFromUDP(buf)
  562. if netutil.IsTemporaryError(err) {
  563. // Ignore temporary read errors.
  564. t.log.Debug("Temporary UDP read error", "err", err)
  565. continue
  566. } else if err != nil {
  567. // Shut down the loop for permanent errors.
  568. if !errors.Is(err, io.EOF) {
  569. t.log.Debug("UDP read error", "err", err)
  570. }
  571. return
  572. }
  573. t.dispatchReadPacket(from, buf[:nbytes])
  574. }
  575. }
  576. // dispatchReadPacket sends a packet into the dispatch loop.
  577. func (t *UDPv5) dispatchReadPacket(from *net.UDPAddr, content []byte) bool {
  578. select {
  579. case t.packetInCh <- ReadPacket{content, from}:
  580. return true
  581. case <-t.closeCtx.Done():
  582. return false
  583. }
  584. }
  585. // handlePacket decodes and processes an incoming packet from the network.
  586. func (t *UDPv5) handlePacket(rawpacket []byte, fromAddr *net.UDPAddr) error {
  587. addr := fromAddr.String()
  588. fromID, fromNode, packet, err := t.codec.Decode(rawpacket, addr)
  589. if err != nil {
  590. t.log.Debug("Bad discv5 packet", "id", fromID, "addr", addr, "err", err)
  591. return err
  592. }
  593. if fromNode != nil {
  594. // Handshake succeeded, add to table.
  595. t.tab.addSeenNode(wrapNode(fromNode))
  596. }
  597. if packet.Kind() != v5wire.WhoareyouPacket {
  598. // WHOAREYOU logged separately to report errors.
  599. t.log.Trace("<< "+packet.Name(), "id", fromID, "addr", addr)
  600. }
  601. t.handle(packet, fromID, fromAddr)
  602. return nil
  603. }
  604. // handleCallResponse dispatches a response packet to the call waiting for it.
  605. func (t *UDPv5) handleCallResponse(fromID enode.ID, fromAddr *net.UDPAddr, p v5wire.Packet) bool {
  606. ac := t.activeCallByNode[fromID]
  607. if ac == nil || !bytes.Equal(p.RequestID(), ac.reqid) {
  608. t.log.Debug(fmt.Sprintf("Unsolicited/late %s response", p.Name()), "id", fromID, "addr", fromAddr)
  609. return false
  610. }
  611. if !fromAddr.IP.Equal(ac.node.IP()) || fromAddr.Port != ac.node.UDP() {
  612. t.log.Debug(fmt.Sprintf("%s from wrong endpoint", p.Name()), "id", fromID, "addr", fromAddr)
  613. return false
  614. }
  615. if p.Kind() != ac.responseType {
  616. t.log.Debug(fmt.Sprintf("Wrong discv5 response type %s", p.Name()), "id", fromID, "addr", fromAddr)
  617. return false
  618. }
  619. t.startResponseTimeout(ac)
  620. ac.ch <- p
  621. return true
  622. }
  623. // getNode looks for a node record in table and database.
  624. func (t *UDPv5) getNode(id enode.ID) *enode.Node {
  625. if n := t.tab.getNode(id); n != nil {
  626. return n
  627. }
  628. if n := t.localNode.Database().Node(id); n != nil {
  629. return n
  630. }
  631. return nil
  632. }
  633. // handle processes incoming packets according to their message type.
  634. func (t *UDPv5) handle(p v5wire.Packet, fromID enode.ID, fromAddr *net.UDPAddr) {
  635. switch p := p.(type) {
  636. case *v5wire.Unknown:
  637. t.handleUnknown(p, fromID, fromAddr)
  638. case *v5wire.Whoareyou:
  639. t.handleWhoareyou(p, fromID, fromAddr)
  640. case *v5wire.Ping:
  641. t.handlePing(p, fromID, fromAddr)
  642. case *v5wire.Pong:
  643. if t.handleCallResponse(fromID, fromAddr, p) {
  644. t.localNode.UDPEndpointStatement(fromAddr, &net.UDPAddr{IP: p.ToIP, Port: int(p.ToPort)})
  645. }
  646. case *v5wire.Findnode:
  647. t.handleFindnode(p, fromID, fromAddr)
  648. case *v5wire.Nodes:
  649. t.handleCallResponse(fromID, fromAddr, p)
  650. case *v5wire.TalkRequest:
  651. t.handleTalkRequest(p, fromID, fromAddr)
  652. case *v5wire.TalkResponse:
  653. t.handleCallResponse(fromID, fromAddr, p)
  654. }
  655. }
  656. // handleUnknown initiates a handshake by responding with WHOAREYOU.
  657. func (t *UDPv5) handleUnknown(p *v5wire.Unknown, fromID enode.ID, fromAddr *net.UDPAddr) {
  658. challenge := &v5wire.Whoareyou{Nonce: p.Nonce}
  659. crand.Read(challenge.IDNonce[:])
  660. if n := t.getNode(fromID); n != nil {
  661. challenge.Node = n
  662. challenge.RecordSeq = n.Seq()
  663. }
  664. t.sendResponse(fromID, fromAddr, challenge)
  665. }
  666. var (
  667. errChallengeNoCall = errors.New("no matching call")
  668. errChallengeTwice = errors.New("second handshake")
  669. )
  670. // handleWhoareyou resends the active call as a handshake packet.
  671. func (t *UDPv5) handleWhoareyou(p *v5wire.Whoareyou, fromID enode.ID, fromAddr *net.UDPAddr) {
  672. c, err := t.matchWithCall(fromID, p.Nonce)
  673. if err != nil {
  674. t.log.Debug("Invalid "+p.Name(), "addr", fromAddr, "err", err)
  675. return
  676. }
  677. // Resend the call that was answered by WHOAREYOU.
  678. t.log.Trace("<< "+p.Name(), "id", c.node.ID(), "addr", fromAddr)
  679. c.handshakeCount++
  680. c.challenge = p
  681. p.Node = c.node
  682. t.sendCall(c)
  683. }
  684. // matchWithCall checks whether a handshake attempt matches the active call.
  685. func (t *UDPv5) matchWithCall(fromID enode.ID, nonce v5wire.Nonce) (*callV5, error) {
  686. c := t.activeCallByAuth[nonce]
  687. if c == nil {
  688. return nil, errChallengeNoCall
  689. }
  690. if c.handshakeCount > 0 {
  691. return nil, errChallengeTwice
  692. }
  693. return c, nil
  694. }
  695. // handlePing sends a PONG response.
  696. func (t *UDPv5) handlePing(p *v5wire.Ping, fromID enode.ID, fromAddr *net.UDPAddr) {
  697. remoteIP := fromAddr.IP
  698. // Handle IPv4 mapped IPv6 addresses in the
  699. // event the local node is binded to an
  700. // ipv6 interface.
  701. if remoteIP.To4() != nil {
  702. remoteIP = remoteIP.To4()
  703. }
  704. t.sendResponse(fromID, fromAddr, &v5wire.Pong{
  705. ReqID: p.ReqID,
  706. ToIP: remoteIP,
  707. ToPort: uint16(fromAddr.Port),
  708. ENRSeq: t.localNode.Node().Seq(),
  709. })
  710. }
  711. // handleFindnode returns nodes to the requester.
  712. func (t *UDPv5) handleFindnode(p *v5wire.Findnode, fromID enode.ID, fromAddr *net.UDPAddr) {
  713. nodes := t.collectTableNodes(fromAddr.IP, p.Distances, findnodeResultLimit)
  714. for _, resp := range packNodes(p.ReqID, nodes) {
  715. t.sendResponse(fromID, fromAddr, resp)
  716. }
  717. }
  718. // collectTableNodes creates a FINDNODE result set for the given distances.
  719. func (t *UDPv5) collectTableNodes(rip net.IP, distances []uint, limit int) []*enode.Node {
  720. var nodes []*enode.Node
  721. var processed = make(map[uint]struct{})
  722. for _, dist := range distances {
  723. // Reject duplicate / invalid distances.
  724. _, seen := processed[dist]
  725. if seen || dist > 256 {
  726. continue
  727. }
  728. // Get the nodes.
  729. var bn []*enode.Node
  730. if dist == 0 {
  731. bn = []*enode.Node{t.Self()}
  732. } else if dist <= 256 {
  733. t.tab.mutex.Lock()
  734. bn = unwrapNodes(t.tab.bucketAtDistance(int(dist)).entries)
  735. t.tab.mutex.Unlock()
  736. }
  737. processed[dist] = struct{}{}
  738. // Apply some pre-checks to avoid sending invalid nodes.
  739. for _, n := range bn {
  740. // TODO livenessChecks > 1
  741. if netutil.CheckRelayIP(rip, n.IP()) != nil {
  742. continue
  743. }
  744. nodes = append(nodes, n)
  745. if len(nodes) >= limit {
  746. return nodes
  747. }
  748. }
  749. }
  750. return nodes
  751. }
  752. // packNodes creates NODES response packets for the given node list.
  753. func packNodes(reqid []byte, nodes []*enode.Node) []*v5wire.Nodes {
  754. if len(nodes) == 0 {
  755. return []*v5wire.Nodes{{ReqID: reqid, Total: 1}}
  756. }
  757. total := uint8(math.Ceil(float64(len(nodes)) / 3))
  758. var resp []*v5wire.Nodes
  759. for len(nodes) > 0 {
  760. p := &v5wire.Nodes{ReqID: reqid, Total: total}
  761. items := min(nodesResponseItemLimit, len(nodes))
  762. for i := 0; i < items; i++ {
  763. p.Nodes = append(p.Nodes, nodes[i].Record())
  764. }
  765. nodes = nodes[items:]
  766. resp = append(resp, p)
  767. }
  768. return resp
  769. }
  770. // handleTalkRequest runs the talk request handler of the requested protocol.
  771. func (t *UDPv5) handleTalkRequest(p *v5wire.TalkRequest, fromID enode.ID, fromAddr *net.UDPAddr) {
  772. t.trlock.Lock()
  773. handler := t.trhandlers[p.Protocol]
  774. t.trlock.Unlock()
  775. var response []byte
  776. if handler != nil {
  777. response = handler(fromID, fromAddr, p.Message)
  778. }
  779. resp := &v5wire.TalkResponse{ReqID: p.ReqID, Message: response}
  780. t.sendResponse(fromID, fromAddr, resp)
  781. }