tracker.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. // Copyright 2021 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package diff
  17. import (
  18. "container/list"
  19. "fmt"
  20. "sync"
  21. "time"
  22. "github.com/ethereum/go-ethereum/log"
  23. )
  24. const (
  25. // maxTrackedPackets is a huge number to act as a failsafe on the number of
  26. // pending requests the node will track. It should never be hit unless an
  27. // attacker figures out a way to spin requests.
  28. maxTrackedPackets = 10000
  29. )
  30. // request tracks sent network requests which have not yet received a response.
  31. type request struct {
  32. peer string
  33. version uint // Protocol version
  34. reqCode uint64 // Protocol message code of the request
  35. resCode uint64 // Protocol message code of the expected response
  36. time time.Time // Timestamp when the request was made
  37. expire *list.Element // Expiration marker to untrack it
  38. }
  39. type Tracker struct {
  40. timeout time.Duration // Global timeout after which to drop a tracked packet
  41. pending map[uint64]*request // Currently pending requests
  42. expire *list.List // Linked list tracking the expiration order
  43. wake *time.Timer // Timer tracking the expiration of the next item
  44. lock sync.Mutex // Lock protecting from concurrent updates
  45. }
  46. func NewTracker(timeout time.Duration) *Tracker {
  47. return &Tracker{
  48. timeout: timeout,
  49. pending: make(map[uint64]*request),
  50. expire: list.New(),
  51. }
  52. }
  53. // Track adds a network request to the tracker to wait for a response to arrive
  54. // or until the request it cancelled or times out.
  55. func (t *Tracker) Track(peer string, version uint, reqCode uint64, resCode uint64, id uint64) {
  56. t.lock.Lock()
  57. defer t.lock.Unlock()
  58. // If there's a duplicate request, we've just random-collided (or more probably,
  59. // we have a bug), report it. We could also add a metric, but we're not really
  60. // expecting ourselves to be buggy, so a noisy warning should be enough.
  61. if _, ok := t.pending[id]; ok {
  62. log.Error("Network request id collision", "version", version, "code", reqCode, "id", id)
  63. return
  64. }
  65. // If we have too many pending requests, bail out instead of leaking memory
  66. if pending := len(t.pending); pending >= maxTrackedPackets {
  67. log.Error("Request tracker exceeded allowance", "pending", pending, "peer", peer, "version", version, "code", reqCode)
  68. return
  69. }
  70. // Id doesn't exist yet, start tracking it
  71. t.pending[id] = &request{
  72. peer: peer,
  73. version: version,
  74. reqCode: reqCode,
  75. resCode: resCode,
  76. time: time.Now(),
  77. expire: t.expire.PushBack(id),
  78. }
  79. // If we've just inserted the first item, start the expiration timer
  80. if t.wake == nil {
  81. t.wake = time.AfterFunc(t.timeout, t.clean)
  82. }
  83. }
  84. // clean is called automatically when a preset time passes without a response
  85. // being dleivered for the first network request.
  86. func (t *Tracker) clean() {
  87. t.lock.Lock()
  88. defer t.lock.Unlock()
  89. // Expire anything within a certain threshold (might be no items at all if
  90. // we raced with the delivery)
  91. for t.expire.Len() > 0 {
  92. // Stop iterating if the next pending request is still alive
  93. var (
  94. head = t.expire.Front()
  95. id = head.Value.(uint64)
  96. req = t.pending[id]
  97. )
  98. if time.Since(req.time) < t.timeout+5*time.Millisecond {
  99. break
  100. }
  101. // Nope, dead, drop it
  102. t.expire.Remove(head)
  103. delete(t.pending, id)
  104. }
  105. t.schedule()
  106. }
  107. // schedule starts a timer to trigger on the expiration of the first network
  108. // packet.
  109. func (t *Tracker) schedule() {
  110. if t.expire.Len() == 0 {
  111. t.wake = nil
  112. return
  113. }
  114. t.wake = time.AfterFunc(time.Until(t.pending[t.expire.Front().Value.(uint64)].time.Add(t.timeout)), t.clean)
  115. }
  116. // Fulfil fills a pending request, if any is available.
  117. func (t *Tracker) Fulfil(peer string, version uint, code uint64, id uint64) bool {
  118. t.lock.Lock()
  119. defer t.lock.Unlock()
  120. // If it's a non existing request, track as stale response
  121. req, ok := t.pending[id]
  122. if !ok {
  123. return false
  124. }
  125. // If the response is funky, it might be some active attack
  126. if req.peer != peer || req.version != version || req.resCode != code {
  127. log.Warn("Network response id collision",
  128. "have", fmt.Sprintf("%s:/%d:%d", peer, version, code),
  129. "want", fmt.Sprintf("%s:/%d:%d", peer, req.version, req.resCode),
  130. )
  131. return false
  132. }
  133. // Everything matches, mark the request serviced
  134. t.expire.Remove(req.expire)
  135. delete(t.pending, id)
  136. if req.expire.Prev() == nil {
  137. if t.wake.Stop() {
  138. t.schedule()
  139. }
  140. }
  141. return true
  142. }