client_test.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package rpc
  17. import (
  18. "fmt"
  19. "math/rand"
  20. "net"
  21. "net/http"
  22. "net/http/httptest"
  23. "os"
  24. "reflect"
  25. "runtime"
  26. "sync"
  27. "testing"
  28. "time"
  29. "github.com/davecgh/go-spew/spew"
  30. "github.com/ethereum/go-ethereum/logger"
  31. "github.com/ethereum/go-ethereum/logger/glog"
  32. "golang.org/x/net/context"
  33. )
  34. func TestClientRequest(t *testing.T) {
  35. server := newTestServer("service", new(Service))
  36. defer server.Stop()
  37. client := DialInProc(server)
  38. defer client.Close()
  39. var resp Result
  40. if err := client.Call(&resp, "service_echo", "hello", 10, &Args{"world"}); err != nil {
  41. t.Fatal(err)
  42. }
  43. if !reflect.DeepEqual(resp, Result{"hello", 10, &Args{"world"}}) {
  44. t.Errorf("incorrect result %#v", resp)
  45. }
  46. }
  47. func TestClientBatchRequest(t *testing.T) {
  48. server := newTestServer("service", new(Service))
  49. defer server.Stop()
  50. client := DialInProc(server)
  51. defer client.Close()
  52. batch := []BatchElem{
  53. {
  54. Method: "service_echo",
  55. Args: []interface{}{"hello", 10, &Args{"world"}},
  56. Result: new(Result),
  57. },
  58. {
  59. Method: "service_echo",
  60. Args: []interface{}{"hello2", 11, &Args{"world"}},
  61. Result: new(Result),
  62. },
  63. {
  64. Method: "no_such_method",
  65. Args: []interface{}{1, 2, 3},
  66. Result: new(int),
  67. },
  68. }
  69. if err := client.BatchCall(batch); err != nil {
  70. t.Fatal(err)
  71. }
  72. wantResult := []BatchElem{
  73. {
  74. Method: "service_echo",
  75. Args: []interface{}{"hello", 10, &Args{"world"}},
  76. Result: &Result{"hello", 10, &Args{"world"}},
  77. },
  78. {
  79. Method: "service_echo",
  80. Args: []interface{}{"hello2", 11, &Args{"world"}},
  81. Result: &Result{"hello2", 11, &Args{"world"}},
  82. },
  83. {
  84. Method: "no_such_method",
  85. Args: []interface{}{1, 2, 3},
  86. Result: new(int),
  87. Error: &jsonError{Code: -32601, Message: "The method no_such_method_ does not exist/is not available"},
  88. },
  89. }
  90. if !reflect.DeepEqual(batch, wantResult) {
  91. t.Errorf("batch results mismatch:\ngot %swant %s", spew.Sdump(batch), spew.Sdump(wantResult))
  92. }
  93. }
  94. // func TestClientCancelInproc(t *testing.T) { testClientCancel("inproc", t) }
  95. func TestClientCancelWebsocket(t *testing.T) { testClientCancel("ws", t) }
  96. func TestClientCancelHTTP(t *testing.T) { testClientCancel("http", t) }
  97. func TestClientCancelIPC(t *testing.T) { testClientCancel("ipc", t) }
  98. // This test checks that requests made through CallContext can be canceled by canceling
  99. // the context.
  100. func testClientCancel(transport string, t *testing.T) {
  101. server := newTestServer("service", new(Service))
  102. defer server.Stop()
  103. // What we want to achieve is that the context gets canceled
  104. // at various stages of request processing. The interesting cases
  105. // are:
  106. // - cancel during dial
  107. // - cancel while performing a HTTP request
  108. // - cancel while waiting for a response
  109. //
  110. // To trigger those, the times are chosen such that connections
  111. // are killed within the deadline for every other call (maxKillTimeout
  112. // is 2x maxCancelTimeout).
  113. //
  114. // Once a connection is dead, there is a fair chance it won't connect
  115. // successfully because the accept is delayed by 1s.
  116. maxContextCancelTimeout := 300 * time.Millisecond
  117. fl := &flakeyListener{
  118. maxAcceptDelay: 1 * time.Second,
  119. maxKillTimeout: 600 * time.Millisecond,
  120. }
  121. var client *Client
  122. switch transport {
  123. case "ws", "http":
  124. c, hs := httpTestClient(server, transport, fl)
  125. defer hs.Close()
  126. client = c
  127. case "ipc":
  128. c, l := ipcTestClient(server, fl)
  129. defer l.Close()
  130. client = c
  131. default:
  132. panic("unknown transport: " + transport)
  133. }
  134. // These tests take a lot of time, run them all at once.
  135. // You probably want to run with -parallel 1 or comment out
  136. // the call to t.Parallel if you enable the logging.
  137. t.Parallel()
  138. // glog.SetV(6)
  139. // glog.SetToStderr(true)
  140. // defer glog.SetToStderr(false)
  141. // glog.Infoln("testing ", transport)
  142. // The actual test starts here.
  143. var (
  144. wg sync.WaitGroup
  145. nreqs = 10
  146. ncallers = 6
  147. )
  148. caller := func(index int) {
  149. defer wg.Done()
  150. for i := 0; i < nreqs; i++ {
  151. var (
  152. ctx context.Context
  153. cancel func()
  154. timeout = time.Duration(rand.Int63n(int64(maxContextCancelTimeout)))
  155. )
  156. if index < ncallers/2 {
  157. // For half of the callers, create a context without deadline
  158. // and cancel it later.
  159. ctx, cancel = context.WithCancel(context.Background())
  160. time.AfterFunc(timeout, cancel)
  161. } else {
  162. // For the other half, create a context with a deadline instead. This is
  163. // different because the context deadline is used to set the socket write
  164. // deadline.
  165. ctx, cancel = context.WithTimeout(context.Background(), timeout)
  166. }
  167. // Now perform a call with the context.
  168. // The key thing here is that no call will ever complete successfully.
  169. err := client.CallContext(ctx, nil, "service_sleep", 2*maxContextCancelTimeout)
  170. if err != nil {
  171. glog.V(logger.Debug).Infoln("got expected error:", err)
  172. } else {
  173. t.Errorf("no error for call with %v wait time", timeout)
  174. }
  175. cancel()
  176. }
  177. }
  178. wg.Add(ncallers)
  179. for i := 0; i < ncallers; i++ {
  180. go caller(i)
  181. }
  182. wg.Wait()
  183. }
  184. func TestClientSubscribeInvalidArg(t *testing.T) {
  185. server := newTestServer("service", new(Service))
  186. defer server.Stop()
  187. client := DialInProc(server)
  188. defer client.Close()
  189. check := func(shouldPanic bool, arg interface{}) {
  190. defer func() {
  191. err := recover()
  192. if shouldPanic && err == nil {
  193. t.Errorf("EthSubscribe should've panicked for %#v", arg)
  194. }
  195. if !shouldPanic && err != nil {
  196. t.Errorf("EthSubscribe shouldn't have panicked for %#v", arg)
  197. buf := make([]byte, 1024*1024)
  198. buf = buf[:runtime.Stack(buf, false)]
  199. t.Error(err)
  200. t.Error(string(buf))
  201. }
  202. }()
  203. client.EthSubscribe(context.Background(), arg, "foo_bar")
  204. }
  205. check(true, nil)
  206. check(true, 1)
  207. check(true, (chan int)(nil))
  208. check(true, make(<-chan int))
  209. check(false, make(chan int))
  210. check(false, make(chan<- int))
  211. }
  212. func TestClientSubscribe(t *testing.T) {
  213. server := newTestServer("eth", new(NotificationTestService))
  214. defer server.Stop()
  215. client := DialInProc(server)
  216. defer client.Close()
  217. nc := make(chan int)
  218. count := 10
  219. sub, err := client.EthSubscribe(context.Background(), nc, "someSubscription", count, 0)
  220. if err != nil {
  221. t.Fatal("can't subscribe:", err)
  222. }
  223. for i := 0; i < count; i++ {
  224. if val := <-nc; val != i {
  225. t.Fatalf("value mismatch: got %d, want %d", val, i)
  226. }
  227. }
  228. sub.Unsubscribe()
  229. select {
  230. case v := <-nc:
  231. t.Fatal("received value after unsubscribe:", v)
  232. case err := <-sub.Err():
  233. if err != nil {
  234. t.Fatalf("Err returned a non-nil error after explicit unsubscribe: %q", err)
  235. }
  236. case <-time.After(1 * time.Second):
  237. t.Fatalf("subscription not closed within 1s after unsubscribe")
  238. }
  239. }
  240. // In this test, the connection drops while EthSubscribe is
  241. // waiting for a response.
  242. func TestClientSubscribeClose(t *testing.T) {
  243. service := &NotificationTestService{
  244. gotHangSubscriptionReq: make(chan struct{}),
  245. unblockHangSubscription: make(chan struct{}),
  246. }
  247. server := newTestServer("eth", service)
  248. defer server.Stop()
  249. client := DialInProc(server)
  250. defer client.Close()
  251. var (
  252. nc = make(chan int)
  253. errc = make(chan error)
  254. sub *ClientSubscription
  255. err error
  256. )
  257. go func() {
  258. sub, err = client.EthSubscribe(context.Background(), nc, "hangSubscription", 999)
  259. errc <- err
  260. }()
  261. <-service.gotHangSubscriptionReq
  262. client.Close()
  263. service.unblockHangSubscription <- struct{}{}
  264. select {
  265. case err := <-errc:
  266. if err == nil {
  267. t.Errorf("EthSubscribe returned nil error after Close")
  268. }
  269. if sub != nil {
  270. t.Error("EthSubscribe returned non-nil subscription after Close")
  271. }
  272. case <-time.After(1 * time.Second):
  273. t.Fatalf("EthSubscribe did not return within 1s after Close")
  274. }
  275. }
  276. // This test checks that Client doesn't lock up when a single subscriber
  277. // doesn't read subscription events.
  278. func TestClientNotificationStorm(t *testing.T) {
  279. server := newTestServer("eth", new(NotificationTestService))
  280. defer server.Stop()
  281. doTest := func(count int, wantError bool) {
  282. client := DialInProc(server)
  283. defer client.Close()
  284. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  285. defer cancel()
  286. // Subscribe on the server. It will start sending many notifications
  287. // very quickly.
  288. nc := make(chan int)
  289. sub, err := client.EthSubscribe(ctx, nc, "someSubscription", count, 0)
  290. if err != nil {
  291. t.Fatal("can't subscribe:", err)
  292. }
  293. defer sub.Unsubscribe()
  294. // Process each notification, try to run a call in between each of them.
  295. for i := 0; i < count; i++ {
  296. select {
  297. case val := <-nc:
  298. if val != i {
  299. t.Fatalf("(%d/%d) unexpected value %d", i, count, val)
  300. }
  301. case err := <-sub.Err():
  302. if wantError && err != ErrSubscriptionQueueOverflow {
  303. t.Fatalf("(%d/%d) got error %q, want %q", i, count, err, ErrSubscriptionQueueOverflow)
  304. } else if !wantError {
  305. t.Fatalf("(%d/%d) got unexpected error %q", i, count, err)
  306. }
  307. return
  308. }
  309. var r int
  310. err := client.CallContext(ctx, &r, "eth_echo", i)
  311. if err != nil {
  312. if !wantError {
  313. t.Fatalf("(%d/%d) call error: %v", i, count, err)
  314. }
  315. return
  316. }
  317. }
  318. }
  319. doTest(8000, false)
  320. doTest(10000, true)
  321. }
  322. func TestClientHTTP(t *testing.T) {
  323. server := newTestServer("service", new(Service))
  324. defer server.Stop()
  325. client, hs := httpTestClient(server, "http", nil)
  326. defer hs.Close()
  327. defer client.Close()
  328. // Launch concurrent requests.
  329. var (
  330. results = make([]Result, 100)
  331. errc = make(chan error)
  332. wantResult = Result{"a", 1, new(Args)}
  333. )
  334. defer client.Close()
  335. for i := range results {
  336. i := i
  337. go func() {
  338. errc <- client.Call(&results[i], "service_echo",
  339. wantResult.String, wantResult.Int, wantResult.Args)
  340. }()
  341. }
  342. // Wait for all of them to complete.
  343. timeout := time.NewTimer(5 * time.Second)
  344. defer timeout.Stop()
  345. for i := range results {
  346. select {
  347. case err := <-errc:
  348. if err != nil {
  349. t.Fatal(err)
  350. }
  351. case <-timeout.C:
  352. t.Fatalf("timeout (got %d/%d) results)", i+1, len(results))
  353. }
  354. }
  355. // Check results.
  356. for i := range results {
  357. if !reflect.DeepEqual(results[i], wantResult) {
  358. t.Errorf("result %d mismatch: got %#v, want %#v", i, results[i], wantResult)
  359. }
  360. }
  361. }
  362. func TestClientReconnect(t *testing.T) {
  363. startServer := func(addr string) (*Server, net.Listener) {
  364. srv := newTestServer("service", new(Service))
  365. l, err := net.Listen("tcp", addr)
  366. if err != nil {
  367. t.Fatal(err)
  368. }
  369. go http.Serve(l, srv.WebsocketHandler("*"))
  370. return srv, l
  371. }
  372. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  373. defer cancel()
  374. // Start a server and corresponding client.
  375. s1, l1 := startServer("127.0.0.1:0")
  376. client, err := DialContext(ctx, "ws://"+l1.Addr().String())
  377. if err != nil {
  378. t.Fatal("can't dial", err)
  379. }
  380. // Perform a call. This should work because the server is up.
  381. var resp Result
  382. if err := client.CallContext(ctx, &resp, "service_echo", "", 1, nil); err != nil {
  383. t.Fatal(err)
  384. }
  385. // Shut down the server and try calling again. It shouldn't work.
  386. l1.Close()
  387. s1.Stop()
  388. if err := client.CallContext(ctx, &resp, "service_echo", "", 2, nil); err == nil {
  389. t.Error("successful call while the server is down")
  390. t.Logf("resp: %#v", resp)
  391. }
  392. // Allow for some cool down time so we can listen on the same address again.
  393. time.Sleep(2 * time.Second)
  394. // Start it up again and call again. The connection should be reestablished.
  395. // We spawn multiple calls here to check whether this hangs somehow.
  396. s2, l2 := startServer(l1.Addr().String())
  397. defer l2.Close()
  398. defer s2.Stop()
  399. start := make(chan struct{})
  400. errors := make(chan error, 20)
  401. for i := 0; i < cap(errors); i++ {
  402. go func() {
  403. <-start
  404. var resp Result
  405. errors <- client.CallContext(ctx, &resp, "service_echo", "", 3, nil)
  406. }()
  407. }
  408. close(start)
  409. errcount := 0
  410. for i := 0; i < cap(errors); i++ {
  411. if err = <-errors; err != nil {
  412. errcount++
  413. }
  414. }
  415. t.Log("err:", err)
  416. if errcount > 1 {
  417. t.Errorf("expected one error after disconnect, got %d", errcount)
  418. }
  419. }
  420. func newTestServer(serviceName string, service interface{}) *Server {
  421. server := NewServer()
  422. if err := server.RegisterName(serviceName, service); err != nil {
  423. panic(err)
  424. }
  425. return server
  426. }
  427. func httpTestClient(srv *Server, transport string, fl *flakeyListener) (*Client, *httptest.Server) {
  428. // Create the HTTP server.
  429. var hs *httptest.Server
  430. switch transport {
  431. case "ws":
  432. hs = httptest.NewUnstartedServer(srv.WebsocketHandler("*"))
  433. case "http":
  434. hs = httptest.NewUnstartedServer(srv)
  435. default:
  436. panic("unknown HTTP transport: " + transport)
  437. }
  438. // Wrap the listener if required.
  439. if fl != nil {
  440. fl.Listener = hs.Listener
  441. hs.Listener = fl
  442. }
  443. // Connect the client.
  444. hs.Start()
  445. client, err := Dial(transport + "://" + hs.Listener.Addr().String())
  446. if err != nil {
  447. panic(err)
  448. }
  449. return client, hs
  450. }
  451. func ipcTestClient(srv *Server, fl *flakeyListener) (*Client, net.Listener) {
  452. // Listen on a random endpoint.
  453. endpoint := fmt.Sprintf("go-ethereum-test-ipc-%d-%d", os.Getpid(), rand.Int63())
  454. if runtime.GOOS == "windows" {
  455. endpoint = `\\.\pipe\` + endpoint
  456. } else {
  457. endpoint = os.TempDir() + "/" + endpoint
  458. }
  459. l, err := ipcListen(endpoint)
  460. if err != nil {
  461. panic(err)
  462. }
  463. // Connect the listener to the server.
  464. if fl != nil {
  465. fl.Listener = l
  466. l = fl
  467. }
  468. go srv.ServeListener(l)
  469. // Connect the client.
  470. client, err := Dial(endpoint)
  471. if err != nil {
  472. panic(err)
  473. }
  474. return client, l
  475. }
  476. // flakeyListener kills accepted connections after a random timeout.
  477. type flakeyListener struct {
  478. net.Listener
  479. maxKillTimeout time.Duration
  480. maxAcceptDelay time.Duration
  481. }
  482. func (l *flakeyListener) Accept() (net.Conn, error) {
  483. delay := time.Duration(rand.Int63n(int64(l.maxAcceptDelay)))
  484. time.Sleep(delay)
  485. c, err := l.Listener.Accept()
  486. if err == nil {
  487. timeout := time.Duration(rand.Int63n(int64(l.maxKillTimeout)))
  488. time.AfterFunc(timeout, func() {
  489. glog.V(logger.Debug).Infof("killing conn %v after %v", c.LocalAddr(), timeout)
  490. c.Close()
  491. })
  492. }
  493. return c, err
  494. }