dial.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. package p2p
  2. import (
  3. "container/heap"
  4. "crypto/rand"
  5. "fmt"
  6. "net"
  7. "time"
  8. "github.com/ethereum/go-ethereum/logger"
  9. "github.com/ethereum/go-ethereum/logger/glog"
  10. "github.com/ethereum/go-ethereum/p2p/discover"
  11. )
  12. const (
  13. // This is the amount of time spent waiting in between
  14. // redialing a certain node.
  15. dialHistoryExpiration = 30 * time.Second
  16. // Discovery lookup tasks will wait for this long when
  17. // no results are returned. This can happen if the table
  18. // becomes empty (i.e. not often).
  19. emptyLookupDelay = 10 * time.Second
  20. )
  21. // dialstate schedules dials and discovery lookups.
  22. // it get's a chance to compute new tasks on every iteration
  23. // of the main loop in Server.run.
  24. type dialstate struct {
  25. maxDynDials int
  26. ntab discoverTable
  27. lookupRunning bool
  28. bootstrapped bool
  29. dialing map[discover.NodeID]connFlag
  30. lookupBuf []*discover.Node // current discovery lookup results
  31. randomNodes []*discover.Node // filled from Table
  32. static map[discover.NodeID]*discover.Node
  33. hist *dialHistory
  34. }
  35. type discoverTable interface {
  36. Self() *discover.Node
  37. Close()
  38. Bootstrap([]*discover.Node)
  39. Lookup(target discover.NodeID) []*discover.Node
  40. ReadRandomNodes([]*discover.Node) int
  41. }
  42. // the dial history remembers recent dials.
  43. type dialHistory []pastDial
  44. // pastDial is an entry in the dial history.
  45. type pastDial struct {
  46. id discover.NodeID
  47. exp time.Time
  48. }
  49. type task interface {
  50. Do(*Server)
  51. }
  52. // A dialTask is generated for each node that is dialed.
  53. type dialTask struct {
  54. flags connFlag
  55. dest *discover.Node
  56. }
  57. // discoverTask runs discovery table operations.
  58. // Only one discoverTask is active at any time.
  59. //
  60. // If bootstrap is true, the task runs Table.Bootstrap,
  61. // otherwise it performs a random lookup and leaves the
  62. // results in the task.
  63. type discoverTask struct {
  64. bootstrap bool
  65. results []*discover.Node
  66. }
  67. // A waitExpireTask is generated if there are no other tasks
  68. // to keep the loop in Server.run ticking.
  69. type waitExpireTask struct {
  70. time.Duration
  71. }
  72. func newDialState(static []*discover.Node, ntab discoverTable, maxdyn int) *dialstate {
  73. s := &dialstate{
  74. maxDynDials: maxdyn,
  75. ntab: ntab,
  76. static: make(map[discover.NodeID]*discover.Node),
  77. dialing: make(map[discover.NodeID]connFlag),
  78. randomNodes: make([]*discover.Node, maxdyn/2),
  79. hist: new(dialHistory),
  80. }
  81. for _, n := range static {
  82. s.static[n.ID] = n
  83. }
  84. return s
  85. }
  86. func (s *dialstate) addStatic(n *discover.Node) {
  87. s.static[n.ID] = n
  88. }
  89. func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now time.Time) []task {
  90. var newtasks []task
  91. addDial := func(flag connFlag, n *discover.Node) bool {
  92. _, dialing := s.dialing[n.ID]
  93. if dialing || peers[n.ID] != nil || s.hist.contains(n.ID) {
  94. return false
  95. }
  96. s.dialing[n.ID] = flag
  97. newtasks = append(newtasks, &dialTask{flags: flag, dest: n})
  98. return true
  99. }
  100. // Compute number of dynamic dials necessary at this point.
  101. needDynDials := s.maxDynDials
  102. for _, p := range peers {
  103. if p.rw.is(dynDialedConn) {
  104. needDynDials--
  105. }
  106. }
  107. for _, flag := range s.dialing {
  108. if flag&dynDialedConn != 0 {
  109. needDynDials--
  110. }
  111. }
  112. // Expire the dial history on every invocation.
  113. s.hist.expire(now)
  114. // Create dials for static nodes if they are not connected.
  115. for _, n := range s.static {
  116. addDial(staticDialedConn, n)
  117. }
  118. // Use random nodes from the table for half of the necessary
  119. // dynamic dials.
  120. randomCandidates := needDynDials / 2
  121. if randomCandidates > 0 && s.bootstrapped {
  122. n := s.ntab.ReadRandomNodes(s.randomNodes)
  123. for i := 0; i < randomCandidates && i < n; i++ {
  124. if addDial(dynDialedConn, s.randomNodes[i]) {
  125. needDynDials--
  126. }
  127. }
  128. }
  129. // Create dynamic dials from random lookup results, removing tried
  130. // items from the result buffer.
  131. i := 0
  132. for ; i < len(s.lookupBuf) && needDynDials > 0; i++ {
  133. if addDial(dynDialedConn, s.lookupBuf[i]) {
  134. needDynDials--
  135. }
  136. }
  137. s.lookupBuf = s.lookupBuf[:copy(s.lookupBuf, s.lookupBuf[i:])]
  138. // Launch a discovery lookup if more candidates are needed. The
  139. // first discoverTask bootstraps the table and won't return any
  140. // results.
  141. if len(s.lookupBuf) < needDynDials && !s.lookupRunning {
  142. s.lookupRunning = true
  143. newtasks = append(newtasks, &discoverTask{bootstrap: !s.bootstrapped})
  144. }
  145. // Launch a timer to wait for the next node to expire if all
  146. // candidates have been tried and no task is currently active.
  147. // This should prevent cases where the dialer logic is not ticked
  148. // because there are no pending events.
  149. if nRunning == 0 && len(newtasks) == 0 && s.hist.Len() > 0 {
  150. t := &waitExpireTask{s.hist.min().exp.Sub(now)}
  151. newtasks = append(newtasks, t)
  152. }
  153. return newtasks
  154. }
  155. func (s *dialstate) taskDone(t task, now time.Time) {
  156. switch t := t.(type) {
  157. case *dialTask:
  158. s.hist.add(t.dest.ID, now.Add(dialHistoryExpiration))
  159. delete(s.dialing, t.dest.ID)
  160. case *discoverTask:
  161. if t.bootstrap {
  162. s.bootstrapped = true
  163. }
  164. s.lookupRunning = false
  165. s.lookupBuf = append(s.lookupBuf, t.results...)
  166. }
  167. }
  168. func (t *dialTask) Do(srv *Server) {
  169. addr := &net.TCPAddr{IP: t.dest.IP, Port: int(t.dest.TCP)}
  170. glog.V(logger.Debug).Infof("dialing %v\n", t.dest)
  171. fd, err := srv.Dialer.Dial("tcp", addr.String())
  172. if err != nil {
  173. glog.V(logger.Detail).Infof("dial error: %v", err)
  174. return
  175. }
  176. srv.setupConn(fd, t.flags, t.dest)
  177. }
  178. func (t *dialTask) String() string {
  179. return fmt.Sprintf("%v %x %v:%d", t.flags, t.dest.ID[:8], t.dest.IP, t.dest.TCP)
  180. }
  181. func (t *discoverTask) Do(srv *Server) {
  182. if t.bootstrap {
  183. srv.ntab.Bootstrap(srv.BootstrapNodes)
  184. } else {
  185. var target discover.NodeID
  186. rand.Read(target[:])
  187. t.results = srv.ntab.Lookup(target)
  188. // newTasks generates a lookup task whenever dynamic dials are
  189. // necessary. Lookups need to take some time, otherwise the
  190. // event loop spins too fast. An empty result can only be
  191. // returned if the table is empty.
  192. if len(t.results) == 0 {
  193. time.Sleep(emptyLookupDelay)
  194. }
  195. }
  196. }
  197. func (t *discoverTask) String() (s string) {
  198. if t.bootstrap {
  199. s = "discovery bootstrap"
  200. } else {
  201. s = "discovery lookup"
  202. }
  203. if len(t.results) > 0 {
  204. s += fmt.Sprintf(" (%d results)", len(t.results))
  205. }
  206. return s
  207. }
  208. func (t waitExpireTask) Do(*Server) {
  209. time.Sleep(t.Duration)
  210. }
  211. func (t waitExpireTask) String() string {
  212. return fmt.Sprintf("wait for dial hist expire (%v)", t.Duration)
  213. }
  214. // Use only these methods to access or modify dialHistory.
  215. func (h dialHistory) min() pastDial {
  216. return h[0]
  217. }
  218. func (h *dialHistory) add(id discover.NodeID, exp time.Time) {
  219. heap.Push(h, pastDial{id, exp})
  220. }
  221. func (h dialHistory) contains(id discover.NodeID) bool {
  222. for _, v := range h {
  223. if v.id == id {
  224. return true
  225. }
  226. }
  227. return false
  228. }
  229. func (h *dialHistory) expire(now time.Time) {
  230. for h.Len() > 0 && h.min().exp.Before(now) {
  231. heap.Pop(h)
  232. }
  233. }
  234. // heap.Interface boilerplate
  235. func (h dialHistory) Len() int { return len(h) }
  236. func (h dialHistory) Less(i, j int) bool { return h[i].exp.Before(h[j].exp) }
  237. func (h dialHistory) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
  238. func (h *dialHistory) Push(x interface{}) {
  239. *h = append(*h, x.(pastDial))
  240. }
  241. func (h *dialHistory) Pop() interface{} {
  242. old := *h
  243. n := len(old)
  244. x := old[n-1]
  245. *h = old[0 : n-1]
  246. return x
  247. }