crawl.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of go-ethereum.
  3. //
  4. // go-ethereum is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // go-ethereum is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU General Public License
  15. // along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
  16. package main
  17. import (
  18. "time"
  19. "github.com/ethereum/go-ethereum/log"
  20. "github.com/ethereum/go-ethereum/p2p/discover"
  21. "github.com/ethereum/go-ethereum/p2p/enode"
  22. )
  23. type crawler struct {
  24. input nodeSet
  25. output nodeSet
  26. disc *discover.UDPv4
  27. iters []enode.Iterator
  28. inputIter enode.Iterator
  29. ch chan *enode.Node
  30. closed chan struct{}
  31. // settings
  32. revalidateInterval time.Duration
  33. }
  34. func newCrawler(input nodeSet, disc *discover.UDPv4, iters ...enode.Iterator) *crawler {
  35. c := &crawler{
  36. input: input,
  37. output: make(nodeSet, len(input)),
  38. disc: disc,
  39. iters: iters,
  40. inputIter: enode.IterNodes(input.nodes()),
  41. ch: make(chan *enode.Node),
  42. closed: make(chan struct{}),
  43. }
  44. c.iters = append(c.iters, c.inputIter)
  45. // Copy input to output initially. Any nodes that fail validation
  46. // will be dropped from output during the run.
  47. for id, n := range input {
  48. c.output[id] = n
  49. }
  50. return c
  51. }
  52. func (c *crawler) run(timeout time.Duration) nodeSet {
  53. var (
  54. timeoutTimer = time.NewTimer(timeout)
  55. timeoutCh <-chan time.Time
  56. doneCh = make(chan enode.Iterator, len(c.iters))
  57. liveIters = len(c.iters)
  58. )
  59. for _, it := range c.iters {
  60. go c.runIterator(doneCh, it)
  61. }
  62. loop:
  63. for {
  64. select {
  65. case n := <-c.ch:
  66. c.updateNode(n)
  67. case it := <-doneCh:
  68. if it == c.inputIter {
  69. // Enable timeout when we're done revalidating the input nodes.
  70. log.Info("Revalidation of input set is done", "len", len(c.input))
  71. if timeout > 0 {
  72. timeoutCh = timeoutTimer.C
  73. }
  74. }
  75. if liveIters--; liveIters == 0 {
  76. break loop
  77. }
  78. case <-timeoutCh:
  79. break loop
  80. }
  81. }
  82. close(c.closed)
  83. for _, it := range c.iters {
  84. it.Close()
  85. }
  86. for ; liveIters > 0; liveIters-- {
  87. <-doneCh
  88. }
  89. return c.output
  90. }
  91. func (c *crawler) runIterator(done chan<- enode.Iterator, it enode.Iterator) {
  92. defer func() { done <- it }()
  93. for it.Next() {
  94. select {
  95. case c.ch <- it.Node():
  96. case <-c.closed:
  97. return
  98. }
  99. }
  100. }
  101. func (c *crawler) updateNode(n *enode.Node) {
  102. node, ok := c.output[n.ID()]
  103. // Skip validation of recently-seen nodes.
  104. if ok && time.Since(node.LastCheck) < c.revalidateInterval {
  105. return
  106. }
  107. // Request the node record.
  108. nn, err := c.disc.RequestENR(n)
  109. node.LastCheck = truncNow()
  110. if err != nil {
  111. if node.Score == 0 {
  112. // Node doesn't implement EIP-868.
  113. log.Debug("Skipping node", "id", n.ID())
  114. return
  115. }
  116. node.Score /= 2
  117. } else {
  118. node.N = nn
  119. node.Seq = nn.Seq()
  120. node.Score++
  121. if node.FirstResponse.IsZero() {
  122. node.FirstResponse = node.LastCheck
  123. }
  124. node.LastResponse = node.LastCheck
  125. }
  126. // Store/update node in output set.
  127. if node.Score <= 0 {
  128. log.Info("Removing node", "id", n.ID())
  129. delete(c.output, n.ID())
  130. } else {
  131. log.Info("Updating node", "id", n.ID(), "seq", n.Seq(), "score", node.Score)
  132. c.output[n.ID()] = node
  133. }
  134. }
  135. func truncNow() time.Time {
  136. return time.Now().UTC().Truncate(1 * time.Second)
  137. }