|
@@ -41,9 +41,10 @@ var (
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
const (
|
|
const (
|
|
|
- autoRefreshInterval = 1 * time.Hour
|
|
|
|
|
- seedCount = 30
|
|
|
|
|
- seedMaxAge = 5 * 24 * time.Hour
|
|
|
|
|
|
|
+ autoRefreshInterval = 1 * time.Hour
|
|
|
|
|
+ bucketRefreshInterval = 1 * time.Minute
|
|
|
|
|
+ seedCount = 30
|
|
|
|
|
+ seedMaxAge = 5 * 24 * time.Hour
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
const testTopic = "foo"
|
|
const testTopic = "foo"
|
|
@@ -62,8 +63,9 @@ func debugLog(s string) {
|
|
|
// BootNodes are the enode URLs of the P2P bootstrap nodes for the experimental RLPx v5 "Topic Discovery" network
|
|
// BootNodes are the enode URLs of the P2P bootstrap nodes for the experimental RLPx v5 "Topic Discovery" network
|
|
|
// warning: local bootnodes for testing!!!
|
|
// warning: local bootnodes for testing!!!
|
|
|
var BootNodes = []*Node{
|
|
var BootNodes = []*Node{
|
|
|
- //MustParseNode("enode://6f974ede10d07334e7e651c1501cb540d087dd3a6dea81432620895c913f281790b49459d72cb8011bfbbfbd24fad956356189c31b7181a96cd44ccfb68bfc71@127.0.0.1:30301"),
|
|
|
|
|
MustParseNode("enode://0cc5f5ffb5d9098c8b8c62325f3797f56509bff942704687b6530992ac706e2cb946b90a34f1f19548cd3c7baccbcaea354531e5983c7d1bc0dee16ce4b6440b@40.118.3.223:30305"),
|
|
MustParseNode("enode://0cc5f5ffb5d9098c8b8c62325f3797f56509bff942704687b6530992ac706e2cb946b90a34f1f19548cd3c7baccbcaea354531e5983c7d1bc0dee16ce4b6440b@40.118.3.223:30305"),
|
|
|
|
|
+ MustParseNode("enode://1c7a64d76c0334b0418c004af2f67c50e36a3be60b5e4790bdac0439d21603469a85fad36f2473c9a80eb043ae60936df905fa28f1ff614c3e5dc34f15dcd2dc@40.118.3.223:30308"),
|
|
|
|
|
+ MustParseNode("enode://85c85d7143ae8bb96924f2b54f1b3e70d8c4d367af305325d30a61385a432f247d2c75c45c6b4a60335060d072d7f5b35dd1d4c45f76941f62a4f83b6e75daaf@40.118.3.223:30309"),
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Network manages the table and all protocol interaction.
|
|
// Network manages the table and all protocol interaction.
|
|
@@ -82,7 +84,6 @@ type Network struct {
|
|
|
tableOpResp chan struct{}
|
|
tableOpResp chan struct{}
|
|
|
topicRegisterReq chan topicRegisterReq
|
|
topicRegisterReq chan topicRegisterReq
|
|
|
topicSearchReq chan topicSearchReq
|
|
topicSearchReq chan topicSearchReq
|
|
|
- bucketFillChn chan chan struct{}
|
|
|
|
|
|
|
|
|
|
// State of the main loop.
|
|
// State of the main loop.
|
|
|
tab *Table
|
|
tab *Table
|
|
@@ -169,7 +170,6 @@ func newNetwork(conn transport, ourPubkey ecdsa.PublicKey, natm nat.Interface, d
|
|
|
queryReq: make(chan *findnodeQuery),
|
|
queryReq: make(chan *findnodeQuery),
|
|
|
topicRegisterReq: make(chan topicRegisterReq),
|
|
topicRegisterReq: make(chan topicRegisterReq),
|
|
|
topicSearchReq: make(chan topicSearchReq),
|
|
topicSearchReq: make(chan topicSearchReq),
|
|
|
- bucketFillChn: make(chan chan struct{}, 1),
|
|
|
|
|
nodes: make(map[NodeID]*Node),
|
|
nodes: make(map[NodeID]*Node),
|
|
|
}
|
|
}
|
|
|
go net.loop()
|
|
go net.loop()
|
|
@@ -353,8 +353,9 @@ func (net *Network) reqTableOp(f func()) (called bool) {
|
|
|
|
|
|
|
|
func (net *Network) loop() {
|
|
func (net *Network) loop() {
|
|
|
var (
|
|
var (
|
|
|
- refreshTimer = time.NewTicker(autoRefreshInterval)
|
|
|
|
|
- refreshDone chan struct{} // closed when the 'refresh' lookup has ended
|
|
|
|
|
|
|
+ refreshTimer = time.NewTicker(autoRefreshInterval)
|
|
|
|
|
+ bucketRefreshTimer = time.NewTimer(bucketRefreshInterval)
|
|
|
|
|
+ refreshDone chan struct{} // closed when the 'refresh' lookup has ended
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
// Tracking the next ticket to register.
|
|
// Tracking the next ticket to register.
|
|
@@ -389,6 +390,7 @@ func (net *Network) loop() {
|
|
|
topicRegisterLookupDone chan []*Node
|
|
topicRegisterLookupDone chan []*Node
|
|
|
topicRegisterLookupTick = time.NewTimer(0)
|
|
topicRegisterLookupTick = time.NewTimer(0)
|
|
|
topicSearchLookupTarget lookupInfo
|
|
topicSearchLookupTarget lookupInfo
|
|
|
|
|
+ searchReqWhenRefreshDone []topicSearchReq
|
|
|
)
|
|
)
|
|
|
topicSearchLookupDone := make(chan []*Node, 1)
|
|
topicSearchLookupDone := make(chan []*Node, 1)
|
|
|
<-topicRegisterLookupTick.C
|
|
<-topicRegisterLookupTick.C
|
|
@@ -406,6 +408,7 @@ loop:
|
|
|
|
|
|
|
|
// Ingress packet handling.
|
|
// Ingress packet handling.
|
|
|
case pkt := <-net.read:
|
|
case pkt := <-net.read:
|
|
|
|
|
+ //fmt.Println("read", pkt.ev)
|
|
|
debugLog("<-net.read")
|
|
debugLog("<-net.read")
|
|
|
n := net.internNode(&pkt)
|
|
n := net.internNode(&pkt)
|
|
|
prestate := n.state
|
|
prestate := n.state
|
|
@@ -503,14 +506,18 @@ loop:
|
|
|
net.conn.sendTopicRegister(nextTicket.t.node, nextTicket.t.topics, nextTicket.idx, nextTicket.t.pong)
|
|
net.conn.sendTopicRegister(nextTicket.t.node, nextTicket.t.topics, nextTicket.idx, nextTicket.t.pong)
|
|
|
|
|
|
|
|
case req := <-net.topicSearchReq:
|
|
case req := <-net.topicSearchReq:
|
|
|
- debugLog("<-net.topicSearchReq")
|
|
|
|
|
- if req.found == nil {
|
|
|
|
|
- net.ticketStore.removeSearchTopic(req.topic)
|
|
|
|
|
- continue
|
|
|
|
|
- }
|
|
|
|
|
- net.ticketStore.addSearchTopic(req.topic, req.found)
|
|
|
|
|
- if (topicSearchLookupTarget.target == common.Hash{}) {
|
|
|
|
|
- topicSearchLookupDone <- nil
|
|
|
|
|
|
|
+ if refreshDone == nil {
|
|
|
|
|
+ debugLog("<-net.topicSearchReq")
|
|
|
|
|
+ if req.found == nil {
|
|
|
|
|
+ net.ticketStore.removeSearchTopic(req.topic)
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ net.ticketStore.addSearchTopic(req.topic, req.found)
|
|
|
|
|
+ if (topicSearchLookupTarget.target == common.Hash{}) {
|
|
|
|
|
+ topicSearchLookupDone <- nil
|
|
|
|
|
+ }
|
|
|
|
|
+ } else {
|
|
|
|
|
+ searchReqWhenRefreshDone = append(searchReqWhenRefreshDone, req)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
case nodes := <-topicSearchLookupDone:
|
|
case nodes := <-topicSearchLookupDone:
|
|
@@ -519,7 +526,14 @@ loop:
|
|
|
net.ping(n, n.addr())
|
|
net.ping(n, n.addr())
|
|
|
return n.pingEcho
|
|
return n.pingEcho
|
|
|
}, func(n *Node, topic Topic) []byte {
|
|
}, func(n *Node, topic Topic) []byte {
|
|
|
- return net.conn.send(n, topicQueryPacket, topicQuery{Topic: topic}) // TODO: set expiration
|
|
|
|
|
|
|
+ if n.state == known {
|
|
|
|
|
+ return net.conn.send(n, topicQueryPacket, topicQuery{Topic: topic}) // TODO: set expiration
|
|
|
|
|
+ } else {
|
|
|
|
|
+ if n.state == unknown {
|
|
|
|
|
+ net.ping(n, n.addr())
|
|
|
|
|
+ }
|
|
|
|
|
+ return nil
|
|
|
|
|
+ }
|
|
|
})
|
|
})
|
|
|
topicSearchLookupTarget = net.ticketStore.nextSearchLookup()
|
|
topicSearchLookupTarget = net.ticketStore.nextSearchLookup()
|
|
|
target := topicSearchLookupTarget.target
|
|
target := topicSearchLookupTarget.target
|
|
@@ -564,9 +578,12 @@ loop:
|
|
|
refreshDone = make(chan struct{})
|
|
refreshDone = make(chan struct{})
|
|
|
net.refresh(refreshDone)
|
|
net.refresh(refreshDone)
|
|
|
}
|
|
}
|
|
|
- case doneChn := <-net.bucketFillChn:
|
|
|
|
|
- debugLog("bucketFill")
|
|
|
|
|
- net.bucketFill(doneChn)
|
|
|
|
|
|
|
+ case <-bucketRefreshTimer.C:
|
|
|
|
|
+ target := net.tab.chooseBucketRefreshTarget()
|
|
|
|
|
+ go func() {
|
|
|
|
|
+ net.lookup(target, false)
|
|
|
|
|
+ bucketRefreshTimer.Reset(bucketRefreshInterval)
|
|
|
|
|
+ }()
|
|
|
case newNursery := <-net.refreshReq:
|
|
case newNursery := <-net.refreshReq:
|
|
|
debugLog("<-net.refreshReq")
|
|
debugLog("<-net.refreshReq")
|
|
|
if newNursery != nil {
|
|
if newNursery != nil {
|
|
@@ -580,6 +597,13 @@ loop:
|
|
|
case <-refreshDone:
|
|
case <-refreshDone:
|
|
|
debugLog("<-net.refreshDone")
|
|
debugLog("<-net.refreshDone")
|
|
|
refreshDone = nil
|
|
refreshDone = nil
|
|
|
|
|
+ list := searchReqWhenRefreshDone
|
|
|
|
|
+ searchReqWhenRefreshDone = nil
|
|
|
|
|
+ go func() {
|
|
|
|
|
+ for _, req := range list {
|
|
|
|
|
+ net.topicSearchReq <- req
|
|
|
|
|
+ }
|
|
|
|
|
+ }()
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
debugLog("loop stopped")
|
|
debugLog("loop stopped")
|
|
@@ -643,28 +667,13 @@ func (net *Network) refresh(done chan<- struct{}) {
|
|
|
}()
|
|
}()
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (net *Network) bucketFill(done chan<- struct{}) {
|
|
|
|
|
- target := net.tab.chooseBucketFillTarget()
|
|
|
|
|
- go func() {
|
|
|
|
|
- net.lookup(target, false)
|
|
|
|
|
- close(done)
|
|
|
|
|
- }()
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-func (net *Network) BucketFill() {
|
|
|
|
|
- done := make(chan struct{})
|
|
|
|
|
- select {
|
|
|
|
|
- case net.bucketFillChn <- done:
|
|
|
|
|
- <-done
|
|
|
|
|
- case <-net.closed:
|
|
|
|
|
- close(done)
|
|
|
|
|
- }
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
// Node Interning.
|
|
// Node Interning.
|
|
|
|
|
|
|
|
func (net *Network) internNode(pkt *ingressPacket) *Node {
|
|
func (net *Network) internNode(pkt *ingressPacket) *Node {
|
|
|
if n := net.nodes[pkt.remoteID]; n != nil {
|
|
if n := net.nodes[pkt.remoteID]; n != nil {
|
|
|
|
|
+ n.IP = pkt.remoteAddr.IP
|
|
|
|
|
+ n.UDP = uint16(pkt.remoteAddr.Port)
|
|
|
|
|
+ n.TCP = uint16(pkt.remoteAddr.Port)
|
|
|
return n
|
|
return n
|
|
|
}
|
|
}
|
|
|
n := NewNode(pkt.remoteID, pkt.remoteAddr.IP, uint16(pkt.remoteAddr.Port), uint16(pkt.remoteAddr.Port))
|
|
n := NewNode(pkt.remoteID, pkt.remoteAddr.IP, uint16(pkt.remoteAddr.Port), uint16(pkt.remoteAddr.Port))
|
|
@@ -967,8 +976,10 @@ func init() {
|
|
|
|
|
|
|
|
// handle processes packets sent by n and events related to n.
|
|
// handle processes packets sent by n and events related to n.
|
|
|
func (net *Network) handle(n *Node, ev nodeEvent, pkt *ingressPacket) error {
|
|
func (net *Network) handle(n *Node, ev nodeEvent, pkt *ingressPacket) error {
|
|
|
|
|
+ //fmt.Println("handle", n.addr().String(), n.state, ev)
|
|
|
if pkt != nil {
|
|
if pkt != nil {
|
|
|
if err := net.checkPacket(n, ev, pkt); err != nil {
|
|
if err := net.checkPacket(n, ev, pkt); err != nil {
|
|
|
|
|
+ //fmt.Println("check err:", err)
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
// Start the background expiration goroutine after the first
|
|
// Start the background expiration goroutine after the first
|
|
@@ -985,6 +996,7 @@ func (net *Network) handle(n *Node, ev nodeEvent, pkt *ingressPacket) error {
|
|
|
}
|
|
}
|
|
|
next, err := n.state.handle(net, n, ev, pkt)
|
|
next, err := n.state.handle(net, n, ev, pkt)
|
|
|
net.transition(n, next)
|
|
net.transition(n, next)
|
|
|
|
|
+ //fmt.Println("new state:", n.state)
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -1040,6 +1052,11 @@ func (net *Network) abortTimedEvent(n *Node, ev nodeEvent) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (net *Network) ping(n *Node, addr *net.UDPAddr) {
|
|
func (net *Network) ping(n *Node, addr *net.UDPAddr) {
|
|
|
|
|
+ //fmt.Println("ping", n.addr().String(), n.ID.String(), n.sha.Hex())
|
|
|
|
|
+ if n.pingEcho != nil || n.ID == net.tab.self.ID {
|
|
|
|
|
+ //fmt.Println(" not sent")
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
debugLog(fmt.Sprintf("ping(node = %x)", n.ID[:8]))
|
|
debugLog(fmt.Sprintf("ping(node = %x)", n.ID[:8]))
|
|
|
n.pingTopics = net.ticketStore.regTopicSet()
|
|
n.pingTopics = net.ticketStore.regTopicSet()
|
|
|
n.pingEcho = net.conn.sendPing(n, addr, n.pingTopics)
|
|
n.pingEcho = net.conn.sendPing(n, addr, n.pingTopics)
|