Просмотр исходного кода

p2p/discv5: search and lookup improvement

Zsolt Felfoldi 9 лет назад
Родитель
Сommit
a6d3bf6fc3
2 измененных файлов с 99 добавлено и 64 удалено
  1. 85 34
      p2p/discv5/net.go
  2. 14 30
      p2p/discv5/ticket.go

+ 85 - 34
p2p/discv5/net.go

@@ -126,8 +126,15 @@ type topicRegisterReq struct {
 }
 
 type topicSearchReq struct {
-	topic Topic
-	found chan<- string
+	topic  Topic
+	found  chan<- *Node
+	lookup chan<- bool
+	delay  time.Duration
+}
+
+type topicSearchResult struct {
+	target lookupInfo
+	nodes  []*Node
 }
 
 type timeoutEvent struct {
@@ -263,16 +270,23 @@ func (net *Network) lookup(target common.Hash, stopOnMatch bool) []*Node {
 			break
 		}
 		// Wait for the next reply.
-		for _, n := range <-reply {
-			if n != nil && !seen[n.ID] {
-				seen[n.ID] = true
-				result.push(n, bucketSize)
-				if stopOnMatch && n.sha == target {
-					return result.entries
+		select {
+		case nodes := <-reply:
+			for _, n := range nodes {
+				if n != nil && !seen[n.ID] {
+					seen[n.ID] = true
+					result.push(n, bucketSize)
+					if stopOnMatch && n.sha == target {
+						return result.entries
+					}
 				}
 			}
+			pendingQueries--
+		case <-time.After(respTimeout):
+			// forget all pending requests, start new ones
+			pendingQueries = 0
+			reply = make(chan []*Node, alpha)
 		}
-		pendingQueries--
 	}
 	return result.entries
 }
@@ -293,18 +307,20 @@ func (net *Network) RegisterTopic(topic Topic, stop <-chan struct{}) {
 	}
 }
 
-func (net *Network) SearchTopic(topic Topic, stop <-chan struct{}, found chan<- string) {
-	select {
-	case net.topicSearchReq <- topicSearchReq{topic, found}:
-	case <-net.closed:
-		return
-	}
-	select {
-	case <-net.closed:
-	case <-stop:
+func (net *Network) SearchTopic(topic Topic, setPeriod <-chan time.Duration, found chan<- *Node, lookup chan<- bool) {
+	for {
 		select {
-		case net.topicSearchReq <- topicSearchReq{topic, nil}:
 		case <-net.closed:
+			return
+		case delay, ok := <-setPeriod:
+			select {
+			case net.topicSearchReq <- topicSearchReq{topic: topic, found: found, lookup: lookup, delay: delay}:
+			case <-net.closed:
+				return
+			}
+			if !ok {
+				return
+			}
 		}
 	}
 }
@@ -347,6 +363,13 @@ func (net *Network) reqTableOp(f func()) (called bool) {
 
 // TODO: external address handling.
 
+type topicSearchInfo struct {
+	lookupChn chan<- bool
+	period    time.Duration
+}
+
+const maxSearchCount = 5
+
 func (net *Network) loop() {
 	var (
 		refreshTimer       = time.NewTicker(autoRefreshInterval)
@@ -385,10 +408,12 @@ func (net *Network) loop() {
 		topicRegisterLookupTarget lookupInfo
 		topicRegisterLookupDone   chan []*Node
 		topicRegisterLookupTick   = time.NewTimer(0)
-		topicSearchLookupTarget   lookupInfo
 		searchReqWhenRefreshDone  []topicSearchReq
+		searchInfo                = make(map[Topic]topicSearchInfo)
+		activeSearchCount         int
 	)
-	topicSearchLookupDone := make(chan []*Node, 1)
+	topicSearchLookupDone := make(chan topicSearchResult, 100)
+	topicSearch := make(chan Topic, 100)
 	<-topicRegisterLookupTick.C
 
 	statsDump := time.NewTicker(10 * time.Second)
@@ -504,21 +529,52 @@ loop:
 		case req := <-net.topicSearchReq:
 			if refreshDone == nil {
 				debugLog("<-net.topicSearchReq")
-				if req.found == nil {
-					net.ticketStore.removeSearchTopic(req.topic)
+				info, ok := searchInfo[req.topic]
+				if ok {
+					if req.delay == time.Duration(0) {
+						delete(searchInfo, req.topic)
+						net.ticketStore.removeSearchTopic(req.topic)
+					} else {
+						info.period = req.delay
+						searchInfo[req.topic] = info
+					}
 					continue
 				}
-				net.ticketStore.addSearchTopic(req.topic, req.found)
-				if (topicSearchLookupTarget.target == common.Hash{}) {
-					topicSearchLookupDone <- nil
+				if req.delay != time.Duration(0) {
+					var info topicSearchInfo
+					info.period = req.delay
+					info.lookupChn = req.lookup
+					searchInfo[req.topic] = info
+					net.ticketStore.addSearchTopic(req.topic, req.found)
+					topicSearch <- req.topic
 				}
 			} else {
 				searchReqWhenRefreshDone = append(searchReqWhenRefreshDone, req)
 			}
 
-		case nodes := <-topicSearchLookupDone:
-			debugLog("<-topicSearchLookupDone")
-			net.ticketStore.searchLookupDone(topicSearchLookupTarget, nodes, func(n *Node) []byte {
+		case topic := <-topicSearch:
+			if activeSearchCount < maxSearchCount {
+				activeSearchCount++
+				target := net.ticketStore.nextSearchLookup(topic)
+				go func() {
+					nodes := net.lookup(target.target, false)
+					topicSearchLookupDone <- topicSearchResult{target: target, nodes: nodes}
+				}()
+			}
+			period := searchInfo[topic].period
+			if period != time.Duration(0) {
+				go func() {
+					time.Sleep(period)
+					topicSearch <- topic
+				}()
+			}
+
+		case res := <-topicSearchLookupDone:
+			activeSearchCount--
+			if lookupChn := searchInfo[res.target.topic].lookupChn; lookupChn != nil {
+				lookupChn <- net.ticketStore.radius[res.target.topic].converged
+			}
+			net.ticketStore.searchLookupDone(res.target, res.nodes, func(n *Node) []byte {
 				net.ping(n, n.addr())
 				return n.pingEcho
 			}, func(n *Node, topic Topic) []byte {
@@ -531,11 +587,6 @@ loop:
 					return nil
 				}
 			})
-			topicSearchLookupTarget = net.ticketStore.nextSearchLookup()
-			target := topicSearchLookupTarget.target
-			if (target != common.Hash{}) {
-				go func() { topicSearchLookupDone <- net.lookup(target, false) }()
-			}
 
 		case <-statsDump.C:
 			debugLog("<-statsDump.C")

+ 14 - 30
p2p/discv5/ticket.go

@@ -138,16 +138,12 @@ type ticketStore struct {
 	nextTicketReg     mclock.AbsTime
 
 	searchTopicMap        map[Topic]searchTopic
-	searchTopicList       []Topic
-	searchTopicPtr        int
 	nextTopicQueryCleanup mclock.AbsTime
 	queriesSent           map[*Node]map[common.Hash]sentQuery
-	radiusLookupCnt       int
 }
 
 type searchTopic struct {
-	foundChn chan<- string
-	listIdx  int
+	foundChn chan<- *Node
 }
 
 type sentQuery struct {
@@ -183,23 +179,15 @@ func (s *ticketStore) addTopic(t Topic, register bool) {
 	}
 }
 
-func (s *ticketStore) addSearchTopic(t Topic, foundChn chan<- string) {
+func (s *ticketStore) addSearchTopic(t Topic, foundChn chan<- *Node) {
 	s.addTopic(t, false)
 	if s.searchTopicMap[t].foundChn == nil {
-		s.searchTopicList = append(s.searchTopicList, t)
-		s.searchTopicMap[t] = searchTopic{foundChn: foundChn, listIdx: len(s.searchTopicList) - 1}
+		s.searchTopicMap[t] = searchTopic{foundChn: foundChn}
 	}
 }
 
 func (s *ticketStore) removeSearchTopic(t Topic) {
 	if st := s.searchTopicMap[t]; st.foundChn != nil {
-		lastIdx := len(s.searchTopicList) - 1
-		lastTopic := s.searchTopicList[lastIdx]
-		s.searchTopicList[st.listIdx] = lastTopic
-		sl := s.searchTopicMap[lastTopic]
-		sl.listIdx = st.listIdx
-		s.searchTopicMap[lastTopic] = sl
-		s.searchTopicList = s.searchTopicList[:lastIdx]
 		delete(s.searchTopicMap, t)
 	}
 }
@@ -247,20 +235,13 @@ func (s *ticketStore) nextRegisterLookup() (lookup lookupInfo, delay time.Durati
 	return lookupInfo{}, 40 * time.Second
 }
 
-func (s *ticketStore) nextSearchLookup() lookupInfo {
-	if len(s.searchTopicList) == 0 {
-		return lookupInfo{}
-	}
-	if s.searchTopicPtr >= len(s.searchTopicList) {
-		s.searchTopicPtr = 0
-	}
-	topic := s.searchTopicList[s.searchTopicPtr]
-	s.searchTopicPtr++
-	target := s.radius[topic].nextTarget(s.radiusLookupCnt >= searchForceQuery)
+func (s *ticketStore) nextSearchLookup(topic Topic) lookupInfo {
+	tr := s.radius[topic]
+	target := tr.nextTarget(tr.radiusLookupCnt >= searchForceQuery)
 	if target.radiusLookup {
-		s.radiusLookupCnt++
+		tr.radiusLookupCnt++
 	} else {
-		s.radiusLookupCnt = 0
+		tr.radiusLookupCnt = 0
 	}
 	return target
 }
@@ -662,9 +643,9 @@ func (s *ticketStore) gotTopicNodes(from *Node, hash common.Hash, nodes []rpcNod
 		if ip.IsUnspecified() || ip.IsLoopback() {
 			ip = from.IP
 		}
-		enode := NewNode(node.ID, ip, node.UDP-1, node.TCP-1).String() // subtract one from port while discv5 is running in test mode on UDPport+1
+		n := NewNode(node.ID, ip, node.UDP-1, node.TCP-1) // subtract one from port while discv5 is running in test mode on UDPport+1
 		select {
-		case chn <- enode:
+		case chn <- n:
 		default:
 			return false
 		}
@@ -677,6 +658,8 @@ type topicRadius struct {
 	topicHashPrefix   uint64
 	radius, minRadius uint64
 	buckets           []topicRadiusBucket
+	converged         bool
+	radiusLookupCnt   int
 }
 
 type topicRadiusEvent int
@@ -706,7 +689,7 @@ func (b *topicRadiusBucket) update(now mclock.AbsTime) {
 	b.lastTime = now
 
 	for target, tm := range b.lookupSent {
-		if now-tm > mclock.AbsTime(pingTimeout) {
+		if now-tm > mclock.AbsTime(respTimeout) {
 			b.weights[trNoAdjust] += 1
 			delete(b.lookupSent, target)
 		}
@@ -906,6 +889,7 @@ func (r *topicRadius) recalcRadius() (radius uint64, radiusLookup int) {
 
 	if radiusLookup == -1 {
 		// no more radius lookups needed at the moment, return a radius
+		r.converged = true
 		rad := maxBucket
 		if minRadBucket < rad {
 			rad = minRadBucket