topic.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package discv5
  17. import (
  18. "container/heap"
  19. "fmt"
  20. "math"
  21. "math/rand"
  22. "time"
  23. "github.com/ethereum/go-ethereum/common/mclock"
  24. )
  25. const (
  26. maxEntries = 10000
  27. maxEntriesPerTopic = 50
  28. fallbackRegistrationExpiry = 1 * time.Hour
  29. )
  30. type Topic string
  31. type topicEntry struct {
  32. topic Topic
  33. fifoIdx uint64
  34. node *Node
  35. expire mclock.AbsTime
  36. }
  37. type topicInfo struct {
  38. entries map[uint64]*topicEntry
  39. fifoHead, fifoTail uint64
  40. rqItem *topicRequestQueueItem
  41. wcl waitControlLoop
  42. }
  43. // removes tail element from the fifo
  44. func (t *topicInfo) getFifoTail() *topicEntry {
  45. for t.entries[t.fifoTail] == nil {
  46. t.fifoTail++
  47. }
  48. tail := t.entries[t.fifoTail]
  49. t.fifoTail++
  50. return tail
  51. }
  52. type nodeInfo struct {
  53. entries map[Topic]*topicEntry
  54. lastIssuedTicket, lastUsedTicket uint32
  55. // you can't register a ticket newer than lastUsedTicket before noRegUntil (absolute time)
  56. noRegUntil mclock.AbsTime
  57. }
  58. type topicTable struct {
  59. db *nodeDB
  60. self *Node
  61. nodes map[*Node]*nodeInfo
  62. topics map[Topic]*topicInfo
  63. globalEntries uint64
  64. requested topicRequestQueue
  65. requestCnt uint64
  66. lastGarbageCollection mclock.AbsTime
  67. }
  68. func newTopicTable(db *nodeDB, self *Node) *topicTable {
  69. if printTestImgLogs {
  70. fmt.Printf("*N %016x\n", self.sha[:8])
  71. }
  72. return &topicTable{
  73. db: db,
  74. nodes: make(map[*Node]*nodeInfo),
  75. topics: make(map[Topic]*topicInfo),
  76. self: self,
  77. }
  78. }
  79. func (t *topicTable) getOrNewTopic(topic Topic) *topicInfo {
  80. ti := t.topics[topic]
  81. if ti == nil {
  82. rqItem := &topicRequestQueueItem{
  83. topic: topic,
  84. priority: t.requestCnt,
  85. }
  86. ti = &topicInfo{
  87. entries: make(map[uint64]*topicEntry),
  88. rqItem: rqItem,
  89. }
  90. t.topics[topic] = ti
  91. heap.Push(&t.requested, rqItem)
  92. }
  93. return ti
  94. }
  95. func (t *topicTable) checkDeleteTopic(topic Topic) {
  96. ti := t.topics[topic]
  97. if ti == nil {
  98. return
  99. }
  100. if len(ti.entries) == 0 && ti.wcl.hasMinimumWaitPeriod() {
  101. delete(t.topics, topic)
  102. heap.Remove(&t.requested, ti.rqItem.index)
  103. }
  104. }
  105. func (t *topicTable) getOrNewNode(node *Node) *nodeInfo {
  106. n := t.nodes[node]
  107. if n == nil {
  108. //fmt.Printf("newNode %016x %016x\n", t.self.sha[:8], node.sha[:8])
  109. var issued, used uint32
  110. if t.db != nil {
  111. issued, used = t.db.fetchTopicRegTickets(node.ID)
  112. }
  113. n = &nodeInfo{
  114. entries: make(map[Topic]*topicEntry),
  115. lastIssuedTicket: issued,
  116. lastUsedTicket: used,
  117. }
  118. t.nodes[node] = n
  119. }
  120. return n
  121. }
  122. func (t *topicTable) checkDeleteNode(node *Node) {
  123. if n, ok := t.nodes[node]; ok && len(n.entries) == 0 && n.noRegUntil < mclock.Now() {
  124. //fmt.Printf("deleteNode %016x %016x\n", t.self.sha[:8], node.sha[:8])
  125. delete(t.nodes, node)
  126. }
  127. }
  128. func (t *topicTable) storeTicketCounters(node *Node) {
  129. n := t.getOrNewNode(node)
  130. if t.db != nil {
  131. t.db.updateTopicRegTickets(node.ID, n.lastIssuedTicket, n.lastUsedTicket)
  132. }
  133. }
  134. func (t *topicTable) getEntries(topic Topic) []*Node {
  135. t.collectGarbage()
  136. te := t.topics[topic]
  137. if te == nil {
  138. return nil
  139. }
  140. nodes := make([]*Node, len(te.entries))
  141. i := 0
  142. for _, e := range te.entries {
  143. nodes[i] = e.node
  144. i++
  145. }
  146. t.requestCnt++
  147. t.requested.update(te.rqItem, t.requestCnt)
  148. return nodes
  149. }
  150. func (t *topicTable) addEntry(node *Node, topic Topic) {
  151. n := t.getOrNewNode(node)
  152. // clear previous entries by the same node
  153. for _, e := range n.entries {
  154. t.deleteEntry(e)
  155. }
  156. // ***
  157. n = t.getOrNewNode(node)
  158. tm := mclock.Now()
  159. te := t.getOrNewTopic(topic)
  160. if len(te.entries) == maxEntriesPerTopic {
  161. t.deleteEntry(te.getFifoTail())
  162. }
  163. if t.globalEntries == maxEntries {
  164. t.deleteEntry(t.leastRequested()) // not empty, no need to check for nil
  165. }
  166. fifoIdx := te.fifoHead
  167. te.fifoHead++
  168. entry := &topicEntry{
  169. topic: topic,
  170. fifoIdx: fifoIdx,
  171. node: node,
  172. expire: tm + mclock.AbsTime(fallbackRegistrationExpiry),
  173. }
  174. if printTestImgLogs {
  175. fmt.Printf("*+ %d %v %016x %016x\n", tm/1000000, topic, t.self.sha[:8], node.sha[:8])
  176. }
  177. te.entries[fifoIdx] = entry
  178. n.entries[topic] = entry
  179. t.globalEntries++
  180. te.wcl.registered(tm)
  181. }
  182. // removes least requested element from the fifo
  183. func (t *topicTable) leastRequested() *topicEntry {
  184. for t.requested.Len() > 0 && t.topics[t.requested[0].topic] == nil {
  185. heap.Pop(&t.requested)
  186. }
  187. if t.requested.Len() == 0 {
  188. return nil
  189. }
  190. return t.topics[t.requested[0].topic].getFifoTail()
  191. }
  192. // entry should exist
  193. func (t *topicTable) deleteEntry(e *topicEntry) {
  194. if printTestImgLogs {
  195. fmt.Printf("*- %d %v %016x %016x\n", mclock.Now()/1000000, e.topic, t.self.sha[:8], e.node.sha[:8])
  196. }
  197. ne := t.nodes[e.node].entries
  198. delete(ne, e.topic)
  199. if len(ne) == 0 {
  200. t.checkDeleteNode(e.node)
  201. }
  202. te := t.topics[e.topic]
  203. delete(te.entries, e.fifoIdx)
  204. if len(te.entries) == 0 {
  205. t.checkDeleteTopic(e.topic)
  206. }
  207. t.globalEntries--
  208. }
  209. // It is assumed that topics and waitPeriods have the same length.
  210. func (t *topicTable) useTicket(node *Node, serialNo uint32, topics []Topic, idx int, issueTime uint64, waitPeriods []uint32) (registered bool) {
  211. debugLog(fmt.Sprintf("useTicket %v %v %v", serialNo, topics, waitPeriods))
  212. //fmt.Println("useTicket", serialNo, topics, waitPeriods)
  213. t.collectGarbage()
  214. n := t.getOrNewNode(node)
  215. if serialNo < n.lastUsedTicket {
  216. return false
  217. }
  218. tm := mclock.Now()
  219. if serialNo > n.lastUsedTicket && tm < n.noRegUntil {
  220. return false
  221. }
  222. if serialNo != n.lastUsedTicket {
  223. n.lastUsedTicket = serialNo
  224. n.noRegUntil = tm + mclock.AbsTime(noRegTimeout())
  225. t.storeTicketCounters(node)
  226. }
  227. currTime := uint64(tm / mclock.AbsTime(time.Second))
  228. regTime := issueTime + uint64(waitPeriods[idx])
  229. relTime := int64(currTime - regTime)
  230. if relTime >= -1 && relTime <= regTimeWindow+1 { // give clients a little security margin on both ends
  231. if e := n.entries[topics[idx]]; e == nil {
  232. t.addEntry(node, topics[idx])
  233. } else {
  234. // if there is an active entry, don't move to the front of the FIFO but prolong expire time
  235. e.expire = tm + mclock.AbsTime(fallbackRegistrationExpiry)
  236. }
  237. return true
  238. }
  239. return false
  240. }
  241. func (topictab *topicTable) getTicket(node *Node, topics []Topic) *ticket {
  242. topictab.collectGarbage()
  243. now := mclock.Now()
  244. n := topictab.getOrNewNode(node)
  245. n.lastIssuedTicket++
  246. topictab.storeTicketCounters(node)
  247. t := &ticket{
  248. issueTime: now,
  249. topics: topics,
  250. serial: n.lastIssuedTicket,
  251. regTime: make([]mclock.AbsTime, len(topics)),
  252. }
  253. for i, topic := range topics {
  254. var waitPeriod time.Duration
  255. if topic := topictab.topics[topic]; topic != nil {
  256. waitPeriod = topic.wcl.waitPeriod
  257. } else {
  258. waitPeriod = minWaitPeriod
  259. }
  260. t.regTime[i] = now + mclock.AbsTime(waitPeriod)
  261. }
  262. return t
  263. }
  264. const gcInterval = time.Minute
  265. func (t *topicTable) collectGarbage() {
  266. tm := mclock.Now()
  267. if time.Duration(tm-t.lastGarbageCollection) < gcInterval {
  268. return
  269. }
  270. t.lastGarbageCollection = tm
  271. for node, n := range t.nodes {
  272. for _, e := range n.entries {
  273. if e.expire <= tm {
  274. t.deleteEntry(e)
  275. }
  276. }
  277. t.checkDeleteNode(node)
  278. }
  279. for topic, _ := range t.topics {
  280. t.checkDeleteTopic(topic)
  281. }
  282. }
  283. const (
  284. minWaitPeriod = time.Minute
  285. regTimeWindow = 10 // seconds
  286. avgnoRegTimeout = time.Minute * 10
  287. // target average interval between two incoming ad requests
  288. wcTargetRegInterval = time.Minute * 10 / maxEntriesPerTopic
  289. //
  290. wcTimeConst = time.Minute * 10
  291. )
  292. // initialization is not required, will set to minWaitPeriod at first registration
  293. type waitControlLoop struct {
  294. lastIncoming mclock.AbsTime
  295. waitPeriod time.Duration
  296. }
  297. func (w *waitControlLoop) registered(tm mclock.AbsTime) {
  298. w.waitPeriod = w.nextWaitPeriod(tm)
  299. w.lastIncoming = tm
  300. }
  301. func (w *waitControlLoop) nextWaitPeriod(tm mclock.AbsTime) time.Duration {
  302. period := tm - w.lastIncoming
  303. wp := time.Duration(float64(w.waitPeriod) * math.Exp((float64(wcTargetRegInterval)-float64(period))/float64(wcTimeConst)))
  304. if wp < minWaitPeriod {
  305. wp = minWaitPeriod
  306. }
  307. return wp
  308. }
  309. func (w *waitControlLoop) hasMinimumWaitPeriod() bool {
  310. return w.nextWaitPeriod(mclock.Now()) == minWaitPeriod
  311. }
  312. func noRegTimeout() time.Duration {
  313. e := rand.ExpFloat64()
  314. if e > 100 {
  315. e = 100
  316. }
  317. return time.Duration(float64(avgnoRegTimeout) * e)
  318. }
  319. type topicRequestQueueItem struct {
  320. topic Topic
  321. priority uint64
  322. index int
  323. }
  324. // A topicRequestQueue implements heap.Interface and holds topicRequestQueueItems.
  325. type topicRequestQueue []*topicRequestQueueItem
  326. func (tq topicRequestQueue) Len() int { return len(tq) }
  327. func (tq topicRequestQueue) Less(i, j int) bool {
  328. return tq[i].priority < tq[j].priority
  329. }
  330. func (tq topicRequestQueue) Swap(i, j int) {
  331. tq[i], tq[j] = tq[j], tq[i]
  332. tq[i].index = i
  333. tq[j].index = j
  334. }
  335. func (tq *topicRequestQueue) Push(x interface{}) {
  336. n := len(*tq)
  337. item := x.(*topicRequestQueueItem)
  338. item.index = n
  339. *tq = append(*tq, item)
  340. }
  341. func (tq *topicRequestQueue) Pop() interface{} {
  342. old := *tq
  343. n := len(old)
  344. item := old[n-1]
  345. item.index = -1
  346. *tq = old[0 : n-1]
  347. return item
  348. }
  349. func (tq *topicRequestQueue) update(item *topicRequestQueueItem, priority uint64) {
  350. item.priority = priority
  351. heap.Fix(tq, item.index)
  352. }