udp.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package discover
  17. import (
  18. "bytes"
  19. "container/list"
  20. "crypto/ecdsa"
  21. "errors"
  22. "fmt"
  23. "net"
  24. "time"
  25. "github.com/ethereum/go-ethereum/crypto"
  26. "github.com/ethereum/go-ethereum/logger"
  27. "github.com/ethereum/go-ethereum/logger/glog"
  28. "github.com/ethereum/go-ethereum/p2p/nat"
  29. "github.com/ethereum/go-ethereum/rlp"
  30. )
  31. const Version = 4
  32. // Errors
  33. var (
  34. errPacketTooSmall = errors.New("too small")
  35. errBadHash = errors.New("bad hash")
  36. errExpired = errors.New("expired")
  37. errBadVersion = errors.New("version mismatch")
  38. errUnsolicitedReply = errors.New("unsolicited reply")
  39. errUnknownNode = errors.New("unknown node")
  40. errTimeout = errors.New("RPC timeout")
  41. errClockWarp = errors.New("reply deadline too far in the future")
  42. errClosed = errors.New("socket closed")
  43. )
  44. // Timeouts
  45. const (
  46. respTimeout = 500 * time.Millisecond
  47. sendTimeout = 500 * time.Millisecond
  48. expiration = 20 * time.Second
  49. refreshInterval = 1 * time.Hour
  50. )
  51. // RPC packet types
  52. const (
  53. pingPacket = iota + 1 // zero is 'reserved'
  54. pongPacket
  55. findnodePacket
  56. neighborsPacket
  57. )
  58. // RPC request structures
  59. type (
  60. ping struct {
  61. Version uint
  62. From, To rpcEndpoint
  63. Expiration uint64
  64. }
  65. // pong is the reply to ping.
  66. pong struct {
  67. // This field should mirror the UDP envelope address
  68. // of the ping packet, which provides a way to discover the
  69. // the external address (after NAT).
  70. To rpcEndpoint
  71. ReplyTok []byte // This contains the hash of the ping packet.
  72. Expiration uint64 // Absolute timestamp at which the packet becomes invalid.
  73. }
  74. // findnode is a query for nodes close to the given target.
  75. findnode struct {
  76. Target NodeID // doesn't need to be an actual public key
  77. Expiration uint64
  78. }
  79. // reply to findnode
  80. neighbors struct {
  81. Nodes []rpcNode
  82. Expiration uint64
  83. }
  84. rpcNode struct {
  85. IP net.IP // len 4 for IPv4 or 16 for IPv6
  86. UDP uint16 // for discovery protocol
  87. TCP uint16 // for RLPx protocol
  88. ID NodeID
  89. }
  90. rpcEndpoint struct {
  91. IP net.IP // len 4 for IPv4 or 16 for IPv6
  92. UDP uint16 // for discovery protocol
  93. TCP uint16 // for RLPx protocol
  94. }
  95. )
  96. func makeEndpoint(addr *net.UDPAddr, tcpPort uint16) rpcEndpoint {
  97. ip := addr.IP.To4()
  98. if ip == nil {
  99. ip = addr.IP.To16()
  100. }
  101. return rpcEndpoint{IP: ip, UDP: uint16(addr.Port), TCP: tcpPort}
  102. }
  103. func nodeFromRPC(rn rpcNode) (n *Node, valid bool) {
  104. // TODO: don't accept localhost, LAN addresses from internet hosts
  105. // TODO: check public key is on secp256k1 curve
  106. if rn.IP.IsMulticast() || rn.IP.IsUnspecified() || rn.UDP == 0 {
  107. return nil, false
  108. }
  109. return newNode(rn.ID, rn.IP, rn.UDP, rn.TCP), true
  110. }
  111. func nodeToRPC(n *Node) rpcNode {
  112. return rpcNode{ID: n.ID, IP: n.IP, UDP: n.UDP, TCP: n.TCP}
  113. }
  114. type packet interface {
  115. handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error
  116. }
  117. type conn interface {
  118. ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error)
  119. WriteToUDP(b []byte, addr *net.UDPAddr) (n int, err error)
  120. Close() error
  121. LocalAddr() net.Addr
  122. }
  123. // udp implements the RPC protocol.
  124. type udp struct {
  125. conn conn
  126. priv *ecdsa.PrivateKey
  127. ourEndpoint rpcEndpoint
  128. addpending chan *pending
  129. gotreply chan reply
  130. closing chan struct{}
  131. nat nat.Interface
  132. *Table
  133. }
  134. // pending represents a pending reply.
  135. //
  136. // some implementations of the protocol wish to send more than one
  137. // reply packet to findnode. in general, any neighbors packet cannot
  138. // be matched up with a specific findnode packet.
  139. //
  140. // our implementation handles this by storing a callback function for
  141. // each pending reply. incoming packets from a node are dispatched
  142. // to all the callback functions for that node.
  143. type pending struct {
  144. // these fields must match in the reply.
  145. from NodeID
  146. ptype byte
  147. // time when the request must complete
  148. deadline time.Time
  149. // callback is called when a matching reply arrives. if it returns
  150. // true, the callback is removed from the pending reply queue.
  151. // if it returns false, the reply is considered incomplete and
  152. // the callback will be invoked again for the next matching reply.
  153. callback func(resp interface{}) (done bool)
  154. // errc receives nil when the callback indicates completion or an
  155. // error if no further reply is received within the timeout.
  156. errc chan<- error
  157. }
  158. type reply struct {
  159. from NodeID
  160. ptype byte
  161. data interface{}
  162. // loop indicates whether there was
  163. // a matching request by sending on this channel.
  164. matched chan<- bool
  165. }
  166. // ListenUDP returns a new table that listens for UDP packets on laddr.
  167. func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, nodeDBPath string) (*Table, error) {
  168. addr, err := net.ResolveUDPAddr("udp", laddr)
  169. if err != nil {
  170. return nil, err
  171. }
  172. conn, err := net.ListenUDP("udp", addr)
  173. if err != nil {
  174. return nil, err
  175. }
  176. tab, _ := newUDP(priv, conn, natm, nodeDBPath)
  177. glog.V(logger.Info).Infoln("Listening,", tab.self)
  178. return tab, nil
  179. }
  180. func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface, nodeDBPath string) (*Table, *udp) {
  181. udp := &udp{
  182. conn: c,
  183. priv: priv,
  184. closing: make(chan struct{}),
  185. gotreply: make(chan reply),
  186. addpending: make(chan *pending),
  187. }
  188. realaddr := c.LocalAddr().(*net.UDPAddr)
  189. if natm != nil {
  190. if !realaddr.IP.IsLoopback() {
  191. go nat.Map(natm, udp.closing, "udp", realaddr.Port, realaddr.Port, "ethereum discovery")
  192. }
  193. // TODO: react to external IP changes over time.
  194. if ext, err := natm.ExternalIP(); err == nil {
  195. realaddr = &net.UDPAddr{IP: ext, Port: realaddr.Port}
  196. }
  197. }
  198. // TODO: separate TCP port
  199. udp.ourEndpoint = makeEndpoint(realaddr, uint16(realaddr.Port))
  200. udp.Table = newTable(udp, PubkeyID(&priv.PublicKey), realaddr, nodeDBPath)
  201. go udp.loop()
  202. go udp.readLoop()
  203. return udp.Table, udp
  204. }
  205. func (t *udp) close() {
  206. close(t.closing)
  207. t.conn.Close()
  208. // TODO: wait for the loops to end.
  209. }
  210. // ping sends a ping message to the given node and waits for a reply.
  211. func (t *udp) ping(toid NodeID, toaddr *net.UDPAddr) error {
  212. // TODO: maybe check for ReplyTo field in callback to measure RTT
  213. errc := t.pending(toid, pongPacket, func(interface{}) bool { return true })
  214. t.send(toaddr, pingPacket, ping{
  215. Version: Version,
  216. From: t.ourEndpoint,
  217. To: makeEndpoint(toaddr, 0), // TODO: maybe use known TCP port from DB
  218. Expiration: uint64(time.Now().Add(expiration).Unix()),
  219. })
  220. return <-errc
  221. }
  222. func (t *udp) waitping(from NodeID) error {
  223. return <-t.pending(from, pingPacket, func(interface{}) bool { return true })
  224. }
  225. // findnode sends a findnode request to the given node and waits until
  226. // the node has sent up to k neighbors.
  227. func (t *udp) findnode(toid NodeID, toaddr *net.UDPAddr, target NodeID) ([]*Node, error) {
  228. nodes := make([]*Node, 0, bucketSize)
  229. nreceived := 0
  230. errc := t.pending(toid, neighborsPacket, func(r interface{}) bool {
  231. reply := r.(*neighbors)
  232. for _, rn := range reply.Nodes {
  233. nreceived++
  234. if n, valid := nodeFromRPC(rn); valid {
  235. nodes = append(nodes, n)
  236. }
  237. }
  238. return nreceived >= bucketSize
  239. })
  240. t.send(toaddr, findnodePacket, findnode{
  241. Target: target,
  242. Expiration: uint64(time.Now().Add(expiration).Unix()),
  243. })
  244. err := <-errc
  245. return nodes, err
  246. }
  247. // pending adds a reply callback to the pending reply queue.
  248. // see the documentation of type pending for a detailed explanation.
  249. func (t *udp) pending(id NodeID, ptype byte, callback func(interface{}) bool) <-chan error {
  250. ch := make(chan error, 1)
  251. p := &pending{from: id, ptype: ptype, callback: callback, errc: ch}
  252. select {
  253. case t.addpending <- p:
  254. // loop will handle it
  255. case <-t.closing:
  256. ch <- errClosed
  257. }
  258. return ch
  259. }
  260. func (t *udp) handleReply(from NodeID, ptype byte, req packet) bool {
  261. matched := make(chan bool, 1)
  262. select {
  263. case t.gotreply <- reply{from, ptype, req, matched}:
  264. // loop will handle it
  265. return <-matched
  266. case <-t.closing:
  267. return false
  268. }
  269. }
  270. // loop runs in its own goroutin. it keeps track of
  271. // the refresh timer and the pending reply queue.
  272. func (t *udp) loop() {
  273. var (
  274. plist = list.New()
  275. timeout = time.NewTimer(0)
  276. nextTimeout *pending // head of plist when timeout was last reset
  277. refresh = time.NewTicker(refreshInterval)
  278. )
  279. <-timeout.C // ignore first timeout
  280. defer refresh.Stop()
  281. defer timeout.Stop()
  282. resetTimeout := func() {
  283. if plist.Front() == nil || nextTimeout == plist.Front().Value {
  284. return
  285. }
  286. // Start the timer so it fires when the next pending reply has expired.
  287. now := time.Now()
  288. for el := plist.Front(); el != nil; el = el.Next() {
  289. nextTimeout = el.Value.(*pending)
  290. if dist := nextTimeout.deadline.Sub(now); dist < 2*respTimeout {
  291. timeout.Reset(dist)
  292. return
  293. }
  294. // Remove pending replies whose deadline is too far in the
  295. // future. These can occur if the system clock jumped
  296. // backwards after the deadline was assigned.
  297. nextTimeout.errc <- errClockWarp
  298. plist.Remove(el)
  299. }
  300. nextTimeout = nil
  301. timeout.Stop()
  302. }
  303. for {
  304. resetTimeout()
  305. select {
  306. case <-refresh.C:
  307. go t.refresh()
  308. case <-t.closing:
  309. for el := plist.Front(); el != nil; el = el.Next() {
  310. el.Value.(*pending).errc <- errClosed
  311. }
  312. return
  313. case p := <-t.addpending:
  314. p.deadline = time.Now().Add(respTimeout)
  315. plist.PushBack(p)
  316. case r := <-t.gotreply:
  317. var matched bool
  318. for el := plist.Front(); el != nil; el = el.Next() {
  319. p := el.Value.(*pending)
  320. if p.from == r.from && p.ptype == r.ptype {
  321. matched = true
  322. // Remove the matcher if its callback indicates
  323. // that all replies have been received. This is
  324. // required for packet types that expect multiple
  325. // reply packets.
  326. if p.callback(r.data) {
  327. p.errc <- nil
  328. plist.Remove(el)
  329. }
  330. }
  331. }
  332. r.matched <- matched
  333. case now := <-timeout.C:
  334. nextTimeout = nil
  335. // Notify and remove callbacks whose deadline is in the past.
  336. for el := plist.Front(); el != nil; el = el.Next() {
  337. p := el.Value.(*pending)
  338. if now.After(p.deadline) || now.Equal(p.deadline) {
  339. p.errc <- errTimeout
  340. plist.Remove(el)
  341. }
  342. }
  343. }
  344. }
  345. }
  346. const (
  347. macSize = 256 / 8
  348. sigSize = 520 / 8
  349. headSize = macSize + sigSize // space of packet frame data
  350. )
  351. var (
  352. headSpace = make([]byte, headSize)
  353. // Neighbors replies are sent across multiple packets to
  354. // stay below the 1280 byte limit. We compute the maximum number
  355. // of entries by stuffing a packet until it grows too large.
  356. maxNeighbors int
  357. )
  358. func init() {
  359. p := neighbors{Expiration: ^uint64(0)}
  360. maxSizeNode := rpcNode{IP: make(net.IP, 16), UDP: ^uint16(0), TCP: ^uint16(0)}
  361. for n := 0; ; n++ {
  362. p.Nodes = append(p.Nodes, maxSizeNode)
  363. size, _, err := rlp.EncodeToReader(p)
  364. if err != nil {
  365. // If this ever happens, it will be caught by the unit tests.
  366. panic("cannot encode: " + err.Error())
  367. }
  368. if headSize+size+1 >= 1280 {
  369. maxNeighbors = n
  370. break
  371. }
  372. }
  373. }
  374. func (t *udp) send(toaddr *net.UDPAddr, ptype byte, req interface{}) error {
  375. packet, err := encodePacket(t.priv, ptype, req)
  376. if err != nil {
  377. return err
  378. }
  379. glog.V(logger.Detail).Infof(">>> %v %T\n", toaddr, req)
  380. if _, err = t.conn.WriteToUDP(packet, toaddr); err != nil {
  381. glog.V(logger.Detail).Infoln("UDP send failed:", err)
  382. }
  383. return err
  384. }
  385. func encodePacket(priv *ecdsa.PrivateKey, ptype byte, req interface{}) ([]byte, error) {
  386. b := new(bytes.Buffer)
  387. b.Write(headSpace)
  388. b.WriteByte(ptype)
  389. if err := rlp.Encode(b, req); err != nil {
  390. glog.V(logger.Error).Infoln("error encoding packet:", err)
  391. return nil, err
  392. }
  393. packet := b.Bytes()
  394. sig, err := crypto.Sign(crypto.Sha3(packet[headSize:]), priv)
  395. if err != nil {
  396. glog.V(logger.Error).Infoln("could not sign packet:", err)
  397. return nil, err
  398. }
  399. copy(packet[macSize:], sig)
  400. // add the hash to the front. Note: this doesn't protect the
  401. // packet in any way. Our public key will be part of this hash in
  402. // The future.
  403. copy(packet, crypto.Sha3(packet[macSize:]))
  404. return packet, nil
  405. }
  406. // readLoop runs in its own goroutine. it handles incoming UDP packets.
  407. func (t *udp) readLoop() {
  408. defer t.conn.Close()
  409. // Discovery packets are defined to be no larger than 1280 bytes.
  410. // Packets larger than this size will be cut at the end and treated
  411. // as invalid because their hash won't match.
  412. buf := make([]byte, 1280)
  413. for {
  414. nbytes, from, err := t.conn.ReadFromUDP(buf)
  415. if err != nil {
  416. return
  417. }
  418. t.handlePacket(from, buf[:nbytes])
  419. }
  420. }
  421. func (t *udp) handlePacket(from *net.UDPAddr, buf []byte) error {
  422. packet, fromID, hash, err := decodePacket(buf)
  423. if err != nil {
  424. glog.V(logger.Debug).Infof("Bad packet from %v: %v\n", from, err)
  425. return err
  426. }
  427. status := "ok"
  428. if err = packet.handle(t, from, fromID, hash); err != nil {
  429. status = err.Error()
  430. }
  431. glog.V(logger.Detail).Infof("<<< %v %T: %s\n", from, packet, status)
  432. return err
  433. }
  434. func decodePacket(buf []byte) (packet, NodeID, []byte, error) {
  435. if len(buf) < headSize+1 {
  436. return nil, NodeID{}, nil, errPacketTooSmall
  437. }
  438. hash, sig, sigdata := buf[:macSize], buf[macSize:headSize], buf[headSize:]
  439. shouldhash := crypto.Sha3(buf[macSize:])
  440. if !bytes.Equal(hash, shouldhash) {
  441. return nil, NodeID{}, nil, errBadHash
  442. }
  443. fromID, err := recoverNodeID(crypto.Sha3(buf[headSize:]), sig)
  444. if err != nil {
  445. return nil, NodeID{}, hash, err
  446. }
  447. var req packet
  448. switch ptype := sigdata[0]; ptype {
  449. case pingPacket:
  450. req = new(ping)
  451. case pongPacket:
  452. req = new(pong)
  453. case findnodePacket:
  454. req = new(findnode)
  455. case neighborsPacket:
  456. req = new(neighbors)
  457. default:
  458. return nil, fromID, hash, fmt.Errorf("unknown type: %d", ptype)
  459. }
  460. err = rlp.DecodeBytes(sigdata[1:], req)
  461. return req, fromID, hash, err
  462. }
  463. func (req *ping) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
  464. if expired(req.Expiration) {
  465. return errExpired
  466. }
  467. if req.Version != Version {
  468. return errBadVersion
  469. }
  470. t.send(from, pongPacket, pong{
  471. To: makeEndpoint(from, req.From.TCP),
  472. ReplyTok: mac,
  473. Expiration: uint64(time.Now().Add(expiration).Unix()),
  474. })
  475. if !t.handleReply(fromID, pingPacket, req) {
  476. // Note: we're ignoring the provided IP address right now
  477. go t.bond(true, fromID, from, req.From.TCP)
  478. }
  479. return nil
  480. }
  481. func (req *pong) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
  482. if expired(req.Expiration) {
  483. return errExpired
  484. }
  485. if !t.handleReply(fromID, pongPacket, req) {
  486. return errUnsolicitedReply
  487. }
  488. return nil
  489. }
  490. func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
  491. if expired(req.Expiration) {
  492. return errExpired
  493. }
  494. if t.db.node(fromID) == nil {
  495. // No bond exists, we don't process the packet. This prevents
  496. // an attack vector where the discovery protocol could be used
  497. // to amplify traffic in a DDOS attack. A malicious actor
  498. // would send a findnode request with the IP address and UDP
  499. // port of the target as the source address. The recipient of
  500. // the findnode packet would then send a neighbors packet
  501. // (which is a much bigger packet than findnode) to the victim.
  502. return errUnknownNode
  503. }
  504. target := crypto.Sha3Hash(req.Target[:])
  505. t.mutex.Lock()
  506. closest := t.closest(target, bucketSize).entries
  507. t.mutex.Unlock()
  508. p := neighbors{Expiration: uint64(time.Now().Add(expiration).Unix())}
  509. // Send neighbors in chunks with at most maxNeighbors per packet
  510. // to stay below the 1280 byte limit.
  511. for i, n := range closest {
  512. p.Nodes = append(p.Nodes, nodeToRPC(n))
  513. if len(p.Nodes) == maxNeighbors || i == len(closest)-1 {
  514. t.send(from, neighborsPacket, p)
  515. p.Nodes = p.Nodes[:0]
  516. }
  517. }
  518. return nil
  519. }
  520. func (req *neighbors) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
  521. if expired(req.Expiration) {
  522. return errExpired
  523. }
  524. if !t.handleReply(fromID, neighborsPacket, req) {
  525. return errUnsolicitedReply
  526. }
  527. return nil
  528. }
  529. func expired(ts uint64) bool {
  530. return time.Unix(int64(ts), 0).Before(time.Now())
  531. }