udp.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package discover
  17. import (
  18. "bytes"
  19. "crypto/ecdsa"
  20. "errors"
  21. "fmt"
  22. "net"
  23. "time"
  24. "github.com/ethereum/go-ethereum/crypto"
  25. "github.com/ethereum/go-ethereum/fdtrack"
  26. "github.com/ethereum/go-ethereum/logger"
  27. "github.com/ethereum/go-ethereum/logger/glog"
  28. "github.com/ethereum/go-ethereum/p2p/nat"
  29. "github.com/ethereum/go-ethereum/rlp"
  30. )
  31. const Version = 4
  32. // Errors
  33. var (
  34. errPacketTooSmall = errors.New("too small")
  35. errBadHash = errors.New("bad hash")
  36. errExpired = errors.New("expired")
  37. errBadVersion = errors.New("version mismatch")
  38. errUnsolicitedReply = errors.New("unsolicited reply")
  39. errUnknownNode = errors.New("unknown node")
  40. errTimeout = errors.New("RPC timeout")
  41. errClosed = errors.New("socket closed")
  42. )
  43. // Timeouts
  44. const (
  45. respTimeout = 500 * time.Millisecond
  46. sendTimeout = 500 * time.Millisecond
  47. expiration = 20 * time.Second
  48. refreshInterval = 1 * time.Hour
  49. )
  50. // RPC packet types
  51. const (
  52. pingPacket = iota + 1 // zero is 'reserved'
  53. pongPacket
  54. findnodePacket
  55. neighborsPacket
  56. )
  57. // RPC request structures
  58. type (
  59. ping struct {
  60. Version uint
  61. From, To rpcEndpoint
  62. Expiration uint64
  63. }
  64. // pong is the reply to ping.
  65. pong struct {
  66. // This field should mirror the UDP envelope address
  67. // of the ping packet, which provides a way to discover the
  68. // the external address (after NAT).
  69. To rpcEndpoint
  70. ReplyTok []byte // This contains the hash of the ping packet.
  71. Expiration uint64 // Absolute timestamp at which the packet becomes invalid.
  72. }
  73. // findnode is a query for nodes close to the given target.
  74. findnode struct {
  75. Target NodeID // doesn't need to be an actual public key
  76. Expiration uint64
  77. }
  78. // reply to findnode
  79. neighbors struct {
  80. Nodes []rpcNode
  81. Expiration uint64
  82. }
  83. rpcNode struct {
  84. IP net.IP // len 4 for IPv4 or 16 for IPv6
  85. UDP uint16 // for discovery protocol
  86. TCP uint16 // for RLPx protocol
  87. ID NodeID
  88. }
  89. rpcEndpoint struct {
  90. IP net.IP // len 4 for IPv4 or 16 for IPv6
  91. UDP uint16 // for discovery protocol
  92. TCP uint16 // for RLPx protocol
  93. }
  94. )
  95. func makeEndpoint(addr *net.UDPAddr, tcpPort uint16) rpcEndpoint {
  96. ip := addr.IP.To4()
  97. if ip == nil {
  98. ip = addr.IP.To16()
  99. }
  100. return rpcEndpoint{IP: ip, UDP: uint16(addr.Port), TCP: tcpPort}
  101. }
  102. func nodeFromRPC(rn rpcNode) (n *Node, valid bool) {
  103. // TODO: don't accept localhost, LAN addresses from internet hosts
  104. // TODO: check public key is on secp256k1 curve
  105. if rn.IP.IsMulticast() || rn.IP.IsUnspecified() || rn.UDP == 0 {
  106. return nil, false
  107. }
  108. return newNode(rn.ID, rn.IP, rn.UDP, rn.TCP), true
  109. }
  110. func nodeToRPC(n *Node) rpcNode {
  111. return rpcNode{ID: n.ID, IP: n.IP, UDP: n.UDP, TCP: n.TCP}
  112. }
  113. type packet interface {
  114. handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error
  115. }
  116. type conn interface {
  117. ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error)
  118. WriteToUDP(b []byte, addr *net.UDPAddr) (n int, err error)
  119. Close() error
  120. LocalAddr() net.Addr
  121. }
  122. // udp implements the RPC protocol.
  123. type udp struct {
  124. conn conn
  125. priv *ecdsa.PrivateKey
  126. ourEndpoint rpcEndpoint
  127. addpending chan *pending
  128. gotreply chan reply
  129. closing chan struct{}
  130. nat nat.Interface
  131. *Table
  132. }
  133. // pending represents a pending reply.
  134. //
  135. // some implementations of the protocol wish to send more than one
  136. // reply packet to findnode. in general, any neighbors packet cannot
  137. // be matched up with a specific findnode packet.
  138. //
  139. // our implementation handles this by storing a callback function for
  140. // each pending reply. incoming packets from a node are dispatched
  141. // to all the callback functions for that node.
  142. type pending struct {
  143. // these fields must match in the reply.
  144. from NodeID
  145. ptype byte
  146. // time when the request must complete
  147. deadline time.Time
  148. // callback is called when a matching reply arrives. if it returns
  149. // true, the callback is removed from the pending reply queue.
  150. // if it returns false, the reply is considered incomplete and
  151. // the callback will be invoked again for the next matching reply.
  152. callback func(resp interface{}) (done bool)
  153. // errc receives nil when the callback indicates completion or an
  154. // error if no further reply is received within the timeout.
  155. errc chan<- error
  156. }
  157. type reply struct {
  158. from NodeID
  159. ptype byte
  160. data interface{}
  161. // loop indicates whether there was
  162. // a matching request by sending on this channel.
  163. matched chan<- bool
  164. }
  165. // ListenUDP returns a new table that listens for UDP packets on laddr.
  166. func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, nodeDBPath string) (*Table, error) {
  167. addr, err := net.ResolveUDPAddr("udp", laddr)
  168. if err != nil {
  169. return nil, err
  170. }
  171. fdtrack.Open("p2p")
  172. conn, err := net.ListenUDP("udp", addr)
  173. if err != nil {
  174. return nil, err
  175. }
  176. tab, _ := newUDP(priv, conn, natm, nodeDBPath)
  177. glog.V(logger.Info).Infoln("Listening,", tab.self)
  178. return tab, nil
  179. }
  180. func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface, nodeDBPath string) (*Table, *udp) {
  181. udp := &udp{
  182. conn: c,
  183. priv: priv,
  184. closing: make(chan struct{}),
  185. gotreply: make(chan reply),
  186. addpending: make(chan *pending),
  187. }
  188. realaddr := c.LocalAddr().(*net.UDPAddr)
  189. if natm != nil {
  190. if !realaddr.IP.IsLoopback() {
  191. go nat.Map(natm, udp.closing, "udp", realaddr.Port, realaddr.Port, "ethereum discovery")
  192. }
  193. // TODO: react to external IP changes over time.
  194. if ext, err := natm.ExternalIP(); err == nil {
  195. realaddr = &net.UDPAddr{IP: ext, Port: realaddr.Port}
  196. }
  197. }
  198. // TODO: separate TCP port
  199. udp.ourEndpoint = makeEndpoint(realaddr, uint16(realaddr.Port))
  200. udp.Table = newTable(udp, PubkeyID(&priv.PublicKey), realaddr, nodeDBPath)
  201. go udp.loop()
  202. go udp.readLoop()
  203. return udp.Table, udp
  204. }
  205. func (t *udp) close() {
  206. close(t.closing)
  207. fdtrack.Close("p2p")
  208. t.conn.Close()
  209. // TODO: wait for the loops to end.
  210. }
  211. // ping sends a ping message to the given node and waits for a reply.
  212. func (t *udp) ping(toid NodeID, toaddr *net.UDPAddr) error {
  213. // TODO: maybe check for ReplyTo field in callback to measure RTT
  214. errc := t.pending(toid, pongPacket, func(interface{}) bool { return true })
  215. t.send(toaddr, pingPacket, ping{
  216. Version: Version,
  217. From: t.ourEndpoint,
  218. To: makeEndpoint(toaddr, 0), // TODO: maybe use known TCP port from DB
  219. Expiration: uint64(time.Now().Add(expiration).Unix()),
  220. })
  221. return <-errc
  222. }
  223. func (t *udp) waitping(from NodeID) error {
  224. return <-t.pending(from, pingPacket, func(interface{}) bool { return true })
  225. }
  226. // findnode sends a findnode request to the given node and waits until
  227. // the node has sent up to k neighbors.
  228. func (t *udp) findnode(toid NodeID, toaddr *net.UDPAddr, target NodeID) ([]*Node, error) {
  229. nodes := make([]*Node, 0, bucketSize)
  230. nreceived := 0
  231. errc := t.pending(toid, neighborsPacket, func(r interface{}) bool {
  232. reply := r.(*neighbors)
  233. for _, rn := range reply.Nodes {
  234. nreceived++
  235. if n, valid := nodeFromRPC(rn); valid {
  236. nodes = append(nodes, n)
  237. }
  238. }
  239. return nreceived >= bucketSize
  240. })
  241. t.send(toaddr, findnodePacket, findnode{
  242. Target: target,
  243. Expiration: uint64(time.Now().Add(expiration).Unix()),
  244. })
  245. err := <-errc
  246. return nodes, err
  247. }
  248. // pending adds a reply callback to the pending reply queue.
  249. // see the documentation of type pending for a detailed explanation.
  250. func (t *udp) pending(id NodeID, ptype byte, callback func(interface{}) bool) <-chan error {
  251. ch := make(chan error, 1)
  252. p := &pending{from: id, ptype: ptype, callback: callback, errc: ch}
  253. select {
  254. case t.addpending <- p:
  255. // loop will handle it
  256. case <-t.closing:
  257. ch <- errClosed
  258. }
  259. return ch
  260. }
  261. func (t *udp) handleReply(from NodeID, ptype byte, req packet) bool {
  262. matched := make(chan bool)
  263. select {
  264. case t.gotreply <- reply{from, ptype, req, matched}:
  265. // loop will handle it
  266. return <-matched
  267. case <-t.closing:
  268. return false
  269. }
  270. }
  271. // loop runs in its own goroutin. it keeps track of
  272. // the refresh timer and the pending reply queue.
  273. func (t *udp) loop() {
  274. var (
  275. pending []*pending
  276. nextDeadline time.Time
  277. timeout = time.NewTimer(0)
  278. refresh = time.NewTicker(refreshInterval)
  279. )
  280. <-timeout.C // ignore first timeout
  281. defer refresh.Stop()
  282. defer timeout.Stop()
  283. rearmTimeout := func() {
  284. now := time.Now()
  285. if len(pending) == 0 || now.Before(nextDeadline) {
  286. return
  287. }
  288. nextDeadline = pending[0].deadline
  289. timeout.Reset(nextDeadline.Sub(now))
  290. }
  291. for {
  292. select {
  293. case <-refresh.C:
  294. go t.refresh()
  295. case <-t.closing:
  296. for _, p := range pending {
  297. p.errc <- errClosed
  298. }
  299. pending = nil
  300. return
  301. case p := <-t.addpending:
  302. p.deadline = time.Now().Add(respTimeout)
  303. pending = append(pending, p)
  304. rearmTimeout()
  305. case r := <-t.gotreply:
  306. var matched bool
  307. for i := 0; i < len(pending); i++ {
  308. if p := pending[i]; p.from == r.from && p.ptype == r.ptype {
  309. matched = true
  310. if p.callback(r.data) {
  311. // callback indicates the request is done, remove it.
  312. p.errc <- nil
  313. copy(pending[i:], pending[i+1:])
  314. pending = pending[:len(pending)-1]
  315. i--
  316. }
  317. }
  318. }
  319. r.matched <- matched
  320. case now := <-timeout.C:
  321. // notify and remove callbacks whose deadline is in the past.
  322. i := 0
  323. for ; i < len(pending) && now.After(pending[i].deadline); i++ {
  324. pending[i].errc <- errTimeout
  325. }
  326. if i > 0 {
  327. copy(pending, pending[i:])
  328. pending = pending[:len(pending)-i]
  329. }
  330. rearmTimeout()
  331. }
  332. }
  333. }
  334. const (
  335. macSize = 256 / 8
  336. sigSize = 520 / 8
  337. headSize = macSize + sigSize // space of packet frame data
  338. )
  339. var (
  340. headSpace = make([]byte, headSize)
  341. // Neighbors responses are sent across multiple packets to
  342. // stay below the 1280 byte limit. We compute the maximum number
  343. // of entries by stuffing a packet until it grows too large.
  344. maxNeighbors int
  345. )
  346. func init() {
  347. p := neighbors{Expiration: ^uint64(0)}
  348. maxSizeNode := rpcNode{IP: make(net.IP, 16), UDP: ^uint16(0), TCP: ^uint16(0)}
  349. for n := 0; ; n++ {
  350. p.Nodes = append(p.Nodes, maxSizeNode)
  351. size, _, err := rlp.EncodeToReader(p)
  352. if err != nil {
  353. // If this ever happens, it will be caught by the unit tests.
  354. panic("cannot encode: " + err.Error())
  355. }
  356. if headSize+size+1 >= 1280 {
  357. maxNeighbors = n
  358. break
  359. }
  360. }
  361. }
  362. func (t *udp) send(toaddr *net.UDPAddr, ptype byte, req interface{}) error {
  363. packet, err := encodePacket(t.priv, ptype, req)
  364. if err != nil {
  365. return err
  366. }
  367. glog.V(logger.Detail).Infof(">>> %v %T\n", toaddr, req)
  368. if _, err = t.conn.WriteToUDP(packet, toaddr); err != nil {
  369. glog.V(logger.Detail).Infoln("UDP send failed:", err)
  370. }
  371. return err
  372. }
  373. func encodePacket(priv *ecdsa.PrivateKey, ptype byte, req interface{}) ([]byte, error) {
  374. b := new(bytes.Buffer)
  375. b.Write(headSpace)
  376. b.WriteByte(ptype)
  377. if err := rlp.Encode(b, req); err != nil {
  378. glog.V(logger.Error).Infoln("error encoding packet:", err)
  379. return nil, err
  380. }
  381. packet := b.Bytes()
  382. sig, err := crypto.Sign(crypto.Sha3(packet[headSize:]), priv)
  383. if err != nil {
  384. glog.V(logger.Error).Infoln("could not sign packet:", err)
  385. return nil, err
  386. }
  387. copy(packet[macSize:], sig)
  388. // add the hash to the front. Note: this doesn't protect the
  389. // packet in any way. Our public key will be part of this hash in
  390. // The future.
  391. copy(packet, crypto.Sha3(packet[macSize:]))
  392. return packet, nil
  393. }
  394. // readLoop runs in its own goroutine. it handles incoming UDP packets.
  395. func (t *udp) readLoop() {
  396. defer t.conn.Close()
  397. // Discovery packets are defined to be no larger than 1280 bytes.
  398. // Packets larger than this size will be cut at the end and treated
  399. // as invalid because their hash won't match.
  400. buf := make([]byte, 1280)
  401. for {
  402. nbytes, from, err := t.conn.ReadFromUDP(buf)
  403. if err != nil {
  404. return
  405. }
  406. t.handlePacket(from, buf[:nbytes])
  407. }
  408. }
  409. func (t *udp) handlePacket(from *net.UDPAddr, buf []byte) error {
  410. packet, fromID, hash, err := decodePacket(buf)
  411. if err != nil {
  412. glog.V(logger.Debug).Infof("Bad packet from %v: %v\n", from, err)
  413. return err
  414. }
  415. status := "ok"
  416. if err = packet.handle(t, from, fromID, hash); err != nil {
  417. status = err.Error()
  418. }
  419. glog.V(logger.Detail).Infof("<<< %v %T: %s\n", from, packet, status)
  420. return err
  421. }
  422. func decodePacket(buf []byte) (packet, NodeID, []byte, error) {
  423. if len(buf) < headSize+1 {
  424. return nil, NodeID{}, nil, errPacketTooSmall
  425. }
  426. hash, sig, sigdata := buf[:macSize], buf[macSize:headSize], buf[headSize:]
  427. shouldhash := crypto.Sha3(buf[macSize:])
  428. if !bytes.Equal(hash, shouldhash) {
  429. return nil, NodeID{}, nil, errBadHash
  430. }
  431. fromID, err := recoverNodeID(crypto.Sha3(buf[headSize:]), sig)
  432. if err != nil {
  433. return nil, NodeID{}, hash, err
  434. }
  435. var req packet
  436. switch ptype := sigdata[0]; ptype {
  437. case pingPacket:
  438. req = new(ping)
  439. case pongPacket:
  440. req = new(pong)
  441. case findnodePacket:
  442. req = new(findnode)
  443. case neighborsPacket:
  444. req = new(neighbors)
  445. default:
  446. return nil, fromID, hash, fmt.Errorf("unknown type: %d", ptype)
  447. }
  448. err = rlp.DecodeBytes(sigdata[1:], req)
  449. return req, fromID, hash, err
  450. }
  451. func (req *ping) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
  452. if expired(req.Expiration) {
  453. return errExpired
  454. }
  455. if req.Version != Version {
  456. return errBadVersion
  457. }
  458. t.send(from, pongPacket, pong{
  459. To: makeEndpoint(from, req.From.TCP),
  460. ReplyTok: mac,
  461. Expiration: uint64(time.Now().Add(expiration).Unix()),
  462. })
  463. if !t.handleReply(fromID, pingPacket, req) {
  464. // Note: we're ignoring the provided IP address right now
  465. go t.bond(true, fromID, from, req.From.TCP)
  466. }
  467. return nil
  468. }
  469. func (req *pong) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
  470. if expired(req.Expiration) {
  471. return errExpired
  472. }
  473. if !t.handleReply(fromID, pongPacket, req) {
  474. return errUnsolicitedReply
  475. }
  476. return nil
  477. }
  478. func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
  479. if expired(req.Expiration) {
  480. return errExpired
  481. }
  482. if t.db.node(fromID) == nil {
  483. // No bond exists, we don't process the packet. This prevents
  484. // an attack vector where the discovery protocol could be used
  485. // to amplify traffic in a DDOS attack. A malicious actor
  486. // would send a findnode request with the IP address and UDP
  487. // port of the target as the source address. The recipient of
  488. // the findnode packet would then send a neighbors packet
  489. // (which is a much bigger packet than findnode) to the victim.
  490. return errUnknownNode
  491. }
  492. target := crypto.Sha3Hash(req.Target[:])
  493. t.mutex.Lock()
  494. closest := t.closest(target, bucketSize).entries
  495. t.mutex.Unlock()
  496. p := neighbors{Expiration: uint64(time.Now().Add(expiration).Unix())}
  497. // Send neighbors in chunks with at most maxNeighbors per packet
  498. // to stay below the 1280 byte limit.
  499. for i, n := range closest {
  500. p.Nodes = append(p.Nodes, nodeToRPC(n))
  501. if len(p.Nodes) == maxNeighbors || i == len(closest)-1 {
  502. t.send(from, neighborsPacket, p)
  503. p.Nodes = p.Nodes[:0]
  504. }
  505. }
  506. return nil
  507. }
  508. func (req *neighbors) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
  509. if expired(req.Expiration) {
  510. return errExpired
  511. }
  512. if !t.handleReply(fromID, neighborsPacket, req) {
  513. return errUnsolicitedReply
  514. }
  515. return nil
  516. }
  517. func expired(ts uint64) bool {
  518. return time.Unix(int64(ts), 0).Before(time.Now())
  519. }