les_test.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. package main
  2. import (
  3. "context"
  4. "os"
  5. "path/filepath"
  6. "runtime"
  7. "strings"
  8. "testing"
  9. "time"
  10. "github.com/ethereum/go-ethereum/p2p"
  11. "github.com/ethereum/go-ethereum/rpc"
  12. )
  13. type gethrpc struct {
  14. name string
  15. rpc *rpc.Client
  16. geth *testgeth
  17. nodeInfo *p2p.NodeInfo
  18. }
  19. func (g *gethrpc) killAndWait() {
  20. g.geth.Kill()
  21. g.geth.WaitExit()
  22. }
  23. func (g *gethrpc) callRPC(result interface{}, method string, args ...interface{}) {
  24. if err := g.rpc.Call(&result, method, args...); err != nil {
  25. g.geth.Fatalf("callRPC %v: %v", method, err)
  26. }
  27. }
  28. func (g *gethrpc) addPeer(peer *gethrpc) {
  29. g.geth.Logf("%v.addPeer(%v)", g.name, peer.name)
  30. enode := peer.getNodeInfo().Enode
  31. peerCh := make(chan *p2p.PeerEvent)
  32. sub, err := g.rpc.Subscribe(context.Background(), "admin", peerCh, "peerEvents")
  33. if err != nil {
  34. g.geth.Fatalf("subscribe %v: %v", g.name, err)
  35. }
  36. defer sub.Unsubscribe()
  37. g.callRPC(nil, "admin_addPeer", enode)
  38. dur := 14 * time.Second
  39. timeout := time.After(dur)
  40. select {
  41. case ev := <-peerCh:
  42. g.geth.Logf("%v received event: type=%v, peer=%v", g.name, ev.Type, ev.Peer)
  43. case err := <-sub.Err():
  44. g.geth.Fatalf("%v sub error: %v", g.name, err)
  45. case <-timeout:
  46. g.geth.Error("timeout adding peer after", dur)
  47. }
  48. }
  49. // Use this function instead of `g.nodeInfo` directly
  50. func (g *gethrpc) getNodeInfo() *p2p.NodeInfo {
  51. if g.nodeInfo != nil {
  52. return g.nodeInfo
  53. }
  54. g.nodeInfo = &p2p.NodeInfo{}
  55. g.callRPC(&g.nodeInfo, "admin_nodeInfo")
  56. return g.nodeInfo
  57. }
  58. func (g *gethrpc) waitSynced() {
  59. // Check if it's synced now
  60. var result interface{}
  61. g.callRPC(&result, "eth_syncing")
  62. syncing, ok := result.(bool)
  63. if ok && !syncing {
  64. g.geth.Logf("%v already synced", g.name)
  65. return
  66. }
  67. // Actually wait, subscribe to the event
  68. ch := make(chan interface{})
  69. sub, err := g.rpc.Subscribe(context.Background(), "eth", ch, "syncing")
  70. if err != nil {
  71. g.geth.Fatalf("%v syncing: %v", g.name, err)
  72. }
  73. defer sub.Unsubscribe()
  74. timeout := time.After(4 * time.Second)
  75. select {
  76. case ev := <-ch:
  77. g.geth.Log("'syncing' event", ev)
  78. syncing, ok := ev.(bool)
  79. if ok && !syncing {
  80. break
  81. }
  82. g.geth.Log("Other 'syncing' event", ev)
  83. case err := <-sub.Err():
  84. g.geth.Fatalf("%v notification: %v", g.name, err)
  85. break
  86. case <-timeout:
  87. g.geth.Fatalf("%v timeout syncing", g.name)
  88. break
  89. }
  90. }
  91. // ipcEndpoint resolves an IPC endpoint based on a configured value, taking into
  92. // account the set data folders as well as the designated platform we're currently
  93. // running on.
  94. func ipcEndpoint(ipcPath, datadir string) string {
  95. // On windows we can only use plain top-level pipes
  96. if runtime.GOOS == "windows" {
  97. if strings.HasPrefix(ipcPath, `\\.\pipe\`) {
  98. return ipcPath
  99. }
  100. return `\\.\pipe\` + ipcPath
  101. }
  102. // Resolve names into the data directory full paths otherwise
  103. if filepath.Base(ipcPath) == ipcPath {
  104. if datadir == "" {
  105. return filepath.Join(os.TempDir(), ipcPath)
  106. }
  107. return filepath.Join(datadir, ipcPath)
  108. }
  109. return ipcPath
  110. }
  111. func startGethWithIpc(t *testing.T, name string, args ...string) *gethrpc {
  112. g := &gethrpc{name: name}
  113. args = append([]string{"--networkid=42", "--port=0", "--nousb"}, args...)
  114. t.Logf("Starting %v with rpc: %v", name, args)
  115. g.geth = runGeth(t, args...)
  116. // wait before we can attach to it. TODO: probe for it properly
  117. time.Sleep(1 * time.Second)
  118. var err error
  119. ipcpath := ipcEndpoint("geth.ipc", g.geth.Datadir)
  120. g.rpc, err = rpc.Dial(ipcpath)
  121. if err != nil {
  122. t.Fatalf("%v rpc connect to %v: %v", name, ipcpath, err)
  123. }
  124. return g
  125. }
  126. func initGeth(t *testing.T) string {
  127. g := runGeth(t, "--nousb", "--networkid=42", "init", "./testdata/clique.json")
  128. datadir := g.Datadir
  129. g.WaitExit()
  130. return datadir
  131. }
  132. func startLightServer(t *testing.T) *gethrpc {
  133. datadir := initGeth(t)
  134. runGeth(t, "--nousb", "--datadir", datadir, "--password", "./testdata/password.txt", "account", "import", "./testdata/key.prv").WaitExit()
  135. account := "0x02f0d131f1f97aef08aec6e3291b957d9efe7105"
  136. server := startGethWithIpc(t, "lightserver", "--allow-insecure-unlock", "--datadir", datadir, "--password", "./testdata/password.txt", "--unlock", account, "--mine", "--light.serve=100", "--light.maxpeers=1", "--nodiscover", "--nat=extip:127.0.0.1")
  137. return server
  138. }
  139. func startClient(t *testing.T, name string) *gethrpc {
  140. datadir := initGeth(t)
  141. return startGethWithIpc(t, name, "--datadir", datadir, "--nodiscover", "--syncmode=light", "--nat=extip:127.0.0.1")
  142. }
  143. func TestPriorityClient(t *testing.T) {
  144. lightServer := startLightServer(t)
  145. defer lightServer.killAndWait()
  146. // Start client and add lightServer as peer
  147. freeCli := startClient(t, "freeCli")
  148. defer freeCli.killAndWait()
  149. freeCli.addPeer(lightServer)
  150. var peers []*p2p.PeerInfo
  151. freeCli.callRPC(&peers, "admin_peers")
  152. if len(peers) != 1 {
  153. t.Errorf("Expected: # of client peers == 1, actual: %v", len(peers))
  154. return
  155. }
  156. // Set up priority client, get its nodeID, increase its balance on the lightServer
  157. prioCli := startClient(t, "prioCli")
  158. defer prioCli.killAndWait()
  159. // 3_000_000_000 once we move to Go 1.13
  160. tokens := 3000000000
  161. lightServer.callRPC(nil, "les_addBalance", prioCli.getNodeInfo().ID, tokens)
  162. prioCli.addPeer(lightServer)
  163. // Check if priority client is actually syncing and the regular client got kicked out
  164. prioCli.callRPC(&peers, "admin_peers")
  165. if len(peers) != 1 {
  166. t.Errorf("Expected: # of prio peers == 1, actual: %v", len(peers))
  167. }
  168. nodes := map[string]*gethrpc{
  169. lightServer.getNodeInfo().ID: lightServer,
  170. freeCli.getNodeInfo().ID: freeCli,
  171. prioCli.getNodeInfo().ID: prioCli,
  172. }
  173. time.Sleep(1 * time.Second)
  174. lightServer.callRPC(&peers, "admin_peers")
  175. peersWithNames := make(map[string]string)
  176. for _, p := range peers {
  177. peersWithNames[nodes[p.ID].name] = p.ID
  178. }
  179. if _, freeClientFound := peersWithNames[freeCli.name]; freeClientFound {
  180. t.Error("client is still a peer of lightServer", peersWithNames)
  181. }
  182. if _, prioClientFound := peersWithNames[prioCli.name]; !prioClientFound {
  183. t.Error("prio client is not among lightServer peers", peersWithNames)
  184. }
  185. }