sim_test.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package discv5
  17. import (
  18. "crypto/ecdsa"
  19. "encoding/binary"
  20. "fmt"
  21. "math/rand"
  22. "net"
  23. "strconv"
  24. "sync"
  25. "sync/atomic"
  26. "testing"
  27. "time"
  28. "github.com/ethereum/go-ethereum/common"
  29. )
  30. // In this test, nodes try to randomly resolve each other.
  31. func TestSimRandomResolve(t *testing.T) {
  32. t.Skip("boring")
  33. if runWithPlaygroundTime(t) {
  34. return
  35. }
  36. sim := newSimulation()
  37. bootnode := sim.launchNode(false)
  38. // A new node joins every 10s.
  39. launcher := time.NewTicker(10 * time.Second)
  40. defer launcher.Stop()
  41. go func() {
  42. for range launcher.C {
  43. net := sim.launchNode(false)
  44. go randomResolves(t, sim, net)
  45. if err := net.SetFallbackNodes([]*Node{bootnode.Self()}); err != nil {
  46. panic(err)
  47. }
  48. t.Logf("launched @ %v: %x\n", time.Now(), net.Self().ID[:16])
  49. }
  50. }()
  51. time.Sleep(3 * time.Hour)
  52. sim.shutdown()
  53. sim.printStats()
  54. }
  55. func TestSimTopics(t *testing.T) {
  56. t.Skip("NaCl test")
  57. if runWithPlaygroundTime(t) {
  58. return
  59. }
  60. sim := newSimulation()
  61. bootnode := sim.launchNode(false)
  62. go func() {
  63. nets := make([]*Network, 1024)
  64. for i := range nets {
  65. net := sim.launchNode(false)
  66. nets[i] = net
  67. if err := net.SetFallbackNodes([]*Node{bootnode.Self()}); err != nil {
  68. panic(err)
  69. }
  70. time.Sleep(time.Second * 5)
  71. }
  72. for i, net := range nets {
  73. if i < 256 {
  74. stop := make(chan struct{})
  75. go net.RegisterTopic(testTopic, stop)
  76. go func() {
  77. //time.Sleep(time.Second * 36000)
  78. time.Sleep(time.Second * 40000)
  79. close(stop)
  80. }()
  81. time.Sleep(time.Millisecond * 100)
  82. }
  83. // time.Sleep(time.Second * 10)
  84. //time.Sleep(time.Second)
  85. /*if i%500 == 499 {
  86. time.Sleep(time.Second * 9501)
  87. } else {
  88. time.Sleep(time.Second)
  89. }*/
  90. }
  91. }()
  92. // A new node joins every 10s.
  93. /* launcher := time.NewTicker(5 * time.Second)
  94. cnt := 0
  95. var printNet *Network
  96. go func() {
  97. for range launcher.C {
  98. cnt++
  99. if cnt <= 1000 {
  100. log := false //(cnt == 500)
  101. net := sim.launchNode(log)
  102. if log {
  103. printNet = net
  104. }
  105. if cnt > 500 {
  106. go net.RegisterTopic(testTopic, nil)
  107. }
  108. if err := net.SetFallbackNodes([]*Node{bootnode.Self()}); err != nil {
  109. panic(err)
  110. }
  111. }
  112. //fmt.Printf("launched @ %v: %x\n", time.Now(), net.Self().ID[:16])
  113. }
  114. }()
  115. */
  116. time.Sleep(55000 * time.Second)
  117. //launcher.Stop()
  118. sim.shutdown()
  119. //sim.printStats()
  120. //printNet.log.printLogs()
  121. }
  122. /*func testHierarchicalTopics(i int) []Topic {
  123. digits := strconv.FormatInt(int64(256+i/4), 4)
  124. res := make([]Topic, 5)
  125. for i, _ := range res {
  126. res[i] = Topic("foo" + digits[1:i+1])
  127. }
  128. return res
  129. }*/
  130. func testHierarchicalTopics(i int) []Topic {
  131. digits := strconv.FormatInt(int64(128+i/8), 2)
  132. res := make([]Topic, 8)
  133. for i := range res {
  134. res[i] = Topic("foo" + digits[1:i+1])
  135. }
  136. return res
  137. }
  138. func TestSimTopicHierarchy(t *testing.T) {
  139. t.Skip("NaCl test")
  140. if runWithPlaygroundTime(t) {
  141. return
  142. }
  143. sim := newSimulation()
  144. bootnode := sim.launchNode(false)
  145. go func() {
  146. nets := make([]*Network, 1024)
  147. for i := range nets {
  148. net := sim.launchNode(false)
  149. nets[i] = net
  150. if err := net.SetFallbackNodes([]*Node{bootnode.Self()}); err != nil {
  151. panic(err)
  152. }
  153. time.Sleep(time.Second * 5)
  154. }
  155. stop := make(chan struct{})
  156. for i, net := range nets {
  157. //if i < 256 {
  158. for _, topic := range testHierarchicalTopics(i)[:5] {
  159. //fmt.Println("reg", topic)
  160. go net.RegisterTopic(topic, stop)
  161. }
  162. time.Sleep(time.Millisecond * 100)
  163. //}
  164. }
  165. time.Sleep(time.Second * 90000)
  166. close(stop)
  167. }()
  168. time.Sleep(100000 * time.Second)
  169. sim.shutdown()
  170. }
  171. func randomResolves(t *testing.T, s *simulation, net *Network) {
  172. randtime := func() time.Duration {
  173. return time.Duration(rand.Intn(50)+20) * time.Second
  174. }
  175. lookup := func(target NodeID) bool {
  176. result := net.Resolve(target)
  177. return result != nil && result.ID == target
  178. }
  179. timer := time.NewTimer(randtime())
  180. defer timer.Stop()
  181. for {
  182. select {
  183. case <-timer.C:
  184. target := s.randomNode().Self().ID
  185. if !lookup(target) {
  186. t.Errorf("node %x: target %x not found", net.Self().ID[:8], target[:8])
  187. }
  188. timer.Reset(randtime())
  189. case <-net.closed:
  190. return
  191. }
  192. }
  193. }
  194. type simulation struct {
  195. mu sync.RWMutex
  196. nodes map[NodeID]*Network
  197. nodectr uint32
  198. }
  199. func newSimulation() *simulation {
  200. return &simulation{nodes: make(map[NodeID]*Network)}
  201. }
  202. func (s *simulation) shutdown() {
  203. s.mu.RLock()
  204. alive := make([]*Network, 0, len(s.nodes))
  205. for _, n := range s.nodes {
  206. alive = append(alive, n)
  207. }
  208. defer s.mu.RUnlock()
  209. for _, n := range alive {
  210. n.Close()
  211. }
  212. }
  213. func (s *simulation) printStats() {
  214. s.mu.Lock()
  215. defer s.mu.Unlock()
  216. fmt.Println("node counter:", s.nodectr)
  217. fmt.Println("alive nodes:", len(s.nodes))
  218. // for _, n := range s.nodes {
  219. // fmt.Printf("%x\n", n.tab.self.ID[:8])
  220. // transport := n.conn.(*simTransport)
  221. // fmt.Println(" joined:", transport.joinTime)
  222. // fmt.Println(" sends:", transport.hashctr)
  223. // fmt.Println(" table size:", n.tab.count)
  224. // }
  225. /*for _, n := range s.nodes {
  226. fmt.Println()
  227. fmt.Printf("*** Node %x\n", n.tab.self.ID[:8])
  228. n.log.printLogs()
  229. }*/
  230. }
  231. func (s *simulation) randomNode() *Network {
  232. s.mu.Lock()
  233. defer s.mu.Unlock()
  234. n := rand.Intn(len(s.nodes))
  235. for _, net := range s.nodes {
  236. if n == 0 {
  237. return net
  238. }
  239. n--
  240. }
  241. return nil
  242. }
  243. func (s *simulation) launchNode(log bool) *Network {
  244. var (
  245. num = s.nodectr
  246. key = newkey()
  247. id = PubkeyID(&key.PublicKey)
  248. ip = make(net.IP, 4)
  249. )
  250. s.nodectr++
  251. binary.BigEndian.PutUint32(ip, num)
  252. ip[0] = 10
  253. addr := &net.UDPAddr{IP: ip, Port: 30303}
  254. transport := &simTransport{joinTime: time.Now(), sender: id, senderAddr: addr, sim: s, priv: key}
  255. net, err := newNetwork(transport, key.PublicKey, "<no database>", nil)
  256. if err != nil {
  257. panic("cannot launch new node: " + err.Error())
  258. }
  259. s.mu.Lock()
  260. s.nodes[id] = net
  261. s.mu.Unlock()
  262. return net
  263. }
  264. type simTransport struct {
  265. joinTime time.Time
  266. sender NodeID
  267. senderAddr *net.UDPAddr
  268. sim *simulation
  269. hashctr uint64
  270. priv *ecdsa.PrivateKey
  271. }
  272. func (st *simTransport) localAddr() *net.UDPAddr {
  273. return st.senderAddr
  274. }
  275. func (st *simTransport) Close() {}
  276. func (st *simTransport) send(remote *Node, ptype nodeEvent, data interface{}) (hash []byte) {
  277. hash = st.nextHash()
  278. var raw []byte
  279. if ptype == pongPacket {
  280. var err error
  281. raw, _, err = encodePacket(st.priv, byte(ptype), data)
  282. if err != nil {
  283. panic(err)
  284. }
  285. }
  286. st.sendPacket(remote.ID, ingressPacket{
  287. remoteID: st.sender,
  288. remoteAddr: st.senderAddr,
  289. hash: hash,
  290. ev: ptype,
  291. data: data,
  292. rawData: raw,
  293. })
  294. return hash
  295. }
  296. func (st *simTransport) sendPing(remote *Node, remoteAddr *net.UDPAddr, topics []Topic) []byte {
  297. hash := st.nextHash()
  298. st.sendPacket(remote.ID, ingressPacket{
  299. remoteID: st.sender,
  300. remoteAddr: st.senderAddr,
  301. hash: hash,
  302. ev: pingPacket,
  303. data: &ping{
  304. Version: 4,
  305. From: rpcEndpoint{IP: st.senderAddr.IP, UDP: uint16(st.senderAddr.Port), TCP: 30303},
  306. To: rpcEndpoint{IP: remoteAddr.IP, UDP: uint16(remoteAddr.Port), TCP: 30303},
  307. Expiration: uint64(time.Now().Unix() + int64(expiration)),
  308. Topics: topics,
  309. },
  310. })
  311. return hash
  312. }
  313. func (st *simTransport) sendFindnodeHash(remote *Node, target common.Hash) {
  314. st.sendPacket(remote.ID, ingressPacket{
  315. remoteID: st.sender,
  316. remoteAddr: st.senderAddr,
  317. hash: st.nextHash(),
  318. ev: findnodeHashPacket,
  319. data: &findnodeHash{
  320. Target: target,
  321. Expiration: uint64(time.Now().Unix() + int64(expiration)),
  322. },
  323. })
  324. }
  325. func (st *simTransport) sendTopicRegister(remote *Node, topics []Topic, idx int, pong []byte) {
  326. //fmt.Println("send", topics, pong)
  327. st.sendPacket(remote.ID, ingressPacket{
  328. remoteID: st.sender,
  329. remoteAddr: st.senderAddr,
  330. hash: st.nextHash(),
  331. ev: topicRegisterPacket,
  332. data: &topicRegister{
  333. Topics: topics,
  334. Idx: uint(idx),
  335. Pong: pong,
  336. },
  337. })
  338. }
  339. func (st *simTransport) sendTopicNodes(remote *Node, queryHash common.Hash, nodes []*Node) {
  340. rnodes := make([]rpcNode, len(nodes))
  341. for i := range nodes {
  342. rnodes[i] = nodeToRPC(nodes[i])
  343. }
  344. st.sendPacket(remote.ID, ingressPacket{
  345. remoteID: st.sender,
  346. remoteAddr: st.senderAddr,
  347. hash: st.nextHash(),
  348. ev: topicNodesPacket,
  349. data: &topicNodes{Echo: queryHash, Nodes: rnodes},
  350. })
  351. }
  352. func (st *simTransport) sendNeighbours(remote *Node, nodes []*Node) {
  353. // TODO: send multiple packets
  354. rnodes := make([]rpcNode, len(nodes))
  355. for i := range nodes {
  356. rnodes[i] = nodeToRPC(nodes[i])
  357. }
  358. st.sendPacket(remote.ID, ingressPacket{
  359. remoteID: st.sender,
  360. remoteAddr: st.senderAddr,
  361. hash: st.nextHash(),
  362. ev: neighborsPacket,
  363. data: &neighbors{
  364. Nodes: rnodes,
  365. Expiration: uint64(time.Now().Unix() + int64(expiration)),
  366. },
  367. })
  368. }
  369. func (st *simTransport) nextHash() []byte {
  370. v := atomic.AddUint64(&st.hashctr, 1)
  371. var hash common.Hash
  372. binary.BigEndian.PutUint64(hash[:], v)
  373. return hash[:]
  374. }
  375. const packetLoss = 0 // 1/1000
  376. func (st *simTransport) sendPacket(remote NodeID, p ingressPacket) {
  377. if rand.Int31n(1000) >= packetLoss {
  378. st.sim.mu.RLock()
  379. recipient := st.sim.nodes[remote]
  380. st.sim.mu.RUnlock()
  381. time.AfterFunc(200*time.Millisecond, func() {
  382. recipient.reqReadPacket(p)
  383. })
  384. }
  385. }