dial.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package p2p
  17. import (
  18. "context"
  19. crand "crypto/rand"
  20. "encoding/binary"
  21. "errors"
  22. "fmt"
  23. mrand "math/rand"
  24. "net"
  25. "sync"
  26. "time"
  27. "github.com/ethereum/go-ethereum/common/gopool"
  28. "github.com/ethereum/go-ethereum/common/mclock"
  29. "github.com/ethereum/go-ethereum/log"
  30. "github.com/ethereum/go-ethereum/p2p/enode"
  31. "github.com/ethereum/go-ethereum/p2p/netutil"
  32. )
  33. const (
  34. // This is the amount of time spent waiting in between redialing a certain node. The
  35. // limit is a bit higher than inboundThrottleTime to prevent failing dials in small
  36. // private networks.
  37. dialHistoryExpiration = inboundThrottleTime + 5*time.Second
  38. // Config for the "Looking for peers" message.
  39. dialStatsLogInterval = 10 * time.Second // printed at most this often
  40. dialStatsPeerLimit = 3 // but not if more than this many dialed peers
  41. // Endpoint resolution is throttled with bounded backoff.
  42. initialResolveDelay = 60 * time.Second
  43. maxResolveDelay = time.Hour
  44. )
  45. // NodeDialer is used to connect to nodes in the network, typically by using
  46. // an underlying net.Dialer but also using net.Pipe in tests.
  47. type NodeDialer interface {
  48. Dial(context.Context, *enode.Node) (net.Conn, error)
  49. }
  50. type nodeResolver interface {
  51. Resolve(*enode.Node) *enode.Node
  52. }
  53. // tcpDialer implements NodeDialer using real TCP connections.
  54. type tcpDialer struct {
  55. d *net.Dialer
  56. }
  57. func (t tcpDialer) Dial(ctx context.Context, dest *enode.Node) (net.Conn, error) {
  58. return t.d.DialContext(ctx, "tcp", nodeAddr(dest).String())
  59. }
  60. func nodeAddr(n *enode.Node) net.Addr {
  61. return &net.TCPAddr{IP: n.IP(), Port: n.TCP()}
  62. }
  63. // checkDial errors:
  64. var (
  65. errSelf = errors.New("is self")
  66. errAlreadyDialing = errors.New("already dialing")
  67. errAlreadyConnected = errors.New("already connected")
  68. errRecentlyDialed = errors.New("recently dialed")
  69. errNotWhitelisted = errors.New("not contained in netrestrict whitelist")
  70. errNoPort = errors.New("node does not provide TCP port")
  71. )
  72. // dialer creates outbound connections and submits them into Server.
  73. // Two types of peer connections can be created:
  74. //
  75. // - static dials are pre-configured connections. The dialer attempts
  76. // keep these nodes connected at all times.
  77. //
  78. // - dynamic dials are created from node discovery results. The dialer
  79. // continuously reads candidate nodes from its input iterator and attempts
  80. // to create peer connections to nodes arriving through the iterator.
  81. //
  82. type dialScheduler struct {
  83. dialConfig
  84. setupFunc dialSetupFunc
  85. wg sync.WaitGroup
  86. cancel context.CancelFunc
  87. ctx context.Context
  88. nodesIn chan *enode.Node
  89. doneCh chan *dialTask
  90. addStaticCh chan *enode.Node
  91. remStaticCh chan *enode.Node
  92. addPeerCh chan *conn
  93. remPeerCh chan *conn
  94. // Everything below here belongs to loop and
  95. // should only be accessed by code on the loop goroutine.
  96. dialing map[enode.ID]*dialTask // active tasks
  97. peers map[enode.ID]connFlag // all connected peers
  98. dialPeers int // current number of dialed peers
  99. // The static map tracks all static dial tasks. The subset of usable static dial tasks
  100. // (i.e. those passing checkDial) is kept in staticPool. The scheduler prefers
  101. // launching random static tasks from the pool over launching dynamic dials from the
  102. // iterator.
  103. static map[enode.ID]*dialTask
  104. staticPool []*dialTask
  105. // The dial history keeps recently dialed nodes. Members of history are not dialed.
  106. history expHeap
  107. historyTimer mclock.Timer
  108. historyTimerTime mclock.AbsTime
  109. // for logStats
  110. lastStatsLog mclock.AbsTime
  111. doneSinceLastLog int
  112. }
  113. type dialSetupFunc func(net.Conn, connFlag, *enode.Node) error
  114. type dialConfig struct {
  115. self enode.ID // our own ID
  116. maxDialPeers int // maximum number of dialed peers
  117. maxActiveDials int // maximum number of active dials
  118. netRestrict *netutil.Netlist // IP whitelist, disabled if nil
  119. resolver nodeResolver
  120. dialer NodeDialer
  121. log log.Logger
  122. clock mclock.Clock
  123. rand *mrand.Rand
  124. }
  125. func (cfg dialConfig) withDefaults() dialConfig {
  126. if cfg.maxActiveDials == 0 {
  127. cfg.maxActiveDials = defaultMaxPendingPeers
  128. }
  129. if cfg.log == nil {
  130. cfg.log = log.Root()
  131. }
  132. if cfg.clock == nil {
  133. cfg.clock = mclock.System{}
  134. }
  135. if cfg.rand == nil {
  136. seedb := make([]byte, 8)
  137. crand.Read(seedb)
  138. seed := int64(binary.BigEndian.Uint64(seedb))
  139. cfg.rand = mrand.New(mrand.NewSource(seed))
  140. }
  141. return cfg
  142. }
  143. func newDialScheduler(config dialConfig, it enode.Iterator, setupFunc dialSetupFunc) *dialScheduler {
  144. d := &dialScheduler{
  145. dialConfig: config.withDefaults(),
  146. setupFunc: setupFunc,
  147. dialing: make(map[enode.ID]*dialTask),
  148. static: make(map[enode.ID]*dialTask),
  149. peers: make(map[enode.ID]connFlag),
  150. doneCh: make(chan *dialTask),
  151. nodesIn: make(chan *enode.Node),
  152. addStaticCh: make(chan *enode.Node),
  153. remStaticCh: make(chan *enode.Node),
  154. addPeerCh: make(chan *conn),
  155. remPeerCh: make(chan *conn),
  156. }
  157. d.lastStatsLog = d.clock.Now()
  158. d.ctx, d.cancel = context.WithCancel(context.Background())
  159. d.wg.Add(2)
  160. gopool.Submit(func() {
  161. d.readNodes(it)
  162. })
  163. gopool.Submit(
  164. func() {
  165. d.loop(it)
  166. })
  167. return d
  168. }
  169. // stop shuts down the dialer, canceling all current dial tasks.
  170. func (d *dialScheduler) stop() {
  171. d.cancel()
  172. d.wg.Wait()
  173. }
  174. // addStatic adds a static dial candidate.
  175. func (d *dialScheduler) addStatic(n *enode.Node) {
  176. select {
  177. case d.addStaticCh <- n:
  178. case <-d.ctx.Done():
  179. }
  180. }
  181. // removeStatic removes a static dial candidate.
  182. func (d *dialScheduler) removeStatic(n *enode.Node) {
  183. select {
  184. case d.remStaticCh <- n:
  185. case <-d.ctx.Done():
  186. }
  187. }
  188. // peerAdded updates the peer set.
  189. func (d *dialScheduler) peerAdded(c *conn) {
  190. select {
  191. case d.addPeerCh <- c:
  192. case <-d.ctx.Done():
  193. }
  194. }
  195. // peerRemoved updates the peer set.
  196. func (d *dialScheduler) peerRemoved(c *conn) {
  197. select {
  198. case d.remPeerCh <- c:
  199. case <-d.ctx.Done():
  200. }
  201. }
  202. // loop is the main loop of the dialer.
  203. func (d *dialScheduler) loop(it enode.Iterator) {
  204. var (
  205. nodesCh chan *enode.Node
  206. historyExp = make(chan struct{}, 1)
  207. )
  208. loop:
  209. for {
  210. // Launch new dials if slots are available.
  211. slots := d.freeDialSlots()
  212. slots -= d.startStaticDials(slots)
  213. if slots > 0 {
  214. nodesCh = d.nodesIn
  215. } else {
  216. nodesCh = nil
  217. }
  218. d.rearmHistoryTimer(historyExp)
  219. d.logStats()
  220. select {
  221. case node := <-nodesCh:
  222. if err := d.checkDial(node); err != nil {
  223. d.log.Trace("Discarding dial candidate", "id", node.ID(), "ip", node.IP(), "reason", err)
  224. } else {
  225. d.startDial(newDialTask(node, dynDialedConn))
  226. }
  227. case task := <-d.doneCh:
  228. id := task.dest.ID()
  229. delete(d.dialing, id)
  230. d.updateStaticPool(id)
  231. d.doneSinceLastLog++
  232. case c := <-d.addPeerCh:
  233. if c.is(dynDialedConn) || c.is(staticDialedConn) {
  234. d.dialPeers++
  235. }
  236. id := c.node.ID()
  237. d.peers[id] = c.flags
  238. // Remove from static pool because the node is now connected.
  239. task := d.static[id]
  240. if task != nil && task.staticPoolIndex >= 0 {
  241. d.removeFromStaticPool(task.staticPoolIndex)
  242. }
  243. // TODO: cancel dials to connected peers
  244. case c := <-d.remPeerCh:
  245. if c.is(dynDialedConn) || c.is(staticDialedConn) {
  246. d.dialPeers--
  247. }
  248. delete(d.peers, c.node.ID())
  249. d.updateStaticPool(c.node.ID())
  250. case node := <-d.addStaticCh:
  251. id := node.ID()
  252. _, exists := d.static[id]
  253. d.log.Trace("Adding static node", "id", id, "ip", node.IP(), "added", !exists)
  254. if exists {
  255. continue loop
  256. }
  257. task := newDialTask(node, staticDialedConn)
  258. d.static[id] = task
  259. if d.checkDial(node) == nil {
  260. d.addToStaticPool(task)
  261. }
  262. case node := <-d.remStaticCh:
  263. id := node.ID()
  264. task := d.static[id]
  265. d.log.Trace("Removing static node", "id", id, "ok", task != nil)
  266. if task != nil {
  267. delete(d.static, id)
  268. if task.staticPoolIndex >= 0 {
  269. d.removeFromStaticPool(task.staticPoolIndex)
  270. }
  271. }
  272. case <-historyExp:
  273. d.expireHistory()
  274. case <-d.ctx.Done():
  275. it.Close()
  276. break loop
  277. }
  278. }
  279. d.stopHistoryTimer(historyExp)
  280. for range d.dialing {
  281. <-d.doneCh
  282. }
  283. d.wg.Done()
  284. }
  285. // readNodes runs in its own goroutine and delivers nodes from
  286. // the input iterator to the nodesIn channel.
  287. func (d *dialScheduler) readNodes(it enode.Iterator) {
  288. defer d.wg.Done()
  289. for it.Next() {
  290. select {
  291. case d.nodesIn <- it.Node():
  292. case <-d.ctx.Done():
  293. }
  294. }
  295. }
  296. // logStats prints dialer statistics to the log. The message is suppressed when enough
  297. // peers are connected because users should only see it while their client is starting up
  298. // or comes back online.
  299. func (d *dialScheduler) logStats() {
  300. now := d.clock.Now()
  301. if d.lastStatsLog.Add(dialStatsLogInterval) > now {
  302. return
  303. }
  304. if d.dialPeers < dialStatsPeerLimit && d.dialPeers < d.maxDialPeers {
  305. d.log.Info("Looking for peers", "peercount", len(d.peers), "tried", d.doneSinceLastLog, "static", len(d.static))
  306. }
  307. d.doneSinceLastLog = 0
  308. d.lastStatsLog = now
  309. }
  310. // rearmHistoryTimer configures d.historyTimer to fire when the
  311. // next item in d.history expires.
  312. func (d *dialScheduler) rearmHistoryTimer(ch chan struct{}) {
  313. if len(d.history) == 0 || d.historyTimerTime == d.history.nextExpiry() {
  314. return
  315. }
  316. d.stopHistoryTimer(ch)
  317. d.historyTimerTime = d.history.nextExpiry()
  318. timeout := time.Duration(d.historyTimerTime - d.clock.Now())
  319. d.historyTimer = d.clock.AfterFunc(timeout, func() { ch <- struct{}{} })
  320. }
  321. // stopHistoryTimer stops the timer and drains the channel it sends on.
  322. func (d *dialScheduler) stopHistoryTimer(ch chan struct{}) {
  323. if d.historyTimer != nil && !d.historyTimer.Stop() {
  324. <-ch
  325. }
  326. }
  327. // expireHistory removes expired items from d.history.
  328. func (d *dialScheduler) expireHistory() {
  329. d.historyTimer.Stop()
  330. d.historyTimer = nil
  331. d.historyTimerTime = 0
  332. d.history.expire(d.clock.Now(), func(hkey string) {
  333. var id enode.ID
  334. copy(id[:], hkey)
  335. d.updateStaticPool(id)
  336. })
  337. }
  338. // freeDialSlots returns the number of free dial slots. The result can be negative
  339. // when peers are connected while their task is still running.
  340. func (d *dialScheduler) freeDialSlots() int {
  341. slots := (d.maxDialPeers - d.dialPeers) * 2
  342. if slots > d.maxActiveDials {
  343. slots = d.maxActiveDials
  344. }
  345. free := slots - len(d.dialing)
  346. return free
  347. }
  348. // checkDial returns an error if node n should not be dialed.
  349. func (d *dialScheduler) checkDial(n *enode.Node) error {
  350. if n.ID() == d.self {
  351. return errSelf
  352. }
  353. if n.IP() != nil && n.TCP() == 0 {
  354. // This check can trigger if a non-TCP node is found
  355. // by discovery. If there is no IP, the node is a static
  356. // node and the actual endpoint will be resolved later in dialTask.
  357. return errNoPort
  358. }
  359. if _, ok := d.dialing[n.ID()]; ok {
  360. return errAlreadyDialing
  361. }
  362. if _, ok := d.peers[n.ID()]; ok {
  363. return errAlreadyConnected
  364. }
  365. if d.netRestrict != nil && !d.netRestrict.Contains(n.IP()) {
  366. return errNotWhitelisted
  367. }
  368. if d.history.contains(string(n.ID().Bytes())) {
  369. return errRecentlyDialed
  370. }
  371. return nil
  372. }
  373. // startStaticDials starts n static dial tasks.
  374. func (d *dialScheduler) startStaticDials(n int) (started int) {
  375. for started = 0; started < n && len(d.staticPool) > 0; started++ {
  376. idx := d.rand.Intn(len(d.staticPool))
  377. task := d.staticPool[idx]
  378. d.startDial(task)
  379. d.removeFromStaticPool(idx)
  380. }
  381. return started
  382. }
  383. // updateStaticPool attempts to move the given static dial back into staticPool.
  384. func (d *dialScheduler) updateStaticPool(id enode.ID) {
  385. task, ok := d.static[id]
  386. if ok && task.staticPoolIndex < 0 && d.checkDial(task.dest) == nil {
  387. d.addToStaticPool(task)
  388. }
  389. }
  390. func (d *dialScheduler) addToStaticPool(task *dialTask) {
  391. if task.staticPoolIndex >= 0 {
  392. panic("attempt to add task to staticPool twice")
  393. }
  394. d.staticPool = append(d.staticPool, task)
  395. task.staticPoolIndex = len(d.staticPool) - 1
  396. }
  397. // removeFromStaticPool removes the task at idx from staticPool. It does that by moving the
  398. // current last element of the pool to idx and then shortening the pool by one.
  399. func (d *dialScheduler) removeFromStaticPool(idx int) {
  400. task := d.staticPool[idx]
  401. end := len(d.staticPool) - 1
  402. d.staticPool[idx] = d.staticPool[end]
  403. d.staticPool[idx].staticPoolIndex = idx
  404. d.staticPool[end] = nil
  405. d.staticPool = d.staticPool[:end]
  406. task.staticPoolIndex = -1
  407. }
  408. // startDial runs the given dial task in a separate goroutine.
  409. func (d *dialScheduler) startDial(task *dialTask) {
  410. d.log.Trace("Starting p2p dial", "id", task.dest.ID(), "ip", task.dest.IP(), "flag", task.flags)
  411. hkey := string(task.dest.ID().Bytes())
  412. d.history.add(hkey, d.clock.Now().Add(dialHistoryExpiration))
  413. d.dialing[task.dest.ID()] = task
  414. gopool.Submit(func() {
  415. task.run(d)
  416. d.doneCh <- task
  417. })
  418. }
  419. // A dialTask generated for each node that is dialed.
  420. type dialTask struct {
  421. staticPoolIndex int
  422. flags connFlag
  423. // These fields are private to the task and should not be
  424. // accessed by dialScheduler while the task is running.
  425. dest *enode.Node
  426. lastResolved mclock.AbsTime
  427. resolveDelay time.Duration
  428. }
  429. func newDialTask(dest *enode.Node, flags connFlag) *dialTask {
  430. return &dialTask{dest: dest, flags: flags, staticPoolIndex: -1}
  431. }
  432. type dialError struct {
  433. error
  434. }
  435. func (t *dialTask) run(d *dialScheduler) {
  436. if t.needResolve() && !t.resolve(d) {
  437. return
  438. }
  439. err := t.dial(d, t.dest)
  440. if err != nil {
  441. // For static nodes, resolve one more time if dialing fails.
  442. if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 {
  443. if t.resolve(d) {
  444. t.dial(d, t.dest)
  445. }
  446. }
  447. }
  448. }
  449. func (t *dialTask) needResolve() bool {
  450. return t.flags&staticDialedConn != 0 && t.dest.IP() == nil
  451. }
  452. // resolve attempts to find the current endpoint for the destination
  453. // using discovery.
  454. //
  455. // Resolve operations are throttled with backoff to avoid flooding the
  456. // discovery network with useless queries for nodes that don't exist.
  457. // The backoff delay resets when the node is found.
  458. func (t *dialTask) resolve(d *dialScheduler) bool {
  459. if d.resolver == nil {
  460. return false
  461. }
  462. if t.resolveDelay == 0 {
  463. t.resolveDelay = initialResolveDelay
  464. }
  465. if t.lastResolved > 0 && time.Duration(d.clock.Now()-t.lastResolved) < t.resolveDelay {
  466. return false
  467. }
  468. resolved := d.resolver.Resolve(t.dest)
  469. t.lastResolved = d.clock.Now()
  470. if resolved == nil {
  471. t.resolveDelay *= 2
  472. if t.resolveDelay > maxResolveDelay {
  473. t.resolveDelay = maxResolveDelay
  474. }
  475. d.log.Debug("Resolving node failed", "id", t.dest.ID(), "newdelay", t.resolveDelay)
  476. return false
  477. }
  478. // The node was found.
  479. t.resolveDelay = initialResolveDelay
  480. t.dest = resolved
  481. d.log.Debug("Resolved node", "id", t.dest.ID(), "addr", &net.TCPAddr{IP: t.dest.IP(), Port: t.dest.TCP()})
  482. return true
  483. }
  484. // dial performs the actual connection attempt.
  485. func (t *dialTask) dial(d *dialScheduler, dest *enode.Node) error {
  486. fd, err := d.dialer.Dial(d.ctx, t.dest)
  487. if err != nil {
  488. d.log.Trace("Dial error", "id", t.dest.ID(), "addr", nodeAddr(t.dest), "conn", t.flags, "err", cleanupDialErr(err))
  489. return &dialError{err}
  490. }
  491. mfd := newMeteredConn(fd, false, &net.TCPAddr{IP: dest.IP(), Port: dest.TCP()})
  492. return d.setupFunc(mfd, t.flags, dest)
  493. }
  494. func (t *dialTask) String() string {
  495. id := t.dest.ID()
  496. return fmt.Sprintf("%v %x %v:%d", t.flags, id[:8], t.dest.IP(), t.dest.TCP())
  497. }
  498. func cleanupDialErr(err error) error {
  499. if netErr, ok := err.(*net.OpError); ok && netErr.Op == "dial" {
  500. return netErr.Err
  501. }
  502. return err
  503. }