sim_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package discv5
  17. import (
  18. "crypto/ecdsa"
  19. "encoding/binary"
  20. "fmt"
  21. "math/rand"
  22. "net"
  23. "strconv"
  24. "sync"
  25. "sync/atomic"
  26. "testing"
  27. "time"
  28. "github.com/ethereum/go-ethereum/common"
  29. )
  30. // In this test, nodes try to randomly resolve each other.
  31. func TestSimRandomResolve(t *testing.T) {
  32. t.Skip("boring")
  33. if runWithPlaygroundTime(t) {
  34. return
  35. }
  36. sim := newSimulation()
  37. bootnode := sim.launchNode(false)
  38. // A new node joins every 10s.
  39. launcher := time.NewTicker(10 * time.Second)
  40. go func() {
  41. for range launcher.C {
  42. net := sim.launchNode(false)
  43. go randomResolves(t, sim, net)
  44. if err := net.SetFallbackNodes([]*Node{bootnode.Self()}); err != nil {
  45. panic(err)
  46. }
  47. fmt.Printf("launched @ %v: %x\n", time.Now(), net.Self().ID[:16])
  48. }
  49. }()
  50. time.Sleep(3 * time.Hour)
  51. launcher.Stop()
  52. sim.shutdown()
  53. sim.printStats()
  54. }
  55. func TestSimTopics(t *testing.T) {
  56. t.Skip("NaCl test")
  57. if runWithPlaygroundTime(t) {
  58. return
  59. }
  60. // glog.SetV(6)
  61. // glog.SetToStderr(true)
  62. sim := newSimulation()
  63. bootnode := sim.launchNode(false)
  64. go func() {
  65. nets := make([]*Network, 1024)
  66. for i, _ := range nets {
  67. net := sim.launchNode(false)
  68. nets[i] = net
  69. if err := net.SetFallbackNodes([]*Node{bootnode.Self()}); err != nil {
  70. panic(err)
  71. }
  72. time.Sleep(time.Second * 5)
  73. }
  74. for i, net := range nets {
  75. if i < 256 {
  76. stop := make(chan struct{})
  77. go net.RegisterTopic(testTopic, stop)
  78. go func() {
  79. //time.Sleep(time.Second * 36000)
  80. time.Sleep(time.Second * 40000)
  81. close(stop)
  82. }()
  83. time.Sleep(time.Millisecond * 100)
  84. }
  85. // time.Sleep(time.Second * 10)
  86. //time.Sleep(time.Second)
  87. /*if i%500 == 499 {
  88. time.Sleep(time.Second * 9501)
  89. } else {
  90. time.Sleep(time.Second)
  91. }*/
  92. }
  93. }()
  94. // A new node joins every 10s.
  95. /* launcher := time.NewTicker(5 * time.Second)
  96. cnt := 0
  97. var printNet *Network
  98. go func() {
  99. for range launcher.C {
  100. cnt++
  101. if cnt <= 1000 {
  102. log := false //(cnt == 500)
  103. net := sim.launchNode(log)
  104. if log {
  105. printNet = net
  106. }
  107. if cnt > 500 {
  108. go net.RegisterTopic(testTopic, nil)
  109. }
  110. if err := net.SetFallbackNodes([]*Node{bootnode.Self()}); err != nil {
  111. panic(err)
  112. }
  113. }
  114. //fmt.Printf("launched @ %v: %x\n", time.Now(), net.Self().ID[:16])
  115. }
  116. }()
  117. */
  118. time.Sleep(55000 * time.Second)
  119. //launcher.Stop()
  120. sim.shutdown()
  121. //sim.printStats()
  122. //printNet.log.printLogs()
  123. }
  124. /*func testHierarchicalTopics(i int) []Topic {
  125. digits := strconv.FormatInt(int64(256+i/4), 4)
  126. res := make([]Topic, 5)
  127. for i, _ := range res {
  128. res[i] = Topic("foo" + digits[1:i+1])
  129. }
  130. return res
  131. }*/
  132. func testHierarchicalTopics(i int) []Topic {
  133. digits := strconv.FormatInt(int64(128+i/8), 2)
  134. res := make([]Topic, 8)
  135. for i, _ := range res {
  136. res[i] = Topic("foo" + digits[1:i+1])
  137. }
  138. return res
  139. }
  140. func TestSimTopicHierarchy(t *testing.T) {
  141. t.Skip("NaCl test")
  142. if runWithPlaygroundTime(t) {
  143. return
  144. }
  145. // glog.SetV(6)
  146. // glog.SetToStderr(true)
  147. sim := newSimulation()
  148. bootnode := sim.launchNode(false)
  149. go func() {
  150. nets := make([]*Network, 1024)
  151. for i, _ := range nets {
  152. net := sim.launchNode(false)
  153. nets[i] = net
  154. if err := net.SetFallbackNodes([]*Node{bootnode.Self()}); err != nil {
  155. panic(err)
  156. }
  157. time.Sleep(time.Second * 5)
  158. }
  159. stop := make(chan struct{})
  160. for i, net := range nets {
  161. //if i < 256 {
  162. for _, topic := range testHierarchicalTopics(i)[:5] {
  163. //fmt.Println("reg", topic)
  164. go net.RegisterTopic(topic, stop)
  165. }
  166. time.Sleep(time.Millisecond * 100)
  167. //}
  168. }
  169. time.Sleep(time.Second * 90000)
  170. close(stop)
  171. }()
  172. time.Sleep(100000 * time.Second)
  173. sim.shutdown()
  174. }
  175. func randomResolves(t *testing.T, s *simulation, net *Network) {
  176. randtime := func() time.Duration {
  177. return time.Duration(rand.Intn(50)+20) * time.Second
  178. }
  179. lookup := func(target NodeID) bool {
  180. result := net.Resolve(target)
  181. return result != nil && result.ID == target
  182. }
  183. timer := time.NewTimer(randtime())
  184. for {
  185. select {
  186. case <-timer.C:
  187. target := s.randomNode().Self().ID
  188. if !lookup(target) {
  189. t.Errorf("node %x: target %x not found", net.Self().ID[:8], target[:8])
  190. }
  191. timer.Reset(randtime())
  192. case <-net.closed:
  193. return
  194. }
  195. }
  196. }
  197. type simulation struct {
  198. mu sync.RWMutex
  199. nodes map[NodeID]*Network
  200. nodectr uint32
  201. }
  202. func newSimulation() *simulation {
  203. return &simulation{nodes: make(map[NodeID]*Network)}
  204. }
  205. func (s *simulation) shutdown() {
  206. s.mu.RLock()
  207. alive := make([]*Network, 0, len(s.nodes))
  208. for _, n := range s.nodes {
  209. alive = append(alive, n)
  210. }
  211. defer s.mu.RUnlock()
  212. for _, n := range alive {
  213. n.Close()
  214. }
  215. }
  216. func (s *simulation) printStats() {
  217. s.mu.Lock()
  218. defer s.mu.Unlock()
  219. fmt.Println("node counter:", s.nodectr)
  220. fmt.Println("alive nodes:", len(s.nodes))
  221. // for _, n := range s.nodes {
  222. // fmt.Printf("%x\n", n.tab.self.ID[:8])
  223. // transport := n.conn.(*simTransport)
  224. // fmt.Println(" joined:", transport.joinTime)
  225. // fmt.Println(" sends:", transport.hashctr)
  226. // fmt.Println(" table size:", n.tab.count)
  227. // }
  228. /*for _, n := range s.nodes {
  229. fmt.Println()
  230. fmt.Printf("*** Node %x\n", n.tab.self.ID[:8])
  231. n.log.printLogs()
  232. }*/
  233. }
  234. func (s *simulation) randomNode() *Network {
  235. s.mu.Lock()
  236. defer s.mu.Unlock()
  237. n := rand.Intn(len(s.nodes))
  238. for _, net := range s.nodes {
  239. if n == 0 {
  240. return net
  241. }
  242. n--
  243. }
  244. return nil
  245. }
  246. func (s *simulation) launchNode(log bool) *Network {
  247. var (
  248. num = s.nodectr
  249. key = newkey()
  250. id = PubkeyID(&key.PublicKey)
  251. ip = make(net.IP, 4)
  252. )
  253. s.nodectr++
  254. binary.BigEndian.PutUint32(ip, num)
  255. ip[0] = 10
  256. addr := &net.UDPAddr{IP: ip, Port: 30303}
  257. transport := &simTransport{joinTime: time.Now(), sender: id, senderAddr: addr, sim: s, priv: key}
  258. net, err := newNetwork(transport, key.PublicKey, nil, "<no database>", nil)
  259. if err != nil {
  260. panic("cannot launch new node: " + err.Error())
  261. }
  262. s.mu.Lock()
  263. s.nodes[id] = net
  264. s.mu.Unlock()
  265. return net
  266. }
  267. func (s *simulation) dropNode(id NodeID) {
  268. s.mu.Lock()
  269. n := s.nodes[id]
  270. delete(s.nodes, id)
  271. s.mu.Unlock()
  272. n.Close()
  273. }
  274. type simTransport struct {
  275. joinTime time.Time
  276. sender NodeID
  277. senderAddr *net.UDPAddr
  278. sim *simulation
  279. hashctr uint64
  280. priv *ecdsa.PrivateKey
  281. }
  282. func (st *simTransport) localAddr() *net.UDPAddr {
  283. return st.senderAddr
  284. }
  285. func (st *simTransport) Close() {}
  286. func (st *simTransport) send(remote *Node, ptype nodeEvent, data interface{}) (hash []byte) {
  287. hash = st.nextHash()
  288. var raw []byte
  289. if ptype == pongPacket {
  290. var err error
  291. raw, _, err = encodePacket(st.priv, byte(ptype), data)
  292. if err != nil {
  293. panic(err)
  294. }
  295. }
  296. st.sendPacket(remote.ID, ingressPacket{
  297. remoteID: st.sender,
  298. remoteAddr: st.senderAddr,
  299. hash: hash,
  300. ev: ptype,
  301. data: data,
  302. rawData: raw,
  303. })
  304. return hash
  305. }
  306. func (st *simTransport) sendPing(remote *Node, remoteAddr *net.UDPAddr, topics []Topic) []byte {
  307. hash := st.nextHash()
  308. st.sendPacket(remote.ID, ingressPacket{
  309. remoteID: st.sender,
  310. remoteAddr: st.senderAddr,
  311. hash: hash,
  312. ev: pingPacket,
  313. data: &ping{
  314. Version: 4,
  315. From: rpcEndpoint{IP: st.senderAddr.IP, UDP: uint16(st.senderAddr.Port), TCP: 30303},
  316. To: rpcEndpoint{IP: remoteAddr.IP, UDP: uint16(remoteAddr.Port), TCP: 30303},
  317. Expiration: uint64(time.Now().Unix() + int64(expiration)),
  318. Topics: topics,
  319. },
  320. })
  321. return hash
  322. }
  323. func (st *simTransport) sendPong(remote *Node, pingHash []byte) {
  324. raddr := remote.addr()
  325. st.sendPacket(remote.ID, ingressPacket{
  326. remoteID: st.sender,
  327. remoteAddr: st.senderAddr,
  328. hash: st.nextHash(),
  329. ev: pongPacket,
  330. data: &pong{
  331. To: rpcEndpoint{IP: raddr.IP, UDP: uint16(raddr.Port), TCP: 30303},
  332. ReplyTok: pingHash,
  333. Expiration: uint64(time.Now().Unix() + int64(expiration)),
  334. },
  335. })
  336. }
  337. func (st *simTransport) sendFindnodeHash(remote *Node, target common.Hash) {
  338. st.sendPacket(remote.ID, ingressPacket{
  339. remoteID: st.sender,
  340. remoteAddr: st.senderAddr,
  341. hash: st.nextHash(),
  342. ev: findnodeHashPacket,
  343. data: &findnodeHash{
  344. Target: target,
  345. Expiration: uint64(time.Now().Unix() + int64(expiration)),
  346. },
  347. })
  348. }
  349. func (st *simTransport) sendTopicRegister(remote *Node, topics []Topic, idx int, pong []byte) {
  350. //fmt.Println("send", topics, pong)
  351. st.sendPacket(remote.ID, ingressPacket{
  352. remoteID: st.sender,
  353. remoteAddr: st.senderAddr,
  354. hash: st.nextHash(),
  355. ev: topicRegisterPacket,
  356. data: &topicRegister{
  357. Topics: topics,
  358. Idx: uint(idx),
  359. Pong: pong,
  360. },
  361. })
  362. }
  363. func (st *simTransport) sendTopicNodes(remote *Node, queryHash common.Hash, nodes []*Node) {
  364. rnodes := make([]rpcNode, len(nodes))
  365. for i := range nodes {
  366. rnodes[i] = nodeToRPC(nodes[i])
  367. }
  368. st.sendPacket(remote.ID, ingressPacket{
  369. remoteID: st.sender,
  370. remoteAddr: st.senderAddr,
  371. hash: st.nextHash(),
  372. ev: topicNodesPacket,
  373. data: &topicNodes{Echo: queryHash, Nodes: rnodes},
  374. })
  375. }
  376. func (st *simTransport) sendNeighbours(remote *Node, nodes []*Node) {
  377. // TODO: send multiple packets
  378. rnodes := make([]rpcNode, len(nodes))
  379. for i := range nodes {
  380. rnodes[i] = nodeToRPC(nodes[i])
  381. }
  382. st.sendPacket(remote.ID, ingressPacket{
  383. remoteID: st.sender,
  384. remoteAddr: st.senderAddr,
  385. hash: st.nextHash(),
  386. ev: neighborsPacket,
  387. data: &neighbors{
  388. Nodes: rnodes,
  389. Expiration: uint64(time.Now().Unix() + int64(expiration)),
  390. },
  391. })
  392. }
  393. func (st *simTransport) nextHash() []byte {
  394. v := atomic.AddUint64(&st.hashctr, 1)
  395. var hash common.Hash
  396. binary.BigEndian.PutUint64(hash[:], v)
  397. return hash[:]
  398. }
  399. const packetLoss = 0 // 1/1000
  400. func (st *simTransport) sendPacket(remote NodeID, p ingressPacket) {
  401. if rand.Int31n(1000) >= packetLoss {
  402. st.sim.mu.RLock()
  403. recipient := st.sim.nodes[remote]
  404. st.sim.mu.RUnlock()
  405. time.AfterFunc(200*time.Millisecond, func() {
  406. recipient.reqReadPacket(p)
  407. })
  408. }
  409. }