dial.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package p2p
  17. import (
  18. "context"
  19. crand "crypto/rand"
  20. "encoding/binary"
  21. "errors"
  22. "fmt"
  23. mrand "math/rand"
  24. "net"
  25. "sync"
  26. "time"
  27. "github.com/ethereum/go-ethereum/common/mclock"
  28. "github.com/ethereum/go-ethereum/log"
  29. "github.com/ethereum/go-ethereum/p2p/enode"
  30. "github.com/ethereum/go-ethereum/p2p/netutil"
  31. )
  32. const (
  33. // This is the amount of time spent waiting in between redialing a certain node. The
  34. // limit is a bit higher than inboundThrottleTime to prevent failing dials in small
  35. // private networks.
  36. dialHistoryExpiration = inboundThrottleTime + 5*time.Second
  37. // Config for the "Looking for peers" message.
  38. dialStatsLogInterval = 10 * time.Second // printed at most this often
  39. dialStatsPeerLimit = 3 // but not if more than this many dialed peers
  40. // Endpoint resolution is throttled with bounded backoff.
  41. initialResolveDelay = 60 * time.Second
  42. maxResolveDelay = time.Hour
  43. )
  44. // NodeDialer is used to connect to nodes in the network, typically by using
  45. // an underlying net.Dialer but also using net.Pipe in tests.
  46. type NodeDialer interface {
  47. Dial(context.Context, *enode.Node) (net.Conn, error)
  48. }
  49. type nodeResolver interface {
  50. Resolve(*enode.Node) *enode.Node
  51. }
  52. // tcpDialer implements NodeDialer using real TCP connections.
  53. type tcpDialer struct {
  54. d *net.Dialer
  55. }
  56. func (t tcpDialer) Dial(ctx context.Context, dest *enode.Node) (net.Conn, error) {
  57. return t.d.DialContext(ctx, "tcp", nodeAddr(dest).String())
  58. }
  59. func nodeAddr(n *enode.Node) net.Addr {
  60. return &net.TCPAddr{IP: n.IP(), Port: n.TCP()}
  61. }
  62. // checkDial errors:
  63. var (
  64. errSelf = errors.New("is self")
  65. errAlreadyDialing = errors.New("already dialing")
  66. errAlreadyConnected = errors.New("already connected")
  67. errRecentlyDialed = errors.New("recently dialed")
  68. errNotWhitelisted = errors.New("not contained in netrestrict whitelist")
  69. )
  70. // dialer creates outbound connections and submits them into Server.
  71. // Two types of peer connections can be created:
  72. //
  73. // - static dials are pre-configured connections. The dialer attempts
  74. // keep these nodes connected at all times.
  75. //
  76. // - dynamic dials are created from node discovery results. The dialer
  77. // continuously reads candidate nodes from its input iterator and attempts
  78. // to create peer connections to nodes arriving through the iterator.
  79. //
  80. type dialScheduler struct {
  81. dialConfig
  82. setupFunc dialSetupFunc
  83. wg sync.WaitGroup
  84. cancel context.CancelFunc
  85. ctx context.Context
  86. nodesIn chan *enode.Node
  87. doneCh chan *dialTask
  88. addStaticCh chan *enode.Node
  89. remStaticCh chan *enode.Node
  90. addPeerCh chan *conn
  91. remPeerCh chan *conn
  92. // Everything below here belongs to loop and
  93. // should only be accessed by code on the loop goroutine.
  94. dialing map[enode.ID]*dialTask // active tasks
  95. peers map[enode.ID]connFlag // all connected peers
  96. dialPeers int // current number of dialed peers
  97. // The static map tracks all static dial tasks. The subset of usable static dial tasks
  98. // (i.e. those passing checkDial) is kept in staticPool. The scheduler prefers
  99. // launching random static tasks from the pool over launching dynamic dials from the
  100. // iterator.
  101. static map[enode.ID]*dialTask
  102. staticPool []*dialTask
  103. // The dial history keeps recently dialed nodes. Members of history are not dialed.
  104. history expHeap
  105. historyTimer mclock.Timer
  106. historyTimerTime mclock.AbsTime
  107. // for logStats
  108. lastStatsLog mclock.AbsTime
  109. doneSinceLastLog int
  110. }
  111. type dialSetupFunc func(net.Conn, connFlag, *enode.Node) error
  112. type dialConfig struct {
  113. self enode.ID // our own ID
  114. maxDialPeers int // maximum number of dialed peers
  115. maxActiveDials int // maximum number of active dials
  116. netRestrict *netutil.Netlist // IP whitelist, disabled if nil
  117. resolver nodeResolver
  118. dialer NodeDialer
  119. log log.Logger
  120. clock mclock.Clock
  121. rand *mrand.Rand
  122. }
  123. func (cfg dialConfig) withDefaults() dialConfig {
  124. if cfg.maxActiveDials == 0 {
  125. cfg.maxActiveDials = defaultMaxPendingPeers
  126. }
  127. if cfg.log == nil {
  128. cfg.log = log.Root()
  129. }
  130. if cfg.clock == nil {
  131. cfg.clock = mclock.System{}
  132. }
  133. if cfg.rand == nil {
  134. seedb := make([]byte, 8)
  135. crand.Read(seedb)
  136. seed := int64(binary.BigEndian.Uint64(seedb))
  137. cfg.rand = mrand.New(mrand.NewSource(seed))
  138. }
  139. return cfg
  140. }
  141. func newDialScheduler(config dialConfig, it enode.Iterator, setupFunc dialSetupFunc) *dialScheduler {
  142. d := &dialScheduler{
  143. dialConfig: config.withDefaults(),
  144. setupFunc: setupFunc,
  145. dialing: make(map[enode.ID]*dialTask),
  146. static: make(map[enode.ID]*dialTask),
  147. peers: make(map[enode.ID]connFlag),
  148. doneCh: make(chan *dialTask),
  149. nodesIn: make(chan *enode.Node),
  150. addStaticCh: make(chan *enode.Node),
  151. remStaticCh: make(chan *enode.Node),
  152. addPeerCh: make(chan *conn),
  153. remPeerCh: make(chan *conn),
  154. }
  155. d.lastStatsLog = d.clock.Now()
  156. d.ctx, d.cancel = context.WithCancel(context.Background())
  157. d.wg.Add(2)
  158. go d.readNodes(it)
  159. go d.loop(it)
  160. return d
  161. }
  162. // stop shuts down the dialer, canceling all current dial tasks.
  163. func (d *dialScheduler) stop() {
  164. d.cancel()
  165. d.wg.Wait()
  166. }
  167. // addStatic adds a static dial candidate.
  168. func (d *dialScheduler) addStatic(n *enode.Node) {
  169. select {
  170. case d.addStaticCh <- n:
  171. case <-d.ctx.Done():
  172. }
  173. }
  174. // removeStatic removes a static dial candidate.
  175. func (d *dialScheduler) removeStatic(n *enode.Node) {
  176. select {
  177. case d.remStaticCh <- n:
  178. case <-d.ctx.Done():
  179. }
  180. }
  181. // peerAdded updates the peer set.
  182. func (d *dialScheduler) peerAdded(c *conn) {
  183. select {
  184. case d.addPeerCh <- c:
  185. case <-d.ctx.Done():
  186. }
  187. }
  188. // peerRemoved updates the peer set.
  189. func (d *dialScheduler) peerRemoved(c *conn) {
  190. select {
  191. case d.remPeerCh <- c:
  192. case <-d.ctx.Done():
  193. }
  194. }
  195. // loop is the main loop of the dialer.
  196. func (d *dialScheduler) loop(it enode.Iterator) {
  197. var (
  198. nodesCh chan *enode.Node
  199. historyExp = make(chan struct{}, 1)
  200. )
  201. loop:
  202. for {
  203. // Launch new dials if slots are available.
  204. slots := d.freeDialSlots()
  205. slots -= d.startStaticDials(slots)
  206. if slots > 0 {
  207. nodesCh = d.nodesIn
  208. } else {
  209. nodesCh = nil
  210. }
  211. d.rearmHistoryTimer(historyExp)
  212. d.logStats()
  213. select {
  214. case node := <-nodesCh:
  215. if err := d.checkDial(node); err != nil {
  216. d.log.Trace("Discarding dial candidate", "id", node.ID(), "ip", node.IP(), "reason", err)
  217. } else {
  218. d.startDial(newDialTask(node, dynDialedConn))
  219. }
  220. case task := <-d.doneCh:
  221. id := task.dest.ID()
  222. delete(d.dialing, id)
  223. d.updateStaticPool(id)
  224. d.doneSinceLastLog++
  225. case c := <-d.addPeerCh:
  226. if c.is(dynDialedConn) || c.is(staticDialedConn) {
  227. d.dialPeers++
  228. }
  229. id := c.node.ID()
  230. d.peers[id] = c.flags
  231. // Remove from static pool because the node is now connected.
  232. task := d.static[id]
  233. if task != nil && task.staticPoolIndex >= 0 {
  234. d.removeFromStaticPool(task.staticPoolIndex)
  235. }
  236. // TODO: cancel dials to connected peers
  237. case c := <-d.remPeerCh:
  238. if c.is(dynDialedConn) || c.is(staticDialedConn) {
  239. d.dialPeers--
  240. }
  241. delete(d.peers, c.node.ID())
  242. d.updateStaticPool(c.node.ID())
  243. case node := <-d.addStaticCh:
  244. id := node.ID()
  245. _, exists := d.static[id]
  246. d.log.Trace("Adding static node", "id", id, "ip", node.IP(), "added", !exists)
  247. if exists {
  248. continue loop
  249. }
  250. task := newDialTask(node, staticDialedConn)
  251. d.static[id] = task
  252. if d.checkDial(node) == nil {
  253. d.addToStaticPool(task)
  254. }
  255. case node := <-d.remStaticCh:
  256. id := node.ID()
  257. task := d.static[id]
  258. d.log.Trace("Removing static node", "id", id, "ok", task != nil)
  259. if task != nil {
  260. delete(d.static, id)
  261. if task.staticPoolIndex >= 0 {
  262. d.removeFromStaticPool(task.staticPoolIndex)
  263. }
  264. }
  265. case <-historyExp:
  266. d.expireHistory()
  267. case <-d.ctx.Done():
  268. it.Close()
  269. break loop
  270. }
  271. }
  272. d.stopHistoryTimer(historyExp)
  273. for range d.dialing {
  274. <-d.doneCh
  275. }
  276. d.wg.Done()
  277. }
  278. // readNodes runs in its own goroutine and delivers nodes from
  279. // the input iterator to the nodesIn channel.
  280. func (d *dialScheduler) readNodes(it enode.Iterator) {
  281. defer d.wg.Done()
  282. for it.Next() {
  283. select {
  284. case d.nodesIn <- it.Node():
  285. case <-d.ctx.Done():
  286. }
  287. }
  288. }
  289. // logStats prints dialer statistics to the log. The message is suppressed when enough
  290. // peers are connected because users should only see it while their client is starting up
  291. // or comes back online.
  292. func (d *dialScheduler) logStats() {
  293. now := d.clock.Now()
  294. if d.lastStatsLog.Add(dialStatsLogInterval) > now {
  295. return
  296. }
  297. if d.dialPeers < dialStatsPeerLimit && d.dialPeers < d.maxDialPeers {
  298. d.log.Info("Looking for peers", "peercount", len(d.peers), "tried", d.doneSinceLastLog, "static", len(d.static))
  299. }
  300. d.doneSinceLastLog = 0
  301. d.lastStatsLog = now
  302. }
  303. // rearmHistoryTimer configures d.historyTimer to fire when the
  304. // next item in d.history expires.
  305. func (d *dialScheduler) rearmHistoryTimer(ch chan struct{}) {
  306. if len(d.history) == 0 || d.historyTimerTime == d.history.nextExpiry() {
  307. return
  308. }
  309. d.stopHistoryTimer(ch)
  310. d.historyTimerTime = d.history.nextExpiry()
  311. timeout := time.Duration(d.historyTimerTime - d.clock.Now())
  312. d.historyTimer = d.clock.AfterFunc(timeout, func() { ch <- struct{}{} })
  313. }
  314. // stopHistoryTimer stops the timer and drains the channel it sends on.
  315. func (d *dialScheduler) stopHistoryTimer(ch chan struct{}) {
  316. if d.historyTimer != nil && !d.historyTimer.Stop() {
  317. <-ch
  318. }
  319. }
  320. // expireHistory removes expired items from d.history.
  321. func (d *dialScheduler) expireHistory() {
  322. d.historyTimer.Stop()
  323. d.historyTimer = nil
  324. d.historyTimerTime = 0
  325. d.history.expire(d.clock.Now(), func(hkey string) {
  326. var id enode.ID
  327. copy(id[:], hkey)
  328. d.updateStaticPool(id)
  329. })
  330. }
  331. // freeDialSlots returns the number of free dial slots. The result can be negative
  332. // when peers are connected while their task is still running.
  333. func (d *dialScheduler) freeDialSlots() int {
  334. slots := (d.maxDialPeers - d.dialPeers) * 2
  335. if slots > d.maxActiveDials {
  336. slots = d.maxActiveDials
  337. }
  338. free := slots - len(d.dialing)
  339. return free
  340. }
  341. // checkDial returns an error if node n should not be dialed.
  342. func (d *dialScheduler) checkDial(n *enode.Node) error {
  343. if n.ID() == d.self {
  344. return errSelf
  345. }
  346. if _, ok := d.dialing[n.ID()]; ok {
  347. return errAlreadyDialing
  348. }
  349. if _, ok := d.peers[n.ID()]; ok {
  350. return errAlreadyConnected
  351. }
  352. if d.netRestrict != nil && !d.netRestrict.Contains(n.IP()) {
  353. return errNotWhitelisted
  354. }
  355. if d.history.contains(string(n.ID().Bytes())) {
  356. return errRecentlyDialed
  357. }
  358. return nil
  359. }
  360. // startStaticDials starts n static dial tasks.
  361. func (d *dialScheduler) startStaticDials(n int) (started int) {
  362. for started = 0; started < n && len(d.staticPool) > 0; started++ {
  363. idx := d.rand.Intn(len(d.staticPool))
  364. task := d.staticPool[idx]
  365. d.startDial(task)
  366. d.removeFromStaticPool(idx)
  367. }
  368. return started
  369. }
  370. // updateStaticPool attempts to move the given static dial back into staticPool.
  371. func (d *dialScheduler) updateStaticPool(id enode.ID) {
  372. task, ok := d.static[id]
  373. if ok && task.staticPoolIndex < 0 && d.checkDial(task.dest) == nil {
  374. d.addToStaticPool(task)
  375. }
  376. }
  377. func (d *dialScheduler) addToStaticPool(task *dialTask) {
  378. if task.staticPoolIndex >= 0 {
  379. panic("attempt to add task to staticPool twice")
  380. }
  381. d.staticPool = append(d.staticPool, task)
  382. task.staticPoolIndex = len(d.staticPool) - 1
  383. }
  384. // removeFromStaticPool removes the task at idx from staticPool. It does that by moving the
  385. // current last element of the pool to idx and then shortening the pool by one.
  386. func (d *dialScheduler) removeFromStaticPool(idx int) {
  387. task := d.staticPool[idx]
  388. end := len(d.staticPool) - 1
  389. d.staticPool[idx] = d.staticPool[end]
  390. d.staticPool[idx].staticPoolIndex = idx
  391. d.staticPool[end] = nil
  392. d.staticPool = d.staticPool[:end]
  393. task.staticPoolIndex = -1
  394. }
  395. // startDial runs the given dial task in a separate goroutine.
  396. func (d *dialScheduler) startDial(task *dialTask) {
  397. d.log.Trace("Starting p2p dial", "id", task.dest.ID(), "ip", task.dest.IP(), "flag", task.flags)
  398. hkey := string(task.dest.ID().Bytes())
  399. d.history.add(hkey, d.clock.Now().Add(dialHistoryExpiration))
  400. d.dialing[task.dest.ID()] = task
  401. go func() {
  402. task.run(d)
  403. d.doneCh <- task
  404. }()
  405. }
  406. // A dialTask generated for each node that is dialed.
  407. type dialTask struct {
  408. staticPoolIndex int
  409. flags connFlag
  410. // These fields are private to the task and should not be
  411. // accessed by dialScheduler while the task is running.
  412. dest *enode.Node
  413. lastResolved mclock.AbsTime
  414. resolveDelay time.Duration
  415. }
  416. func newDialTask(dest *enode.Node, flags connFlag) *dialTask {
  417. return &dialTask{dest: dest, flags: flags, staticPoolIndex: -1}
  418. }
  419. type dialError struct {
  420. error
  421. }
  422. func (t *dialTask) run(d *dialScheduler) {
  423. if t.dest.Incomplete() {
  424. if !t.resolve(d) {
  425. return
  426. }
  427. }
  428. err := t.dial(d, t.dest)
  429. if err != nil {
  430. // Try resolving the ID of static nodes if dialing failed.
  431. if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 {
  432. if t.resolve(d) {
  433. t.dial(d, t.dest)
  434. }
  435. }
  436. }
  437. }
  438. // resolve attempts to find the current endpoint for the destination
  439. // using discovery.
  440. //
  441. // Resolve operations are throttled with backoff to avoid flooding the
  442. // discovery network with useless queries for nodes that don't exist.
  443. // The backoff delay resets when the node is found.
  444. func (t *dialTask) resolve(d *dialScheduler) bool {
  445. if d.resolver == nil {
  446. return false
  447. }
  448. if t.resolveDelay == 0 {
  449. t.resolveDelay = initialResolveDelay
  450. }
  451. if t.lastResolved > 0 && time.Duration(d.clock.Now()-t.lastResolved) < t.resolveDelay {
  452. return false
  453. }
  454. resolved := d.resolver.Resolve(t.dest)
  455. t.lastResolved = d.clock.Now()
  456. if resolved == nil {
  457. t.resolveDelay *= 2
  458. if t.resolveDelay > maxResolveDelay {
  459. t.resolveDelay = maxResolveDelay
  460. }
  461. d.log.Debug("Resolving node failed", "id", t.dest.ID(), "newdelay", t.resolveDelay)
  462. return false
  463. }
  464. // The node was found.
  465. t.resolveDelay = initialResolveDelay
  466. t.dest = resolved
  467. d.log.Debug("Resolved node", "id", t.dest.ID(), "addr", &net.TCPAddr{IP: t.dest.IP(), Port: t.dest.TCP()})
  468. return true
  469. }
  470. // dial performs the actual connection attempt.
  471. func (t *dialTask) dial(d *dialScheduler, dest *enode.Node) error {
  472. fd, err := d.dialer.Dial(d.ctx, t.dest)
  473. if err != nil {
  474. d.log.Trace("Dial error", "id", t.dest.ID(), "addr", nodeAddr(t.dest), "conn", t.flags, "err", cleanupDialErr(err))
  475. return &dialError{err}
  476. }
  477. mfd := newMeteredConn(fd, false, &net.TCPAddr{IP: dest.IP(), Port: dest.TCP()})
  478. return d.setupFunc(mfd, t.flags, dest)
  479. }
  480. func (t *dialTask) String() string {
  481. id := t.dest.ID()
  482. return fmt.Sprintf("%v %x %v:%d", t.flags, id[:8], t.dest.IP(), t.dest.TCP())
  483. }
  484. func cleanupDialErr(err error) error {
  485. if netErr, ok := err.(*net.OpError); ok && netErr.Op == "dial" {
  486. return netErr.Err
  487. }
  488. return err
  489. }