serverpool_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package client
  17. import (
  18. "math/rand"
  19. "strconv"
  20. "sync"
  21. "sync/atomic"
  22. "testing"
  23. "time"
  24. "github.com/ethereum/go-ethereum/common/mclock"
  25. "github.com/ethereum/go-ethereum/ethdb"
  26. "github.com/ethereum/go-ethereum/ethdb/memorydb"
  27. "github.com/ethereum/go-ethereum/p2p/enode"
  28. "github.com/ethereum/go-ethereum/p2p/enr"
  29. )
  30. const (
  31. spTestNodes = 1000
  32. spTestTarget = 5
  33. spTestLength = 10000
  34. spMinTotal = 40000
  35. spMaxTotal = 50000
  36. )
  37. func testNodeID(i int) enode.ID {
  38. return enode.ID{42, byte(i % 256), byte(i / 256)}
  39. }
  40. func testNodeIndex(id enode.ID) int {
  41. if id[0] != 42 {
  42. return -1
  43. }
  44. return int(id[1]) + int(id[2])*256
  45. }
  46. type ServerPoolTest struct {
  47. db ethdb.KeyValueStore
  48. clock *mclock.Simulated
  49. quit chan chan struct{}
  50. preNeg, preNegFail bool
  51. sp *ServerPool
  52. spi enode.Iterator
  53. input enode.Iterator
  54. testNodes []spTestNode
  55. trusted []string
  56. waitCount, waitEnded int32
  57. // preNegLock protects the cycle counter, testNodes list and its connected field
  58. // (accessed from both the main thread and the preNeg callback)
  59. preNegLock sync.Mutex
  60. queryWg *sync.WaitGroup // a new wait group is created each time the simulation is started
  61. stopping bool // stopping avoid calling queryWg.Add after queryWg.Wait
  62. cycle, conn, servedConn int
  63. serviceCycles, dialCount int
  64. disconnect map[int][]int
  65. }
  66. type spTestNode struct {
  67. connectCycles, waitCycles int
  68. nextConnCycle, totalConn int
  69. connected, service bool
  70. node *enode.Node
  71. }
  72. func newServerPoolTest(preNeg, preNegFail bool) *ServerPoolTest {
  73. nodes := make([]*enode.Node, spTestNodes)
  74. for i := range nodes {
  75. nodes[i] = enode.SignNull(&enr.Record{}, testNodeID(i))
  76. }
  77. return &ServerPoolTest{
  78. clock: &mclock.Simulated{},
  79. db: memorydb.New(),
  80. input: enode.CycleNodes(nodes),
  81. testNodes: make([]spTestNode, spTestNodes),
  82. preNeg: preNeg,
  83. preNegFail: preNegFail,
  84. }
  85. }
  86. func (s *ServerPoolTest) beginWait() {
  87. // ensure that dialIterator and the maximal number of pre-neg queries are not all stuck in a waiting state
  88. for atomic.AddInt32(&s.waitCount, 1) > preNegLimit {
  89. atomic.AddInt32(&s.waitCount, -1)
  90. s.clock.Run(time.Second)
  91. }
  92. }
  93. func (s *ServerPoolTest) endWait() {
  94. atomic.AddInt32(&s.waitCount, -1)
  95. atomic.AddInt32(&s.waitEnded, 1)
  96. }
  97. func (s *ServerPoolTest) addTrusted(i int) {
  98. s.trusted = append(s.trusted, enode.SignNull(&enr.Record{}, testNodeID(i)).String())
  99. }
  100. func (s *ServerPoolTest) start() {
  101. var testQuery QueryFunc
  102. s.queryWg = new(sync.WaitGroup)
  103. if s.preNeg {
  104. testQuery = func(node *enode.Node) int {
  105. s.preNegLock.Lock()
  106. if s.stopping {
  107. s.preNegLock.Unlock()
  108. return 0
  109. }
  110. s.queryWg.Add(1)
  111. idx := testNodeIndex(node.ID())
  112. n := &s.testNodes[idx]
  113. canConnect := !n.connected && n.connectCycles != 0 && s.cycle >= n.nextConnCycle
  114. s.preNegLock.Unlock()
  115. defer s.queryWg.Done()
  116. if s.preNegFail {
  117. // simulate a scenario where UDP queries never work
  118. s.beginWait()
  119. s.clock.Sleep(time.Second * 5)
  120. s.endWait()
  121. return -1
  122. }
  123. switch idx % 3 {
  124. case 0:
  125. // pre-neg returns true only if connection is possible
  126. if canConnect {
  127. return 1
  128. }
  129. return 0
  130. case 1:
  131. // pre-neg returns true but connection might still fail
  132. return 1
  133. case 2:
  134. // pre-neg returns true if connection is possible, otherwise timeout (node unresponsive)
  135. if canConnect {
  136. return 1
  137. }
  138. s.beginWait()
  139. s.clock.Sleep(time.Second * 5)
  140. s.endWait()
  141. return -1
  142. }
  143. return -1
  144. }
  145. }
  146. requestList := make([]RequestInfo, testReqTypes)
  147. for i := range requestList {
  148. requestList[i] = RequestInfo{Name: "testreq" + strconv.Itoa(i), InitAmount: 1, InitValue: 1}
  149. }
  150. s.sp, s.spi = NewServerPool(s.db, []byte("sp:"), 0, testQuery, s.clock, s.trusted, requestList)
  151. s.sp.AddSource(s.input)
  152. s.sp.validSchemes = enode.ValidSchemesForTesting
  153. s.sp.unixTime = func() int64 { return int64(s.clock.Now()) / int64(time.Second) }
  154. s.disconnect = make(map[int][]int)
  155. s.sp.Start()
  156. s.quit = make(chan chan struct{})
  157. go func() {
  158. last := int32(-1)
  159. for {
  160. select {
  161. case <-time.After(time.Millisecond * 100):
  162. c := atomic.LoadInt32(&s.waitEnded)
  163. if c == last {
  164. // advance clock if test is stuck (might happen in rare cases)
  165. s.clock.Run(time.Second)
  166. }
  167. last = c
  168. case quit := <-s.quit:
  169. close(quit)
  170. return
  171. }
  172. }
  173. }()
  174. }
  175. func (s *ServerPoolTest) stop() {
  176. // disable further queries and wait if one is currently running
  177. s.preNegLock.Lock()
  178. s.stopping = true
  179. s.preNegLock.Unlock()
  180. s.queryWg.Wait()
  181. quit := make(chan struct{})
  182. s.quit <- quit
  183. <-quit
  184. s.sp.Stop()
  185. s.spi.Close()
  186. s.preNegLock.Lock()
  187. s.stopping = false
  188. s.preNegLock.Unlock()
  189. for i := range s.testNodes {
  190. n := &s.testNodes[i]
  191. if n.connected {
  192. n.totalConn += s.cycle
  193. }
  194. n.connected = false
  195. n.node = nil
  196. n.nextConnCycle = 0
  197. }
  198. s.conn, s.servedConn = 0, 0
  199. }
  200. func (s *ServerPoolTest) run() {
  201. for count := spTestLength; count > 0; count-- {
  202. if dcList := s.disconnect[s.cycle]; dcList != nil {
  203. for _, idx := range dcList {
  204. n := &s.testNodes[idx]
  205. s.sp.UnregisterNode(n.node)
  206. n.totalConn += s.cycle
  207. s.preNegLock.Lock()
  208. n.connected = false
  209. s.preNegLock.Unlock()
  210. n.node = nil
  211. s.conn--
  212. if n.service {
  213. s.servedConn--
  214. }
  215. n.nextConnCycle = s.cycle + n.waitCycles
  216. }
  217. delete(s.disconnect, s.cycle)
  218. }
  219. if s.conn < spTestTarget {
  220. s.dialCount++
  221. s.beginWait()
  222. s.spi.Next()
  223. s.endWait()
  224. dial := s.spi.Node()
  225. id := dial.ID()
  226. idx := testNodeIndex(id)
  227. n := &s.testNodes[idx]
  228. if !n.connected && n.connectCycles != 0 && s.cycle >= n.nextConnCycle {
  229. s.conn++
  230. if n.service {
  231. s.servedConn++
  232. }
  233. n.totalConn -= s.cycle
  234. s.preNegLock.Lock()
  235. n.connected = true
  236. s.preNegLock.Unlock()
  237. dc := s.cycle + n.connectCycles
  238. s.disconnect[dc] = append(s.disconnect[dc], idx)
  239. n.node = dial
  240. nv, _ := s.sp.RegisterNode(n.node)
  241. if n.service {
  242. nv.Served([]ServedRequest{{ReqType: 0, Amount: 100}}, 0)
  243. }
  244. }
  245. }
  246. s.serviceCycles += s.servedConn
  247. s.clock.Run(time.Second)
  248. s.preNegLock.Lock()
  249. s.cycle++
  250. s.preNegLock.Unlock()
  251. }
  252. }
  253. func (s *ServerPoolTest) setNodes(count, conn, wait int, service, trusted bool) (res []int) {
  254. for ; count > 0; count-- {
  255. idx := rand.Intn(spTestNodes)
  256. for s.testNodes[idx].connectCycles != 0 || s.testNodes[idx].connected {
  257. idx = rand.Intn(spTestNodes)
  258. }
  259. res = append(res, idx)
  260. s.preNegLock.Lock()
  261. s.testNodes[idx] = spTestNode{
  262. connectCycles: conn,
  263. waitCycles: wait,
  264. service: service,
  265. }
  266. s.preNegLock.Unlock()
  267. if trusted {
  268. s.addTrusted(idx)
  269. }
  270. }
  271. return
  272. }
  273. func (s *ServerPoolTest) resetNodes() {
  274. for i, n := range s.testNodes {
  275. if n.connected {
  276. n.totalConn += s.cycle
  277. s.sp.UnregisterNode(n.node)
  278. }
  279. s.preNegLock.Lock()
  280. s.testNodes[i] = spTestNode{totalConn: n.totalConn}
  281. s.preNegLock.Unlock()
  282. }
  283. s.conn, s.servedConn = 0, 0
  284. s.disconnect = make(map[int][]int)
  285. s.trusted = nil
  286. }
  287. func (s *ServerPoolTest) checkNodes(t *testing.T, nodes []int) {
  288. var sum int
  289. for _, idx := range nodes {
  290. n := &s.testNodes[idx]
  291. if n.connected {
  292. n.totalConn += s.cycle
  293. }
  294. sum += n.totalConn
  295. n.totalConn = 0
  296. if n.connected {
  297. n.totalConn -= s.cycle
  298. }
  299. }
  300. if sum < spMinTotal || sum > spMaxTotal {
  301. t.Errorf("Total connection amount %d outside expected range %d to %d", sum, spMinTotal, spMaxTotal)
  302. }
  303. }
  304. func TestServerPool(t *testing.T) { testServerPool(t, false, false) }
  305. func TestServerPoolWithPreNeg(t *testing.T) { testServerPool(t, true, false) }
  306. func TestServerPoolWithPreNegFail(t *testing.T) { testServerPool(t, true, true) }
  307. func testServerPool(t *testing.T, preNeg, fail bool) {
  308. s := newServerPoolTest(preNeg, fail)
  309. nodes := s.setNodes(100, 200, 200, true, false)
  310. s.setNodes(100, 20, 20, false, false)
  311. s.start()
  312. s.run()
  313. s.stop()
  314. s.checkNodes(t, nodes)
  315. }
  316. func TestServerPoolChangedNodes(t *testing.T) { testServerPoolChangedNodes(t, false) }
  317. func TestServerPoolChangedNodesWithPreNeg(t *testing.T) { testServerPoolChangedNodes(t, true) }
  318. func testServerPoolChangedNodes(t *testing.T, preNeg bool) {
  319. s := newServerPoolTest(preNeg, false)
  320. nodes := s.setNodes(100, 200, 200, true, false)
  321. s.setNodes(100, 20, 20, false, false)
  322. s.start()
  323. s.run()
  324. s.checkNodes(t, nodes)
  325. for i := 0; i < 3; i++ {
  326. s.resetNodes()
  327. nodes := s.setNodes(100, 200, 200, true, false)
  328. s.setNodes(100, 20, 20, false, false)
  329. s.run()
  330. s.checkNodes(t, nodes)
  331. }
  332. s.stop()
  333. }
  334. func TestServerPoolRestartNoDiscovery(t *testing.T) { testServerPoolRestartNoDiscovery(t, false) }
  335. func TestServerPoolRestartNoDiscoveryWithPreNeg(t *testing.T) {
  336. testServerPoolRestartNoDiscovery(t, true)
  337. }
  338. func testServerPoolRestartNoDiscovery(t *testing.T, preNeg bool) {
  339. s := newServerPoolTest(preNeg, false)
  340. nodes := s.setNodes(100, 200, 200, true, false)
  341. s.setNodes(100, 20, 20, false, false)
  342. s.start()
  343. s.run()
  344. s.stop()
  345. s.checkNodes(t, nodes)
  346. s.input = nil
  347. s.start()
  348. s.run()
  349. s.stop()
  350. s.checkNodes(t, nodes)
  351. }
  352. func TestServerPoolTrustedNoDiscovery(t *testing.T) { testServerPoolTrustedNoDiscovery(t, false) }
  353. func TestServerPoolTrustedNoDiscoveryWithPreNeg(t *testing.T) {
  354. testServerPoolTrustedNoDiscovery(t, true)
  355. }
  356. func testServerPoolTrustedNoDiscovery(t *testing.T, preNeg bool) {
  357. s := newServerPoolTest(preNeg, false)
  358. trusted := s.setNodes(200, 200, 200, true, true)
  359. s.input = nil
  360. s.start()
  361. s.run()
  362. s.stop()
  363. s.checkNodes(t, trusted)
  364. }