client_test.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package rpc
  17. import (
  18. "context"
  19. "fmt"
  20. "math/rand"
  21. "net"
  22. "net/http"
  23. "net/http/httptest"
  24. "os"
  25. "reflect"
  26. "runtime"
  27. "strings"
  28. "sync"
  29. "testing"
  30. "time"
  31. "github.com/davecgh/go-spew/spew"
  32. "github.com/ethereum/go-ethereum/log"
  33. )
  34. func TestClientRequest(t *testing.T) {
  35. server := newTestServer()
  36. defer server.Stop()
  37. client := DialInProc(server)
  38. defer client.Close()
  39. var resp echoResult
  40. if err := client.Call(&resp, "test_echo", "hello", 10, &echoArgs{"world"}); err != nil {
  41. t.Fatal(err)
  42. }
  43. if !reflect.DeepEqual(resp, echoResult{"hello", 10, &echoArgs{"world"}}) {
  44. t.Errorf("incorrect result %#v", resp)
  45. }
  46. }
  47. func TestClientResponseType(t *testing.T) {
  48. server := newTestServer()
  49. defer server.Stop()
  50. client := DialInProc(server)
  51. defer client.Close()
  52. if err := client.Call(nil, "test_echo", "hello", 10, &echoArgs{"world"}); err != nil {
  53. t.Errorf("Passing nil as result should be fine, but got an error: %v", err)
  54. }
  55. var resultVar echoResult
  56. // Note: passing the var, not a ref
  57. err := client.Call(resultVar, "test_echo", "hello", 10, &echoArgs{"world"})
  58. if err == nil {
  59. t.Error("Passing a var as result should be an error")
  60. }
  61. }
  62. // This test checks that server-returned errors with code and data come out of Client.Call.
  63. func TestClientErrorData(t *testing.T) {
  64. server := newTestServer()
  65. defer server.Stop()
  66. client := DialInProc(server)
  67. defer client.Close()
  68. var resp interface{}
  69. err := client.Call(&resp, "test_returnError")
  70. if err == nil {
  71. t.Fatal("expected error")
  72. }
  73. // Check code.
  74. if e, ok := err.(Error); !ok {
  75. t.Fatalf("client did not return rpc.Error, got %#v", e)
  76. } else if e.ErrorCode() != (testError{}.ErrorCode()) {
  77. t.Fatalf("wrong error code %d, want %d", e.ErrorCode(), testError{}.ErrorCode())
  78. }
  79. // Check data.
  80. if e, ok := err.(DataError); !ok {
  81. t.Fatalf("client did not return rpc.DataError, got %#v", e)
  82. } else if e.ErrorData() != (testError{}.ErrorData()) {
  83. t.Fatalf("wrong error data %#v, want %#v", e.ErrorData(), testError{}.ErrorData())
  84. }
  85. }
  86. func TestClientBatchRequest(t *testing.T) {
  87. server := newTestServer()
  88. defer server.Stop()
  89. client := DialInProc(server)
  90. defer client.Close()
  91. batch := []BatchElem{
  92. {
  93. Method: "test_echo",
  94. Args: []interface{}{"hello", 10, &echoArgs{"world"}},
  95. Result: new(echoResult),
  96. },
  97. {
  98. Method: "test_echo",
  99. Args: []interface{}{"hello2", 11, &echoArgs{"world"}},
  100. Result: new(echoResult),
  101. },
  102. {
  103. Method: "no_such_method",
  104. Args: []interface{}{1, 2, 3},
  105. Result: new(int),
  106. },
  107. }
  108. if err := client.BatchCall(batch); err != nil {
  109. t.Fatal(err)
  110. }
  111. wantResult := []BatchElem{
  112. {
  113. Method: "test_echo",
  114. Args: []interface{}{"hello", 10, &echoArgs{"world"}},
  115. Result: &echoResult{"hello", 10, &echoArgs{"world"}},
  116. },
  117. {
  118. Method: "test_echo",
  119. Args: []interface{}{"hello2", 11, &echoArgs{"world"}},
  120. Result: &echoResult{"hello2", 11, &echoArgs{"world"}},
  121. },
  122. {
  123. Method: "no_such_method",
  124. Args: []interface{}{1, 2, 3},
  125. Result: new(int),
  126. Error: &jsonError{Code: -32601, Message: "the method no_such_method does not exist/is not available"},
  127. },
  128. }
  129. if !reflect.DeepEqual(batch, wantResult) {
  130. t.Errorf("batch results mismatch:\ngot %swant %s", spew.Sdump(batch), spew.Sdump(wantResult))
  131. }
  132. }
  133. func TestClientNotify(t *testing.T) {
  134. server := newTestServer()
  135. defer server.Stop()
  136. client := DialInProc(server)
  137. defer client.Close()
  138. if err := client.Notify(context.Background(), "test_echo", "hello", 10, &echoArgs{"world"}); err != nil {
  139. t.Fatal(err)
  140. }
  141. }
  142. // func TestClientCancelInproc(t *testing.T) { testClientCancel("inproc", t) }
  143. func TestClientCancelWebsocket(t *testing.T) { testClientCancel("ws", t) }
  144. func TestClientCancelHTTP(t *testing.T) { testClientCancel("http", t) }
  145. func TestClientCancelIPC(t *testing.T) { testClientCancel("ipc", t) }
  146. // This test checks that requests made through CallContext can be canceled by canceling
  147. // the context.
  148. func testClientCancel(transport string, t *testing.T) {
  149. // These tests take a lot of time, run them all at once.
  150. // You probably want to run with -parallel 1 or comment out
  151. // the call to t.Parallel if you enable the logging.
  152. t.Parallel()
  153. server := newTestServer()
  154. defer server.Stop()
  155. // What we want to achieve is that the context gets canceled
  156. // at various stages of request processing. The interesting cases
  157. // are:
  158. // - cancel during dial
  159. // - cancel while performing a HTTP request
  160. // - cancel while waiting for a response
  161. //
  162. // To trigger those, the times are chosen such that connections
  163. // are killed within the deadline for every other call (maxKillTimeout
  164. // is 2x maxCancelTimeout).
  165. //
  166. // Once a connection is dead, there is a fair chance it won't connect
  167. // successfully because the accept is delayed by 1s.
  168. maxContextCancelTimeout := 300 * time.Millisecond
  169. fl := &flakeyListener{
  170. maxAcceptDelay: 1 * time.Second,
  171. maxKillTimeout: 600 * time.Millisecond,
  172. }
  173. var client *Client
  174. switch transport {
  175. case "ws", "http":
  176. c, hs := httpTestClient(server, transport, fl)
  177. defer hs.Close()
  178. client = c
  179. case "ipc":
  180. c, l := ipcTestClient(server, fl)
  181. defer l.Close()
  182. client = c
  183. default:
  184. panic("unknown transport: " + transport)
  185. }
  186. // The actual test starts here.
  187. var (
  188. wg sync.WaitGroup
  189. nreqs = 10
  190. ncallers = 10
  191. )
  192. caller := func(index int) {
  193. defer wg.Done()
  194. for i := 0; i < nreqs; i++ {
  195. var (
  196. ctx context.Context
  197. cancel func()
  198. timeout = time.Duration(rand.Int63n(int64(maxContextCancelTimeout)))
  199. )
  200. if index < ncallers/2 {
  201. // For half of the callers, create a context without deadline
  202. // and cancel it later.
  203. ctx, cancel = context.WithCancel(context.Background())
  204. time.AfterFunc(timeout, cancel)
  205. } else {
  206. // For the other half, create a context with a deadline instead. This is
  207. // different because the context deadline is used to set the socket write
  208. // deadline.
  209. ctx, cancel = context.WithTimeout(context.Background(), timeout)
  210. }
  211. // Now perform a call with the context.
  212. // The key thing here is that no call will ever complete successfully.
  213. err := client.CallContext(ctx, nil, "test_block")
  214. switch {
  215. case err == nil:
  216. _, hasDeadline := ctx.Deadline()
  217. t.Errorf("no error for call with %v wait time (deadline: %v)", timeout, hasDeadline)
  218. // default:
  219. // t.Logf("got expected error with %v wait time: %v", timeout, err)
  220. }
  221. cancel()
  222. }
  223. }
  224. wg.Add(ncallers)
  225. for i := 0; i < ncallers; i++ {
  226. go caller(i)
  227. }
  228. wg.Wait()
  229. }
  230. func TestClientSubscribeInvalidArg(t *testing.T) {
  231. server := newTestServer()
  232. defer server.Stop()
  233. client := DialInProc(server)
  234. defer client.Close()
  235. check := func(shouldPanic bool, arg interface{}) {
  236. defer func() {
  237. err := recover()
  238. if shouldPanic && err == nil {
  239. t.Errorf("EthSubscribe should've panicked for %#v", arg)
  240. }
  241. if !shouldPanic && err != nil {
  242. t.Errorf("EthSubscribe shouldn't have panicked for %#v", arg)
  243. buf := make([]byte, 1024*1024)
  244. buf = buf[:runtime.Stack(buf, false)]
  245. t.Error(err)
  246. t.Error(string(buf))
  247. }
  248. }()
  249. client.EthSubscribe(context.Background(), arg, "foo_bar")
  250. }
  251. check(true, nil)
  252. check(true, 1)
  253. check(true, (chan int)(nil))
  254. check(true, make(<-chan int))
  255. check(false, make(chan int))
  256. check(false, make(chan<- int))
  257. }
  258. func TestClientSubscribe(t *testing.T) {
  259. server := newTestServer()
  260. defer server.Stop()
  261. client := DialInProc(server)
  262. defer client.Close()
  263. nc := make(chan int)
  264. count := 10
  265. sub, err := client.Subscribe(context.Background(), "nftest", nc, "someSubscription", count, 0)
  266. if err != nil {
  267. t.Fatal("can't subscribe:", err)
  268. }
  269. for i := 0; i < count; i++ {
  270. if val := <-nc; val != i {
  271. t.Fatalf("value mismatch: got %d, want %d", val, i)
  272. }
  273. }
  274. sub.Unsubscribe()
  275. select {
  276. case v := <-nc:
  277. t.Fatal("received value after unsubscribe:", v)
  278. case err := <-sub.Err():
  279. if err != nil {
  280. t.Fatalf("Err returned a non-nil error after explicit unsubscribe: %q", err)
  281. }
  282. case <-time.After(1 * time.Second):
  283. t.Fatalf("subscription not closed within 1s after unsubscribe")
  284. }
  285. }
  286. // In this test, the connection drops while Subscribe is waiting for a response.
  287. func TestClientSubscribeClose(t *testing.T) {
  288. server := newTestServer()
  289. service := &notificationTestService{
  290. gotHangSubscriptionReq: make(chan struct{}),
  291. unblockHangSubscription: make(chan struct{}),
  292. }
  293. if err := server.RegisterName("nftest2", service); err != nil {
  294. t.Fatal(err)
  295. }
  296. defer server.Stop()
  297. client := DialInProc(server)
  298. defer client.Close()
  299. var (
  300. nc = make(chan int)
  301. errc = make(chan error, 1)
  302. sub *ClientSubscription
  303. err error
  304. )
  305. go func() {
  306. sub, err = client.Subscribe(context.Background(), "nftest2", nc, "hangSubscription", 999)
  307. errc <- err
  308. }()
  309. <-service.gotHangSubscriptionReq
  310. client.Close()
  311. service.unblockHangSubscription <- struct{}{}
  312. select {
  313. case err := <-errc:
  314. if err == nil {
  315. t.Errorf("Subscribe returned nil error after Close")
  316. }
  317. if sub != nil {
  318. t.Error("Subscribe returned non-nil subscription after Close")
  319. }
  320. case <-time.After(1 * time.Second):
  321. t.Fatalf("Subscribe did not return within 1s after Close")
  322. }
  323. }
  324. // This test reproduces https://github.com/ethereum/go-ethereum/issues/17837 where the
  325. // client hangs during shutdown when Unsubscribe races with Client.Close.
  326. func TestClientCloseUnsubscribeRace(t *testing.T) {
  327. server := newTestServer()
  328. defer server.Stop()
  329. for i := 0; i < 20; i++ {
  330. client := DialInProc(server)
  331. nc := make(chan int)
  332. sub, err := client.Subscribe(context.Background(), "nftest", nc, "someSubscription", 3, 1)
  333. if err != nil {
  334. t.Fatal(err)
  335. }
  336. go client.Close()
  337. go sub.Unsubscribe()
  338. select {
  339. case <-sub.Err():
  340. case <-time.After(5 * time.Second):
  341. t.Fatal("subscription not closed within timeout")
  342. }
  343. }
  344. }
  345. // This test checks that Client doesn't lock up when a single subscriber
  346. // doesn't read subscription events.
  347. func TestClientNotificationStorm(t *testing.T) {
  348. server := newTestServer()
  349. defer server.Stop()
  350. doTest := func(count int, wantError bool) {
  351. client := DialInProc(server)
  352. defer client.Close()
  353. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  354. defer cancel()
  355. // Subscribe on the server. It will start sending many notifications
  356. // very quickly.
  357. nc := make(chan int)
  358. sub, err := client.Subscribe(ctx, "nftest", nc, "someSubscription", count, 0)
  359. if err != nil {
  360. t.Fatal("can't subscribe:", err)
  361. }
  362. defer sub.Unsubscribe()
  363. // Process each notification, try to run a call in between each of them.
  364. for i := 0; i < count; i++ {
  365. select {
  366. case val := <-nc:
  367. if val != i {
  368. t.Fatalf("(%d/%d) unexpected value %d", i, count, val)
  369. }
  370. case err := <-sub.Err():
  371. if wantError && err != ErrSubscriptionQueueOverflow {
  372. t.Fatalf("(%d/%d) got error %q, want %q", i, count, err, ErrSubscriptionQueueOverflow)
  373. } else if !wantError {
  374. t.Fatalf("(%d/%d) got unexpected error %q", i, count, err)
  375. }
  376. return
  377. }
  378. var r int
  379. err := client.CallContext(ctx, &r, "nftest_echo", i)
  380. if err != nil {
  381. if !wantError {
  382. t.Fatalf("(%d/%d) call error: %v", i, count, err)
  383. }
  384. return
  385. }
  386. }
  387. if wantError {
  388. t.Fatalf("didn't get expected error")
  389. }
  390. }
  391. doTest(8000, false)
  392. doTest(24000, true)
  393. }
  394. func TestClientSetHeader(t *testing.T) {
  395. var gotHeader bool
  396. srv := newTestServer()
  397. httpsrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  398. if r.Header.Get("test") == "ok" {
  399. gotHeader = true
  400. }
  401. srv.ServeHTTP(w, r)
  402. }))
  403. defer httpsrv.Close()
  404. defer srv.Stop()
  405. client, err := Dial(httpsrv.URL)
  406. if err != nil {
  407. t.Fatal(err)
  408. }
  409. defer client.Close()
  410. client.SetHeader("test", "ok")
  411. if _, err := client.SupportedModules(); err != nil {
  412. t.Fatal(err)
  413. }
  414. if !gotHeader {
  415. t.Fatal("client did not set custom header")
  416. }
  417. // Check that Content-Type can be replaced.
  418. client.SetHeader("content-type", "application/x-garbage")
  419. _, err = client.SupportedModules()
  420. if err == nil {
  421. t.Fatal("no error for invalid content-type header")
  422. } else if !strings.Contains(err.Error(), "Unsupported Media Type") {
  423. t.Fatalf("error is not related to content-type: %q", err)
  424. }
  425. }
  426. func TestClientHTTP(t *testing.T) {
  427. server := newTestServer()
  428. defer server.Stop()
  429. client, hs := httpTestClient(server, "http", nil)
  430. defer hs.Close()
  431. defer client.Close()
  432. // Launch concurrent requests.
  433. var (
  434. results = make([]echoResult, 100)
  435. errc = make(chan error, len(results))
  436. wantResult = echoResult{"a", 1, new(echoArgs)}
  437. )
  438. defer client.Close()
  439. for i := range results {
  440. i := i
  441. go func() {
  442. errc <- client.Call(&results[i], "test_echo", wantResult.String, wantResult.Int, wantResult.Args)
  443. }()
  444. }
  445. // Wait for all of them to complete.
  446. timeout := time.NewTimer(5 * time.Second)
  447. defer timeout.Stop()
  448. for i := range results {
  449. select {
  450. case err := <-errc:
  451. if err != nil {
  452. t.Fatal(err)
  453. }
  454. case <-timeout.C:
  455. t.Fatalf("timeout (got %d/%d) results)", i+1, len(results))
  456. }
  457. }
  458. // Check results.
  459. for i := range results {
  460. if !reflect.DeepEqual(results[i], wantResult) {
  461. t.Errorf("result %d mismatch: got %#v, want %#v", i, results[i], wantResult)
  462. }
  463. }
  464. }
  465. func TestClientReconnect(t *testing.T) {
  466. startServer := func(addr string) (*Server, net.Listener) {
  467. srv := newTestServer()
  468. l, err := net.Listen("tcp", addr)
  469. if err != nil {
  470. t.Fatal("can't listen:", err)
  471. }
  472. go http.Serve(l, srv.WebsocketHandler([]string{"*"}))
  473. return srv, l
  474. }
  475. ctx, cancel := context.WithTimeout(context.Background(), 12*time.Second)
  476. defer cancel()
  477. // Start a server and corresponding client.
  478. s1, l1 := startServer("127.0.0.1:0")
  479. client, err := DialContext(ctx, "ws://"+l1.Addr().String())
  480. if err != nil {
  481. t.Fatal("can't dial", err)
  482. }
  483. // Perform a call. This should work because the server is up.
  484. var resp echoResult
  485. if err := client.CallContext(ctx, &resp, "test_echo", "", 1, nil); err != nil {
  486. t.Fatal(err)
  487. }
  488. // Shut down the server and allow for some cool down time so we can listen on the same
  489. // address again.
  490. l1.Close()
  491. s1.Stop()
  492. time.Sleep(2 * time.Second)
  493. // Try calling again. It shouldn't work.
  494. if err := client.CallContext(ctx, &resp, "test_echo", "", 2, nil); err == nil {
  495. t.Error("successful call while the server is down")
  496. t.Logf("resp: %#v", resp)
  497. }
  498. // Start it up again and call again. The connection should be reestablished.
  499. // We spawn multiple calls here to check whether this hangs somehow.
  500. s2, l2 := startServer(l1.Addr().String())
  501. defer l2.Close()
  502. defer s2.Stop()
  503. start := make(chan struct{})
  504. errors := make(chan error, 20)
  505. for i := 0; i < cap(errors); i++ {
  506. go func() {
  507. <-start
  508. var resp echoResult
  509. errors <- client.CallContext(ctx, &resp, "test_echo", "", 3, nil)
  510. }()
  511. }
  512. close(start)
  513. errcount := 0
  514. for i := 0; i < cap(errors); i++ {
  515. if err = <-errors; err != nil {
  516. errcount++
  517. }
  518. }
  519. t.Logf("%d errors, last error: %v", errcount, err)
  520. if errcount > 1 {
  521. t.Errorf("expected one error after disconnect, got %d", errcount)
  522. }
  523. }
  524. func httpTestClient(srv *Server, transport string, fl *flakeyListener) (*Client, *httptest.Server) {
  525. // Create the HTTP server.
  526. var hs *httptest.Server
  527. switch transport {
  528. case "ws":
  529. hs = httptest.NewUnstartedServer(srv.WebsocketHandler([]string{"*"}))
  530. case "http":
  531. hs = httptest.NewUnstartedServer(srv)
  532. default:
  533. panic("unknown HTTP transport: " + transport)
  534. }
  535. // Wrap the listener if required.
  536. if fl != nil {
  537. fl.Listener = hs.Listener
  538. hs.Listener = fl
  539. }
  540. // Connect the client.
  541. hs.Start()
  542. client, err := Dial(transport + "://" + hs.Listener.Addr().String())
  543. if err != nil {
  544. panic(err)
  545. }
  546. return client, hs
  547. }
  548. func ipcTestClient(srv *Server, fl *flakeyListener) (*Client, net.Listener) {
  549. // Listen on a random endpoint.
  550. endpoint := fmt.Sprintf("go-ethereum-test-ipc-%d-%d", os.Getpid(), rand.Int63())
  551. if runtime.GOOS == "windows" {
  552. endpoint = `\\.\pipe\` + endpoint
  553. } else {
  554. endpoint = os.TempDir() + "/" + endpoint
  555. }
  556. l, err := ipcListen(endpoint)
  557. if err != nil {
  558. panic(err)
  559. }
  560. // Connect the listener to the server.
  561. if fl != nil {
  562. fl.Listener = l
  563. l = fl
  564. }
  565. go srv.ServeListener(l)
  566. // Connect the client.
  567. client, err := Dial(endpoint)
  568. if err != nil {
  569. panic(err)
  570. }
  571. return client, l
  572. }
  573. // flakeyListener kills accepted connections after a random timeout.
  574. type flakeyListener struct {
  575. net.Listener
  576. maxKillTimeout time.Duration
  577. maxAcceptDelay time.Duration
  578. }
  579. func (l *flakeyListener) Accept() (net.Conn, error) {
  580. delay := time.Duration(rand.Int63n(int64(l.maxAcceptDelay)))
  581. time.Sleep(delay)
  582. c, err := l.Listener.Accept()
  583. if err == nil {
  584. timeout := time.Duration(rand.Int63n(int64(l.maxKillTimeout)))
  585. time.AfterFunc(timeout, func() {
  586. log.Debug(fmt.Sprintf("killing conn %v after %v", c.LocalAddr(), timeout))
  587. c.Close()
  588. })
  589. }
  590. return c, err
  591. }