client_test.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package rpc
  17. import (
  18. "context"
  19. "fmt"
  20. "math/rand"
  21. "net"
  22. "net/http"
  23. "net/http/httptest"
  24. "os"
  25. "reflect"
  26. "runtime"
  27. "sync"
  28. "testing"
  29. "time"
  30. "github.com/davecgh/go-spew/spew"
  31. "github.com/ethereum/go-ethereum/log"
  32. )
  33. func TestClientRequest(t *testing.T) {
  34. server := newTestServer()
  35. defer server.Stop()
  36. client := DialInProc(server)
  37. defer client.Close()
  38. var resp echoResult
  39. if err := client.Call(&resp, "test_echo", "hello", 10, &echoArgs{"world"}); err != nil {
  40. t.Fatal(err)
  41. }
  42. if !reflect.DeepEqual(resp, echoResult{"hello", 10, &echoArgs{"world"}}) {
  43. t.Errorf("incorrect result %#v", resp)
  44. }
  45. }
  46. func TestClientResponseType(t *testing.T) {
  47. server := newTestServer()
  48. defer server.Stop()
  49. client := DialInProc(server)
  50. defer client.Close()
  51. if err := client.Call(nil, "test_echo", "hello", 10, &echoArgs{"world"}); err != nil {
  52. t.Errorf("Passing nil as result should be fine, but got an error: %v", err)
  53. }
  54. var resultVar echoResult
  55. // Note: passing the var, not a ref
  56. err := client.Call(resultVar, "test_echo", "hello", 10, &echoArgs{"world"})
  57. if err == nil {
  58. t.Error("Passing a var as result should be an error")
  59. }
  60. }
  61. // This test checks that server-returned errors with code and data come out of Client.Call.
  62. func TestClientErrorData(t *testing.T) {
  63. server := newTestServer()
  64. defer server.Stop()
  65. client := DialInProc(server)
  66. defer client.Close()
  67. var resp interface{}
  68. err := client.Call(&resp, "test_returnError")
  69. if err == nil {
  70. t.Fatal("expected error")
  71. }
  72. // Check code.
  73. if e, ok := err.(Error); !ok {
  74. t.Fatalf("client did not return rpc.Error, got %#v", e)
  75. } else if e.ErrorCode() != (testError{}.ErrorCode()) {
  76. t.Fatalf("wrong error code %d, want %d", e.ErrorCode(), testError{}.ErrorCode())
  77. }
  78. // Check data.
  79. if e, ok := err.(DataError); !ok {
  80. t.Fatalf("client did not return rpc.DataError, got %#v", e)
  81. } else if e.ErrorData() != (testError{}.ErrorData()) {
  82. t.Fatalf("wrong error data %#v, want %#v", e.ErrorData(), testError{}.ErrorData())
  83. }
  84. }
  85. func TestClientBatchRequest(t *testing.T) {
  86. server := newTestServer()
  87. defer server.Stop()
  88. client := DialInProc(server)
  89. defer client.Close()
  90. batch := []BatchElem{
  91. {
  92. Method: "test_echo",
  93. Args: []interface{}{"hello", 10, &echoArgs{"world"}},
  94. Result: new(echoResult),
  95. },
  96. {
  97. Method: "test_echo",
  98. Args: []interface{}{"hello2", 11, &echoArgs{"world"}},
  99. Result: new(echoResult),
  100. },
  101. {
  102. Method: "no_such_method",
  103. Args: []interface{}{1, 2, 3},
  104. Result: new(int),
  105. },
  106. }
  107. if err := client.BatchCall(batch); err != nil {
  108. t.Fatal(err)
  109. }
  110. wantResult := []BatchElem{
  111. {
  112. Method: "test_echo",
  113. Args: []interface{}{"hello", 10, &echoArgs{"world"}},
  114. Result: &echoResult{"hello", 10, &echoArgs{"world"}},
  115. },
  116. {
  117. Method: "test_echo",
  118. Args: []interface{}{"hello2", 11, &echoArgs{"world"}},
  119. Result: &echoResult{"hello2", 11, &echoArgs{"world"}},
  120. },
  121. {
  122. Method: "no_such_method",
  123. Args: []interface{}{1, 2, 3},
  124. Result: new(int),
  125. Error: &jsonError{Code: -32601, Message: "the method no_such_method does not exist/is not available"},
  126. },
  127. }
  128. if !reflect.DeepEqual(batch, wantResult) {
  129. t.Errorf("batch results mismatch:\ngot %swant %s", spew.Sdump(batch), spew.Sdump(wantResult))
  130. }
  131. }
  132. func TestClientNotify(t *testing.T) {
  133. server := newTestServer()
  134. defer server.Stop()
  135. client := DialInProc(server)
  136. defer client.Close()
  137. if err := client.Notify(context.Background(), "test_echo", "hello", 10, &echoArgs{"world"}); err != nil {
  138. t.Fatal(err)
  139. }
  140. }
  141. // func TestClientCancelInproc(t *testing.T) { testClientCancel("inproc", t) }
  142. func TestClientCancelWebsocket(t *testing.T) { testClientCancel("ws", t) }
  143. func TestClientCancelHTTP(t *testing.T) { testClientCancel("http", t) }
  144. func TestClientCancelIPC(t *testing.T) { testClientCancel("ipc", t) }
  145. // This test checks that requests made through CallContext can be canceled by canceling
  146. // the context.
  147. func testClientCancel(transport string, t *testing.T) {
  148. // These tests take a lot of time, run them all at once.
  149. // You probably want to run with -parallel 1 or comment out
  150. // the call to t.Parallel if you enable the logging.
  151. t.Parallel()
  152. server := newTestServer()
  153. defer server.Stop()
  154. // What we want to achieve is that the context gets canceled
  155. // at various stages of request processing. The interesting cases
  156. // are:
  157. // - cancel during dial
  158. // - cancel while performing a HTTP request
  159. // - cancel while waiting for a response
  160. //
  161. // To trigger those, the times are chosen such that connections
  162. // are killed within the deadline for every other call (maxKillTimeout
  163. // is 2x maxCancelTimeout).
  164. //
  165. // Once a connection is dead, there is a fair chance it won't connect
  166. // successfully because the accept is delayed by 1s.
  167. maxContextCancelTimeout := 300 * time.Millisecond
  168. fl := &flakeyListener{
  169. maxAcceptDelay: 1 * time.Second,
  170. maxKillTimeout: 600 * time.Millisecond,
  171. }
  172. var client *Client
  173. switch transport {
  174. case "ws", "http":
  175. c, hs := httpTestClient(server, transport, fl)
  176. defer hs.Close()
  177. client = c
  178. case "ipc":
  179. c, l := ipcTestClient(server, fl)
  180. defer l.Close()
  181. client = c
  182. default:
  183. panic("unknown transport: " + transport)
  184. }
  185. // The actual test starts here.
  186. var (
  187. wg sync.WaitGroup
  188. nreqs = 10
  189. ncallers = 10
  190. )
  191. caller := func(index int) {
  192. defer wg.Done()
  193. for i := 0; i < nreqs; i++ {
  194. var (
  195. ctx context.Context
  196. cancel func()
  197. timeout = time.Duration(rand.Int63n(int64(maxContextCancelTimeout)))
  198. )
  199. if index < ncallers/2 {
  200. // For half of the callers, create a context without deadline
  201. // and cancel it later.
  202. ctx, cancel = context.WithCancel(context.Background())
  203. time.AfterFunc(timeout, cancel)
  204. } else {
  205. // For the other half, create a context with a deadline instead. This is
  206. // different because the context deadline is used to set the socket write
  207. // deadline.
  208. ctx, cancel = context.WithTimeout(context.Background(), timeout)
  209. }
  210. // Now perform a call with the context.
  211. // The key thing here is that no call will ever complete successfully.
  212. err := client.CallContext(ctx, nil, "test_block")
  213. switch {
  214. case err == nil:
  215. _, hasDeadline := ctx.Deadline()
  216. t.Errorf("no error for call with %v wait time (deadline: %v)", timeout, hasDeadline)
  217. // default:
  218. // t.Logf("got expected error with %v wait time: %v", timeout, err)
  219. }
  220. cancel()
  221. }
  222. }
  223. wg.Add(ncallers)
  224. for i := 0; i < ncallers; i++ {
  225. go caller(i)
  226. }
  227. wg.Wait()
  228. }
  229. func TestClientSubscribeInvalidArg(t *testing.T) {
  230. server := newTestServer()
  231. defer server.Stop()
  232. client := DialInProc(server)
  233. defer client.Close()
  234. check := func(shouldPanic bool, arg interface{}) {
  235. defer func() {
  236. err := recover()
  237. if shouldPanic && err == nil {
  238. t.Errorf("EthSubscribe should've panicked for %#v", arg)
  239. }
  240. if !shouldPanic && err != nil {
  241. t.Errorf("EthSubscribe shouldn't have panicked for %#v", arg)
  242. buf := make([]byte, 1024*1024)
  243. buf = buf[:runtime.Stack(buf, false)]
  244. t.Error(err)
  245. t.Error(string(buf))
  246. }
  247. }()
  248. client.EthSubscribe(context.Background(), arg, "foo_bar")
  249. }
  250. check(true, nil)
  251. check(true, 1)
  252. check(true, (chan int)(nil))
  253. check(true, make(<-chan int))
  254. check(false, make(chan int))
  255. check(false, make(chan<- int))
  256. }
  257. func TestClientSubscribe(t *testing.T) {
  258. server := newTestServer()
  259. defer server.Stop()
  260. client := DialInProc(server)
  261. defer client.Close()
  262. nc := make(chan int)
  263. count := 10
  264. sub, err := client.Subscribe(context.Background(), "nftest", nc, "someSubscription", count, 0)
  265. if err != nil {
  266. t.Fatal("can't subscribe:", err)
  267. }
  268. for i := 0; i < count; i++ {
  269. if val := <-nc; val != i {
  270. t.Fatalf("value mismatch: got %d, want %d", val, i)
  271. }
  272. }
  273. sub.Unsubscribe()
  274. select {
  275. case v := <-nc:
  276. t.Fatal("received value after unsubscribe:", v)
  277. case err := <-sub.Err():
  278. if err != nil {
  279. t.Fatalf("Err returned a non-nil error after explicit unsubscribe: %q", err)
  280. }
  281. case <-time.After(1 * time.Second):
  282. t.Fatalf("subscription not closed within 1s after unsubscribe")
  283. }
  284. }
  285. // In this test, the connection drops while Subscribe is waiting for a response.
  286. func TestClientSubscribeClose(t *testing.T) {
  287. server := newTestServer()
  288. service := &notificationTestService{
  289. gotHangSubscriptionReq: make(chan struct{}),
  290. unblockHangSubscription: make(chan struct{}),
  291. }
  292. if err := server.RegisterName("nftest2", service); err != nil {
  293. t.Fatal(err)
  294. }
  295. defer server.Stop()
  296. client := DialInProc(server)
  297. defer client.Close()
  298. var (
  299. nc = make(chan int)
  300. errc = make(chan error, 1)
  301. sub *ClientSubscription
  302. err error
  303. )
  304. go func() {
  305. sub, err = client.Subscribe(context.Background(), "nftest2", nc, "hangSubscription", 999)
  306. errc <- err
  307. }()
  308. <-service.gotHangSubscriptionReq
  309. client.Close()
  310. service.unblockHangSubscription <- struct{}{}
  311. select {
  312. case err := <-errc:
  313. if err == nil {
  314. t.Errorf("Subscribe returned nil error after Close")
  315. }
  316. if sub != nil {
  317. t.Error("Subscribe returned non-nil subscription after Close")
  318. }
  319. case <-time.After(1 * time.Second):
  320. t.Fatalf("Subscribe did not return within 1s after Close")
  321. }
  322. }
  323. // This test reproduces https://github.com/ethereum/go-ethereum/issues/17837 where the
  324. // client hangs during shutdown when Unsubscribe races with Client.Close.
  325. func TestClientCloseUnsubscribeRace(t *testing.T) {
  326. server := newTestServer()
  327. defer server.Stop()
  328. for i := 0; i < 20; i++ {
  329. client := DialInProc(server)
  330. nc := make(chan int)
  331. sub, err := client.Subscribe(context.Background(), "nftest", nc, "someSubscription", 3, 1)
  332. if err != nil {
  333. t.Fatal(err)
  334. }
  335. go client.Close()
  336. go sub.Unsubscribe()
  337. select {
  338. case <-sub.Err():
  339. case <-time.After(5 * time.Second):
  340. t.Fatal("subscription not closed within timeout")
  341. }
  342. }
  343. }
  344. // This test checks that Client doesn't lock up when a single subscriber
  345. // doesn't read subscription events.
  346. func TestClientNotificationStorm(t *testing.T) {
  347. server := newTestServer()
  348. defer server.Stop()
  349. doTest := func(count int, wantError bool) {
  350. client := DialInProc(server)
  351. defer client.Close()
  352. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  353. defer cancel()
  354. // Subscribe on the server. It will start sending many notifications
  355. // very quickly.
  356. nc := make(chan int)
  357. sub, err := client.Subscribe(ctx, "nftest", nc, "someSubscription", count, 0)
  358. if err != nil {
  359. t.Fatal("can't subscribe:", err)
  360. }
  361. defer sub.Unsubscribe()
  362. // Process each notification, try to run a call in between each of them.
  363. for i := 0; i < count; i++ {
  364. select {
  365. case val := <-nc:
  366. if val != i {
  367. t.Fatalf("(%d/%d) unexpected value %d", i, count, val)
  368. }
  369. case err := <-sub.Err():
  370. if wantError && err != ErrSubscriptionQueueOverflow {
  371. t.Fatalf("(%d/%d) got error %q, want %q", i, count, err, ErrSubscriptionQueueOverflow)
  372. } else if !wantError {
  373. t.Fatalf("(%d/%d) got unexpected error %q", i, count, err)
  374. }
  375. return
  376. }
  377. var r int
  378. err := client.CallContext(ctx, &r, "nftest_echo", i)
  379. if err != nil {
  380. if !wantError {
  381. t.Fatalf("(%d/%d) call error: %v", i, count, err)
  382. }
  383. return
  384. }
  385. }
  386. if wantError {
  387. t.Fatalf("didn't get expected error")
  388. }
  389. }
  390. doTest(8000, false)
  391. doTest(23000, true)
  392. }
  393. func TestClientHTTP(t *testing.T) {
  394. server := newTestServer()
  395. defer server.Stop()
  396. client, hs := httpTestClient(server, "http", nil)
  397. defer hs.Close()
  398. defer client.Close()
  399. // Launch concurrent requests.
  400. var (
  401. results = make([]echoResult, 100)
  402. errc = make(chan error, len(results))
  403. wantResult = echoResult{"a", 1, new(echoArgs)}
  404. )
  405. defer client.Close()
  406. for i := range results {
  407. i := i
  408. go func() {
  409. errc <- client.Call(&results[i], "test_echo", wantResult.String, wantResult.Int, wantResult.Args)
  410. }()
  411. }
  412. // Wait for all of them to complete.
  413. timeout := time.NewTimer(5 * time.Second)
  414. defer timeout.Stop()
  415. for i := range results {
  416. select {
  417. case err := <-errc:
  418. if err != nil {
  419. t.Fatal(err)
  420. }
  421. case <-timeout.C:
  422. t.Fatalf("timeout (got %d/%d) results)", i+1, len(results))
  423. }
  424. }
  425. // Check results.
  426. for i := range results {
  427. if !reflect.DeepEqual(results[i], wantResult) {
  428. t.Errorf("result %d mismatch: got %#v, want %#v", i, results[i], wantResult)
  429. }
  430. }
  431. }
  432. func TestClientReconnect(t *testing.T) {
  433. startServer := func(addr string) (*Server, net.Listener) {
  434. srv := newTestServer()
  435. l, err := net.Listen("tcp", addr)
  436. if err != nil {
  437. t.Fatal("can't listen:", err)
  438. }
  439. go http.Serve(l, srv.WebsocketHandler([]string{"*"}))
  440. return srv, l
  441. }
  442. ctx, cancel := context.WithTimeout(context.Background(), 12*time.Second)
  443. defer cancel()
  444. // Start a server and corresponding client.
  445. s1, l1 := startServer("127.0.0.1:0")
  446. client, err := DialContext(ctx, "ws://"+l1.Addr().String())
  447. if err != nil {
  448. t.Fatal("can't dial", err)
  449. }
  450. // Perform a call. This should work because the server is up.
  451. var resp echoResult
  452. if err := client.CallContext(ctx, &resp, "test_echo", "", 1, nil); err != nil {
  453. t.Fatal(err)
  454. }
  455. // Shut down the server and allow for some cool down time so we can listen on the same
  456. // address again.
  457. l1.Close()
  458. s1.Stop()
  459. time.Sleep(2 * time.Second)
  460. // Try calling again. It shouldn't work.
  461. if err := client.CallContext(ctx, &resp, "test_echo", "", 2, nil); err == nil {
  462. t.Error("successful call while the server is down")
  463. t.Logf("resp: %#v", resp)
  464. }
  465. // Start it up again and call again. The connection should be reestablished.
  466. // We spawn multiple calls here to check whether this hangs somehow.
  467. s2, l2 := startServer(l1.Addr().String())
  468. defer l2.Close()
  469. defer s2.Stop()
  470. start := make(chan struct{})
  471. errors := make(chan error, 20)
  472. for i := 0; i < cap(errors); i++ {
  473. go func() {
  474. <-start
  475. var resp echoResult
  476. errors <- client.CallContext(ctx, &resp, "test_echo", "", 3, nil)
  477. }()
  478. }
  479. close(start)
  480. errcount := 0
  481. for i := 0; i < cap(errors); i++ {
  482. if err = <-errors; err != nil {
  483. errcount++
  484. }
  485. }
  486. t.Logf("%d errors, last error: %v", errcount, err)
  487. if errcount > 1 {
  488. t.Errorf("expected one error after disconnect, got %d", errcount)
  489. }
  490. }
  491. func httpTestClient(srv *Server, transport string, fl *flakeyListener) (*Client, *httptest.Server) {
  492. // Create the HTTP server.
  493. var hs *httptest.Server
  494. switch transport {
  495. case "ws":
  496. hs = httptest.NewUnstartedServer(srv.WebsocketHandler([]string{"*"}))
  497. case "http":
  498. hs = httptest.NewUnstartedServer(srv)
  499. default:
  500. panic("unknown HTTP transport: " + transport)
  501. }
  502. // Wrap the listener if required.
  503. if fl != nil {
  504. fl.Listener = hs.Listener
  505. hs.Listener = fl
  506. }
  507. // Connect the client.
  508. hs.Start()
  509. client, err := Dial(transport + "://" + hs.Listener.Addr().String())
  510. if err != nil {
  511. panic(err)
  512. }
  513. return client, hs
  514. }
  515. func ipcTestClient(srv *Server, fl *flakeyListener) (*Client, net.Listener) {
  516. // Listen on a random endpoint.
  517. endpoint := fmt.Sprintf("go-ethereum-test-ipc-%d-%d", os.Getpid(), rand.Int63())
  518. if runtime.GOOS == "windows" {
  519. endpoint = `\\.\pipe\` + endpoint
  520. } else {
  521. endpoint = os.TempDir() + "/" + endpoint
  522. }
  523. l, err := ipcListen(endpoint)
  524. if err != nil {
  525. panic(err)
  526. }
  527. // Connect the listener to the server.
  528. if fl != nil {
  529. fl.Listener = l
  530. l = fl
  531. }
  532. go srv.ServeListener(l)
  533. // Connect the client.
  534. client, err := Dial(endpoint)
  535. if err != nil {
  536. panic(err)
  537. }
  538. return client, l
  539. }
  540. // flakeyListener kills accepted connections after a random timeout.
  541. type flakeyListener struct {
  542. net.Listener
  543. maxKillTimeout time.Duration
  544. maxAcceptDelay time.Duration
  545. }
  546. func (l *flakeyListener) Accept() (net.Conn, error) {
  547. delay := time.Duration(rand.Int63n(int64(l.maxAcceptDelay)))
  548. time.Sleep(delay)
  549. c, err := l.Listener.Accept()
  550. if err == nil {
  551. timeout := time.Duration(rand.Int63n(int64(l.maxKillTimeout)))
  552. time.AfterFunc(timeout, func() {
  553. log.Debug(fmt.Sprintf("killing conn %v after %v", c.LocalAddr(), timeout))
  554. c.Close()
  555. })
  556. }
  557. return c, err
  558. }