client_test.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package rpc
  17. import (
  18. "context"
  19. "encoding/json"
  20. "fmt"
  21. "math/rand"
  22. "net"
  23. "net/http"
  24. "net/http/httptest"
  25. "os"
  26. "reflect"
  27. "runtime"
  28. "strings"
  29. "sync"
  30. "testing"
  31. "time"
  32. "github.com/davecgh/go-spew/spew"
  33. "github.com/ethereum/go-ethereum/log"
  34. )
  35. func TestClientRequest(t *testing.T) {
  36. server := newTestServer()
  37. defer server.Stop()
  38. client := DialInProc(server)
  39. defer client.Close()
  40. var resp echoResult
  41. if err := client.Call(&resp, "test_echo", "hello", 10, &echoArgs{"world"}); err != nil {
  42. t.Fatal(err)
  43. }
  44. if !reflect.DeepEqual(resp, echoResult{"hello", 10, &echoArgs{"world"}}) {
  45. t.Errorf("incorrect result %#v", resp)
  46. }
  47. }
  48. func TestClientResponseType(t *testing.T) {
  49. server := newTestServer()
  50. defer server.Stop()
  51. client := DialInProc(server)
  52. defer client.Close()
  53. if err := client.Call(nil, "test_echo", "hello", 10, &echoArgs{"world"}); err != nil {
  54. t.Errorf("Passing nil as result should be fine, but got an error: %v", err)
  55. }
  56. var resultVar echoResult
  57. // Note: passing the var, not a ref
  58. err := client.Call(resultVar, "test_echo", "hello", 10, &echoArgs{"world"})
  59. if err == nil {
  60. t.Error("Passing a var as result should be an error")
  61. }
  62. }
  63. // This test checks that server-returned errors with code and data come out of Client.Call.
  64. func TestClientErrorData(t *testing.T) {
  65. server := newTestServer()
  66. defer server.Stop()
  67. client := DialInProc(server)
  68. defer client.Close()
  69. var resp interface{}
  70. err := client.Call(&resp, "test_returnError")
  71. if err == nil {
  72. t.Fatal("expected error")
  73. }
  74. // Check code.
  75. if e, ok := err.(Error); !ok {
  76. t.Fatalf("client did not return rpc.Error, got %#v", e)
  77. } else if e.ErrorCode() != (testError{}.ErrorCode()) {
  78. t.Fatalf("wrong error code %d, want %d", e.ErrorCode(), testError{}.ErrorCode())
  79. }
  80. // Check data.
  81. if e, ok := err.(DataError); !ok {
  82. t.Fatalf("client did not return rpc.DataError, got %#v", e)
  83. } else if e.ErrorData() != (testError{}.ErrorData()) {
  84. t.Fatalf("wrong error data %#v, want %#v", e.ErrorData(), testError{}.ErrorData())
  85. }
  86. }
  87. func TestClientBatchRequest(t *testing.T) {
  88. server := newTestServer()
  89. defer server.Stop()
  90. client := DialInProc(server)
  91. defer client.Close()
  92. batch := []BatchElem{
  93. {
  94. Method: "test_echo",
  95. Args: []interface{}{"hello", 10, &echoArgs{"world"}},
  96. Result: new(echoResult),
  97. },
  98. {
  99. Method: "test_echo",
  100. Args: []interface{}{"hello2", 11, &echoArgs{"world"}},
  101. Result: new(echoResult),
  102. },
  103. {
  104. Method: "no_such_method",
  105. Args: []interface{}{1, 2, 3},
  106. Result: new(int),
  107. },
  108. }
  109. if err := client.BatchCall(batch); err != nil {
  110. t.Fatal(err)
  111. }
  112. wantResult := []BatchElem{
  113. {
  114. Method: "test_echo",
  115. Args: []interface{}{"hello", 10, &echoArgs{"world"}},
  116. Result: &echoResult{"hello", 10, &echoArgs{"world"}},
  117. },
  118. {
  119. Method: "test_echo",
  120. Args: []interface{}{"hello2", 11, &echoArgs{"world"}},
  121. Result: &echoResult{"hello2", 11, &echoArgs{"world"}},
  122. },
  123. {
  124. Method: "no_such_method",
  125. Args: []interface{}{1, 2, 3},
  126. Result: new(int),
  127. Error: &jsonError{Code: -32601, Message: "the method no_such_method does not exist/is not available"},
  128. },
  129. }
  130. if !reflect.DeepEqual(batch, wantResult) {
  131. t.Errorf("batch results mismatch:\ngot %swant %s", spew.Sdump(batch), spew.Sdump(wantResult))
  132. }
  133. }
  134. func TestClientNotify(t *testing.T) {
  135. server := newTestServer()
  136. defer server.Stop()
  137. client := DialInProc(server)
  138. defer client.Close()
  139. if err := client.Notify(context.Background(), "test_echo", "hello", 10, &echoArgs{"world"}); err != nil {
  140. t.Fatal(err)
  141. }
  142. }
  143. // func TestClientCancelInproc(t *testing.T) { testClientCancel("inproc", t) }
  144. func TestClientCancelWebsocket(t *testing.T) { testClientCancel("ws", t) }
  145. func TestClientCancelHTTP(t *testing.T) { testClientCancel("http", t) }
  146. func TestClientCancelIPC(t *testing.T) { testClientCancel("ipc", t) }
  147. // This test checks that requests made through CallContext can be canceled by canceling
  148. // the context.
  149. func testClientCancel(transport string, t *testing.T) {
  150. // These tests take a lot of time, run them all at once.
  151. // You probably want to run with -parallel 1 or comment out
  152. // the call to t.Parallel if you enable the logging.
  153. t.Parallel()
  154. server := newTestServer()
  155. defer server.Stop()
  156. // What we want to achieve is that the context gets canceled
  157. // at various stages of request processing. The interesting cases
  158. // are:
  159. // - cancel during dial
  160. // - cancel while performing a HTTP request
  161. // - cancel while waiting for a response
  162. //
  163. // To trigger those, the times are chosen such that connections
  164. // are killed within the deadline for every other call (maxKillTimeout
  165. // is 2x maxCancelTimeout).
  166. //
  167. // Once a connection is dead, there is a fair chance it won't connect
  168. // successfully because the accept is delayed by 1s.
  169. maxContextCancelTimeout := 300 * time.Millisecond
  170. fl := &flakeyListener{
  171. maxAcceptDelay: 1 * time.Second,
  172. maxKillTimeout: 600 * time.Millisecond,
  173. }
  174. var client *Client
  175. switch transport {
  176. case "ws", "http":
  177. c, hs := httpTestClient(server, transport, fl)
  178. defer hs.Close()
  179. client = c
  180. case "ipc":
  181. c, l := ipcTestClient(server, fl)
  182. defer l.Close()
  183. client = c
  184. default:
  185. panic("unknown transport: " + transport)
  186. }
  187. // The actual test starts here.
  188. var (
  189. wg sync.WaitGroup
  190. nreqs = 10
  191. ncallers = 10
  192. )
  193. caller := func(index int) {
  194. defer wg.Done()
  195. for i := 0; i < nreqs; i++ {
  196. var (
  197. ctx context.Context
  198. cancel func()
  199. timeout = time.Duration(rand.Int63n(int64(maxContextCancelTimeout)))
  200. )
  201. if index < ncallers/2 {
  202. // For half of the callers, create a context without deadline
  203. // and cancel it later.
  204. ctx, cancel = context.WithCancel(context.Background())
  205. time.AfterFunc(timeout, cancel)
  206. } else {
  207. // For the other half, create a context with a deadline instead. This is
  208. // different because the context deadline is used to set the socket write
  209. // deadline.
  210. ctx, cancel = context.WithTimeout(context.Background(), timeout)
  211. }
  212. // Now perform a call with the context.
  213. // The key thing here is that no call will ever complete successfully.
  214. err := client.CallContext(ctx, nil, "test_block")
  215. switch {
  216. case err == nil:
  217. _, hasDeadline := ctx.Deadline()
  218. t.Errorf("no error for call with %v wait time (deadline: %v)", timeout, hasDeadline)
  219. // default:
  220. // t.Logf("got expected error with %v wait time: %v", timeout, err)
  221. }
  222. cancel()
  223. }
  224. }
  225. wg.Add(ncallers)
  226. for i := 0; i < ncallers; i++ {
  227. go caller(i)
  228. }
  229. wg.Wait()
  230. }
  231. func TestClientSubscribeInvalidArg(t *testing.T) {
  232. server := newTestServer()
  233. defer server.Stop()
  234. client := DialInProc(server)
  235. defer client.Close()
  236. check := func(shouldPanic bool, arg interface{}) {
  237. defer func() {
  238. err := recover()
  239. if shouldPanic && err == nil {
  240. t.Errorf("EthSubscribe should've panicked for %#v", arg)
  241. }
  242. if !shouldPanic && err != nil {
  243. t.Errorf("EthSubscribe shouldn't have panicked for %#v", arg)
  244. buf := make([]byte, 1024*1024)
  245. buf = buf[:runtime.Stack(buf, false)]
  246. t.Error(err)
  247. t.Error(string(buf))
  248. }
  249. }()
  250. client.EthSubscribe(context.Background(), arg, "foo_bar")
  251. }
  252. check(true, nil)
  253. check(true, 1)
  254. check(true, (chan int)(nil))
  255. check(true, make(<-chan int))
  256. check(false, make(chan int))
  257. check(false, make(chan<- int))
  258. }
  259. func TestClientSubscribe(t *testing.T) {
  260. server := newTestServer()
  261. defer server.Stop()
  262. client := DialInProc(server)
  263. defer client.Close()
  264. nc := make(chan int)
  265. count := 10
  266. sub, err := client.Subscribe(context.Background(), "nftest", nc, "someSubscription", count, 0)
  267. if err != nil {
  268. t.Fatal("can't subscribe:", err)
  269. }
  270. for i := 0; i < count; i++ {
  271. if val := <-nc; val != i {
  272. t.Fatalf("value mismatch: got %d, want %d", val, i)
  273. }
  274. }
  275. sub.Unsubscribe()
  276. select {
  277. case v := <-nc:
  278. t.Fatal("received value after unsubscribe:", v)
  279. case err := <-sub.Err():
  280. if err != nil {
  281. t.Fatalf("Err returned a non-nil error after explicit unsubscribe: %q", err)
  282. }
  283. case <-time.After(1 * time.Second):
  284. t.Fatalf("subscription not closed within 1s after unsubscribe")
  285. }
  286. }
  287. // In this test, the connection drops while Subscribe is waiting for a response.
  288. func TestClientSubscribeClose(t *testing.T) {
  289. server := newTestServer()
  290. service := &notificationTestService{
  291. gotHangSubscriptionReq: make(chan struct{}),
  292. unblockHangSubscription: make(chan struct{}),
  293. }
  294. if err := server.RegisterName("nftest2", service); err != nil {
  295. t.Fatal(err)
  296. }
  297. defer server.Stop()
  298. client := DialInProc(server)
  299. defer client.Close()
  300. var (
  301. nc = make(chan int)
  302. errc = make(chan error, 1)
  303. sub *ClientSubscription
  304. err error
  305. )
  306. go func() {
  307. sub, err = client.Subscribe(context.Background(), "nftest2", nc, "hangSubscription", 999)
  308. errc <- err
  309. }()
  310. <-service.gotHangSubscriptionReq
  311. client.Close()
  312. service.unblockHangSubscription <- struct{}{}
  313. select {
  314. case err := <-errc:
  315. if err == nil {
  316. t.Errorf("Subscribe returned nil error after Close")
  317. }
  318. if sub != nil {
  319. t.Error("Subscribe returned non-nil subscription after Close")
  320. }
  321. case <-time.After(1 * time.Second):
  322. t.Fatalf("Subscribe did not return within 1s after Close")
  323. }
  324. }
  325. // This test reproduces https://github.com/ethereum/go-ethereum/issues/17837 where the
  326. // client hangs during shutdown when Unsubscribe races with Client.Close.
  327. func TestClientCloseUnsubscribeRace(t *testing.T) {
  328. server := newTestServer()
  329. defer server.Stop()
  330. for i := 0; i < 20; i++ {
  331. client := DialInProc(server)
  332. nc := make(chan int)
  333. sub, err := client.Subscribe(context.Background(), "nftest", nc, "someSubscription", 3, 1)
  334. if err != nil {
  335. t.Fatal(err)
  336. }
  337. go client.Close()
  338. go sub.Unsubscribe()
  339. select {
  340. case <-sub.Err():
  341. case <-time.After(5 * time.Second):
  342. t.Fatal("subscription not closed within timeout")
  343. }
  344. }
  345. }
  346. // unsubscribeRecorder collects the subscription IDs of *_unsubscribe calls.
  347. type unsubscribeRecorder struct {
  348. ServerCodec
  349. unsubscribes map[string]bool
  350. }
  351. func (r *unsubscribeRecorder) readBatch() ([]*jsonrpcMessage, bool, error) {
  352. if r.unsubscribes == nil {
  353. r.unsubscribes = make(map[string]bool)
  354. }
  355. msgs, batch, err := r.ServerCodec.readBatch()
  356. for _, msg := range msgs {
  357. if msg.isUnsubscribe() {
  358. var params []string
  359. if err := json.Unmarshal(msg.Params, &params); err != nil {
  360. panic("unsubscribe decode error: " + err.Error())
  361. }
  362. r.unsubscribes[params[0]] = true
  363. }
  364. }
  365. return msgs, batch, err
  366. }
  367. // This checks that Client calls the _unsubscribe method on the server when Unsubscribe is
  368. // called on a subscription.
  369. func TestClientSubscriptionUnsubscribeServer(t *testing.T) {
  370. t.Parallel()
  371. // Create the server.
  372. srv := NewServer()
  373. srv.RegisterName("nftest", new(notificationTestService))
  374. p1, p2 := net.Pipe()
  375. recorder := &unsubscribeRecorder{ServerCodec: NewCodec(p1)}
  376. go srv.ServeCodec(recorder, OptionMethodInvocation|OptionSubscriptions)
  377. defer srv.Stop()
  378. // Create the client on the other end of the pipe.
  379. client, _ := newClient(context.Background(), func(context.Context) (ServerCodec, error) {
  380. return NewCodec(p2), nil
  381. })
  382. defer client.Close()
  383. // Create the subscription.
  384. ch := make(chan int)
  385. sub, err := client.Subscribe(context.Background(), "nftest", ch, "someSubscription", 1, 1)
  386. if err != nil {
  387. t.Fatal(err)
  388. }
  389. // Unsubscribe and check that unsubscribe was called.
  390. sub.Unsubscribe()
  391. if !recorder.unsubscribes[sub.subid] {
  392. t.Fatal("client did not call unsubscribe method")
  393. }
  394. if _, open := <-sub.Err(); open {
  395. t.Fatal("subscription error channel not closed after unsubscribe")
  396. }
  397. }
  398. // This checks that the subscribed channel can be closed after Unsubscribe.
  399. // It is the reproducer for https://github.com/ethereum/go-ethereum/issues/22322
  400. func TestClientSubscriptionChannelClose(t *testing.T) {
  401. t.Parallel()
  402. var (
  403. srv = NewServer()
  404. httpsrv = httptest.NewServer(srv.WebsocketHandler(nil))
  405. wsURL = "ws:" + strings.TrimPrefix(httpsrv.URL, "http:")
  406. )
  407. defer srv.Stop()
  408. defer httpsrv.Close()
  409. srv.RegisterName("nftest", new(notificationTestService))
  410. client, _ := Dial(wsURL)
  411. for i := 0; i < 100; i++ {
  412. ch := make(chan int, 100)
  413. sub, err := client.Subscribe(context.Background(), "nftest", ch, "someSubscription", maxClientSubscriptionBuffer-1, 1)
  414. if err != nil {
  415. t.Fatal(err)
  416. }
  417. sub.Unsubscribe()
  418. close(ch)
  419. }
  420. }
  421. // This test checks that Client doesn't lock up when a single subscriber
  422. // doesn't read subscription events.
  423. func TestClientNotificationStorm(t *testing.T) {
  424. server := newTestServer()
  425. defer server.Stop()
  426. doTest := func(count int, wantError bool) {
  427. client := DialInProc(server)
  428. defer client.Close()
  429. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  430. defer cancel()
  431. // Subscribe on the server. It will start sending many notifications
  432. // very quickly.
  433. nc := make(chan int)
  434. sub, err := client.Subscribe(ctx, "nftest", nc, "someSubscription", count, 0)
  435. if err != nil {
  436. t.Fatal("can't subscribe:", err)
  437. }
  438. defer sub.Unsubscribe()
  439. // Process each notification, try to run a call in between each of them.
  440. for i := 0; i < count; i++ {
  441. select {
  442. case val := <-nc:
  443. if val != i {
  444. t.Fatalf("(%d/%d) unexpected value %d", i, count, val)
  445. }
  446. case err := <-sub.Err():
  447. if wantError && err != ErrSubscriptionQueueOverflow {
  448. t.Fatalf("(%d/%d) got error %q, want %q", i, count, err, ErrSubscriptionQueueOverflow)
  449. } else if !wantError {
  450. t.Fatalf("(%d/%d) got unexpected error %q", i, count, err)
  451. }
  452. return
  453. }
  454. var r int
  455. err := client.CallContext(ctx, &r, "nftest_echo", i)
  456. if err != nil {
  457. if !wantError {
  458. t.Fatalf("(%d/%d) call error: %v", i, count, err)
  459. }
  460. return
  461. }
  462. }
  463. if wantError {
  464. t.Fatalf("didn't get expected error")
  465. }
  466. }
  467. doTest(8000, false)
  468. doTest(24000, true)
  469. }
  470. func TestClientSetHeader(t *testing.T) {
  471. var gotHeader bool
  472. srv := newTestServer()
  473. httpsrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  474. if r.Header.Get("test") == "ok" {
  475. gotHeader = true
  476. }
  477. srv.ServeHTTP(w, r)
  478. }))
  479. defer httpsrv.Close()
  480. defer srv.Stop()
  481. client, err := Dial(httpsrv.URL)
  482. if err != nil {
  483. t.Fatal(err)
  484. }
  485. defer client.Close()
  486. client.SetHeader("test", "ok")
  487. if _, err := client.SupportedModules(); err != nil {
  488. t.Fatal(err)
  489. }
  490. if !gotHeader {
  491. t.Fatal("client did not set custom header")
  492. }
  493. // Check that Content-Type can be replaced.
  494. client.SetHeader("content-type", "application/x-garbage")
  495. _, err = client.SupportedModules()
  496. if err == nil {
  497. t.Fatal("no error for invalid content-type header")
  498. } else if !strings.Contains(err.Error(), "Unsupported Media Type") {
  499. t.Fatalf("error is not related to content-type: %q", err)
  500. }
  501. }
  502. func TestClientHTTP(t *testing.T) {
  503. server := newTestServer()
  504. defer server.Stop()
  505. client, hs := httpTestClient(server, "http", nil)
  506. defer hs.Close()
  507. defer client.Close()
  508. // Launch concurrent requests.
  509. var (
  510. results = make([]echoResult, 100)
  511. errc = make(chan error, len(results))
  512. wantResult = echoResult{"a", 1, new(echoArgs)}
  513. )
  514. defer client.Close()
  515. for i := range results {
  516. i := i
  517. go func() {
  518. errc <- client.Call(&results[i], "test_echo", wantResult.String, wantResult.Int, wantResult.Args)
  519. }()
  520. }
  521. // Wait for all of them to complete.
  522. timeout := time.NewTimer(5 * time.Second)
  523. defer timeout.Stop()
  524. for i := range results {
  525. select {
  526. case err := <-errc:
  527. if err != nil {
  528. t.Fatal(err)
  529. }
  530. case <-timeout.C:
  531. t.Fatalf("timeout (got %d/%d) results)", i+1, len(results))
  532. }
  533. }
  534. // Check results.
  535. for i := range results {
  536. if !reflect.DeepEqual(results[i], wantResult) {
  537. t.Errorf("result %d mismatch: got %#v, want %#v", i, results[i], wantResult)
  538. }
  539. }
  540. }
  541. func TestClientReconnect(t *testing.T) {
  542. startServer := func(addr string) (*Server, net.Listener) {
  543. srv := newTestServer()
  544. l, err := net.Listen("tcp", addr)
  545. if err != nil {
  546. t.Fatal("can't listen:", err)
  547. }
  548. go http.Serve(l, srv.WebsocketHandler([]string{"*"}))
  549. return srv, l
  550. }
  551. ctx, cancel := context.WithTimeout(context.Background(), 12*time.Second)
  552. defer cancel()
  553. // Start a server and corresponding client.
  554. s1, l1 := startServer("127.0.0.1:0")
  555. client, err := DialContext(ctx, "ws://"+l1.Addr().String())
  556. if err != nil {
  557. t.Fatal("can't dial", err)
  558. }
  559. defer client.Close()
  560. // Perform a call. This should work because the server is up.
  561. var resp echoResult
  562. if err := client.CallContext(ctx, &resp, "test_echo", "", 1, nil); err != nil {
  563. t.Fatal(err)
  564. }
  565. // Shut down the server and allow for some cool down time so we can listen on the same
  566. // address again.
  567. l1.Close()
  568. s1.Stop()
  569. time.Sleep(2 * time.Second)
  570. // Try calling again. It shouldn't work.
  571. if err := client.CallContext(ctx, &resp, "test_echo", "", 2, nil); err == nil {
  572. t.Error("successful call while the server is down")
  573. t.Logf("resp: %#v", resp)
  574. }
  575. // Start it up again and call again. The connection should be reestablished.
  576. // We spawn multiple calls here to check whether this hangs somehow.
  577. s2, l2 := startServer(l1.Addr().String())
  578. defer l2.Close()
  579. defer s2.Stop()
  580. start := make(chan struct{})
  581. errors := make(chan error, 20)
  582. for i := 0; i < cap(errors); i++ {
  583. go func() {
  584. <-start
  585. var resp echoResult
  586. errors <- client.CallContext(ctx, &resp, "test_echo", "", 3, nil)
  587. }()
  588. }
  589. close(start)
  590. errcount := 0
  591. for i := 0; i < cap(errors); i++ {
  592. if err = <-errors; err != nil {
  593. errcount++
  594. }
  595. }
  596. t.Logf("%d errors, last error: %v", errcount, err)
  597. if errcount > 1 {
  598. t.Errorf("expected one error after disconnect, got %d", errcount)
  599. }
  600. }
  601. func httpTestClient(srv *Server, transport string, fl *flakeyListener) (*Client, *httptest.Server) {
  602. // Create the HTTP server.
  603. var hs *httptest.Server
  604. switch transport {
  605. case "ws":
  606. hs = httptest.NewUnstartedServer(srv.WebsocketHandler([]string{"*"}))
  607. case "http":
  608. hs = httptest.NewUnstartedServer(srv)
  609. default:
  610. panic("unknown HTTP transport: " + transport)
  611. }
  612. // Wrap the listener if required.
  613. if fl != nil {
  614. fl.Listener = hs.Listener
  615. hs.Listener = fl
  616. }
  617. // Connect the client.
  618. hs.Start()
  619. client, err := Dial(transport + "://" + hs.Listener.Addr().String())
  620. if err != nil {
  621. panic(err)
  622. }
  623. return client, hs
  624. }
  625. func ipcTestClient(srv *Server, fl *flakeyListener) (*Client, net.Listener) {
  626. // Listen on a random endpoint.
  627. endpoint := fmt.Sprintf("go-ethereum-test-ipc-%d-%d", os.Getpid(), rand.Int63())
  628. if runtime.GOOS == "windows" {
  629. endpoint = `\\.\pipe\` + endpoint
  630. } else {
  631. endpoint = os.TempDir() + "/" + endpoint
  632. }
  633. l, err := ipcListen(endpoint)
  634. if err != nil {
  635. panic(err)
  636. }
  637. // Connect the listener to the server.
  638. if fl != nil {
  639. fl.Listener = l
  640. l = fl
  641. }
  642. go srv.ServeListener(l)
  643. // Connect the client.
  644. client, err := Dial(endpoint)
  645. if err != nil {
  646. panic(err)
  647. }
  648. return client, l
  649. }
  650. // flakeyListener kills accepted connections after a random timeout.
  651. type flakeyListener struct {
  652. net.Listener
  653. maxKillTimeout time.Duration
  654. maxAcceptDelay time.Duration
  655. }
  656. func (l *flakeyListener) Accept() (net.Conn, error) {
  657. delay := time.Duration(rand.Int63n(int64(l.maxAcceptDelay)))
  658. time.Sleep(delay)
  659. c, err := l.Listener.Accept()
  660. if err == nil {
  661. timeout := time.Duration(rand.Int63n(int64(l.maxKillTimeout)))
  662. time.AfterFunc(timeout, func() {
  663. log.Debug(fmt.Sprintf("killing conn %v after %v", c.LocalAddr(), timeout))
  664. c.Close()
  665. })
  666. }
  667. return c, err
  668. }