dial.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package p2p
  17. import (
  18. "context"
  19. crand "crypto/rand"
  20. "encoding/binary"
  21. "errors"
  22. "fmt"
  23. mrand "math/rand"
  24. "net"
  25. "sync"
  26. "time"
  27. "blockchain-go/common/gopool"
  28. "blockchain-go/common/mclock"
  29. "blockchain-go/log"
  30. "blockchain-go/p2p/enode"
  31. "blockchain-go/p2p/netutil"
  32. )
  33. const (
  34. // This is the amount of time spent waiting in between redialing a certain node. The
  35. // limit is a bit higher than inboundThrottleTime to prevent failing dials in small
  36. // private networks.
  37. dialHistoryExpiration = inboundThrottleTime + 5*time.Second
  38. // Config for the "Looking for peers" message.
  39. dialStatsLogInterval = 10 * time.Second // printed at most this often
  40. dialStatsPeerLimit = 3 // but not if more than this many dialed peers
  41. // Endpoint resolution is throttled with bounded backoff.
  42. initialResolveDelay = 60 * time.Second
  43. maxResolveDelay = time.Hour
  44. )
  45. // NodeDialer is used to connect to nodes in the network, typically by using
  46. // an underlying net.Dialer but also using net.Pipe in tests.
  47. type NodeDialer interface {
  48. Dial(context.Context, *enode.Node) (net.Conn, error)
  49. }
  50. type nodeResolver interface {
  51. Resolve(*enode.Node) *enode.Node
  52. }
  53. // tcpDialer implements NodeDialer using real TCP connections.
  54. type tcpDialer struct {
  55. d *net.Dialer
  56. }
  57. func (t tcpDialer) Dial(ctx context.Context, dest *enode.Node) (net.Conn, error) {
  58. return t.d.DialContext(ctx, "tcp", nodeAddr(dest).String())
  59. }
  60. func nodeAddr(n *enode.Node) net.Addr {
  61. return &net.TCPAddr{IP: n.IP(), Port: n.TCP()}
  62. }
  63. // checkDial errors:
  64. var (
  65. errSelf = errors.New("is self")
  66. errAlreadyDialing = errors.New("already dialing")
  67. errAlreadyConnected = errors.New("already connected")
  68. errRecentlyDialed = errors.New("recently dialed")
  69. errNotWhitelisted = errors.New("not contained in netrestrict whitelist")
  70. errNoPort = errors.New("node does not provide TCP port")
  71. )
  72. // dialer creates outbound connections and submits them into Server.
  73. // Two types of peer connections can be created:
  74. //
  75. // - static dials are pre-configured connections. The dialer attempts
  76. // keep these nodes connected at all times.
  77. //
  78. // - dynamic dials are created from node discovery results. The dialer
  79. // continuously reads candidate nodes from its input iterator and attempts
  80. // to create peer connections to nodes arriving through the iterator.
  81. type dialScheduler struct {
  82. dialConfig
  83. setupFunc dialSetupFunc
  84. wg sync.WaitGroup
  85. cancel context.CancelFunc
  86. ctx context.Context
  87. nodesIn chan *enode.Node
  88. doneCh chan *dialTask
  89. addStaticCh chan *enode.Node
  90. remStaticCh chan *enode.Node
  91. addPeerCh chan *conn
  92. remPeerCh chan *conn
  93. // Everything below here belongs to loop and
  94. // should only be accessed by code on the loop goroutine.
  95. dialing map[enode.ID]*dialTask // active tasks
  96. peers map[enode.ID]connFlag // all connected peers
  97. dialPeers int // current number of dialed peers
  98. // The static map tracks all static dial tasks. The subset of usable static dial tasks
  99. // (i.e. those passing checkDial) is kept in staticPool. The scheduler prefers
  100. // launching random static tasks from the pool over launching dynamic dials from the
  101. // iterator.
  102. static map[enode.ID]*dialTask
  103. staticPool []*dialTask
  104. // The dial history keeps recently dialed nodes. Members of history are not dialed.
  105. history expHeap
  106. historyTimer mclock.Timer
  107. historyTimerTime mclock.AbsTime
  108. // for logStats
  109. lastStatsLog mclock.AbsTime
  110. doneSinceLastLog int
  111. }
  112. type dialSetupFunc func(net.Conn, connFlag, *enode.Node) error
  113. type dialConfig struct {
  114. self enode.ID // our own ID
  115. maxDialPeers int // maximum number of dialed peers
  116. maxActiveDials int // maximum number of active dials
  117. netRestrict *netutil.Netlist // IP whitelist, disabled if nil
  118. resolver nodeResolver
  119. dialer NodeDialer
  120. log log.Logger
  121. clock mclock.Clock
  122. rand *mrand.Rand
  123. }
  124. func (cfg dialConfig) withDefaults() dialConfig {
  125. if cfg.maxActiveDials == 0 {
  126. cfg.maxActiveDials = defaultMaxPendingPeers
  127. }
  128. if cfg.log == nil {
  129. cfg.log = log.Root()
  130. }
  131. if cfg.clock == nil {
  132. cfg.clock = mclock.System{}
  133. }
  134. if cfg.rand == nil {
  135. seedb := make([]byte, 8)
  136. crand.Read(seedb)
  137. seed := int64(binary.BigEndian.Uint64(seedb))
  138. cfg.rand = mrand.New(mrand.NewSource(seed))
  139. }
  140. return cfg
  141. }
  142. func newDialScheduler(config dialConfig, it enode.Iterator, setupFunc dialSetupFunc) *dialScheduler {
  143. d := &dialScheduler{
  144. dialConfig: config.withDefaults(),
  145. setupFunc: setupFunc,
  146. dialing: make(map[enode.ID]*dialTask),
  147. static: make(map[enode.ID]*dialTask),
  148. peers: make(map[enode.ID]connFlag),
  149. doneCh: make(chan *dialTask),
  150. nodesIn: make(chan *enode.Node),
  151. addStaticCh: make(chan *enode.Node),
  152. remStaticCh: make(chan *enode.Node),
  153. addPeerCh: make(chan *conn),
  154. remPeerCh: make(chan *conn),
  155. }
  156. d.lastStatsLog = d.clock.Now()
  157. d.ctx, d.cancel = context.WithCancel(context.Background())
  158. d.wg.Add(2)
  159. gopool.Submit(func() {
  160. d.readNodes(it)
  161. })
  162. gopool.Submit(
  163. func() {
  164. d.loop(it)
  165. })
  166. return d
  167. }
  168. // stop shuts down the dialer, canceling all current dial tasks.
  169. func (d *dialScheduler) stop() {
  170. d.cancel()
  171. d.wg.Wait()
  172. }
  173. // addStatic adds a static dial candidate.
  174. func (d *dialScheduler) addStatic(n *enode.Node) {
  175. select {
  176. case d.addStaticCh <- n:
  177. case <-d.ctx.Done():
  178. }
  179. }
  180. // removeStatic removes a static dial candidate.
  181. func (d *dialScheduler) removeStatic(n *enode.Node) {
  182. select {
  183. case d.remStaticCh <- n:
  184. case <-d.ctx.Done():
  185. }
  186. }
  187. // peerAdded updates the peer set.
  188. func (d *dialScheduler) peerAdded(c *conn) {
  189. select {
  190. case d.addPeerCh <- c:
  191. case <-d.ctx.Done():
  192. }
  193. }
  194. // peerRemoved updates the peer set.
  195. func (d *dialScheduler) peerRemoved(c *conn) {
  196. select {
  197. case d.remPeerCh <- c:
  198. case <-d.ctx.Done():
  199. }
  200. }
  201. // loop is the main loop of the dialer.
  202. func (d *dialScheduler) loop(it enode.Iterator) {
  203. var (
  204. nodesCh chan *enode.Node
  205. historyExp = make(chan struct{}, 1)
  206. )
  207. loop:
  208. for {
  209. // Launch new dials if slots are available.
  210. slots := d.freeDialSlots()
  211. slots -= d.startStaticDials(slots)
  212. if slots > 0 {
  213. nodesCh = d.nodesIn
  214. } else {
  215. nodesCh = nil
  216. }
  217. d.rearmHistoryTimer(historyExp)
  218. d.logStats()
  219. select {
  220. case node := <-nodesCh:
  221. if err := d.checkDial(node); err != nil {
  222. d.log.Trace("Discarding dial candidate", "id", node.ID(), "ip", node.IP(), "reason", err)
  223. } else {
  224. d.startDial(newDialTask(node, dynDialedConn))
  225. }
  226. case task := <-d.doneCh:
  227. id := task.dest.ID()
  228. delete(d.dialing, id)
  229. d.updateStaticPool(id)
  230. d.doneSinceLastLog++
  231. case c := <-d.addPeerCh:
  232. if c.is(dynDialedConn) || c.is(staticDialedConn) {
  233. d.dialPeers++
  234. }
  235. id := c.node.ID()
  236. d.peers[id] = c.flags
  237. // Remove from static pool because the node is now connected.
  238. task := d.static[id]
  239. if task != nil && task.staticPoolIndex >= 0 {
  240. d.removeFromStaticPool(task.staticPoolIndex)
  241. }
  242. // TODO: cancel dials to connected peers
  243. case c := <-d.remPeerCh:
  244. if c.is(dynDialedConn) || c.is(staticDialedConn) {
  245. d.dialPeers--
  246. }
  247. delete(d.peers, c.node.ID())
  248. d.updateStaticPool(c.node.ID())
  249. case node := <-d.addStaticCh:
  250. id := node.ID()
  251. _, exists := d.static[id]
  252. d.log.Trace("Adding static node", "id", id, "ip", node.IP(), "added", !exists)
  253. if exists {
  254. continue loop
  255. }
  256. task := newDialTask(node, staticDialedConn)
  257. d.static[id] = task
  258. if d.checkDial(node) == nil {
  259. d.addToStaticPool(task)
  260. }
  261. case node := <-d.remStaticCh:
  262. id := node.ID()
  263. task := d.static[id]
  264. d.log.Trace("Removing static node", "id", id, "ok", task != nil)
  265. if task != nil {
  266. delete(d.static, id)
  267. if task.staticPoolIndex >= 0 {
  268. d.removeFromStaticPool(task.staticPoolIndex)
  269. }
  270. }
  271. case <-historyExp:
  272. d.expireHistory()
  273. case <-d.ctx.Done():
  274. it.Close()
  275. break loop
  276. }
  277. }
  278. d.stopHistoryTimer(historyExp)
  279. for range d.dialing {
  280. <-d.doneCh
  281. }
  282. d.wg.Done()
  283. }
  284. // readNodes runs in its own goroutine and delivers nodes from
  285. // the input iterator to the nodesIn channel.
  286. func (d *dialScheduler) readNodes(it enode.Iterator) {
  287. defer d.wg.Done()
  288. for it.Next() {
  289. select {
  290. case d.nodesIn <- it.Node():
  291. case <-d.ctx.Done():
  292. }
  293. }
  294. }
  295. // logStats prints dialer statistics to the log. The message is suppressed when enough
  296. // peers are connected because users should only see it while their client is starting up
  297. // or comes back online.
  298. func (d *dialScheduler) logStats() {
  299. now := d.clock.Now()
  300. if d.lastStatsLog.Add(dialStatsLogInterval) > now {
  301. return
  302. }
  303. if d.dialPeers < dialStatsPeerLimit && d.dialPeers < d.maxDialPeers {
  304. d.log.Info("Looking for peers", "peercount", len(d.peers), "tried", d.doneSinceLastLog, "static", len(d.static))
  305. }
  306. d.doneSinceLastLog = 0
  307. d.lastStatsLog = now
  308. }
  309. // rearmHistoryTimer configures d.historyTimer to fire when the
  310. // next item in d.history expires.
  311. func (d *dialScheduler) rearmHistoryTimer(ch chan struct{}) {
  312. if len(d.history) == 0 || d.historyTimerTime == d.history.nextExpiry() {
  313. return
  314. }
  315. d.stopHistoryTimer(ch)
  316. d.historyTimerTime = d.history.nextExpiry()
  317. timeout := time.Duration(d.historyTimerTime - d.clock.Now())
  318. d.historyTimer = d.clock.AfterFunc(timeout, func() { ch <- struct{}{} })
  319. }
  320. // stopHistoryTimer stops the timer and drains the channel it sends on.
  321. func (d *dialScheduler) stopHistoryTimer(ch chan struct{}) {
  322. if d.historyTimer != nil && !d.historyTimer.Stop() {
  323. <-ch
  324. }
  325. }
  326. // expireHistory removes expired items from d.history.
  327. func (d *dialScheduler) expireHistory() {
  328. d.historyTimer.Stop()
  329. d.historyTimer = nil
  330. d.historyTimerTime = 0
  331. d.history.expire(d.clock.Now(), func(hkey string) {
  332. var id enode.ID
  333. copy(id[:], hkey)
  334. d.updateStaticPool(id)
  335. })
  336. }
  337. // freeDialSlots returns the number of free dial slots. The result can be negative
  338. // when peers are connected while their task is still running.
  339. func (d *dialScheduler) freeDialSlots() int {
  340. slots := (d.maxDialPeers - d.dialPeers) * 2
  341. if slots > d.maxActiveDials {
  342. slots = d.maxActiveDials
  343. }
  344. free := slots - len(d.dialing)
  345. return free
  346. }
  347. // checkDial returns an error if node n should not be dialed.
  348. func (d *dialScheduler) checkDial(n *enode.Node) error {
  349. if n.ID() == d.self {
  350. return errSelf
  351. }
  352. if n.IP() != nil && n.TCP() == 0 {
  353. // This check can trigger if a non-TCP node is found
  354. // by discovery. If there is no IP, the node is a static
  355. // node and the actual endpoint will be resolved later in dialTask.
  356. return errNoPort
  357. }
  358. if _, ok := d.dialing[n.ID()]; ok {
  359. return errAlreadyDialing
  360. }
  361. if _, ok := d.peers[n.ID()]; ok {
  362. return errAlreadyConnected
  363. }
  364. if d.netRestrict != nil && !d.netRestrict.Contains(n.IP()) {
  365. return errNotWhitelisted
  366. }
  367. if d.history.contains(string(n.ID().Bytes())) {
  368. return errRecentlyDialed
  369. }
  370. return nil
  371. }
  372. // startStaticDials starts n static dial tasks.
  373. func (d *dialScheduler) startStaticDials(n int) (started int) {
  374. for started = 0; started < n && len(d.staticPool) > 0; started++ {
  375. idx := d.rand.Intn(len(d.staticPool))
  376. task := d.staticPool[idx]
  377. d.startDial(task)
  378. d.removeFromStaticPool(idx)
  379. }
  380. return started
  381. }
  382. // updateStaticPool attempts to move the given static dial back into staticPool.
  383. func (d *dialScheduler) updateStaticPool(id enode.ID) {
  384. task, ok := d.static[id]
  385. if ok && task.staticPoolIndex < 0 && d.checkDial(task.dest) == nil {
  386. d.addToStaticPool(task)
  387. }
  388. }
  389. func (d *dialScheduler) addToStaticPool(task *dialTask) {
  390. if task.staticPoolIndex >= 0 {
  391. panic("attempt to add task to staticPool twice")
  392. }
  393. d.staticPool = append(d.staticPool, task)
  394. task.staticPoolIndex = len(d.staticPool) - 1
  395. }
  396. // removeFromStaticPool removes the task at idx from staticPool. It does that by moving the
  397. // current last element of the pool to idx and then shortening the pool by one.
  398. func (d *dialScheduler) removeFromStaticPool(idx int) {
  399. task := d.staticPool[idx]
  400. end := len(d.staticPool) - 1
  401. d.staticPool[idx] = d.staticPool[end]
  402. d.staticPool[idx].staticPoolIndex = idx
  403. d.staticPool[end] = nil
  404. d.staticPool = d.staticPool[:end]
  405. task.staticPoolIndex = -1
  406. }
  407. // startDial runs the given dial task in a separate goroutine.
  408. func (d *dialScheduler) startDial(task *dialTask) {
  409. d.log.Trace("Starting p2p dial", "id", task.dest.ID(), "ip", task.dest.IP(), "flag", task.flags)
  410. hkey := string(task.dest.ID().Bytes())
  411. d.history.add(hkey, d.clock.Now().Add(dialHistoryExpiration))
  412. d.dialing[task.dest.ID()] = task
  413. gopool.Submit(func() {
  414. task.run(d)
  415. d.doneCh <- task
  416. })
  417. }
  418. // A dialTask generated for each node that is dialed.
  419. type dialTask struct {
  420. staticPoolIndex int
  421. flags connFlag
  422. // These fields are private to the task and should not be
  423. // accessed by dialScheduler while the task is running.
  424. dest *enode.Node
  425. lastResolved mclock.AbsTime
  426. resolveDelay time.Duration
  427. }
  428. func newDialTask(dest *enode.Node, flags connFlag) *dialTask {
  429. return &dialTask{dest: dest, flags: flags, staticPoolIndex: -1}
  430. }
  431. type dialError struct {
  432. error
  433. }
  434. func (t *dialTask) run(d *dialScheduler) {
  435. if t.needResolve() && !t.resolve(d) {
  436. return
  437. }
  438. err := t.dial(d, t.dest)
  439. if err != nil {
  440. // For static nodes, resolve one more time if dialing fails.
  441. if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 {
  442. if t.resolve(d) {
  443. t.dial(d, t.dest)
  444. }
  445. }
  446. }
  447. }
  448. func (t *dialTask) needResolve() bool {
  449. return t.flags&staticDialedConn != 0 && t.dest.IP() == nil
  450. }
  451. // resolve attempts to find the current endpoint for the destination
  452. // using discovery.
  453. //
  454. // Resolve operations are throttled with backoff to avoid flooding the
  455. // discovery network with useless queries for nodes that don't exist.
  456. // The backoff delay resets when the node is found.
  457. func (t *dialTask) resolve(d *dialScheduler) bool {
  458. if d.resolver == nil {
  459. return false
  460. }
  461. if t.resolveDelay == 0 {
  462. t.resolveDelay = initialResolveDelay
  463. }
  464. if t.lastResolved > 0 && time.Duration(d.clock.Now()-t.lastResolved) < t.resolveDelay {
  465. return false
  466. }
  467. resolved := d.resolver.Resolve(t.dest)
  468. t.lastResolved = d.clock.Now()
  469. if resolved == nil {
  470. t.resolveDelay *= 2
  471. if t.resolveDelay > maxResolveDelay {
  472. t.resolveDelay = maxResolveDelay
  473. }
  474. d.log.Debug("Resolving node failed", "id", t.dest.ID(), "newdelay", t.resolveDelay)
  475. return false
  476. }
  477. // The node was found.
  478. t.resolveDelay = initialResolveDelay
  479. t.dest = resolved
  480. d.log.Debug("Resolved node", "id", t.dest.ID(), "addr", &net.TCPAddr{IP: t.dest.IP(), Port: t.dest.TCP()})
  481. return true
  482. }
  483. // dial performs the actual connection attempt.
  484. func (t *dialTask) dial(d *dialScheduler, dest *enode.Node) error {
  485. fd, err := d.dialer.Dial(d.ctx, t.dest)
  486. if err != nil {
  487. d.log.Trace("Dial error", "id", t.dest.ID(), "addr", nodeAddr(t.dest), "conn", t.flags, "err", cleanupDialErr(err))
  488. return &dialError{err}
  489. }
  490. mfd := newMeteredConn(fd, false, &net.TCPAddr{IP: dest.IP(), Port: dest.TCP()})
  491. return d.setupFunc(mfd, t.flags, dest)
  492. }
  493. func (t *dialTask) String() string {
  494. id := t.dest.ID()
  495. return fmt.Sprintf("%v %x %v:%d", t.flags, id[:8], t.dest.IP(), t.dest.TCP())
  496. }
  497. func cleanupDialErr(err error) error {
  498. if netErr, ok := err.(*net.OpError); ok && netErr.Op == "dial" {
  499. return netErr.Err
  500. }
  501. return err
  502. }