Explorar el Código

eth, les: fix time sensitive unit tests (#20741)

gary rong hace 5 años
padre
commit
92f3405dae
Se han modificado 8 ficheros con 52 adiciones y 39 borrados
  1. 1 1
      eth/handler_test.go
  2. 5 0
      les/client_handler.go
  3. 0 2
      les/odr_test.go
  4. 7 7
      les/peer.go
  5. 0 1
      les/request_test.go
  6. 3 0
      les/server_handler.go
  7. 6 17
      les/sync_test.go
  8. 30 11
      les/test_helper.go

+ 1 - 1
eth/handler_test.go

@@ -534,7 +534,7 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo
 		}
 	}
 	// Wait until the test timeout passes to ensure proper cleanup
-	time.Sleep(syncChallengeTimeout + 100*time.Millisecond)
+	time.Sleep(syncChallengeTimeout + 300*time.Millisecond)
 
 	// Verify that the remote peer is maintained or dropped
 	if drop {

+ 5 - 0
les/client_handler.go

@@ -19,6 +19,7 @@ package les
 import (
 	"math/big"
 	"sync"
+	"sync/atomic"
 	"time"
 
 	"github.com/ethereum/go-ethereum/common"
@@ -132,6 +133,10 @@ func (h *clientHandler) handle(p *serverPeer) error {
 	if p.poolEntry != nil {
 		h.backend.serverPool.registered(p.poolEntry)
 	}
+	// Mark the peer starts to be served.
+	atomic.StoreUint32(&p.serving, 1)
+	defer atomic.StoreUint32(&p.serving, 0)
+
 	// Spawn a main loop to handle all incoming messages.
 	for {
 		if err := h.handleMsg(p); err != nil {

+ 0 - 2
les/odr_test.go

@@ -186,8 +186,6 @@ func testOdr(t *testing.T, protocol int, expFail uint64, checkCached bool, fn od
 	server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, nil, 0, false, true)
 	defer tearDown()
 
-	client.handler.synchronise(client.peer.speer)
-
 	// Ensure the client has synced all necessary data.
 	clientHead := client.handler.backend.blockchain.CurrentHeader()
 	if clientHead.Number.Uint64() != 4 {

+ 7 - 7
les/peer.go

@@ -131,6 +131,7 @@ type peerCommons struct {
 	network      uint64    // Network ID being on.
 	frozen       uint32    // Flag whether the peer is frozen.
 	announceType uint64    // New block announcement type.
+	serving      uint32    // The status indicates the peer is served.
 	headInfo     blockInfo // Latest block information.
 
 	// Background task queue for caching peer tasks and executing in order.
@@ -636,13 +637,12 @@ type clientPeer struct {
 
 	// responseLock ensures that responses are queued in the same order as
 	// RequestProcessed is called
-	responseLock   sync.Mutex
-	server         bool
-	invalidCount   uint32 // Counter the invalid request the client peer has made.
-	responseCount  uint64 // Counter to generate an unique id for request processing.
-	errCh          chan error
-	fcClient       *flowcontrol.ClientNode // Server side mirror token bucket.
-	balanceTracker *balanceTracker         // set by clientPool.connect, used and removed by serverHandler
+	responseLock  sync.Mutex
+	server        bool
+	invalidCount  uint32 // Counter the invalid request the client peer has made.
+	responseCount uint64 // Counter to generate an unique id for request processing.
+	errCh         chan error
+	fcClient      *flowcontrol.ClientNode // Server side mirror token bucket.
 }
 
 func newClientPeer(version int, network uint64, p *p2p.Peer, rw p2p.MsgReadWriter) *clientPeer {

+ 0 - 1
les/request_test.go

@@ -81,7 +81,6 @@ func testAccess(t *testing.T, protocol int, fn accessTestFn) {
 	// Assemble the test environment
 	server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, nil, 0, false, true)
 	defer tearDown()
-	client.handler.synchronise(client.peer.speer)
 
 	// Ensure the client has synced all necessary data.
 	clientHead := client.handler.backend.blockchain.CurrentHeader()

+ 3 - 0
les/server_handler.go

@@ -157,6 +157,9 @@ func (h *serverHandler) handle(p *clientPeer) error {
 		clientConnectionGauge.Update(int64(h.server.peers.len()))
 		connectionTimer.Update(time.Duration(mclock.Now() - connectedAt))
 	}()
+	// Mark the peer starts to be served.
+	atomic.StoreUint32(&p.serving, 1)
+	defer atomic.StoreUint32(&p.serving, 0)
 
 	// Spawn a main loop to handle all incoming messages.
 	for {

+ 6 - 17
les/sync_test.go

@@ -109,16 +109,12 @@ func testCheckpointSyncing(t *testing.T, protocol int, syncMode int) {
 	}
 
 	// Create connected peer pair.
-	peer1, err1, peer2, err2 := newTestPeerPair("peer", protocol, server.handler, client.handler)
+	peer1, peer2, err := newTestPeerPair("peer", protocol, server.handler, client.handler)
+	if err != nil {
+		t.Fatalf("Failed to connect testing peers %v", err)
+	}
 	defer peer1.close()
 	defer peer2.close()
-	select {
-	case <-time.After(time.Millisecond * 100):
-	case err := <-err1:
-		t.Fatalf("peer 1 handshake error: %v", err)
-	case err := <-err2:
-		t.Fatalf("peer 2 handshake error: %v", err)
-	}
 
 	select {
 	case err := <-done:
@@ -208,17 +204,10 @@ func testMissOracleBackend(t *testing.T, hasCheckpoint bool) {
 			done <- fmt.Errorf("blockchain length mismatch, want %d, got %d", expected, header.Number)
 		}
 	}
-
 	// Create connected peer pair.
-	_, err1, _, err2 := newTestPeerPair("peer", 2, server.handler, client.handler)
-	select {
-	case <-time.After(time.Millisecond * 100):
-	case err := <-err1:
-		t.Fatalf("peer 1 handshake error: %v", err)
-	case err := <-err2:
-		t.Fatalf("peer 2 handshake error: %v", err)
+	if _, _, err := newTestPeerPair("peer", 2, server.handler, client.handler); err != nil {
+		t.Fatalf("Failed to connect testing peers %v", err)
 	}
-
 	select {
 	case err := <-done:
 		if err != nil {

+ 30 - 11
les/test_helper.go

@@ -22,7 +22,9 @@ package les
 import (
 	"context"
 	"crypto/rand"
+	"fmt"
 	"math/big"
+	"sync/atomic"
 	"testing"
 	"time"
 
@@ -347,7 +349,7 @@ func (p *testPeer) close() {
 	p.app.Close()
 }
 
-func newTestPeerPair(name string, version int, server *serverHandler, client *clientHandler) (*testPeer, <-chan error, *testPeer, <-chan error) {
+func newTestPeerPair(name string, version int, server *serverHandler, client *clientHandler) (*testPeer, *testPeer, error) {
 	// Create a message pipe to communicate through
 	app, net := p2p.MsgPipe()
 
@@ -371,11 +373,25 @@ func newTestPeerPair(name string, version int, server *serverHandler, client *cl
 	go func() {
 		select {
 		case <-client.closeCh:
-			errc1 <- p2p.DiscQuitting
-		case errc1 <- client.handle(peer2):
+			errc2 <- p2p.DiscQuitting
+		case errc2 <- client.handle(peer2):
 		}
 	}()
-	return &testPeer{cpeer: peer1, net: net, app: app}, errc1, &testPeer{speer: peer2, net: app, app: net}, errc2
+	// Ensure the connection is established or exits when any error occurs
+	for {
+		select {
+		case err := <-errc1:
+			return nil, nil, fmt.Errorf("Failed to establish protocol connection %v", err)
+		case err := <-errc2:
+			return nil, nil, fmt.Errorf("Failed to establish protocol connection %v", err)
+		default:
+		}
+		if atomic.LoadUint32(&peer1.serving) == 1 && atomic.LoadUint32(&peer2.serving) == 1 {
+			break
+		}
+		time.Sleep(50 * time.Millisecond)
+	}
+	return &testPeer{cpeer: peer1, net: net, app: app}, &testPeer{speer: peer2, net: app, app: net}, nil
 }
 
 // handshake simulates a trivial handshake that expects the same state from the
@@ -514,17 +530,20 @@ func newClientServerEnv(t *testing.T, blocks int, protocol int, callback indexer
 		callback(scIndexer, sbIndexer, sbtIndexer)
 	}
 	var (
+		err          error
 		speer, cpeer *testPeer
-		err1, err2   <-chan error
 	)
 	if connect {
-		cpeer, err1, speer, err2 = newTestPeerPair("peer", protocol, server, client)
+		done := make(chan struct{})
+		client.syncDone = func() { close(done) }
+		cpeer, speer, err = newTestPeerPair("peer", protocol, server, client)
+		if err != nil {
+			t.Fatalf("Failed to connect testing peers %v", err)
+		}
 		select {
-		case <-time.After(time.Millisecond * 300):
-		case err := <-err1:
-			t.Fatalf("peer 1 handshake error: %v", err)
-		case err := <-err2:
-			t.Fatalf("peer 2 handshake error: %v", err)
+		case <-done:
+		case <-time.After(3 * time.Second):
+			t.Fatal("test peer did not connect and sync within 3s")
 		}
 	}
 	s := &testServer{