serverpool_test.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "math/rand"
  19. "sync/atomic"
  20. "testing"
  21. "time"
  22. "github.com/ethereum/go-ethereum/common/mclock"
  23. "github.com/ethereum/go-ethereum/ethdb"
  24. "github.com/ethereum/go-ethereum/ethdb/memorydb"
  25. lpc "github.com/ethereum/go-ethereum/les/lespay/client"
  26. "github.com/ethereum/go-ethereum/p2p"
  27. "github.com/ethereum/go-ethereum/p2p/enode"
  28. "github.com/ethereum/go-ethereum/p2p/enr"
  29. )
  30. const (
  31. spTestNodes = 1000
  32. spTestTarget = 5
  33. spTestLength = 10000
  34. spMinTotal = 40000
  35. spMaxTotal = 50000
  36. )
  37. func testNodeID(i int) enode.ID {
  38. return enode.ID{42, byte(i % 256), byte(i / 256)}
  39. }
  40. func testNodeIndex(id enode.ID) int {
  41. if id[0] != 42 {
  42. return -1
  43. }
  44. return int(id[1]) + int(id[2])*256
  45. }
  46. type serverPoolTest struct {
  47. db ethdb.KeyValueStore
  48. clock *mclock.Simulated
  49. quit chan struct{}
  50. preNeg, preNegFail bool
  51. vt *lpc.ValueTracker
  52. sp *serverPool
  53. input enode.Iterator
  54. testNodes []spTestNode
  55. trusted []string
  56. waitCount, waitEnded int32
  57. cycle, conn, servedConn int
  58. serviceCycles, dialCount int
  59. disconnect map[int][]int
  60. }
  61. type spTestNode struct {
  62. connectCycles, waitCycles int
  63. nextConnCycle, totalConn int
  64. connected, service bool
  65. peer *serverPeer
  66. }
  67. func newServerPoolTest(preNeg, preNegFail bool) *serverPoolTest {
  68. nodes := make([]*enode.Node, spTestNodes)
  69. for i := range nodes {
  70. nodes[i] = enode.SignNull(&enr.Record{}, testNodeID(i))
  71. }
  72. return &serverPoolTest{
  73. clock: &mclock.Simulated{},
  74. db: memorydb.New(),
  75. input: enode.CycleNodes(nodes),
  76. testNodes: make([]spTestNode, spTestNodes),
  77. preNeg: preNeg,
  78. preNegFail: preNegFail,
  79. }
  80. }
  81. func (s *serverPoolTest) beginWait() {
  82. // ensure that dialIterator and the maximal number of pre-neg queries are not all stuck in a waiting state
  83. for atomic.AddInt32(&s.waitCount, 1) > preNegLimit {
  84. atomic.AddInt32(&s.waitCount, -1)
  85. s.clock.Run(time.Second)
  86. }
  87. }
  88. func (s *serverPoolTest) endWait() {
  89. atomic.AddInt32(&s.waitCount, -1)
  90. atomic.AddInt32(&s.waitEnded, 1)
  91. }
  92. func (s *serverPoolTest) addTrusted(i int) {
  93. s.trusted = append(s.trusted, enode.SignNull(&enr.Record{}, testNodeID(i)).String())
  94. }
  95. func (s *serverPoolTest) start() {
  96. var testQuery queryFunc
  97. if s.preNeg {
  98. testQuery = func(node *enode.Node) int {
  99. idx := testNodeIndex(node.ID())
  100. n := &s.testNodes[idx]
  101. canConnect := !n.connected && n.connectCycles != 0 && s.cycle >= n.nextConnCycle
  102. if s.preNegFail {
  103. // simulate a scenario where UDP queries never work
  104. s.beginWait()
  105. s.clock.Sleep(time.Second * 5)
  106. s.endWait()
  107. return -1
  108. } else {
  109. switch idx % 3 {
  110. case 0:
  111. // pre-neg returns true only if connection is possible
  112. if canConnect {
  113. return 1
  114. } else {
  115. return 0
  116. }
  117. case 1:
  118. // pre-neg returns true but connection might still fail
  119. return 1
  120. case 2:
  121. // pre-neg returns true if connection is possible, otherwise timeout (node unresponsive)
  122. if canConnect {
  123. return 1
  124. } else {
  125. s.beginWait()
  126. s.clock.Sleep(time.Second * 5)
  127. s.endWait()
  128. return -1
  129. }
  130. }
  131. return -1
  132. }
  133. }
  134. }
  135. s.vt = lpc.NewValueTracker(s.db, s.clock, requestList, time.Minute, 1/float64(time.Hour), 1/float64(time.Hour*100), 1/float64(time.Hour*1000))
  136. s.sp = newServerPool(s.db, []byte("serverpool:"), s.vt, s.input, 0, testQuery, s.clock, s.trusted)
  137. s.sp.validSchemes = enode.ValidSchemesForTesting
  138. s.sp.unixTime = func() int64 { return int64(s.clock.Now()) / int64(time.Second) }
  139. s.disconnect = make(map[int][]int)
  140. s.sp.start()
  141. s.quit = make(chan struct{})
  142. go func() {
  143. last := int32(-1)
  144. for {
  145. select {
  146. case <-time.After(time.Millisecond * 100):
  147. c := atomic.LoadInt32(&s.waitEnded)
  148. if c == last {
  149. // advance clock if test is stuck (might happen in rare cases)
  150. s.clock.Run(time.Second)
  151. }
  152. last = c
  153. case <-s.quit:
  154. return
  155. }
  156. }
  157. }()
  158. }
  159. func (s *serverPoolTest) stop() {
  160. close(s.quit)
  161. s.sp.stop()
  162. s.vt.Stop()
  163. for i := range s.testNodes {
  164. n := &s.testNodes[i]
  165. if n.connected {
  166. n.totalConn += s.cycle
  167. }
  168. n.connected = false
  169. n.peer = nil
  170. n.nextConnCycle = 0
  171. }
  172. s.conn, s.servedConn = 0, 0
  173. }
  174. func (s *serverPoolTest) run() {
  175. for count := spTestLength; count > 0; count-- {
  176. if dcList := s.disconnect[s.cycle]; dcList != nil {
  177. for _, idx := range dcList {
  178. n := &s.testNodes[idx]
  179. s.sp.unregisterPeer(n.peer)
  180. n.totalConn += s.cycle
  181. n.connected = false
  182. n.peer = nil
  183. s.conn--
  184. if n.service {
  185. s.servedConn--
  186. }
  187. n.nextConnCycle = s.cycle + n.waitCycles
  188. }
  189. delete(s.disconnect, s.cycle)
  190. }
  191. if s.conn < spTestTarget {
  192. s.dialCount++
  193. s.beginWait()
  194. s.sp.dialIterator.Next()
  195. s.endWait()
  196. dial := s.sp.dialIterator.Node()
  197. id := dial.ID()
  198. idx := testNodeIndex(id)
  199. n := &s.testNodes[idx]
  200. if !n.connected && n.connectCycles != 0 && s.cycle >= n.nextConnCycle {
  201. s.conn++
  202. if n.service {
  203. s.servedConn++
  204. }
  205. n.totalConn -= s.cycle
  206. n.connected = true
  207. dc := s.cycle + n.connectCycles
  208. s.disconnect[dc] = append(s.disconnect[dc], idx)
  209. n.peer = &serverPeer{peerCommons: peerCommons{Peer: p2p.NewPeer(id, "", nil)}}
  210. s.sp.registerPeer(n.peer)
  211. if n.service {
  212. s.vt.Served(s.vt.GetNode(id), []lpc.ServedRequest{{ReqType: 0, Amount: 100}}, 0)
  213. }
  214. }
  215. }
  216. s.serviceCycles += s.servedConn
  217. s.clock.Run(time.Second)
  218. s.cycle++
  219. }
  220. }
  221. func (s *serverPoolTest) setNodes(count, conn, wait int, service, trusted bool) (res []int) {
  222. for ; count > 0; count-- {
  223. idx := rand.Intn(spTestNodes)
  224. for s.testNodes[idx].connectCycles != 0 || s.testNodes[idx].connected {
  225. idx = rand.Intn(spTestNodes)
  226. }
  227. res = append(res, idx)
  228. s.testNodes[idx] = spTestNode{
  229. connectCycles: conn,
  230. waitCycles: wait,
  231. service: service,
  232. }
  233. if trusted {
  234. s.addTrusted(idx)
  235. }
  236. }
  237. return
  238. }
  239. func (s *serverPoolTest) resetNodes() {
  240. for i, n := range s.testNodes {
  241. if n.connected {
  242. n.totalConn += s.cycle
  243. s.sp.unregisterPeer(n.peer)
  244. }
  245. s.testNodes[i] = spTestNode{totalConn: n.totalConn}
  246. }
  247. s.conn, s.servedConn = 0, 0
  248. s.disconnect = make(map[int][]int)
  249. s.trusted = nil
  250. }
  251. func (s *serverPoolTest) checkNodes(t *testing.T, nodes []int) {
  252. var sum int
  253. for _, idx := range nodes {
  254. n := &s.testNodes[idx]
  255. if n.connected {
  256. n.totalConn += s.cycle
  257. }
  258. sum += n.totalConn
  259. n.totalConn = 0
  260. if n.connected {
  261. n.totalConn -= s.cycle
  262. }
  263. }
  264. if sum < spMinTotal || sum > spMaxTotal {
  265. t.Errorf("Total connection amount %d outside expected range %d to %d", sum, spMinTotal, spMaxTotal)
  266. }
  267. }
  268. func TestServerPool(t *testing.T) { testServerPool(t, false, false) }
  269. func TestServerPoolWithPreNeg(t *testing.T) { testServerPool(t, true, false) }
  270. func TestServerPoolWithPreNegFail(t *testing.T) { testServerPool(t, true, true) }
  271. func testServerPool(t *testing.T, preNeg, fail bool) {
  272. s := newServerPoolTest(preNeg, fail)
  273. nodes := s.setNodes(100, 200, 200, true, false)
  274. s.setNodes(100, 20, 20, false, false)
  275. s.start()
  276. s.run()
  277. s.stop()
  278. s.checkNodes(t, nodes)
  279. }
  280. func TestServerPoolChangedNodes(t *testing.T) { testServerPoolChangedNodes(t, false) }
  281. func TestServerPoolChangedNodesWithPreNeg(t *testing.T) { testServerPoolChangedNodes(t, true) }
  282. func testServerPoolChangedNodes(t *testing.T, preNeg bool) {
  283. s := newServerPoolTest(preNeg, false)
  284. nodes := s.setNodes(100, 200, 200, true, false)
  285. s.setNodes(100, 20, 20, false, false)
  286. s.start()
  287. s.run()
  288. s.checkNodes(t, nodes)
  289. for i := 0; i < 3; i++ {
  290. s.resetNodes()
  291. nodes := s.setNodes(100, 200, 200, true, false)
  292. s.setNodes(100, 20, 20, false, false)
  293. s.run()
  294. s.checkNodes(t, nodes)
  295. }
  296. s.stop()
  297. }
  298. func TestServerPoolRestartNoDiscovery(t *testing.T) { testServerPoolRestartNoDiscovery(t, false) }
  299. func TestServerPoolRestartNoDiscoveryWithPreNeg(t *testing.T) {
  300. testServerPoolRestartNoDiscovery(t, true)
  301. }
  302. func testServerPoolRestartNoDiscovery(t *testing.T, preNeg bool) {
  303. s := newServerPoolTest(preNeg, false)
  304. nodes := s.setNodes(100, 200, 200, true, false)
  305. s.setNodes(100, 20, 20, false, false)
  306. s.start()
  307. s.run()
  308. s.stop()
  309. s.checkNodes(t, nodes)
  310. s.input = nil
  311. s.start()
  312. s.run()
  313. s.stop()
  314. s.checkNodes(t, nodes)
  315. }
  316. func TestServerPoolTrustedNoDiscovery(t *testing.T) { testServerPoolTrustedNoDiscovery(t, false) }
  317. func TestServerPoolTrustedNoDiscoveryWithPreNeg(t *testing.T) {
  318. testServerPoolTrustedNoDiscovery(t, true)
  319. }
  320. func testServerPoolTrustedNoDiscovery(t *testing.T, preNeg bool) {
  321. s := newServerPoolTest(preNeg, false)
  322. trusted := s.setNodes(200, 200, 200, true, true)
  323. s.input = nil
  324. s.start()
  325. s.run()
  326. s.stop()
  327. s.checkNodes(t, trusted)
  328. }