freeclient.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "io"
  19. "math"
  20. "net"
  21. "sync"
  22. "time"
  23. "github.com/ethereum/go-ethereum/common/mclock"
  24. "github.com/ethereum/go-ethereum/common/prque"
  25. "github.com/ethereum/go-ethereum/ethdb"
  26. "github.com/ethereum/go-ethereum/les/csvlogger"
  27. "github.com/ethereum/go-ethereum/log"
  28. "github.com/ethereum/go-ethereum/rlp"
  29. )
  30. // freeClientPool implements a client database that limits the connection time
  31. // of each client and manages accepting/rejecting incoming connections and even
  32. // kicking out some connected clients. The pool calculates recent usage time
  33. // for each known client (a value that increases linearly when the client is
  34. // connected and decreases exponentially when not connected). Clients with lower
  35. // recent usage are preferred, unknown nodes have the highest priority. Already
  36. // connected nodes receive a small bias in their favor in order to avoid accepting
  37. // and instantly kicking out clients.
  38. //
  39. // Note: the pool can use any string for client identification. Using signature
  40. // keys for that purpose would not make sense when being known has a negative
  41. // value for the client. Currently the LES protocol manager uses IP addresses
  42. // (without port address) to identify clients.
  43. type freeClientPool struct {
  44. db ethdb.Database
  45. lock sync.Mutex
  46. clock mclock.Clock
  47. closed bool
  48. removePeer func(string)
  49. connectedLimit, totalLimit int
  50. freeClientCap uint64
  51. logger *csvlogger.Logger
  52. logTotalFreeConn *csvlogger.Channel
  53. addressMap map[string]*freeClientPoolEntry
  54. connPool, disconnPool *prque.Prque
  55. startupTime mclock.AbsTime
  56. logOffsetAtStartup int64
  57. }
  58. const (
  59. recentUsageExpTC = time.Hour // time constant of the exponential weighting window for "recent" server usage
  60. fixedPointMultiplier = 0x1000000 // constant to convert logarithms to fixed point format
  61. connectedBias = time.Minute // this bias is applied in favor of already connected clients in order to avoid kicking them out very soon
  62. )
  63. // newFreeClientPool creates a new free client pool
  64. func newFreeClientPool(db ethdb.Database, freeClientCap uint64, totalLimit int, clock mclock.Clock, removePeer func(string), metricsLogger, eventLogger *csvlogger.Logger) *freeClientPool {
  65. pool := &freeClientPool{
  66. db: db,
  67. clock: clock,
  68. addressMap: make(map[string]*freeClientPoolEntry),
  69. connPool: prque.New(poolSetIndex),
  70. disconnPool: prque.New(poolSetIndex),
  71. freeClientCap: freeClientCap,
  72. totalLimit: totalLimit,
  73. logger: eventLogger,
  74. logTotalFreeConn: metricsLogger.NewChannel("totalFreeConn", 0),
  75. removePeer: removePeer,
  76. }
  77. pool.loadFromDb()
  78. return pool
  79. }
  80. func (f *freeClientPool) stop() {
  81. f.lock.Lock()
  82. f.closed = true
  83. f.saveToDb()
  84. f.lock.Unlock()
  85. }
  86. // freeClientId returns a string identifier for the peer. Multiple peers with the
  87. // same identifier can not be in the free client pool simultaneously.
  88. func freeClientId(p *peer) string {
  89. if addr, ok := p.RemoteAddr().(*net.TCPAddr); ok {
  90. if addr.IP.IsLoopback() {
  91. // using peer id instead of loopback ip address allows multiple free
  92. // connections from local machine to own server
  93. return p.id
  94. } else {
  95. return addr.IP.String()
  96. }
  97. }
  98. return ""
  99. }
  100. // registerPeer implements clientPool
  101. func (f *freeClientPool) registerPeer(p *peer) {
  102. if freeId := freeClientId(p); freeId != "" {
  103. if !f.connect(freeId, p.id) {
  104. f.removePeer(p.id)
  105. }
  106. }
  107. }
  108. // connect should be called after a successful handshake. If the connection was
  109. // rejected, there is no need to call disconnect.
  110. func (f *freeClientPool) connect(address, id string) bool {
  111. f.lock.Lock()
  112. defer f.lock.Unlock()
  113. if f.closed {
  114. return false
  115. }
  116. f.logger.Event("freeClientPool: connecting from " + address + ", " + id)
  117. if f.connectedLimit == 0 {
  118. f.logger.Event("freeClientPool: rejected, " + id)
  119. log.Debug("Client rejected", "address", address)
  120. return false
  121. }
  122. e := f.addressMap[address]
  123. now := f.clock.Now()
  124. var recentUsage int64
  125. if e == nil {
  126. e = &freeClientPoolEntry{address: address, index: -1, id: id}
  127. f.addressMap[address] = e
  128. } else {
  129. if e.connected {
  130. f.logger.Event("freeClientPool: already connected, " + id)
  131. log.Debug("Client already connected", "address", address)
  132. return false
  133. }
  134. recentUsage = int64(math.Exp(float64(e.logUsage-f.logOffset(now)) / fixedPointMultiplier))
  135. }
  136. e.linUsage = recentUsage - int64(now)
  137. // check whether (linUsage+connectedBias) is smaller than the highest entry in the connected pool
  138. if f.connPool.Size() == f.connectedLimit {
  139. i := f.connPool.PopItem().(*freeClientPoolEntry)
  140. if e.linUsage+int64(connectedBias)-i.linUsage < 0 {
  141. // kick it out and accept the new client
  142. f.dropClient(i, now)
  143. f.logger.Event("freeClientPool: kicked out, " + i.id)
  144. } else {
  145. // keep the old client and reject the new one
  146. f.connPool.Push(i, i.linUsage)
  147. f.logger.Event("freeClientPool: rejected, " + id)
  148. log.Debug("Client rejected", "address", address)
  149. return false
  150. }
  151. }
  152. f.disconnPool.Remove(e.index)
  153. e.connected = true
  154. e.id = id
  155. f.connPool.Push(e, e.linUsage)
  156. f.logTotalFreeConn.Update(float64(uint64(f.connPool.Size()) * f.freeClientCap))
  157. if f.connPool.Size()+f.disconnPool.Size() > f.totalLimit {
  158. f.disconnPool.Pop()
  159. }
  160. f.logger.Event("freeClientPool: accepted, " + id)
  161. log.Debug("Client accepted", "address", address)
  162. return true
  163. }
  164. // unregisterPeer implements clientPool
  165. func (f *freeClientPool) unregisterPeer(p *peer) {
  166. if freeId := freeClientId(p); freeId != "" {
  167. f.disconnect(freeId)
  168. }
  169. }
  170. // disconnect should be called when a connection is terminated. If the disconnection
  171. // was initiated by the pool itself using disconnectFn then calling disconnect is
  172. // not necessary but permitted.
  173. func (f *freeClientPool) disconnect(address string) {
  174. f.lock.Lock()
  175. defer f.lock.Unlock()
  176. if f.closed {
  177. return
  178. }
  179. e := f.addressMap[address]
  180. now := f.clock.Now()
  181. if !e.connected {
  182. log.Debug("Client already disconnected", "address", address)
  183. return
  184. }
  185. f.connPool.Remove(e.index)
  186. f.logTotalFreeConn.Update(float64(uint64(f.connPool.Size()) * f.freeClientCap))
  187. f.calcLogUsage(e, now)
  188. e.connected = false
  189. f.disconnPool.Push(e, -e.logUsage)
  190. f.logger.Event("freeClientPool: disconnected, " + e.id)
  191. log.Debug("Client disconnected", "address", address)
  192. }
  193. // setConnLimit sets the maximum number of free client slots and also drops
  194. // some peers if necessary
  195. func (f *freeClientPool) setLimits(count int, totalCap uint64) {
  196. f.lock.Lock()
  197. defer f.lock.Unlock()
  198. f.connectedLimit = int(totalCap / f.freeClientCap)
  199. if count < f.connectedLimit {
  200. f.connectedLimit = count
  201. }
  202. now := mclock.Now()
  203. for f.connPool.Size() > f.connectedLimit {
  204. i := f.connPool.PopItem().(*freeClientPoolEntry)
  205. f.dropClient(i, now)
  206. f.logger.Event("freeClientPool: setLimits kicked out, " + i.id)
  207. }
  208. }
  209. // dropClient disconnects a client and also moves it from the connected to the
  210. // disconnected pool
  211. func (f *freeClientPool) dropClient(i *freeClientPoolEntry, now mclock.AbsTime) {
  212. f.connPool.Remove(i.index)
  213. f.logTotalFreeConn.Update(float64(uint64(f.connPool.Size()) * f.freeClientCap))
  214. f.calcLogUsage(i, now)
  215. i.connected = false
  216. f.disconnPool.Push(i, -i.logUsage)
  217. log.Debug("Client kicked out", "address", i.address)
  218. f.removePeer(i.id)
  219. }
  220. // logOffset calculates the time-dependent offset for the logarithmic
  221. // representation of recent usage
  222. func (f *freeClientPool) logOffset(now mclock.AbsTime) int64 {
  223. // Note: fixedPointMultiplier acts as a multiplier here; the reason for dividing the divisor
  224. // is to avoid int64 overflow. We assume that int64(recentUsageExpTC) >> fixedPointMultiplier.
  225. logDecay := int64((time.Duration(now - f.startupTime)) / (recentUsageExpTC / fixedPointMultiplier))
  226. return f.logOffsetAtStartup + logDecay
  227. }
  228. // calcLogUsage converts recent usage from linear to logarithmic representation
  229. // when disconnecting a peer or closing the client pool
  230. func (f *freeClientPool) calcLogUsage(e *freeClientPoolEntry, now mclock.AbsTime) {
  231. dt := e.linUsage + int64(now)
  232. if dt < 1 {
  233. dt = 1
  234. }
  235. e.logUsage = int64(math.Log(float64(dt))*fixedPointMultiplier) + f.logOffset(now)
  236. }
  237. // freeClientPoolStorage is the RLP representation of the pool's database storage
  238. type freeClientPoolStorage struct {
  239. LogOffset uint64
  240. List []*freeClientPoolEntry
  241. }
  242. // loadFromDb restores pool status from the database storage
  243. // (automatically called at initialization)
  244. func (f *freeClientPool) loadFromDb() {
  245. enc, err := f.db.Get([]byte("freeClientPool"))
  246. if err != nil {
  247. return
  248. }
  249. var storage freeClientPoolStorage
  250. err = rlp.DecodeBytes(enc, &storage)
  251. if err != nil {
  252. log.Error("Failed to decode client list", "err", err)
  253. return
  254. }
  255. f.logOffsetAtStartup = int64(storage.LogOffset)
  256. f.startupTime = f.clock.Now()
  257. for _, e := range storage.List {
  258. log.Debug("Loaded free client record", "address", e.address, "logUsage", e.logUsage)
  259. f.addressMap[e.address] = e
  260. f.disconnPool.Push(e, -e.logUsage)
  261. }
  262. }
  263. // saveToDb saves pool status to the database storage
  264. // (automatically called during shutdown)
  265. func (f *freeClientPool) saveToDb() {
  266. now := f.clock.Now()
  267. storage := freeClientPoolStorage{
  268. LogOffset: uint64(f.logOffset(now)),
  269. List: make([]*freeClientPoolEntry, len(f.addressMap)),
  270. }
  271. i := 0
  272. for _, e := range f.addressMap {
  273. if e.connected {
  274. f.calcLogUsage(e, now)
  275. }
  276. storage.List[i] = e
  277. i++
  278. }
  279. enc, err := rlp.EncodeToBytes(storage)
  280. if err != nil {
  281. log.Error("Failed to encode client list", "err", err)
  282. } else {
  283. f.db.Put([]byte("freeClientPool"), enc)
  284. }
  285. }
  286. // freeClientPoolEntry represents a client address known by the pool.
  287. // When connected, recent usage is calculated as linUsage + int64(clock.Now())
  288. // When disconnected, it is calculated as exp(logUsage - logOffset) where logOffset
  289. // also grows linearly with time while the server is running.
  290. // Conversion between linear and logarithmic representation happens when connecting
  291. // or disconnecting the node.
  292. //
  293. // Note: linUsage and logUsage are values used with constantly growing offsets so
  294. // even though they are close to each other at any time they may wrap around int64
  295. // limits over time. Comparison should be performed accordingly.
  296. type freeClientPoolEntry struct {
  297. address, id string
  298. connected bool
  299. disconnectFn func()
  300. linUsage, logUsage int64
  301. index int
  302. }
  303. func (e *freeClientPoolEntry) EncodeRLP(w io.Writer) error {
  304. return rlp.Encode(w, []interface{}{e.address, uint64(e.logUsage)})
  305. }
  306. func (e *freeClientPoolEntry) DecodeRLP(s *rlp.Stream) error {
  307. var entry struct {
  308. Address string
  309. LogUsage uint64
  310. }
  311. if err := s.Decode(&entry); err != nil {
  312. return err
  313. }
  314. e.address = entry.Address
  315. e.logUsage = int64(entry.LogUsage)
  316. e.connected = false
  317. e.index = -1
  318. return nil
  319. }
  320. // poolSetIndex callback is used by both priority queues to set/update the index of
  321. // the element in the queue. Index is needed to remove elements other than the top one.
  322. func poolSetIndex(a interface{}, i int) {
  323. a.(*freeClientPoolEntry).index = i
  324. }