v5_udp.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package discover
  17. import (
  18. "bytes"
  19. "context"
  20. "crypto/ecdsa"
  21. crand "crypto/rand"
  22. "errors"
  23. "fmt"
  24. "io"
  25. "math"
  26. "net"
  27. "sync"
  28. "time"
  29. "github.com/ethereum/go-ethereum/common/mclock"
  30. "github.com/ethereum/go-ethereum/log"
  31. "github.com/ethereum/go-ethereum/p2p/enode"
  32. "github.com/ethereum/go-ethereum/p2p/enr"
  33. "github.com/ethereum/go-ethereum/p2p/netutil"
  34. )
  35. const (
  36. lookupRequestLimit = 3 // max requests against a single node during lookup
  37. findnodeResultLimit = 15 // applies in FINDNODE handler
  38. totalNodesResponseLimit = 5 // applies in waitForNodes
  39. nodesResponseItemLimit = 3 // applies in sendNodes
  40. respTimeoutV5 = 700 * time.Millisecond
  41. )
  42. // codecV5 is implemented by wireCodec (and testCodec).
  43. //
  44. // The UDPv5 transport is split into two objects: the codec object deals with
  45. // encoding/decoding and with the handshake; the UDPv5 object handles higher-level concerns.
  46. type codecV5 interface {
  47. // encode encodes a packet. The 'challenge' parameter is non-nil for calls which got a
  48. // WHOAREYOU response.
  49. encode(fromID enode.ID, fromAddr string, p packetV5, challenge *whoareyouV5) (enc []byte, authTag []byte, err error)
  50. // decode decodes a packet. It returns an *unknownV5 packet if decryption fails.
  51. // The fromNode return value is non-nil when the input contains a handshake response.
  52. decode(input []byte, fromAddr string) (fromID enode.ID, fromNode *enode.Node, p packetV5, err error)
  53. }
  54. // packetV5 is implemented by all discv5 packet type structs.
  55. type packetV5 interface {
  56. // These methods provide information and set the request ID.
  57. name() string
  58. kind() byte
  59. setreqid([]byte)
  60. // handle should perform the appropriate action to handle the packet, i.e. this is the
  61. // place to send the response.
  62. handle(t *UDPv5, fromID enode.ID, fromAddr *net.UDPAddr)
  63. }
  64. // UDPv5 is the implementation of protocol version 5.
  65. type UDPv5 struct {
  66. // static fields
  67. conn UDPConn
  68. tab *Table
  69. netrestrict *netutil.Netlist
  70. priv *ecdsa.PrivateKey
  71. localNode *enode.LocalNode
  72. db *enode.DB
  73. log log.Logger
  74. clock mclock.Clock
  75. validSchemes enr.IdentityScheme
  76. // channels into dispatch
  77. packetInCh chan ReadPacket
  78. readNextCh chan struct{}
  79. callCh chan *callV5
  80. callDoneCh chan *callV5
  81. respTimeoutCh chan *callTimeout
  82. // state of dispatch
  83. codec codecV5
  84. activeCallByNode map[enode.ID]*callV5
  85. activeCallByAuth map[string]*callV5
  86. callQueue map[enode.ID][]*callV5
  87. // shutdown stuff
  88. closeOnce sync.Once
  89. closeCtx context.Context
  90. cancelCloseCtx context.CancelFunc
  91. wg sync.WaitGroup
  92. }
  93. // callV5 represents a remote procedure call against another node.
  94. type callV5 struct {
  95. node *enode.Node
  96. packet packetV5
  97. responseType byte // expected packet type of response
  98. reqid []byte
  99. ch chan packetV5 // responses sent here
  100. err chan error // errors sent here
  101. // Valid for active calls only:
  102. authTag []byte // authTag of request packet
  103. handshakeCount int // # times we attempted handshake for this call
  104. challenge *whoareyouV5 // last sent handshake challenge
  105. timeout mclock.Timer
  106. }
  107. // callTimeout is the response timeout event of a call.
  108. type callTimeout struct {
  109. c *callV5
  110. timer mclock.Timer
  111. }
  112. // ListenV5 listens on the given connection.
  113. func ListenV5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) {
  114. t, err := newUDPv5(conn, ln, cfg)
  115. if err != nil {
  116. return nil, err
  117. }
  118. go t.tab.loop()
  119. t.wg.Add(2)
  120. go t.readLoop()
  121. go t.dispatch()
  122. return t, nil
  123. }
  124. // newUDPv5 creates a UDPv5 transport, but doesn't start any goroutines.
  125. func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) {
  126. closeCtx, cancelCloseCtx := context.WithCancel(context.Background())
  127. cfg = cfg.withDefaults()
  128. t := &UDPv5{
  129. // static fields
  130. conn: conn,
  131. localNode: ln,
  132. db: ln.Database(),
  133. netrestrict: cfg.NetRestrict,
  134. priv: cfg.PrivateKey,
  135. log: cfg.Log,
  136. validSchemes: cfg.ValidSchemes,
  137. clock: cfg.Clock,
  138. // channels into dispatch
  139. packetInCh: make(chan ReadPacket, 1),
  140. readNextCh: make(chan struct{}, 1),
  141. callCh: make(chan *callV5),
  142. callDoneCh: make(chan *callV5),
  143. respTimeoutCh: make(chan *callTimeout),
  144. // state of dispatch
  145. codec: newWireCodec(ln, cfg.PrivateKey, cfg.Clock),
  146. activeCallByNode: make(map[enode.ID]*callV5),
  147. activeCallByAuth: make(map[string]*callV5),
  148. callQueue: make(map[enode.ID][]*callV5),
  149. // shutdown
  150. closeCtx: closeCtx,
  151. cancelCloseCtx: cancelCloseCtx,
  152. }
  153. tab, err := newTable(t, t.db, cfg.Bootnodes, cfg.Log)
  154. if err != nil {
  155. return nil, err
  156. }
  157. t.tab = tab
  158. return t, nil
  159. }
  160. // Self returns the local node record.
  161. func (t *UDPv5) Self() *enode.Node {
  162. return t.localNode.Node()
  163. }
  164. // Close shuts down packet processing.
  165. func (t *UDPv5) Close() {
  166. t.closeOnce.Do(func() {
  167. t.cancelCloseCtx()
  168. t.conn.Close()
  169. t.wg.Wait()
  170. t.tab.close()
  171. })
  172. }
  173. // Ping sends a ping message to the given node.
  174. func (t *UDPv5) Ping(n *enode.Node) error {
  175. _, err := t.ping(n)
  176. return err
  177. }
  178. // Resolve searches for a specific node with the given ID and tries to get the most recent
  179. // version of the node record for it. It returns n if the node could not be resolved.
  180. func (t *UDPv5) Resolve(n *enode.Node) *enode.Node {
  181. if intable := t.tab.getNode(n.ID()); intable != nil && intable.Seq() > n.Seq() {
  182. n = intable
  183. }
  184. // Try asking directly. This works if the node is still responding on the endpoint we have.
  185. if resp, err := t.RequestENR(n); err == nil {
  186. return resp
  187. }
  188. // Otherwise do a network lookup.
  189. result := t.Lookup(n.ID())
  190. for _, rn := range result {
  191. if rn.ID() == n.ID() && rn.Seq() > n.Seq() {
  192. return rn
  193. }
  194. }
  195. return n
  196. }
  197. // AllNodes returns all the nodes stored in the local table.
  198. func (t *UDPv5) AllNodes() []*enode.Node {
  199. t.tab.mutex.Lock()
  200. defer t.tab.mutex.Unlock()
  201. nodes := make([]*enode.Node, 0)
  202. for _, b := range &t.tab.buckets {
  203. for _, n := range b.entries {
  204. nodes = append(nodes, unwrapNode(n))
  205. }
  206. }
  207. return nodes
  208. }
  209. // LocalNode returns the current local node running the
  210. // protocol.
  211. func (t *UDPv5) LocalNode() *enode.LocalNode {
  212. return t.localNode
  213. }
  214. func (t *UDPv5) RandomNodes() enode.Iterator {
  215. if t.tab.len() == 0 {
  216. // All nodes were dropped, refresh. The very first query will hit this
  217. // case and run the bootstrapping logic.
  218. <-t.tab.refresh()
  219. }
  220. return newLookupIterator(t.closeCtx, t.newRandomLookup)
  221. }
  222. // Lookup performs a recursive lookup for the given target.
  223. // It returns the closest nodes to target.
  224. func (t *UDPv5) Lookup(target enode.ID) []*enode.Node {
  225. return t.newLookup(t.closeCtx, target).run()
  226. }
  227. // lookupRandom looks up a random target.
  228. // This is needed to satisfy the transport interface.
  229. func (t *UDPv5) lookupRandom() []*enode.Node {
  230. return t.newRandomLookup(t.closeCtx).run()
  231. }
  232. // lookupSelf looks up our own node ID.
  233. // This is needed to satisfy the transport interface.
  234. func (t *UDPv5) lookupSelf() []*enode.Node {
  235. return t.newLookup(t.closeCtx, t.Self().ID()).run()
  236. }
  237. func (t *UDPv5) newRandomLookup(ctx context.Context) *lookup {
  238. var target enode.ID
  239. crand.Read(target[:])
  240. return t.newLookup(ctx, target)
  241. }
  242. func (t *UDPv5) newLookup(ctx context.Context, target enode.ID) *lookup {
  243. return newLookup(ctx, t.tab, target, func(n *node) ([]*node, error) {
  244. return t.lookupWorker(n, target)
  245. })
  246. }
  247. // lookupWorker performs FINDNODE calls against a single node during lookup.
  248. func (t *UDPv5) lookupWorker(destNode *node, target enode.ID) ([]*node, error) {
  249. var (
  250. dists = lookupDistances(target, destNode.ID())
  251. nodes = nodesByDistance{target: target}
  252. err error
  253. )
  254. for i := 0; i < lookupRequestLimit && len(nodes.entries) < findnodeResultLimit; i++ {
  255. var r []*enode.Node
  256. r, err = t.findnode(unwrapNode(destNode), dists[i])
  257. if err == errClosed {
  258. return nil, err
  259. }
  260. for _, n := range r {
  261. if n.ID() != t.Self().ID() {
  262. nodes.push(wrapNode(n), findnodeResultLimit)
  263. }
  264. }
  265. }
  266. return nodes.entries, err
  267. }
  268. // lookupDistances computes the distance parameter for FINDNODE calls to dest.
  269. // It chooses distances adjacent to logdist(target, dest), e.g. for a target
  270. // with logdist(target, dest) = 255 the result is [255, 256, 254].
  271. func lookupDistances(target, dest enode.ID) (dists []int) {
  272. td := enode.LogDist(target, dest)
  273. dists = append(dists, td)
  274. for i := 1; len(dists) < lookupRequestLimit; i++ {
  275. if td+i < 256 {
  276. dists = append(dists, td+i)
  277. }
  278. if td-i > 0 {
  279. dists = append(dists, td-i)
  280. }
  281. }
  282. return dists
  283. }
  284. // ping calls PING on a node and waits for a PONG response.
  285. func (t *UDPv5) ping(n *enode.Node) (uint64, error) {
  286. resp := t.call(n, p_pongV5, &pingV5{ENRSeq: t.localNode.Node().Seq()})
  287. defer t.callDone(resp)
  288. select {
  289. case pong := <-resp.ch:
  290. return pong.(*pongV5).ENRSeq, nil
  291. case err := <-resp.err:
  292. return 0, err
  293. }
  294. }
  295. // requestENR requests n's record.
  296. func (t *UDPv5) RequestENR(n *enode.Node) (*enode.Node, error) {
  297. nodes, err := t.findnode(n, 0)
  298. if err != nil {
  299. return nil, err
  300. }
  301. if len(nodes) != 1 {
  302. return nil, fmt.Errorf("%d nodes in response for distance zero", len(nodes))
  303. }
  304. return nodes[0], nil
  305. }
  306. // requestTicket calls REQUESTTICKET on a node and waits for a TICKET response.
  307. func (t *UDPv5) requestTicket(n *enode.Node) ([]byte, error) {
  308. resp := t.call(n, p_ticketV5, &pingV5{})
  309. defer t.callDone(resp)
  310. select {
  311. case response := <-resp.ch:
  312. return response.(*ticketV5).Ticket, nil
  313. case err := <-resp.err:
  314. return nil, err
  315. }
  316. }
  317. // findnode calls FINDNODE on a node and waits for responses.
  318. func (t *UDPv5) findnode(n *enode.Node, distance int) ([]*enode.Node, error) {
  319. resp := t.call(n, p_nodesV5, &findnodeV5{Distance: uint(distance)})
  320. return t.waitForNodes(resp, distance)
  321. }
  322. // waitForNodes waits for NODES responses to the given call.
  323. func (t *UDPv5) waitForNodes(c *callV5, distance int) ([]*enode.Node, error) {
  324. defer t.callDone(c)
  325. var (
  326. nodes []*enode.Node
  327. seen = make(map[enode.ID]struct{})
  328. received, total = 0, -1
  329. )
  330. for {
  331. select {
  332. case responseP := <-c.ch:
  333. response := responseP.(*nodesV5)
  334. for _, record := range response.Nodes {
  335. node, err := t.verifyResponseNode(c, record, distance, seen)
  336. if err != nil {
  337. t.log.Debug("Invalid record in "+response.name(), "id", c.node.ID(), "err", err)
  338. continue
  339. }
  340. nodes = append(nodes, node)
  341. }
  342. if total == -1 {
  343. total = min(int(response.Total), totalNodesResponseLimit)
  344. }
  345. if received++; received == total {
  346. return nodes, nil
  347. }
  348. case err := <-c.err:
  349. return nodes, err
  350. }
  351. }
  352. }
  353. // verifyResponseNode checks validity of a record in a NODES response.
  354. func (t *UDPv5) verifyResponseNode(c *callV5, r *enr.Record, distance int, seen map[enode.ID]struct{}) (*enode.Node, error) {
  355. node, err := enode.New(t.validSchemes, r)
  356. if err != nil {
  357. return nil, err
  358. }
  359. if err := netutil.CheckRelayIP(c.node.IP(), node.IP()); err != nil {
  360. return nil, err
  361. }
  362. if c.node.UDP() <= 1024 {
  363. return nil, errLowPort
  364. }
  365. if distance != -1 {
  366. if d := enode.LogDist(c.node.ID(), node.ID()); d != distance {
  367. return nil, fmt.Errorf("wrong distance %d", d)
  368. }
  369. }
  370. if _, ok := seen[node.ID()]; ok {
  371. return nil, fmt.Errorf("duplicate record")
  372. }
  373. seen[node.ID()] = struct{}{}
  374. return node, nil
  375. }
  376. // call sends the given call and sets up a handler for response packets (of type c.responseType).
  377. // Responses are dispatched to the call's response channel.
  378. func (t *UDPv5) call(node *enode.Node, responseType byte, packet packetV5) *callV5 {
  379. c := &callV5{
  380. node: node,
  381. packet: packet,
  382. responseType: responseType,
  383. reqid: make([]byte, 8),
  384. ch: make(chan packetV5, 1),
  385. err: make(chan error, 1),
  386. }
  387. // Assign request ID.
  388. crand.Read(c.reqid)
  389. packet.setreqid(c.reqid)
  390. // Send call to dispatch.
  391. select {
  392. case t.callCh <- c:
  393. case <-t.closeCtx.Done():
  394. c.err <- errClosed
  395. }
  396. return c
  397. }
  398. // callDone tells dispatch that the active call is done.
  399. func (t *UDPv5) callDone(c *callV5) {
  400. select {
  401. case t.callDoneCh <- c:
  402. case <-t.closeCtx.Done():
  403. }
  404. }
  405. // dispatch runs in its own goroutine, handles incoming packets and deals with calls.
  406. //
  407. // For any destination node there is at most one 'active call', stored in the t.activeCall*
  408. // maps. A call is made active when it is sent. The active call can be answered by a
  409. // matching response, in which case c.ch receives the response; or by timing out, in which case
  410. // c.err receives the error. When the function that created the call signals the active
  411. // call is done through callDone, the next call from the call queue is started.
  412. //
  413. // Calls may also be answered by a WHOAREYOU packet referencing the call packet's authTag.
  414. // When that happens the call is simply re-sent to complete the handshake. We allow one
  415. // handshake attempt per call.
  416. func (t *UDPv5) dispatch() {
  417. defer t.wg.Done()
  418. // Arm first read.
  419. t.readNextCh <- struct{}{}
  420. for {
  421. select {
  422. case c := <-t.callCh:
  423. id := c.node.ID()
  424. t.callQueue[id] = append(t.callQueue[id], c)
  425. t.sendNextCall(id)
  426. case ct := <-t.respTimeoutCh:
  427. active := t.activeCallByNode[ct.c.node.ID()]
  428. if ct.c == active && ct.timer == active.timeout {
  429. ct.c.err <- errTimeout
  430. }
  431. case c := <-t.callDoneCh:
  432. id := c.node.ID()
  433. active := t.activeCallByNode[id]
  434. if active != c {
  435. panic("BUG: callDone for inactive call")
  436. }
  437. c.timeout.Stop()
  438. delete(t.activeCallByAuth, string(c.authTag))
  439. delete(t.activeCallByNode, id)
  440. t.sendNextCall(id)
  441. case p := <-t.packetInCh:
  442. t.handlePacket(p.Data, p.Addr)
  443. // Arm next read.
  444. t.readNextCh <- struct{}{}
  445. case <-t.closeCtx.Done():
  446. close(t.readNextCh)
  447. for id, queue := range t.callQueue {
  448. for _, c := range queue {
  449. c.err <- errClosed
  450. }
  451. delete(t.callQueue, id)
  452. }
  453. for id, c := range t.activeCallByNode {
  454. c.err <- errClosed
  455. delete(t.activeCallByNode, id)
  456. delete(t.activeCallByAuth, string(c.authTag))
  457. }
  458. return
  459. }
  460. }
  461. }
  462. // startResponseTimeout sets the response timer for a call.
  463. func (t *UDPv5) startResponseTimeout(c *callV5) {
  464. if c.timeout != nil {
  465. c.timeout.Stop()
  466. }
  467. var (
  468. timer mclock.Timer
  469. done = make(chan struct{})
  470. )
  471. timer = t.clock.AfterFunc(respTimeoutV5, func() {
  472. <-done
  473. select {
  474. case t.respTimeoutCh <- &callTimeout{c, timer}:
  475. case <-t.closeCtx.Done():
  476. }
  477. })
  478. c.timeout = timer
  479. close(done)
  480. }
  481. // sendNextCall sends the next call in the call queue if there is no active call.
  482. func (t *UDPv5) sendNextCall(id enode.ID) {
  483. queue := t.callQueue[id]
  484. if len(queue) == 0 || t.activeCallByNode[id] != nil {
  485. return
  486. }
  487. t.activeCallByNode[id] = queue[0]
  488. t.sendCall(t.activeCallByNode[id])
  489. if len(queue) == 1 {
  490. delete(t.callQueue, id)
  491. } else {
  492. copy(queue, queue[1:])
  493. t.callQueue[id] = queue[:len(queue)-1]
  494. }
  495. }
  496. // sendCall encodes and sends a request packet to the call's recipient node.
  497. // This performs a handshake if needed.
  498. func (t *UDPv5) sendCall(c *callV5) {
  499. if len(c.authTag) > 0 {
  500. // The call already has an authTag from a previous handshake attempt. Remove the
  501. // entry for the authTag because we're about to generate a new authTag for this
  502. // call.
  503. delete(t.activeCallByAuth, string(c.authTag))
  504. }
  505. addr := &net.UDPAddr{IP: c.node.IP(), Port: c.node.UDP()}
  506. newTag, _ := t.send(c.node.ID(), addr, c.packet, c.challenge)
  507. c.authTag = newTag
  508. t.activeCallByAuth[string(c.authTag)] = c
  509. t.startResponseTimeout(c)
  510. }
  511. // sendResponse sends a response packet to the given node.
  512. // This doesn't trigger a handshake even if no keys are available.
  513. func (t *UDPv5) sendResponse(toID enode.ID, toAddr *net.UDPAddr, packet packetV5) error {
  514. _, err := t.send(toID, toAddr, packet, nil)
  515. return err
  516. }
  517. // send sends a packet to the given node.
  518. func (t *UDPv5) send(toID enode.ID, toAddr *net.UDPAddr, packet packetV5, c *whoareyouV5) ([]byte, error) {
  519. addr := toAddr.String()
  520. enc, authTag, err := t.codec.encode(toID, addr, packet, c)
  521. if err != nil {
  522. t.log.Warn(">> "+packet.name(), "id", toID, "addr", addr, "err", err)
  523. return authTag, err
  524. }
  525. _, err = t.conn.WriteToUDP(enc, toAddr)
  526. t.log.Trace(">> "+packet.name(), "id", toID, "addr", addr)
  527. return authTag, err
  528. }
  529. // readLoop runs in its own goroutine and reads packets from the network.
  530. func (t *UDPv5) readLoop() {
  531. defer t.wg.Done()
  532. buf := make([]byte, maxPacketSize)
  533. for range t.readNextCh {
  534. nbytes, from, err := t.conn.ReadFromUDP(buf)
  535. if netutil.IsTemporaryError(err) {
  536. // Ignore temporary read errors.
  537. t.log.Debug("Temporary UDP read error", "err", err)
  538. continue
  539. } else if err != nil {
  540. // Shut down the loop for permament errors.
  541. if err != io.EOF {
  542. t.log.Debug("UDP read error", "err", err)
  543. }
  544. return
  545. }
  546. t.dispatchReadPacket(from, buf[:nbytes])
  547. }
  548. }
  549. // dispatchReadPacket sends a packet into the dispatch loop.
  550. func (t *UDPv5) dispatchReadPacket(from *net.UDPAddr, content []byte) bool {
  551. select {
  552. case t.packetInCh <- ReadPacket{content, from}:
  553. return true
  554. case <-t.closeCtx.Done():
  555. return false
  556. }
  557. }
  558. // handlePacket decodes and processes an incoming packet from the network.
  559. func (t *UDPv5) handlePacket(rawpacket []byte, fromAddr *net.UDPAddr) error {
  560. addr := fromAddr.String()
  561. fromID, fromNode, packet, err := t.codec.decode(rawpacket, addr)
  562. if err != nil {
  563. t.log.Debug("Bad discv5 packet", "id", fromID, "addr", addr, "err", err)
  564. return err
  565. }
  566. if fromNode != nil {
  567. // Handshake succeeded, add to table.
  568. t.tab.addSeenNode(wrapNode(fromNode))
  569. }
  570. if packet.kind() != p_whoareyouV5 {
  571. // WHOAREYOU logged separately to report the sender ID.
  572. t.log.Trace("<< "+packet.name(), "id", fromID, "addr", addr)
  573. }
  574. packet.handle(t, fromID, fromAddr)
  575. return nil
  576. }
  577. // handleCallResponse dispatches a response packet to the call waiting for it.
  578. func (t *UDPv5) handleCallResponse(fromID enode.ID, fromAddr *net.UDPAddr, reqid []byte, p packetV5) {
  579. ac := t.activeCallByNode[fromID]
  580. if ac == nil || !bytes.Equal(reqid, ac.reqid) {
  581. t.log.Debug(fmt.Sprintf("Unsolicited/late %s response", p.name()), "id", fromID, "addr", fromAddr)
  582. return
  583. }
  584. if !fromAddr.IP.Equal(ac.node.IP()) || fromAddr.Port != ac.node.UDP() {
  585. t.log.Debug(fmt.Sprintf("%s from wrong endpoint", p.name()), "id", fromID, "addr", fromAddr)
  586. return
  587. }
  588. if p.kind() != ac.responseType {
  589. t.log.Debug(fmt.Sprintf("Wrong disv5 response type %s", p.name()), "id", fromID, "addr", fromAddr)
  590. return
  591. }
  592. t.startResponseTimeout(ac)
  593. ac.ch <- p
  594. }
  595. // getNode looks for a node record in table and database.
  596. func (t *UDPv5) getNode(id enode.ID) *enode.Node {
  597. if n := t.tab.getNode(id); n != nil {
  598. return n
  599. }
  600. if n := t.localNode.Database().Node(id); n != nil {
  601. return n
  602. }
  603. return nil
  604. }
  605. // UNKNOWN
  606. func (p *unknownV5) name() string { return "UNKNOWN/v5" }
  607. func (p *unknownV5) kind() byte { return p_unknownV5 }
  608. func (p *unknownV5) setreqid(id []byte) {}
  609. func (p *unknownV5) handle(t *UDPv5, fromID enode.ID, fromAddr *net.UDPAddr) {
  610. challenge := &whoareyouV5{AuthTag: p.AuthTag}
  611. crand.Read(challenge.IDNonce[:])
  612. if n := t.getNode(fromID); n != nil {
  613. challenge.node = n
  614. challenge.RecordSeq = n.Seq()
  615. }
  616. t.sendResponse(fromID, fromAddr, challenge)
  617. }
  618. // WHOAREYOU
  619. func (p *whoareyouV5) name() string { return "WHOAREYOU/v5" }
  620. func (p *whoareyouV5) kind() byte { return p_whoareyouV5 }
  621. func (p *whoareyouV5) setreqid(id []byte) {}
  622. func (p *whoareyouV5) handle(t *UDPv5, fromID enode.ID, fromAddr *net.UDPAddr) {
  623. c, err := p.matchWithCall(t, p.AuthTag)
  624. if err != nil {
  625. t.log.Debug("Invalid WHOAREYOU/v5", "addr", fromAddr, "err", err)
  626. return
  627. }
  628. // Resend the call that was answered by WHOAREYOU.
  629. t.log.Trace("<< "+p.name(), "id", c.node.ID(), "addr", fromAddr)
  630. c.handshakeCount++
  631. c.challenge = p
  632. p.node = c.node
  633. t.sendCall(c)
  634. }
  635. var (
  636. errChallengeNoCall = errors.New("no matching call")
  637. errChallengeTwice = errors.New("second handshake")
  638. )
  639. // matchWithCall checks whether the handshake attempt matches the active call.
  640. func (p *whoareyouV5) matchWithCall(t *UDPv5, authTag []byte) (*callV5, error) {
  641. c := t.activeCallByAuth[string(authTag)]
  642. if c == nil {
  643. return nil, errChallengeNoCall
  644. }
  645. if c.handshakeCount > 0 {
  646. return nil, errChallengeTwice
  647. }
  648. return c, nil
  649. }
  650. // PING
  651. func (p *pingV5) name() string { return "PING/v5" }
  652. func (p *pingV5) kind() byte { return p_pingV5 }
  653. func (p *pingV5) setreqid(id []byte) { p.ReqID = id }
  654. func (p *pingV5) handle(t *UDPv5, fromID enode.ID, fromAddr *net.UDPAddr) {
  655. t.sendResponse(fromID, fromAddr, &pongV5{
  656. ReqID: p.ReqID,
  657. ToIP: fromAddr.IP,
  658. ToPort: uint16(fromAddr.Port),
  659. ENRSeq: t.localNode.Node().Seq(),
  660. })
  661. }
  662. // PONG
  663. func (p *pongV5) name() string { return "PONG/v5" }
  664. func (p *pongV5) kind() byte { return p_pongV5 }
  665. func (p *pongV5) setreqid(id []byte) { p.ReqID = id }
  666. func (p *pongV5) handle(t *UDPv5, fromID enode.ID, fromAddr *net.UDPAddr) {
  667. t.localNode.UDPEndpointStatement(fromAddr, &net.UDPAddr{IP: p.ToIP, Port: int(p.ToPort)})
  668. t.handleCallResponse(fromID, fromAddr, p.ReqID, p)
  669. }
  670. // FINDNODE
  671. func (p *findnodeV5) name() string { return "FINDNODE/v5" }
  672. func (p *findnodeV5) kind() byte { return p_findnodeV5 }
  673. func (p *findnodeV5) setreqid(id []byte) { p.ReqID = id }
  674. func (p *findnodeV5) handle(t *UDPv5, fromID enode.ID, fromAddr *net.UDPAddr) {
  675. if p.Distance == 0 {
  676. t.sendNodes(fromID, fromAddr, p.ReqID, []*enode.Node{t.Self()})
  677. return
  678. }
  679. if p.Distance > 256 {
  680. p.Distance = 256
  681. }
  682. // Get bucket entries.
  683. t.tab.mutex.Lock()
  684. nodes := unwrapNodes(t.tab.bucketAtDistance(int(p.Distance)).entries)
  685. t.tab.mutex.Unlock()
  686. if len(nodes) > findnodeResultLimit {
  687. nodes = nodes[:findnodeResultLimit]
  688. }
  689. t.sendNodes(fromID, fromAddr, p.ReqID, nodes)
  690. }
  691. // sendNodes sends the given records in one or more NODES packets.
  692. func (t *UDPv5) sendNodes(toID enode.ID, toAddr *net.UDPAddr, reqid []byte, nodes []*enode.Node) {
  693. // TODO livenessChecks > 1
  694. // TODO CheckRelayIP
  695. total := uint8(math.Ceil(float64(len(nodes)) / 3))
  696. resp := &nodesV5{ReqID: reqid, Total: total, Nodes: make([]*enr.Record, 3)}
  697. sent := false
  698. for len(nodes) > 0 {
  699. items := min(nodesResponseItemLimit, len(nodes))
  700. resp.Nodes = resp.Nodes[:items]
  701. for i := 0; i < items; i++ {
  702. resp.Nodes[i] = nodes[i].Record()
  703. }
  704. t.sendResponse(toID, toAddr, resp)
  705. nodes = nodes[items:]
  706. sent = true
  707. }
  708. // Ensure at least one response is sent.
  709. if !sent {
  710. resp.Total = 1
  711. resp.Nodes = nil
  712. t.sendResponse(toID, toAddr, resp)
  713. }
  714. }
  715. // NODES
  716. func (p *nodesV5) name() string { return "NODES/v5" }
  717. func (p *nodesV5) kind() byte { return p_nodesV5 }
  718. func (p *nodesV5) setreqid(id []byte) { p.ReqID = id }
  719. func (p *nodesV5) handle(t *UDPv5, fromID enode.ID, fromAddr *net.UDPAddr) {
  720. t.handleCallResponse(fromID, fromAddr, p.ReqID, p)
  721. }
  722. // REQUESTTICKET
  723. func (p *requestTicketV5) name() string { return "REQUESTTICKET/v5" }
  724. func (p *requestTicketV5) kind() byte { return p_requestTicketV5 }
  725. func (p *requestTicketV5) setreqid(id []byte) { p.ReqID = id }
  726. func (p *requestTicketV5) handle(t *UDPv5, fromID enode.ID, fromAddr *net.UDPAddr) {
  727. t.sendResponse(fromID, fromAddr, &ticketV5{ReqID: p.ReqID})
  728. }
  729. // TICKET
  730. func (p *ticketV5) name() string { return "TICKET/v5" }
  731. func (p *ticketV5) kind() byte { return p_ticketV5 }
  732. func (p *ticketV5) setreqid(id []byte) { p.ReqID = id }
  733. func (p *ticketV5) handle(t *UDPv5, fromID enode.ID, fromAddr *net.UDPAddr) {
  734. t.handleCallResponse(fromID, fromAddr, p.ReqID, p)
  735. }
  736. // REGTOPIC
  737. func (p *regtopicV5) name() string { return "REGTOPIC/v5" }
  738. func (p *regtopicV5) kind() byte { return p_regtopicV5 }
  739. func (p *regtopicV5) setreqid(id []byte) { p.ReqID = id }
  740. func (p *regtopicV5) handle(t *UDPv5, fromID enode.ID, fromAddr *net.UDPAddr) {
  741. t.sendResponse(fromID, fromAddr, &regconfirmationV5{ReqID: p.ReqID, Registered: false})
  742. }
  743. // REGCONFIRMATION
  744. func (p *regconfirmationV5) name() string { return "REGCONFIRMATION/v5" }
  745. func (p *regconfirmationV5) kind() byte { return p_regconfirmationV5 }
  746. func (p *regconfirmationV5) setreqid(id []byte) { p.ReqID = id }
  747. func (p *regconfirmationV5) handle(t *UDPv5, fromID enode.ID, fromAddr *net.UDPAddr) {
  748. t.handleCallResponse(fromID, fromAddr, p.ReqID, p)
  749. }
  750. // TOPICQUERY
  751. func (p *topicqueryV5) name() string { return "TOPICQUERY/v5" }
  752. func (p *topicqueryV5) kind() byte { return p_topicqueryV5 }
  753. func (p *topicqueryV5) setreqid(id []byte) { p.ReqID = id }
  754. func (p *topicqueryV5) handle(t *UDPv5, fromID enode.ID, fromAddr *net.UDPAddr) {
  755. }