udp.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package discover
  17. import (
  18. "bytes"
  19. "container/list"
  20. "crypto/ecdsa"
  21. "errors"
  22. "fmt"
  23. "net"
  24. "time"
  25. "github.com/ethereum/go-ethereum/crypto"
  26. "github.com/ethereum/go-ethereum/log"
  27. "github.com/ethereum/go-ethereum/p2p/nat"
  28. "github.com/ethereum/go-ethereum/p2p/netutil"
  29. "github.com/ethereum/go-ethereum/rlp"
  30. )
  31. const Version = 4
  32. // Errors
  33. var (
  34. errPacketTooSmall = errors.New("too small")
  35. errBadHash = errors.New("bad hash")
  36. errExpired = errors.New("expired")
  37. errUnsolicitedReply = errors.New("unsolicited reply")
  38. errUnknownNode = errors.New("unknown node")
  39. errTimeout = errors.New("RPC timeout")
  40. errClockWarp = errors.New("reply deadline too far in the future")
  41. errClosed = errors.New("socket closed")
  42. )
  43. // Timeouts
  44. const (
  45. respTimeout = 500 * time.Millisecond
  46. sendTimeout = 500 * time.Millisecond
  47. expiration = 20 * time.Second
  48. ntpFailureThreshold = 32 // Continuous timeouts after which to check NTP
  49. ntpWarningCooldown = 10 * time.Minute // Minimum amount of time to pass before repeating NTP warning
  50. driftThreshold = 10 * time.Second // Allowed clock drift before warning user
  51. )
  52. // RPC packet types
  53. const (
  54. pingPacket = iota + 1 // zero is 'reserved'
  55. pongPacket
  56. findnodePacket
  57. neighborsPacket
  58. )
  59. // RPC request structures
  60. type (
  61. ping struct {
  62. Version uint
  63. From, To rpcEndpoint
  64. Expiration uint64
  65. // Ignore additional fields (for forward compatibility).
  66. Rest []rlp.RawValue `rlp:"tail"`
  67. }
  68. // pong is the reply to ping.
  69. pong struct {
  70. // This field should mirror the UDP envelope address
  71. // of the ping packet, which provides a way to discover the
  72. // the external address (after NAT).
  73. To rpcEndpoint
  74. ReplyTok []byte // This contains the hash of the ping packet.
  75. Expiration uint64 // Absolute timestamp at which the packet becomes invalid.
  76. // Ignore additional fields (for forward compatibility).
  77. Rest []rlp.RawValue `rlp:"tail"`
  78. }
  79. // findnode is a query for nodes close to the given target.
  80. findnode struct {
  81. Target NodeID // doesn't need to be an actual public key
  82. Expiration uint64
  83. // Ignore additional fields (for forward compatibility).
  84. Rest []rlp.RawValue `rlp:"tail"`
  85. }
  86. // reply to findnode
  87. neighbors struct {
  88. Nodes []rpcNode
  89. Expiration uint64
  90. // Ignore additional fields (for forward compatibility).
  91. Rest []rlp.RawValue `rlp:"tail"`
  92. }
  93. rpcNode struct {
  94. IP net.IP // len 4 for IPv4 or 16 for IPv6
  95. UDP uint16 // for discovery protocol
  96. TCP uint16 // for RLPx protocol
  97. ID NodeID
  98. }
  99. rpcEndpoint struct {
  100. IP net.IP // len 4 for IPv4 or 16 for IPv6
  101. UDP uint16 // for discovery protocol
  102. TCP uint16 // for RLPx protocol
  103. }
  104. )
  105. func makeEndpoint(addr *net.UDPAddr, tcpPort uint16) rpcEndpoint {
  106. ip := addr.IP.To4()
  107. if ip == nil {
  108. ip = addr.IP.To16()
  109. }
  110. return rpcEndpoint{IP: ip, UDP: uint16(addr.Port), TCP: tcpPort}
  111. }
  112. func (t *udp) nodeFromRPC(sender *net.UDPAddr, rn rpcNode) (*Node, error) {
  113. if rn.UDP <= 1024 {
  114. return nil, errors.New("low port")
  115. }
  116. if err := netutil.CheckRelayIP(sender.IP, rn.IP); err != nil {
  117. return nil, err
  118. }
  119. if t.netrestrict != nil && !t.netrestrict.Contains(rn.IP) {
  120. return nil, errors.New("not contained in netrestrict whitelist")
  121. }
  122. n := NewNode(rn.ID, rn.IP, rn.UDP, rn.TCP)
  123. err := n.validateComplete()
  124. return n, err
  125. }
  126. func nodeToRPC(n *Node) rpcNode {
  127. return rpcNode{ID: n.ID, IP: n.IP, UDP: n.UDP, TCP: n.TCP}
  128. }
  129. type packet interface {
  130. handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error
  131. name() string
  132. }
  133. type conn interface {
  134. ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error)
  135. WriteToUDP(b []byte, addr *net.UDPAddr) (n int, err error)
  136. Close() error
  137. LocalAddr() net.Addr
  138. }
  139. // udp implements the RPC protocol.
  140. type udp struct {
  141. conn conn
  142. netrestrict *netutil.Netlist
  143. priv *ecdsa.PrivateKey
  144. ourEndpoint rpcEndpoint
  145. addpending chan *pending
  146. gotreply chan reply
  147. closing chan struct{}
  148. nat nat.Interface
  149. *Table
  150. }
  151. // pending represents a pending reply.
  152. //
  153. // some implementations of the protocol wish to send more than one
  154. // reply packet to findnode. in general, any neighbors packet cannot
  155. // be matched up with a specific findnode packet.
  156. //
  157. // our implementation handles this by storing a callback function for
  158. // each pending reply. incoming packets from a node are dispatched
  159. // to all the callback functions for that node.
  160. type pending struct {
  161. // these fields must match in the reply.
  162. from NodeID
  163. ptype byte
  164. // time when the request must complete
  165. deadline time.Time
  166. // callback is called when a matching reply arrives. if it returns
  167. // true, the callback is removed from the pending reply queue.
  168. // if it returns false, the reply is considered incomplete and
  169. // the callback will be invoked again for the next matching reply.
  170. callback func(resp interface{}) (done bool)
  171. // errc receives nil when the callback indicates completion or an
  172. // error if no further reply is received within the timeout.
  173. errc chan<- error
  174. }
  175. type reply struct {
  176. from NodeID
  177. ptype byte
  178. data interface{}
  179. // loop indicates whether there was
  180. // a matching request by sending on this channel.
  181. matched chan<- bool
  182. }
  183. // ReadPacket is sent to the unhandled channel when it could not be processed
  184. type ReadPacket struct {
  185. Data []byte
  186. Addr *net.UDPAddr
  187. }
  188. // Config holds Table-related settings.
  189. type Config struct {
  190. // These settings are required and configure the UDP listener:
  191. PrivateKey *ecdsa.PrivateKey
  192. // These settings are optional:
  193. AnnounceAddr *net.UDPAddr // local address announced in the DHT
  194. NodeDBPath string // if set, the node database is stored at this filesystem location
  195. NetRestrict *netutil.Netlist // network whitelist
  196. Bootnodes []*Node // list of bootstrap nodes
  197. Unhandled chan<- ReadPacket // unhandled packets are sent on this channel
  198. }
  199. // ListenUDP returns a new table that listens for UDP packets on laddr.
  200. func ListenUDP(c conn, cfg Config) (*Table, error) {
  201. tab, _, err := newUDP(c, cfg)
  202. if err != nil {
  203. return nil, err
  204. }
  205. log.Info("UDP listener up", "self", tab.self)
  206. return tab, nil
  207. }
  208. func newUDP(c conn, cfg Config) (*Table, *udp, error) {
  209. udp := &udp{
  210. conn: c,
  211. priv: cfg.PrivateKey,
  212. netrestrict: cfg.NetRestrict,
  213. closing: make(chan struct{}),
  214. gotreply: make(chan reply),
  215. addpending: make(chan *pending),
  216. }
  217. realaddr := c.LocalAddr().(*net.UDPAddr)
  218. if cfg.AnnounceAddr != nil {
  219. realaddr = cfg.AnnounceAddr
  220. }
  221. // TODO: separate TCP port
  222. udp.ourEndpoint = makeEndpoint(realaddr, uint16(realaddr.Port))
  223. tab, err := newTable(udp, PubkeyID(&cfg.PrivateKey.PublicKey), realaddr, cfg.NodeDBPath, cfg.Bootnodes)
  224. if err != nil {
  225. return nil, nil, err
  226. }
  227. udp.Table = tab
  228. go udp.loop()
  229. go udp.readLoop(cfg.Unhandled)
  230. return udp.Table, udp, nil
  231. }
  232. func (t *udp) close() {
  233. close(t.closing)
  234. t.conn.Close()
  235. // TODO: wait for the loops to end.
  236. }
  237. // ping sends a ping message to the given node and waits for a reply.
  238. func (t *udp) ping(toid NodeID, toaddr *net.UDPAddr) error {
  239. req := &ping{
  240. Version: Version,
  241. From: t.ourEndpoint,
  242. To: makeEndpoint(toaddr, 0), // TODO: maybe use known TCP port from DB
  243. Expiration: uint64(time.Now().Add(expiration).Unix()),
  244. }
  245. packet, hash, err := encodePacket(t.priv, pingPacket, req)
  246. if err != nil {
  247. return err
  248. }
  249. errc := t.pending(toid, pongPacket, func(p interface{}) bool {
  250. return bytes.Equal(p.(*pong).ReplyTok, hash)
  251. })
  252. t.write(toaddr, req.name(), packet)
  253. return <-errc
  254. }
  255. func (t *udp) waitping(from NodeID) error {
  256. return <-t.pending(from, pingPacket, func(interface{}) bool { return true })
  257. }
  258. // findnode sends a findnode request to the given node and waits until
  259. // the node has sent up to k neighbors.
  260. func (t *udp) findnode(toid NodeID, toaddr *net.UDPAddr, target NodeID) ([]*Node, error) {
  261. nodes := make([]*Node, 0, bucketSize)
  262. nreceived := 0
  263. errc := t.pending(toid, neighborsPacket, func(r interface{}) bool {
  264. reply := r.(*neighbors)
  265. for _, rn := range reply.Nodes {
  266. nreceived++
  267. n, err := t.nodeFromRPC(toaddr, rn)
  268. if err != nil {
  269. log.Trace("Invalid neighbor node received", "ip", rn.IP, "addr", toaddr, "err", err)
  270. continue
  271. }
  272. nodes = append(nodes, n)
  273. }
  274. return nreceived >= bucketSize
  275. })
  276. t.send(toaddr, findnodePacket, &findnode{
  277. Target: target,
  278. Expiration: uint64(time.Now().Add(expiration).Unix()),
  279. })
  280. err := <-errc
  281. return nodes, err
  282. }
  283. // pending adds a reply callback to the pending reply queue.
  284. // see the documentation of type pending for a detailed explanation.
  285. func (t *udp) pending(id NodeID, ptype byte, callback func(interface{}) bool) <-chan error {
  286. ch := make(chan error, 1)
  287. p := &pending{from: id, ptype: ptype, callback: callback, errc: ch}
  288. select {
  289. case t.addpending <- p:
  290. // loop will handle it
  291. case <-t.closing:
  292. ch <- errClosed
  293. }
  294. return ch
  295. }
  296. func (t *udp) handleReply(from NodeID, ptype byte, req packet) bool {
  297. matched := make(chan bool, 1)
  298. select {
  299. case t.gotreply <- reply{from, ptype, req, matched}:
  300. // loop will handle it
  301. return <-matched
  302. case <-t.closing:
  303. return false
  304. }
  305. }
  306. // loop runs in its own goroutine. it keeps track of
  307. // the refresh timer and the pending reply queue.
  308. func (t *udp) loop() {
  309. var (
  310. plist = list.New()
  311. timeout = time.NewTimer(0)
  312. nextTimeout *pending // head of plist when timeout was last reset
  313. contTimeouts = 0 // number of continuous timeouts to do NTP checks
  314. ntpWarnTime = time.Unix(0, 0)
  315. )
  316. <-timeout.C // ignore first timeout
  317. defer timeout.Stop()
  318. resetTimeout := func() {
  319. if plist.Front() == nil || nextTimeout == plist.Front().Value {
  320. return
  321. }
  322. // Start the timer so it fires when the next pending reply has expired.
  323. now := time.Now()
  324. for el := plist.Front(); el != nil; el = el.Next() {
  325. nextTimeout = el.Value.(*pending)
  326. if dist := nextTimeout.deadline.Sub(now); dist < 2*respTimeout {
  327. timeout.Reset(dist)
  328. return
  329. }
  330. // Remove pending replies whose deadline is too far in the
  331. // future. These can occur if the system clock jumped
  332. // backwards after the deadline was assigned.
  333. nextTimeout.errc <- errClockWarp
  334. plist.Remove(el)
  335. }
  336. nextTimeout = nil
  337. timeout.Stop()
  338. }
  339. for {
  340. resetTimeout()
  341. select {
  342. case <-t.closing:
  343. for el := plist.Front(); el != nil; el = el.Next() {
  344. el.Value.(*pending).errc <- errClosed
  345. }
  346. return
  347. case p := <-t.addpending:
  348. p.deadline = time.Now().Add(respTimeout)
  349. plist.PushBack(p)
  350. case r := <-t.gotreply:
  351. var matched bool
  352. for el := plist.Front(); el != nil; el = el.Next() {
  353. p := el.Value.(*pending)
  354. if p.from == r.from && p.ptype == r.ptype {
  355. matched = true
  356. // Remove the matcher if its callback indicates
  357. // that all replies have been received. This is
  358. // required for packet types that expect multiple
  359. // reply packets.
  360. if p.callback(r.data) {
  361. p.errc <- nil
  362. plist.Remove(el)
  363. }
  364. // Reset the continuous timeout counter (time drift detection)
  365. contTimeouts = 0
  366. }
  367. }
  368. r.matched <- matched
  369. case now := <-timeout.C:
  370. nextTimeout = nil
  371. // Notify and remove callbacks whose deadline is in the past.
  372. for el := plist.Front(); el != nil; el = el.Next() {
  373. p := el.Value.(*pending)
  374. if now.After(p.deadline) || now.Equal(p.deadline) {
  375. p.errc <- errTimeout
  376. plist.Remove(el)
  377. contTimeouts++
  378. }
  379. }
  380. // If we've accumulated too many timeouts, do an NTP time sync check
  381. if contTimeouts > ntpFailureThreshold {
  382. if time.Since(ntpWarnTime) >= ntpWarningCooldown {
  383. ntpWarnTime = time.Now()
  384. go checkClockDrift()
  385. }
  386. contTimeouts = 0
  387. }
  388. }
  389. }
  390. }
  391. const (
  392. macSize = 256 / 8
  393. sigSize = 520 / 8
  394. headSize = macSize + sigSize // space of packet frame data
  395. )
  396. var (
  397. headSpace = make([]byte, headSize)
  398. // Neighbors replies are sent across multiple packets to
  399. // stay below the 1280 byte limit. We compute the maximum number
  400. // of entries by stuffing a packet until it grows too large.
  401. maxNeighbors int
  402. )
  403. func init() {
  404. p := neighbors{Expiration: ^uint64(0)}
  405. maxSizeNode := rpcNode{IP: make(net.IP, 16), UDP: ^uint16(0), TCP: ^uint16(0)}
  406. for n := 0; ; n++ {
  407. p.Nodes = append(p.Nodes, maxSizeNode)
  408. size, _, err := rlp.EncodeToReader(p)
  409. if err != nil {
  410. // If this ever happens, it will be caught by the unit tests.
  411. panic("cannot encode: " + err.Error())
  412. }
  413. if headSize+size+1 >= 1280 {
  414. maxNeighbors = n
  415. break
  416. }
  417. }
  418. }
  419. func (t *udp) send(toaddr *net.UDPAddr, ptype byte, req packet) ([]byte, error) {
  420. packet, hash, err := encodePacket(t.priv, ptype, req)
  421. if err != nil {
  422. return hash, err
  423. }
  424. return hash, t.write(toaddr, req.name(), packet)
  425. }
  426. func (t *udp) write(toaddr *net.UDPAddr, what string, packet []byte) error {
  427. _, err := t.conn.WriteToUDP(packet, toaddr)
  428. log.Trace(">> "+what, "addr", toaddr, "err", err)
  429. return err
  430. }
  431. func encodePacket(priv *ecdsa.PrivateKey, ptype byte, req interface{}) (packet, hash []byte, err error) {
  432. b := new(bytes.Buffer)
  433. b.Write(headSpace)
  434. b.WriteByte(ptype)
  435. if err := rlp.Encode(b, req); err != nil {
  436. log.Error("Can't encode discv4 packet", "err", err)
  437. return nil, nil, err
  438. }
  439. packet = b.Bytes()
  440. sig, err := crypto.Sign(crypto.Keccak256(packet[headSize:]), priv)
  441. if err != nil {
  442. log.Error("Can't sign discv4 packet", "err", err)
  443. return nil, nil, err
  444. }
  445. copy(packet[macSize:], sig)
  446. // add the hash to the front. Note: this doesn't protect the
  447. // packet in any way. Our public key will be part of this hash in
  448. // The future.
  449. hash = crypto.Keccak256(packet[macSize:])
  450. copy(packet, hash)
  451. return packet, hash, nil
  452. }
  453. // readLoop runs in its own goroutine. it handles incoming UDP packets.
  454. func (t *udp) readLoop(unhandled chan<- ReadPacket) {
  455. defer t.conn.Close()
  456. if unhandled != nil {
  457. defer close(unhandled)
  458. }
  459. // Discovery packets are defined to be no larger than 1280 bytes.
  460. // Packets larger than this size will be cut at the end and treated
  461. // as invalid because their hash won't match.
  462. buf := make([]byte, 1280)
  463. for {
  464. nbytes, from, err := t.conn.ReadFromUDP(buf)
  465. if netutil.IsTemporaryError(err) {
  466. // Ignore temporary read errors.
  467. log.Debug("Temporary UDP read error", "err", err)
  468. continue
  469. } else if err != nil {
  470. // Shut down the loop for permament errors.
  471. log.Debug("UDP read error", "err", err)
  472. return
  473. }
  474. if t.handlePacket(from, buf[:nbytes]) != nil && unhandled != nil {
  475. select {
  476. case unhandled <- ReadPacket{buf[:nbytes], from}:
  477. default:
  478. }
  479. }
  480. }
  481. }
  482. func (t *udp) handlePacket(from *net.UDPAddr, buf []byte) error {
  483. packet, fromID, hash, err := decodePacket(buf)
  484. if err != nil {
  485. log.Debug("Bad discv4 packet", "addr", from, "err", err)
  486. return err
  487. }
  488. err = packet.handle(t, from, fromID, hash)
  489. log.Trace("<< "+packet.name(), "addr", from, "err", err)
  490. return err
  491. }
  492. func decodePacket(buf []byte) (packet, NodeID, []byte, error) {
  493. if len(buf) < headSize+1 {
  494. return nil, NodeID{}, nil, errPacketTooSmall
  495. }
  496. hash, sig, sigdata := buf[:macSize], buf[macSize:headSize], buf[headSize:]
  497. shouldhash := crypto.Keccak256(buf[macSize:])
  498. if !bytes.Equal(hash, shouldhash) {
  499. return nil, NodeID{}, nil, errBadHash
  500. }
  501. fromID, err := recoverNodeID(crypto.Keccak256(buf[headSize:]), sig)
  502. if err != nil {
  503. return nil, NodeID{}, hash, err
  504. }
  505. var req packet
  506. switch ptype := sigdata[0]; ptype {
  507. case pingPacket:
  508. req = new(ping)
  509. case pongPacket:
  510. req = new(pong)
  511. case findnodePacket:
  512. req = new(findnode)
  513. case neighborsPacket:
  514. req = new(neighbors)
  515. default:
  516. return nil, fromID, hash, fmt.Errorf("unknown type: %d", ptype)
  517. }
  518. s := rlp.NewStream(bytes.NewReader(sigdata[1:]), 0)
  519. err = s.Decode(req)
  520. return req, fromID, hash, err
  521. }
  522. func (req *ping) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
  523. if expired(req.Expiration) {
  524. return errExpired
  525. }
  526. t.send(from, pongPacket, &pong{
  527. To: makeEndpoint(from, req.From.TCP),
  528. ReplyTok: mac,
  529. Expiration: uint64(time.Now().Add(expiration).Unix()),
  530. })
  531. if !t.handleReply(fromID, pingPacket, req) {
  532. // Note: we're ignoring the provided IP address right now
  533. go t.bond(true, fromID, from, req.From.TCP)
  534. }
  535. return nil
  536. }
  537. func (req *ping) name() string { return "PING/v4" }
  538. func (req *pong) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
  539. if expired(req.Expiration) {
  540. return errExpired
  541. }
  542. if !t.handleReply(fromID, pongPacket, req) {
  543. return errUnsolicitedReply
  544. }
  545. return nil
  546. }
  547. func (req *pong) name() string { return "PONG/v4" }
  548. func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
  549. if expired(req.Expiration) {
  550. return errExpired
  551. }
  552. if t.db.node(fromID) == nil {
  553. // No bond exists, we don't process the packet. This prevents
  554. // an attack vector where the discovery protocol could be used
  555. // to amplify traffic in a DDOS attack. A malicious actor
  556. // would send a findnode request with the IP address and UDP
  557. // port of the target as the source address. The recipient of
  558. // the findnode packet would then send a neighbors packet
  559. // (which is a much bigger packet than findnode) to the victim.
  560. return errUnknownNode
  561. }
  562. target := crypto.Keccak256Hash(req.Target[:])
  563. t.mutex.Lock()
  564. closest := t.closest(target, bucketSize).entries
  565. t.mutex.Unlock()
  566. p := neighbors{Expiration: uint64(time.Now().Add(expiration).Unix())}
  567. var sent bool
  568. // Send neighbors in chunks with at most maxNeighbors per packet
  569. // to stay below the 1280 byte limit.
  570. for _, n := range closest {
  571. if netutil.CheckRelayIP(from.IP, n.IP) == nil {
  572. p.Nodes = append(p.Nodes, nodeToRPC(n))
  573. }
  574. if len(p.Nodes) == maxNeighbors {
  575. t.send(from, neighborsPacket, &p)
  576. p.Nodes = p.Nodes[:0]
  577. sent = true
  578. }
  579. }
  580. if len(p.Nodes) > 0 || !sent {
  581. t.send(from, neighborsPacket, &p)
  582. }
  583. return nil
  584. }
  585. func (req *findnode) name() string { return "FINDNODE/v4" }
  586. func (req *neighbors) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
  587. if expired(req.Expiration) {
  588. return errExpired
  589. }
  590. if !t.handleReply(fromID, neighborsPacket, req) {
  591. return errUnsolicitedReply
  592. }
  593. return nil
  594. }
  595. func (req *neighbors) name() string { return "NEIGHBORS/v4" }
  596. func expired(ts uint64) bool {
  597. return time.Unix(int64(ts), 0).Before(time.Now())
  598. }