freeclient.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "io"
  19. "math"
  20. "net"
  21. "sync"
  22. "time"
  23. "github.com/ethereum/go-ethereum/common/mclock"
  24. "github.com/ethereum/go-ethereum/common/prque"
  25. "github.com/ethereum/go-ethereum/ethdb"
  26. "github.com/ethereum/go-ethereum/log"
  27. "github.com/ethereum/go-ethereum/rlp"
  28. )
  29. // freeClientPool implements a client database that limits the connection time
  30. // of each client and manages accepting/rejecting incoming connections and even
  31. // kicking out some connected clients. The pool calculates recent usage time
  32. // for each known client (a value that increases linearly when the client is
  33. // connected and decreases exponentially when not connected). Clients with lower
  34. // recent usage are preferred, unknown nodes have the highest priority. Already
  35. // connected nodes receive a small bias in their favor in order to avoid accepting
  36. // and instantly kicking out clients.
  37. //
  38. // Note: the pool can use any string for client identification. Using signature
  39. // keys for that purpose would not make sense when being known has a negative
  40. // value for the client. Currently the LES protocol manager uses IP addresses
  41. // (without port address) to identify clients.
  42. type freeClientPool struct {
  43. db ethdb.Database
  44. lock sync.Mutex
  45. clock mclock.Clock
  46. closed bool
  47. removePeer func(string)
  48. connectedLimit, totalLimit int
  49. freeClientCap uint64
  50. connectedCap uint64
  51. addressMap map[string]*freeClientPoolEntry
  52. connPool, disconnPool *prque.Prque
  53. startupTime mclock.AbsTime
  54. logOffsetAtStartup int64
  55. }
  56. const (
  57. recentUsageExpTC = time.Hour // time constant of the exponential weighting window for "recent" server usage
  58. fixedPointMultiplier = 0x1000000 // constant to convert logarithms to fixed point format
  59. connectedBias = time.Minute // this bias is applied in favor of already connected clients in order to avoid kicking them out very soon
  60. )
  61. // newFreeClientPool creates a new free client pool
  62. func newFreeClientPool(db ethdb.Database, freeClientCap uint64, totalLimit int, clock mclock.Clock, removePeer func(string)) *freeClientPool {
  63. pool := &freeClientPool{
  64. db: db,
  65. clock: clock,
  66. addressMap: make(map[string]*freeClientPoolEntry),
  67. connPool: prque.New(poolSetIndex),
  68. disconnPool: prque.New(poolSetIndex),
  69. freeClientCap: freeClientCap,
  70. totalLimit: totalLimit,
  71. removePeer: removePeer,
  72. }
  73. pool.loadFromDb()
  74. return pool
  75. }
  76. func (f *freeClientPool) stop() {
  77. f.lock.Lock()
  78. f.closed = true
  79. f.saveToDb()
  80. f.lock.Unlock()
  81. }
  82. // freeClientId returns a string identifier for the peer. Multiple peers with the
  83. // same identifier can not be in the free client pool simultaneously.
  84. func freeClientId(p *peer) string {
  85. if addr, ok := p.RemoteAddr().(*net.TCPAddr); ok {
  86. if addr.IP.IsLoopback() {
  87. // using peer id instead of loopback ip address allows multiple free
  88. // connections from local machine to own server
  89. return p.id
  90. } else {
  91. return addr.IP.String()
  92. }
  93. }
  94. return ""
  95. }
  96. // registerPeer implements clientPool
  97. func (f *freeClientPool) registerPeer(p *peer) {
  98. if freeId := freeClientId(p); freeId != "" {
  99. if !f.connect(freeId, p.id) {
  100. f.removePeer(p.id)
  101. }
  102. }
  103. }
  104. // connect should be called after a successful handshake. If the connection was
  105. // rejected, there is no need to call disconnect.
  106. func (f *freeClientPool) connect(address, id string) bool {
  107. f.lock.Lock()
  108. defer f.lock.Unlock()
  109. if f.closed {
  110. return false
  111. }
  112. if f.connectedLimit == 0 {
  113. log.Debug("Client rejected", "address", address)
  114. return false
  115. }
  116. e := f.addressMap[address]
  117. now := f.clock.Now()
  118. var recentUsage int64
  119. if e == nil {
  120. e = &freeClientPoolEntry{address: address, index: -1, id: id}
  121. f.addressMap[address] = e
  122. } else {
  123. if e.connected {
  124. log.Debug("Client already connected", "address", address)
  125. return false
  126. }
  127. recentUsage = int64(math.Exp(float64(e.logUsage-f.logOffset(now)) / fixedPointMultiplier))
  128. }
  129. e.linUsage = recentUsage - int64(now)
  130. // check whether (linUsage+connectedBias) is smaller than the highest entry in the connected pool
  131. if f.connPool.Size() == f.connectedLimit {
  132. i := f.connPool.PopItem().(*freeClientPoolEntry)
  133. if e.linUsage+int64(connectedBias)-i.linUsage < 0 {
  134. // kick it out and accept the new client
  135. f.dropClient(i, now)
  136. clientKickedMeter.Mark(1)
  137. f.connectedCap -= f.freeClientCap
  138. } else {
  139. // keep the old client and reject the new one
  140. f.connPool.Push(i, i.linUsage)
  141. log.Debug("Client rejected", "address", address)
  142. clientRejectedMeter.Mark(1)
  143. return false
  144. }
  145. }
  146. f.disconnPool.Remove(e.index)
  147. e.connected = true
  148. e.id = id
  149. f.connPool.Push(e, e.linUsage)
  150. if f.connPool.Size()+f.disconnPool.Size() > f.totalLimit {
  151. f.disconnPool.Pop()
  152. }
  153. f.connectedCap += f.freeClientCap
  154. totalConnectedGauge.Update(int64(f.connectedCap))
  155. clientConnectedMeter.Mark(1)
  156. log.Debug("Client accepted", "address", address)
  157. return true
  158. }
  159. // unregisterPeer implements clientPool
  160. func (f *freeClientPool) unregisterPeer(p *peer) {
  161. if freeId := freeClientId(p); freeId != "" {
  162. f.disconnect(freeId)
  163. }
  164. }
  165. // disconnect should be called when a connection is terminated. If the disconnection
  166. // was initiated by the pool itself using disconnectFn then calling disconnect is
  167. // not necessary but permitted.
  168. func (f *freeClientPool) disconnect(address string) {
  169. f.lock.Lock()
  170. defer f.lock.Unlock()
  171. if f.closed {
  172. return
  173. }
  174. // Short circuit if the peer hasn't been registered.
  175. e := f.addressMap[address]
  176. if e == nil {
  177. return
  178. }
  179. now := f.clock.Now()
  180. if !e.connected {
  181. log.Debug("Client already disconnected", "address", address)
  182. return
  183. }
  184. f.connPool.Remove(e.index)
  185. f.calcLogUsage(e, now)
  186. e.connected = false
  187. f.disconnPool.Push(e, -e.logUsage)
  188. f.connectedCap -= f.freeClientCap
  189. totalConnectedGauge.Update(int64(f.connectedCap))
  190. log.Debug("Client disconnected", "address", address)
  191. }
  192. // setConnLimit sets the maximum number of free client slots and also drops
  193. // some peers if necessary
  194. func (f *freeClientPool) setLimits(count int, totalCap uint64) {
  195. f.lock.Lock()
  196. defer f.lock.Unlock()
  197. f.connectedLimit = int(totalCap / f.freeClientCap)
  198. if count < f.connectedLimit {
  199. f.connectedLimit = count
  200. }
  201. now := mclock.Now()
  202. for f.connPool.Size() > f.connectedLimit {
  203. i := f.connPool.PopItem().(*freeClientPoolEntry)
  204. f.dropClient(i, now)
  205. f.connectedCap -= f.freeClientCap
  206. }
  207. totalConnectedGauge.Update(int64(f.connectedCap))
  208. }
  209. // dropClient disconnects a client and also moves it from the connected to the
  210. // disconnected pool
  211. func (f *freeClientPool) dropClient(i *freeClientPoolEntry, now mclock.AbsTime) {
  212. f.connPool.Remove(i.index)
  213. f.calcLogUsage(i, now)
  214. i.connected = false
  215. f.disconnPool.Push(i, -i.logUsage)
  216. log.Debug("Client kicked out", "address", i.address)
  217. f.removePeer(i.id)
  218. }
  219. // logOffset calculates the time-dependent offset for the logarithmic
  220. // representation of recent usage
  221. func (f *freeClientPool) logOffset(now mclock.AbsTime) int64 {
  222. // Note: fixedPointMultiplier acts as a multiplier here; the reason for dividing the divisor
  223. // is to avoid int64 overflow. We assume that int64(recentUsageExpTC) >> fixedPointMultiplier.
  224. logDecay := int64((time.Duration(now - f.startupTime)) / (recentUsageExpTC / fixedPointMultiplier))
  225. return f.logOffsetAtStartup + logDecay
  226. }
  227. // calcLogUsage converts recent usage from linear to logarithmic representation
  228. // when disconnecting a peer or closing the client pool
  229. func (f *freeClientPool) calcLogUsage(e *freeClientPoolEntry, now mclock.AbsTime) {
  230. dt := e.linUsage + int64(now)
  231. if dt < 1 {
  232. dt = 1
  233. }
  234. e.logUsage = int64(math.Log(float64(dt))*fixedPointMultiplier) + f.logOffset(now)
  235. }
  236. // freeClientPoolStorage is the RLP representation of the pool's database storage
  237. type freeClientPoolStorage struct {
  238. LogOffset uint64
  239. List []*freeClientPoolEntry
  240. }
  241. // loadFromDb restores pool status from the database storage
  242. // (automatically called at initialization)
  243. func (f *freeClientPool) loadFromDb() {
  244. enc, err := f.db.Get([]byte("freeClientPool"))
  245. if err != nil {
  246. return
  247. }
  248. var storage freeClientPoolStorage
  249. err = rlp.DecodeBytes(enc, &storage)
  250. if err != nil {
  251. log.Error("Failed to decode client list", "err", err)
  252. return
  253. }
  254. f.logOffsetAtStartup = int64(storage.LogOffset)
  255. f.startupTime = f.clock.Now()
  256. for _, e := range storage.List {
  257. log.Debug("Loaded free client record", "address", e.address, "logUsage", e.logUsage)
  258. f.addressMap[e.address] = e
  259. f.disconnPool.Push(e, -e.logUsage)
  260. }
  261. }
  262. // saveToDb saves pool status to the database storage
  263. // (automatically called during shutdown)
  264. func (f *freeClientPool) saveToDb() {
  265. now := f.clock.Now()
  266. storage := freeClientPoolStorage{
  267. LogOffset: uint64(f.logOffset(now)),
  268. List: make([]*freeClientPoolEntry, len(f.addressMap)),
  269. }
  270. i := 0
  271. for _, e := range f.addressMap {
  272. if e.connected {
  273. f.calcLogUsage(e, now)
  274. }
  275. storage.List[i] = e
  276. i++
  277. }
  278. enc, err := rlp.EncodeToBytes(storage)
  279. if err != nil {
  280. log.Error("Failed to encode client list", "err", err)
  281. } else {
  282. f.db.Put([]byte("freeClientPool"), enc)
  283. }
  284. }
  285. // freeClientPoolEntry represents a client address known by the pool.
  286. // When connected, recent usage is calculated as linUsage + int64(clock.Now())
  287. // When disconnected, it is calculated as exp(logUsage - logOffset) where logOffset
  288. // also grows linearly with time while the server is running.
  289. // Conversion between linear and logarithmic representation happens when connecting
  290. // or disconnecting the node.
  291. //
  292. // Note: linUsage and logUsage are values used with constantly growing offsets so
  293. // even though they are close to each other at any time they may wrap around int64
  294. // limits over time. Comparison should be performed accordingly.
  295. type freeClientPoolEntry struct {
  296. address, id string
  297. connected bool
  298. disconnectFn func()
  299. linUsage, logUsage int64
  300. index int
  301. }
  302. func (e *freeClientPoolEntry) EncodeRLP(w io.Writer) error {
  303. return rlp.Encode(w, []interface{}{e.address, uint64(e.logUsage)})
  304. }
  305. func (e *freeClientPoolEntry) DecodeRLP(s *rlp.Stream) error {
  306. var entry struct {
  307. Address string
  308. LogUsage uint64
  309. }
  310. if err := s.Decode(&entry); err != nil {
  311. return err
  312. }
  313. e.address = entry.Address
  314. e.logUsage = int64(entry.LogUsage)
  315. e.connected = false
  316. e.index = -1
  317. return nil
  318. }
  319. // poolSetIndex callback is used by both priority queues to set/update the index of
  320. // the element in the queue. Index is needed to remove elements other than the top one.
  321. func poolSetIndex(a interface{}, i int) {
  322. a.(*freeClientPoolEntry).index = i
  323. }