ticket.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package discv5
  17. import (
  18. "bytes"
  19. "encoding/binary"
  20. "fmt"
  21. "math"
  22. "math/rand"
  23. "sort"
  24. "time"
  25. "github.com/ethereum/go-ethereum/common"
  26. "github.com/ethereum/go-ethereum/common/mclock"
  27. "github.com/ethereum/go-ethereum/crypto"
  28. )
  29. const (
  30. ticketTimeBucketLen = time.Minute
  31. timeWindow = 10 // * ticketTimeBucketLen
  32. wantTicketsInWindow = 10
  33. collectFrequency = time.Second * 30
  34. registerFrequency = time.Second * 60
  35. maxCollectDebt = 10
  36. maxRegisterDebt = 5
  37. keepTicketConst = time.Minute * 10
  38. keepTicketExp = time.Minute * 5
  39. targetWaitTime = time.Minute * 10
  40. topicQueryTimeout = time.Second * 5
  41. topicQueryResend = time.Minute
  42. // topic radius detection
  43. maxRadius = 0xffffffffffffffff
  44. radiusTC = time.Minute * 20
  45. radiusBucketsPerBit = 8
  46. minSlope = 1
  47. minPeakSize = 40
  48. maxNoAdjust = 20
  49. lookupWidth = 8
  50. minRightSum = 20
  51. searchForceQuery = 4
  52. )
  53. // timeBucket represents absolute monotonic time in minutes.
  54. // It is used as the index into the per-topic ticket buckets.
  55. type timeBucket int
  56. type ticket struct {
  57. topics []Topic
  58. regTime []mclock.AbsTime // Per-topic local absolute time when the ticket can be used.
  59. // The serial number that was issued by the server.
  60. serial uint32
  61. // Used by registrar, tracks absolute time when the ticket was created.
  62. issueTime mclock.AbsTime
  63. // Fields used only by registrants
  64. node *Node // the registrar node that signed this ticket
  65. refCnt int // tracks number of topics that will be registered using this ticket
  66. pong []byte // encoded pong packet signed by the registrar
  67. }
  68. // ticketRef refers to a single topic in a ticket.
  69. type ticketRef struct {
  70. t *ticket
  71. idx int // index of the topic in t.topics and t.regTime
  72. }
  73. func (ref ticketRef) topic() Topic {
  74. return ref.t.topics[ref.idx]
  75. }
  76. func (ref ticketRef) topicRegTime() mclock.AbsTime {
  77. return ref.t.regTime[ref.idx]
  78. }
  79. func pongToTicket(localTime mclock.AbsTime, topics []Topic, node *Node, p *ingressPacket) (*ticket, error) {
  80. wps := p.data.(*pong).WaitPeriods
  81. if len(topics) != len(wps) {
  82. return nil, fmt.Errorf("bad wait period list: got %d values, want %d", len(topics), len(wps))
  83. }
  84. if rlpHash(topics) != p.data.(*pong).TopicHash {
  85. return nil, fmt.Errorf("bad topic hash")
  86. }
  87. t := &ticket{
  88. issueTime: localTime,
  89. node: node,
  90. topics: topics,
  91. pong: p.rawData,
  92. regTime: make([]mclock.AbsTime, len(wps)),
  93. }
  94. // Convert wait periods to local absolute time.
  95. for i, wp := range wps {
  96. t.regTime[i] = localTime + mclock.AbsTime(time.Second*time.Duration(wp))
  97. }
  98. return t, nil
  99. }
  100. func ticketToPong(t *ticket, pong *pong) {
  101. pong.Expiration = uint64(t.issueTime / mclock.AbsTime(time.Second))
  102. pong.TopicHash = rlpHash(t.topics)
  103. pong.TicketSerial = t.serial
  104. pong.WaitPeriods = make([]uint32, len(t.regTime))
  105. for i, regTime := range t.regTime {
  106. pong.WaitPeriods[i] = uint32(time.Duration(regTime-t.issueTime) / time.Second)
  107. }
  108. }
  109. type ticketStore struct {
  110. // radius detector and target address generator
  111. // exists for both searched and registered topics
  112. radius map[Topic]*topicRadius
  113. // Contains buckets (for each absolute minute) of tickets
  114. // that can be used in that minute.
  115. // This is only set if the topic is being registered.
  116. tickets map[Topic]topicTickets
  117. regtopics []Topic
  118. nodes map[*Node]*ticket
  119. nodeLastReq map[*Node]reqInfo
  120. lastBucketFetched timeBucket
  121. nextTicketCached *ticketRef
  122. nextTicketReg mclock.AbsTime
  123. searchTopicMap map[Topic]searchTopic
  124. searchTopicList []Topic
  125. searchTopicPtr int
  126. nextTopicQueryCleanup mclock.AbsTime
  127. queriesSent map[*Node]map[common.Hash]sentQuery
  128. radiusLookupCnt int
  129. }
  130. type searchTopic struct {
  131. foundChn chan<- string
  132. listIdx int
  133. }
  134. type sentQuery struct {
  135. sent mclock.AbsTime
  136. lookup lookupInfo
  137. }
  138. type topicTickets struct {
  139. buckets map[timeBucket][]ticketRef
  140. nextLookup, nextReg mclock.AbsTime
  141. }
  142. func newTicketStore() *ticketStore {
  143. return &ticketStore{
  144. radius: make(map[Topic]*topicRadius),
  145. tickets: make(map[Topic]topicTickets),
  146. nodes: make(map[*Node]*ticket),
  147. nodeLastReq: make(map[*Node]reqInfo),
  148. searchTopicMap: make(map[Topic]searchTopic),
  149. queriesSent: make(map[*Node]map[common.Hash]sentQuery),
  150. }
  151. }
  152. // addTopic starts tracking a topic. If register is true,
  153. // the local node will register the topic and tickets will be collected.
  154. func (s *ticketStore) addTopic(t Topic, register bool) {
  155. debugLog(fmt.Sprintf(" addTopic(%v, %v)", t, register))
  156. if s.radius[t] == nil {
  157. s.radius[t] = newTopicRadius(t)
  158. }
  159. if register && s.tickets[t].buckets == nil {
  160. s.tickets[t] = topicTickets{buckets: make(map[timeBucket][]ticketRef)}
  161. }
  162. }
  163. func (s *ticketStore) addSearchTopic(t Topic, foundChn chan<- string) {
  164. s.addTopic(t, false)
  165. if s.searchTopicMap[t].foundChn == nil {
  166. s.searchTopicList = append(s.searchTopicList, t)
  167. s.searchTopicMap[t] = searchTopic{foundChn: foundChn, listIdx: len(s.searchTopicList) - 1}
  168. }
  169. }
  170. func (s *ticketStore) removeSearchTopic(t Topic) {
  171. if st := s.searchTopicMap[t]; st.foundChn != nil {
  172. lastIdx := len(s.searchTopicList) - 1
  173. lastTopic := s.searchTopicList[lastIdx]
  174. s.searchTopicList[st.listIdx] = lastTopic
  175. sl := s.searchTopicMap[lastTopic]
  176. sl.listIdx = st.listIdx
  177. s.searchTopicMap[lastTopic] = sl
  178. s.searchTopicList = s.searchTopicList[:lastIdx]
  179. delete(s.searchTopicMap, t)
  180. }
  181. }
  182. // removeRegisterTopic deletes all tickets for the given topic.
  183. func (s *ticketStore) removeRegisterTopic(topic Topic) {
  184. debugLog(fmt.Sprintf(" removeRegisterTopic(%v)", topic))
  185. for _, list := range s.tickets[topic].buckets {
  186. for _, ref := range list {
  187. ref.t.refCnt--
  188. if ref.t.refCnt == 0 {
  189. delete(s.nodes, ref.t.node)
  190. delete(s.nodeLastReq, ref.t.node)
  191. }
  192. }
  193. }
  194. delete(s.tickets, topic)
  195. }
  196. func (s *ticketStore) regTopicSet() []Topic {
  197. topics := make([]Topic, 0, len(s.tickets))
  198. for topic := range s.tickets {
  199. topics = append(topics, topic)
  200. }
  201. return topics
  202. }
  203. // nextRegisterLookup returns the target of the next lookup for ticket collection.
  204. func (s *ticketStore) nextRegisterLookup() (lookup lookupInfo, delay time.Duration) {
  205. debugLog("nextRegisterLookup()")
  206. firstTopic, ok := s.iterRegTopics()
  207. for topic := firstTopic; ok; {
  208. debugLog(fmt.Sprintf(" checking topic %v, len(s.tickets[topic]) = %d", topic, len(s.tickets[topic].buckets)))
  209. if s.tickets[topic].buckets != nil && s.needMoreTickets(topic) {
  210. next := s.radius[topic].nextTarget(false)
  211. debugLog(fmt.Sprintf(" %x 1s", next.target[:8]))
  212. return next, 100 * time.Millisecond
  213. }
  214. topic, ok = s.iterRegTopics()
  215. if topic == firstTopic {
  216. break // We have checked all topics.
  217. }
  218. }
  219. debugLog(" null, 40s")
  220. return lookupInfo{}, 40 * time.Second
  221. }
  222. func (s *ticketStore) nextSearchLookup() lookupInfo {
  223. if len(s.searchTopicList) == 0 {
  224. return lookupInfo{}
  225. }
  226. if s.searchTopicPtr >= len(s.searchTopicList) {
  227. s.searchTopicPtr = 0
  228. }
  229. topic := s.searchTopicList[s.searchTopicPtr]
  230. s.searchTopicPtr++
  231. target := s.radius[topic].nextTarget(s.radiusLookupCnt >= searchForceQuery)
  232. if target.radiusLookup {
  233. s.radiusLookupCnt++
  234. } else {
  235. s.radiusLookupCnt = 0
  236. }
  237. return target
  238. }
  239. // iterRegTopics returns topics to register in arbitrary order.
  240. // The second return value is false if there are no topics.
  241. func (s *ticketStore) iterRegTopics() (Topic, bool) {
  242. debugLog("iterRegTopics()")
  243. if len(s.regtopics) == 0 {
  244. if len(s.tickets) == 0 {
  245. debugLog(" false")
  246. return "", false
  247. }
  248. // Refill register list.
  249. for t := range s.tickets {
  250. s.regtopics = append(s.regtopics, t)
  251. }
  252. }
  253. topic := s.regtopics[len(s.regtopics)-1]
  254. s.regtopics = s.regtopics[:len(s.regtopics)-1]
  255. debugLog(" " + string(topic) + " true")
  256. return topic, true
  257. }
  258. func (s *ticketStore) needMoreTickets(t Topic) bool {
  259. return s.tickets[t].nextLookup < mclock.Now()
  260. }
  261. // ticketsInWindow returns the tickets of a given topic in the registration window.
  262. func (s *ticketStore) ticketsInWindow(t Topic) []ticketRef {
  263. ltBucket := s.lastBucketFetched
  264. var res []ticketRef
  265. tickets := s.tickets[t].buckets
  266. for g := ltBucket; g < ltBucket+timeWindow; g++ {
  267. res = append(res, tickets[g]...)
  268. }
  269. debugLog(fmt.Sprintf("ticketsInWindow(%v) = %v", t, len(res)))
  270. return res
  271. }
  272. func (s *ticketStore) removeExcessTickets(t Topic) {
  273. tickets := s.ticketsInWindow(t)
  274. if len(tickets) <= wantTicketsInWindow {
  275. return
  276. }
  277. sort.Sort(ticketRefByWaitTime(tickets))
  278. for _, r := range tickets[wantTicketsInWindow:] {
  279. s.removeTicketRef(r)
  280. }
  281. }
  282. type ticketRefByWaitTime []ticketRef
  283. // Len is the number of elements in the collection.
  284. func (s ticketRefByWaitTime) Len() int {
  285. return len(s)
  286. }
  287. func (r ticketRef) waitTime() mclock.AbsTime {
  288. return r.t.regTime[r.idx] - r.t.issueTime
  289. }
  290. // Less reports whether the element with
  291. // index i should sort before the element with index j.
  292. func (s ticketRefByWaitTime) Less(i, j int) bool {
  293. return s[i].waitTime() < s[j].waitTime()
  294. }
  295. // Swap swaps the elements with indexes i and j.
  296. func (s ticketRefByWaitTime) Swap(i, j int) {
  297. s[i], s[j] = s[j], s[i]
  298. }
  299. func (s *ticketStore) addTicketRef(r ticketRef) {
  300. topic := r.t.topics[r.idx]
  301. t := s.tickets[topic]
  302. if t.buckets == nil {
  303. return
  304. }
  305. bucket := timeBucket(r.t.regTime[r.idx] / mclock.AbsTime(ticketTimeBucketLen))
  306. t.buckets[bucket] = append(t.buckets[bucket], r)
  307. r.t.refCnt++
  308. min := mclock.Now() - mclock.AbsTime(collectFrequency)*maxCollectDebt
  309. if t.nextLookup < min {
  310. t.nextLookup = min
  311. }
  312. t.nextLookup += mclock.AbsTime(collectFrequency)
  313. s.tickets[topic] = t
  314. //s.removeExcessTickets(topic)
  315. }
  316. func (s *ticketStore) nextFilteredTicket() (t *ticketRef, wait time.Duration) {
  317. now := mclock.Now()
  318. for {
  319. t, wait = s.nextRegisterableTicket()
  320. if t == nil {
  321. return
  322. }
  323. regTime := now + mclock.AbsTime(wait)
  324. topic := t.t.topics[t.idx]
  325. if regTime >= s.tickets[topic].nextReg {
  326. return
  327. }
  328. s.removeTicketRef(*t)
  329. }
  330. }
  331. func (s *ticketStore) ticketRegistered(t ticketRef) {
  332. now := mclock.Now()
  333. topic := t.t.topics[t.idx]
  334. tt := s.tickets[topic]
  335. min := now - mclock.AbsTime(registerFrequency)*maxRegisterDebt
  336. if min > tt.nextReg {
  337. tt.nextReg = min
  338. }
  339. tt.nextReg += mclock.AbsTime(registerFrequency)
  340. s.tickets[topic] = tt
  341. s.removeTicketRef(t)
  342. }
  343. // nextRegisterableTicket returns the next ticket that can be used
  344. // to register.
  345. //
  346. // If the returned wait time <= zero the ticket can be used. For a positive
  347. // wait time, the caller should requery the next ticket later.
  348. //
  349. // A ticket can be returned more than once with <= zero wait time in case
  350. // the ticket contains multiple topics.
  351. func (s *ticketStore) nextRegisterableTicket() (t *ticketRef, wait time.Duration) {
  352. defer func() {
  353. if t == nil {
  354. debugLog(" nil")
  355. } else {
  356. debugLog(fmt.Sprintf(" node = %x sn = %v wait = %v", t.t.node.ID[:8], t.t.serial, wait))
  357. }
  358. }()
  359. debugLog("nextRegisterableTicket()")
  360. now := mclock.Now()
  361. if s.nextTicketCached != nil {
  362. return s.nextTicketCached, time.Duration(s.nextTicketCached.topicRegTime() - now)
  363. }
  364. for bucket := s.lastBucketFetched; ; bucket++ {
  365. var (
  366. empty = true // true if there are no tickets
  367. nextTicket ticketRef // uninitialized if this bucket is empty
  368. )
  369. for _, tickets := range s.tickets {
  370. //s.removeExcessTickets(topic)
  371. if len(tickets.buckets) != 0 {
  372. empty = false
  373. if list := tickets.buckets[bucket]; list != nil {
  374. for _, ref := range list {
  375. //debugLog(fmt.Sprintf(" nrt bucket = %d node = %x sn = %v wait = %v", bucket, ref.t.node.ID[:8], ref.t.serial, time.Duration(ref.topicRegTime()-now)))
  376. if nextTicket.t == nil || ref.topicRegTime() < nextTicket.topicRegTime() {
  377. nextTicket = ref
  378. }
  379. }
  380. }
  381. }
  382. }
  383. if empty {
  384. return nil, 0
  385. }
  386. if nextTicket.t != nil {
  387. wait = time.Duration(nextTicket.topicRegTime() - now)
  388. s.nextTicketCached = &nextTicket
  389. return &nextTicket, wait
  390. }
  391. s.lastBucketFetched = bucket
  392. }
  393. }
  394. // removeTicket removes a ticket from the ticket store
  395. func (s *ticketStore) removeTicketRef(ref ticketRef) {
  396. debugLog(fmt.Sprintf("removeTicketRef(node = %x sn = %v)", ref.t.node.ID[:8], ref.t.serial))
  397. topic := ref.topic()
  398. tickets := s.tickets[topic].buckets
  399. if tickets == nil {
  400. return
  401. }
  402. bucket := timeBucket(ref.t.regTime[ref.idx] / mclock.AbsTime(ticketTimeBucketLen))
  403. list := tickets[bucket]
  404. idx := -1
  405. for i, bt := range list {
  406. if bt.t == ref.t {
  407. idx = i
  408. break
  409. }
  410. }
  411. if idx == -1 {
  412. panic(nil)
  413. }
  414. list = append(list[:idx], list[idx+1:]...)
  415. if len(list) != 0 {
  416. tickets[bucket] = list
  417. } else {
  418. delete(tickets, bucket)
  419. }
  420. ref.t.refCnt--
  421. if ref.t.refCnt == 0 {
  422. delete(s.nodes, ref.t.node)
  423. delete(s.nodeLastReq, ref.t.node)
  424. }
  425. // Make nextRegisterableTicket return the next available ticket.
  426. s.nextTicketCached = nil
  427. }
  428. type lookupInfo struct {
  429. target common.Hash
  430. topic Topic
  431. radiusLookup bool
  432. }
  433. type reqInfo struct {
  434. pingHash []byte
  435. lookup lookupInfo
  436. time mclock.AbsTime
  437. }
  438. // returns -1 if not found
  439. func (t *ticket) findIdx(topic Topic) int {
  440. for i, tt := range t.topics {
  441. if tt == topic {
  442. return i
  443. }
  444. }
  445. return -1
  446. }
  447. func (s *ticketStore) registerLookupDone(lookup lookupInfo, nodes []*Node, ping func(n *Node) []byte) {
  448. now := mclock.Now()
  449. for i, n := range nodes {
  450. if i == 0 || (binary.BigEndian.Uint64(n.sha[:8])^binary.BigEndian.Uint64(lookup.target[:8])) < s.radius[lookup.topic].minRadius {
  451. if lookup.radiusLookup {
  452. if lastReq, ok := s.nodeLastReq[n]; !ok || time.Duration(now-lastReq.time) > radiusTC {
  453. s.nodeLastReq[n] = reqInfo{pingHash: ping(n), lookup: lookup, time: now}
  454. }
  455. } else {
  456. if s.nodes[n] == nil {
  457. s.nodeLastReq[n] = reqInfo{pingHash: ping(n), lookup: lookup, time: now}
  458. }
  459. }
  460. }
  461. }
  462. }
  463. func (s *ticketStore) searchLookupDone(lookup lookupInfo, nodes []*Node, ping func(n *Node) []byte, query func(n *Node, topic Topic) []byte) {
  464. now := mclock.Now()
  465. for i, n := range nodes {
  466. if i == 0 || (binary.BigEndian.Uint64(n.sha[:8])^binary.BigEndian.Uint64(lookup.target[:8])) < s.radius[lookup.topic].minRadius {
  467. if lookup.radiusLookup {
  468. if lastReq, ok := s.nodeLastReq[n]; !ok || time.Duration(now-lastReq.time) > radiusTC {
  469. s.nodeLastReq[n] = reqInfo{pingHash: ping(n), lookup: lookup, time: now}
  470. }
  471. } // else {
  472. if s.canQueryTopic(n, lookup.topic) {
  473. hash := query(n, lookup.topic)
  474. if hash != nil {
  475. s.addTopicQuery(common.BytesToHash(hash), n, lookup)
  476. }
  477. }
  478. //}
  479. }
  480. }
  481. }
  482. func (s *ticketStore) adjustWithTicket(now mclock.AbsTime, targetHash common.Hash, t *ticket) {
  483. for i, topic := range t.topics {
  484. if tt, ok := s.radius[topic]; ok {
  485. tt.adjustWithTicket(now, targetHash, ticketRef{t, i})
  486. }
  487. }
  488. }
  489. func (s *ticketStore) addTicket(localTime mclock.AbsTime, pingHash []byte, t *ticket) {
  490. debugLog(fmt.Sprintf("add(node = %x sn = %v)", t.node.ID[:8], t.serial))
  491. lastReq, ok := s.nodeLastReq[t.node]
  492. if !(ok && bytes.Equal(pingHash, lastReq.pingHash)) {
  493. return
  494. }
  495. s.adjustWithTicket(localTime, lastReq.lookup.target, t)
  496. if lastReq.lookup.radiusLookup || s.nodes[t.node] != nil {
  497. return
  498. }
  499. topic := lastReq.lookup.topic
  500. topicIdx := t.findIdx(topic)
  501. if topicIdx == -1 {
  502. return
  503. }
  504. bucket := timeBucket(localTime / mclock.AbsTime(ticketTimeBucketLen))
  505. if s.lastBucketFetched == 0 || bucket < s.lastBucketFetched {
  506. s.lastBucketFetched = bucket
  507. }
  508. if _, ok := s.tickets[topic]; ok {
  509. wait := t.regTime[topicIdx] - localTime
  510. rnd := rand.ExpFloat64()
  511. if rnd > 10 {
  512. rnd = 10
  513. }
  514. if float64(wait) < float64(keepTicketConst)+float64(keepTicketExp)*rnd {
  515. // use the ticket to register this topic
  516. //fmt.Println("addTicket", t.node.ID[:8], t.node.addr().String(), t.serial, t.pong)
  517. s.addTicketRef(ticketRef{t, topicIdx})
  518. }
  519. }
  520. if t.refCnt > 0 {
  521. s.nextTicketCached = nil
  522. s.nodes[t.node] = t
  523. }
  524. }
  525. func (s *ticketStore) getNodeTicket(node *Node) *ticket {
  526. if s.nodes[node] == nil {
  527. debugLog(fmt.Sprintf("getNodeTicket(%x) sn = nil", node.ID[:8]))
  528. } else {
  529. debugLog(fmt.Sprintf("getNodeTicket(%x) sn = %v", node.ID[:8], s.nodes[node].serial))
  530. }
  531. return s.nodes[node]
  532. }
  533. func (s *ticketStore) canQueryTopic(node *Node, topic Topic) bool {
  534. qq := s.queriesSent[node]
  535. if qq != nil {
  536. now := mclock.Now()
  537. for _, sq := range qq {
  538. if sq.lookup.topic == topic && sq.sent > now-mclock.AbsTime(topicQueryResend) {
  539. return false
  540. }
  541. }
  542. }
  543. return true
  544. }
  545. func (s *ticketStore) addTopicQuery(hash common.Hash, node *Node, lookup lookupInfo) {
  546. now := mclock.Now()
  547. qq := s.queriesSent[node]
  548. if qq == nil {
  549. qq = make(map[common.Hash]sentQuery)
  550. s.queriesSent[node] = qq
  551. }
  552. qq[hash] = sentQuery{sent: now, lookup: lookup}
  553. s.cleanupTopicQueries(now)
  554. }
  555. func (s *ticketStore) cleanupTopicQueries(now mclock.AbsTime) {
  556. if s.nextTopicQueryCleanup > now {
  557. return
  558. }
  559. exp := now - mclock.AbsTime(topicQueryResend)
  560. for n, qq := range s.queriesSent {
  561. for h, q := range qq {
  562. if q.sent < exp {
  563. delete(qq, h)
  564. }
  565. }
  566. if len(qq) == 0 {
  567. delete(s.queriesSent, n)
  568. }
  569. }
  570. s.nextTopicQueryCleanup = now + mclock.AbsTime(topicQueryTimeout)
  571. }
  572. func (s *ticketStore) gotTopicNodes(from *Node, hash common.Hash, nodes []rpcNode) (timeout bool) {
  573. now := mclock.Now()
  574. //fmt.Println("got", from.addr().String(), hash, len(nodes))
  575. qq := s.queriesSent[from]
  576. if qq == nil {
  577. return true
  578. }
  579. q, ok := qq[hash]
  580. if !ok || now > q.sent+mclock.AbsTime(topicQueryTimeout) {
  581. return true
  582. }
  583. inside := float64(0)
  584. if len(nodes) > 0 {
  585. inside = 1
  586. }
  587. s.radius[q.lookup.topic].adjust(now, q.lookup.target, from.sha, inside)
  588. chn := s.searchTopicMap[q.lookup.topic].foundChn
  589. if chn == nil {
  590. //fmt.Println("no channel")
  591. return false
  592. }
  593. for _, node := range nodes {
  594. ip := node.IP
  595. if ip.IsUnspecified() || ip.IsLoopback() {
  596. ip = from.IP
  597. }
  598. enode := NewNode(node.ID, ip, node.UDP-1, node.TCP-1).String() // subtract one from port while discv5 is running in test mode on UDPport+1
  599. select {
  600. case chn <- enode:
  601. default:
  602. return false
  603. }
  604. }
  605. return false
  606. }
  607. type topicRadius struct {
  608. topic Topic
  609. topicHashPrefix uint64
  610. radius, minRadius uint64
  611. buckets []topicRadiusBucket
  612. }
  613. type topicRadiusEvent int
  614. const (
  615. trOutside topicRadiusEvent = iota
  616. trInside
  617. trNoAdjust
  618. trCount
  619. )
  620. type topicRadiusBucket struct {
  621. weights [trCount]float64
  622. lastTime mclock.AbsTime
  623. value float64
  624. lookupSent map[common.Hash]mclock.AbsTime
  625. }
  626. func (b *topicRadiusBucket) update(now mclock.AbsTime) {
  627. if now == b.lastTime {
  628. return
  629. }
  630. exp := math.Exp(-float64(now-b.lastTime) / float64(radiusTC))
  631. for i, w := range b.weights {
  632. b.weights[i] = w * exp
  633. }
  634. b.lastTime = now
  635. for target, tm := range b.lookupSent {
  636. if now-tm > mclock.AbsTime(pingTimeout) {
  637. b.weights[trNoAdjust] += 1
  638. delete(b.lookupSent, target)
  639. }
  640. }
  641. }
  642. func (b *topicRadiusBucket) adjust(now mclock.AbsTime, inside float64) {
  643. b.update(now)
  644. if inside <= 0 {
  645. b.weights[trOutside] += 1
  646. } else {
  647. if inside >= 1 {
  648. b.weights[trInside] += 1
  649. } else {
  650. b.weights[trInside] += inside
  651. b.weights[trOutside] += 1 - inside
  652. }
  653. }
  654. }
  655. func newTopicRadius(t Topic) *topicRadius {
  656. topicHash := crypto.Keccak256Hash([]byte(t))
  657. topicHashPrefix := binary.BigEndian.Uint64(topicHash[0:8])
  658. return &topicRadius{
  659. topic: t,
  660. topicHashPrefix: topicHashPrefix,
  661. radius: maxRadius,
  662. minRadius: maxRadius,
  663. }
  664. }
  665. func (r *topicRadius) getBucketIdx(addrHash common.Hash) int {
  666. prefix := binary.BigEndian.Uint64(addrHash[0:8])
  667. var log2 float64
  668. if prefix != r.topicHashPrefix {
  669. log2 = math.Log2(float64(prefix ^ r.topicHashPrefix))
  670. }
  671. bucket := int((64 - log2) * radiusBucketsPerBit)
  672. max := 64*radiusBucketsPerBit - 1
  673. if bucket > max {
  674. return max
  675. }
  676. if bucket < 0 {
  677. return 0
  678. }
  679. return bucket
  680. }
  681. func (r *topicRadius) targetForBucket(bucket int) common.Hash {
  682. min := math.Pow(2, 64-float64(bucket+1)/radiusBucketsPerBit)
  683. max := math.Pow(2, 64-float64(bucket)/radiusBucketsPerBit)
  684. a := uint64(min)
  685. b := randUint64n(uint64(max - min))
  686. xor := a + b
  687. if xor < a {
  688. xor = ^uint64(0)
  689. }
  690. prefix := r.topicHashPrefix ^ xor
  691. var target common.Hash
  692. binary.BigEndian.PutUint64(target[0:8], prefix)
  693. globalRandRead(target[8:])
  694. return target
  695. }
  696. // package rand provides a Read function in Go 1.6 and later, but
  697. // we can't use it yet because we still support Go 1.5.
  698. func globalRandRead(b []byte) {
  699. pos := 0
  700. val := 0
  701. for n := 0; n < len(b); n++ {
  702. if pos == 0 {
  703. val = rand.Int()
  704. pos = 7
  705. }
  706. b[n] = byte(val)
  707. val >>= 8
  708. pos--
  709. }
  710. }
  711. func (r *topicRadius) isInRadius(addrHash common.Hash) bool {
  712. nodePrefix := binary.BigEndian.Uint64(addrHash[0:8])
  713. dist := nodePrefix ^ r.topicHashPrefix
  714. return dist < r.radius
  715. }
  716. func (r *topicRadius) chooseLookupBucket(a, b int) int {
  717. if a < 0 {
  718. a = 0
  719. }
  720. if a > b {
  721. return -1
  722. }
  723. c := 0
  724. for i := a; i <= b; i++ {
  725. if i >= len(r.buckets) || r.buckets[i].weights[trNoAdjust] < maxNoAdjust {
  726. c++
  727. }
  728. }
  729. if c == 0 {
  730. return -1
  731. }
  732. rnd := randUint(uint32(c))
  733. for i := a; i <= b; i++ {
  734. if i >= len(r.buckets) || r.buckets[i].weights[trNoAdjust] < maxNoAdjust {
  735. if rnd == 0 {
  736. return i
  737. }
  738. rnd--
  739. }
  740. }
  741. panic(nil) // should never happen
  742. }
  743. func (r *topicRadius) needMoreLookups(a, b int, maxValue float64) bool {
  744. var max float64
  745. if a < 0 {
  746. a = 0
  747. }
  748. if b >= len(r.buckets) {
  749. b = len(r.buckets) - 1
  750. if r.buckets[b].value > max {
  751. max = r.buckets[b].value
  752. }
  753. }
  754. if b >= a {
  755. for i := a; i <= b; i++ {
  756. if r.buckets[i].value > max {
  757. max = r.buckets[i].value
  758. }
  759. }
  760. }
  761. return maxValue-max < minPeakSize
  762. }
  763. func (r *topicRadius) recalcRadius() (radius uint64, radiusLookup int) {
  764. maxBucket := 0
  765. maxValue := float64(0)
  766. now := mclock.Now()
  767. v := float64(0)
  768. for i, _ := range r.buckets {
  769. r.buckets[i].update(now)
  770. v += r.buckets[i].weights[trOutside] - r.buckets[i].weights[trInside]
  771. r.buckets[i].value = v
  772. //fmt.Printf("%v %v | ", v, r.buckets[i].weights[trNoAdjust])
  773. }
  774. //fmt.Println()
  775. slopeCross := -1
  776. for i, b := range r.buckets {
  777. v := b.value
  778. if v < float64(i)*minSlope {
  779. slopeCross = i
  780. break
  781. }
  782. if v > maxValue {
  783. maxValue = v
  784. maxBucket = i + 1
  785. }
  786. }
  787. minRadBucket := len(r.buckets)
  788. sum := float64(0)
  789. for minRadBucket > 0 && sum < minRightSum {
  790. minRadBucket--
  791. b := r.buckets[minRadBucket]
  792. sum += b.weights[trInside] + b.weights[trOutside]
  793. }
  794. r.minRadius = uint64(math.Pow(2, 64-float64(minRadBucket)/radiusBucketsPerBit))
  795. lookupLeft := -1
  796. if r.needMoreLookups(0, maxBucket-lookupWidth-1, maxValue) {
  797. lookupLeft = r.chooseLookupBucket(maxBucket-lookupWidth, maxBucket-1)
  798. }
  799. lookupRight := -1
  800. if slopeCross != maxBucket && (minRadBucket <= maxBucket || r.needMoreLookups(maxBucket+lookupWidth, len(r.buckets)-1, maxValue)) {
  801. for len(r.buckets) <= maxBucket+lookupWidth {
  802. r.buckets = append(r.buckets, topicRadiusBucket{lookupSent: make(map[common.Hash]mclock.AbsTime)})
  803. }
  804. lookupRight = r.chooseLookupBucket(maxBucket, maxBucket+lookupWidth-1)
  805. }
  806. if lookupLeft == -1 {
  807. radiusLookup = lookupRight
  808. } else {
  809. if lookupRight == -1 {
  810. radiusLookup = lookupLeft
  811. } else {
  812. if randUint(2) == 0 {
  813. radiusLookup = lookupLeft
  814. } else {
  815. radiusLookup = lookupRight
  816. }
  817. }
  818. }
  819. //fmt.Println("mb", maxBucket, "sc", slopeCross, "mrb", minRadBucket, "ll", lookupLeft, "lr", lookupRight, "mv", maxValue)
  820. if radiusLookup == -1 {
  821. // no more radius lookups needed at the moment, return a radius
  822. rad := maxBucket
  823. if minRadBucket < rad {
  824. rad = minRadBucket
  825. }
  826. radius = ^uint64(0)
  827. if rad > 0 {
  828. radius = uint64(math.Pow(2, 64-float64(rad)/radiusBucketsPerBit))
  829. }
  830. r.radius = radius
  831. }
  832. return
  833. }
  834. func (r *topicRadius) nextTarget(forceRegular bool) lookupInfo {
  835. if !forceRegular {
  836. _, radiusLookup := r.recalcRadius()
  837. if radiusLookup != -1 {
  838. target := r.targetForBucket(radiusLookup)
  839. r.buckets[radiusLookup].lookupSent[target] = mclock.Now()
  840. return lookupInfo{target: target, topic: r.topic, radiusLookup: true}
  841. }
  842. }
  843. radExt := r.radius / 2
  844. if radExt > maxRadius-r.radius {
  845. radExt = maxRadius - r.radius
  846. }
  847. rnd := randUint64n(r.radius) + randUint64n(2*radExt)
  848. if rnd > radExt {
  849. rnd -= radExt
  850. } else {
  851. rnd = radExt - rnd
  852. }
  853. prefix := r.topicHashPrefix ^ rnd
  854. var target common.Hash
  855. binary.BigEndian.PutUint64(target[0:8], prefix)
  856. globalRandRead(target[8:])
  857. return lookupInfo{target: target, topic: r.topic, radiusLookup: false}
  858. }
  859. func (r *topicRadius) adjustWithTicket(now mclock.AbsTime, targetHash common.Hash, t ticketRef) {
  860. wait := t.t.regTime[t.idx] - t.t.issueTime
  861. inside := float64(wait)/float64(targetWaitTime) - 0.5
  862. if inside > 1 {
  863. inside = 1
  864. }
  865. if inside < 0 {
  866. inside = 0
  867. }
  868. r.adjust(now, targetHash, t.t.node.sha, inside)
  869. }
  870. func (r *topicRadius) adjust(now mclock.AbsTime, targetHash, addrHash common.Hash, inside float64) {
  871. bucket := r.getBucketIdx(addrHash)
  872. //fmt.Println("adjust", bucket, len(r.buckets), inside)
  873. if bucket >= len(r.buckets) {
  874. return
  875. }
  876. r.buckets[bucket].adjust(now, inside)
  877. delete(r.buckets[bucket].lookupSent, targetHash)
  878. }