api.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "context"
  19. "errors"
  20. "fmt"
  21. "sync"
  22. "time"
  23. "github.com/ethereum/go-ethereum/common/hexutil"
  24. "github.com/ethereum/go-ethereum/common/mclock"
  25. "github.com/ethereum/go-ethereum/les/csvlogger"
  26. "github.com/ethereum/go-ethereum/p2p/enode"
  27. "github.com/ethereum/go-ethereum/rpc"
  28. )
  29. var (
  30. ErrMinCap = errors.New("capacity too small")
  31. ErrTotalCap = errors.New("total capacity exceeded")
  32. ErrUnknownBenchmarkType = errors.New("unknown benchmark type")
  33. dropCapacityDelay = time.Second // delay applied to decreasing capacity changes
  34. )
  35. // PrivateLightServerAPI provides an API to access the LES light server.
  36. // It offers only methods that operate on public data that is freely available to anyone.
  37. type PrivateLightServerAPI struct {
  38. server *LesServer
  39. }
  40. // NewPrivateLightServerAPI creates a new LES light server API.
  41. func NewPrivateLightServerAPI(server *LesServer) *PrivateLightServerAPI {
  42. return &PrivateLightServerAPI{
  43. server: server,
  44. }
  45. }
  46. // TotalCapacity queries total available capacity for all clients
  47. func (api *PrivateLightServerAPI) TotalCapacity() hexutil.Uint64 {
  48. return hexutil.Uint64(api.server.priorityClientPool.totalCapacity())
  49. }
  50. // SubscribeTotalCapacity subscribes to changed total capacity events.
  51. // If onlyUnderrun is true then notification is sent only if the total capacity
  52. // drops under the total capacity of connected priority clients.
  53. //
  54. // Note: actually applying decreasing total capacity values is delayed while the
  55. // notification is sent instantly. This allows lowering the capacity of a priority client
  56. // or choosing which one to drop before the system drops some of them automatically.
  57. func (api *PrivateLightServerAPI) SubscribeTotalCapacity(ctx context.Context, onlyUnderrun bool) (*rpc.Subscription, error) {
  58. notifier, supported := rpc.NotifierFromContext(ctx)
  59. if !supported {
  60. return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
  61. }
  62. rpcSub := notifier.CreateSubscription()
  63. api.server.priorityClientPool.subscribeTotalCapacity(&tcSubscription{notifier, rpcSub, onlyUnderrun})
  64. return rpcSub, nil
  65. }
  66. type (
  67. // tcSubscription represents a total capacity subscription
  68. tcSubscription struct {
  69. notifier *rpc.Notifier
  70. rpcSub *rpc.Subscription
  71. onlyUnderrun bool
  72. }
  73. tcSubs map[*tcSubscription]struct{}
  74. )
  75. // send sends a changed total capacity event to the subscribers
  76. func (s tcSubs) send(tc uint64, underrun bool) {
  77. for sub := range s {
  78. select {
  79. case <-sub.rpcSub.Err():
  80. delete(s, sub)
  81. case <-sub.notifier.Closed():
  82. delete(s, sub)
  83. default:
  84. if underrun || !sub.onlyUnderrun {
  85. sub.notifier.Notify(sub.rpcSub.ID, tc)
  86. }
  87. }
  88. }
  89. }
  90. // MinimumCapacity queries minimum assignable capacity for a single client
  91. func (api *PrivateLightServerAPI) MinimumCapacity() hexutil.Uint64 {
  92. return hexutil.Uint64(api.server.minCapacity)
  93. }
  94. // FreeClientCapacity queries the capacity provided for free clients
  95. func (api *PrivateLightServerAPI) FreeClientCapacity() hexutil.Uint64 {
  96. return hexutil.Uint64(api.server.freeClientCap)
  97. }
  98. // SetClientCapacity sets the priority capacity assigned to a given client.
  99. // If the assigned capacity is bigger than zero then connection is always
  100. // guaranteed. The sum of capacity assigned to priority clients can not exceed
  101. // the total available capacity.
  102. //
  103. // Note: assigned capacity can be changed while the client is connected with
  104. // immediate effect.
  105. func (api *PrivateLightServerAPI) SetClientCapacity(id enode.ID, cap uint64) error {
  106. if cap != 0 && cap < api.server.minCapacity {
  107. return ErrMinCap
  108. }
  109. return api.server.priorityClientPool.setClientCapacity(id, cap)
  110. }
  111. // GetClientCapacity returns the capacity assigned to a given client
  112. func (api *PrivateLightServerAPI) GetClientCapacity(id enode.ID) hexutil.Uint64 {
  113. api.server.priorityClientPool.lock.Lock()
  114. defer api.server.priorityClientPool.lock.Unlock()
  115. return hexutil.Uint64(api.server.priorityClientPool.clients[id].cap)
  116. }
  117. // clientPool is implemented by both the free and priority client pools
  118. type clientPool interface {
  119. peerSetNotify
  120. setLimits(count int, totalCap uint64)
  121. }
  122. // priorityClientPool stores information about prioritized clients
  123. type priorityClientPool struct {
  124. lock sync.Mutex
  125. child clientPool
  126. ps *peerSet
  127. clients map[enode.ID]priorityClientInfo
  128. totalCap, totalCapAnnounced uint64
  129. totalConnectedCap, freeClientCap uint64
  130. maxPeers, priorityCount int
  131. logger *csvlogger.Logger
  132. logTotalPriConn *csvlogger.Channel
  133. subs tcSubs
  134. updateSchedule []scheduledUpdate
  135. scheduleCounter uint64
  136. }
  137. // scheduledUpdate represents a delayed total capacity update
  138. type scheduledUpdate struct {
  139. time mclock.AbsTime
  140. totalCap, id uint64
  141. }
  142. // priorityClientInfo entries exist for all prioritized clients and currently connected non-priority clients
  143. type priorityClientInfo struct {
  144. cap uint64 // zero for non-priority clients
  145. connected bool
  146. peer *peer
  147. }
  148. // newPriorityClientPool creates a new priority client pool
  149. func newPriorityClientPool(freeClientCap uint64, ps *peerSet, child clientPool, metricsLogger, eventLogger *csvlogger.Logger) *priorityClientPool {
  150. return &priorityClientPool{
  151. clients: make(map[enode.ID]priorityClientInfo),
  152. freeClientCap: freeClientCap,
  153. ps: ps,
  154. child: child,
  155. logger: eventLogger,
  156. logTotalPriConn: metricsLogger.NewChannel("totalPriConn", 0),
  157. }
  158. }
  159. // registerPeer is called when a new client is connected. If the client has no
  160. // priority assigned then it is passed to the child pool which may either keep it
  161. // or disconnect it.
  162. //
  163. // Note: priorityClientPool also stores a record about free clients while they are
  164. // connected in order to be able to assign priority to them later.
  165. func (v *priorityClientPool) registerPeer(p *peer) {
  166. v.lock.Lock()
  167. defer v.lock.Unlock()
  168. id := p.ID()
  169. c := v.clients[id]
  170. v.logger.Event(fmt.Sprintf("priorityClientPool: registerPeer cap=%d connected=%v, %x", c.cap, c.connected, id.Bytes()))
  171. if c.connected {
  172. return
  173. }
  174. if c.cap == 0 && v.child != nil {
  175. v.child.registerPeer(p)
  176. }
  177. if c.cap != 0 && v.totalConnectedCap+c.cap > v.totalCap {
  178. v.logger.Event(fmt.Sprintf("priorityClientPool: rejected, %x", id.Bytes()))
  179. go v.ps.Unregister(p.id)
  180. return
  181. }
  182. c.connected = true
  183. c.peer = p
  184. v.clients[id] = c
  185. if c.cap != 0 {
  186. v.priorityCount++
  187. v.totalConnectedCap += c.cap
  188. v.logger.Event(fmt.Sprintf("priorityClientPool: accepted with %d capacity, %x", c.cap, id.Bytes()))
  189. v.logTotalPriConn.Update(float64(v.totalConnectedCap))
  190. if v.child != nil {
  191. v.child.setLimits(v.maxPeers-v.priorityCount, v.totalCap-v.totalConnectedCap)
  192. }
  193. p.updateCapacity(c.cap)
  194. }
  195. }
  196. // unregisterPeer is called when a client is disconnected. If the client has no
  197. // priority assigned then it is also removed from the child pool.
  198. func (v *priorityClientPool) unregisterPeer(p *peer) {
  199. v.lock.Lock()
  200. defer v.lock.Unlock()
  201. id := p.ID()
  202. c := v.clients[id]
  203. v.logger.Event(fmt.Sprintf("priorityClientPool: unregisterPeer cap=%d connected=%v, %x", c.cap, c.connected, id.Bytes()))
  204. if !c.connected {
  205. return
  206. }
  207. if c.cap != 0 {
  208. c.connected = false
  209. v.clients[id] = c
  210. v.priorityCount--
  211. v.totalConnectedCap -= c.cap
  212. v.logTotalPriConn.Update(float64(v.totalConnectedCap))
  213. if v.child != nil {
  214. v.child.setLimits(v.maxPeers-v.priorityCount, v.totalCap-v.totalConnectedCap)
  215. }
  216. } else {
  217. if v.child != nil {
  218. v.child.unregisterPeer(p)
  219. }
  220. delete(v.clients, id)
  221. }
  222. }
  223. // setLimits updates the allowed peer count and total capacity of the priority
  224. // client pool. Since the free client pool is a child of the priority pool the
  225. // remaining peer count and capacity is assigned to the free pool by calling its
  226. // own setLimits function.
  227. //
  228. // Note: a decreasing change of the total capacity is applied with a delay.
  229. func (v *priorityClientPool) setLimits(count int, totalCap uint64) {
  230. v.lock.Lock()
  231. defer v.lock.Unlock()
  232. v.totalCapAnnounced = totalCap
  233. if totalCap > v.totalCap {
  234. v.setLimitsNow(count, totalCap)
  235. v.subs.send(totalCap, false)
  236. return
  237. }
  238. v.setLimitsNow(count, v.totalCap)
  239. if totalCap < v.totalCap {
  240. v.subs.send(totalCap, totalCap < v.totalConnectedCap)
  241. for i, s := range v.updateSchedule {
  242. if totalCap >= s.totalCap {
  243. s.totalCap = totalCap
  244. v.updateSchedule = v.updateSchedule[:i+1]
  245. return
  246. }
  247. }
  248. v.updateSchedule = append(v.updateSchedule, scheduledUpdate{time: mclock.Now() + mclock.AbsTime(dropCapacityDelay), totalCap: totalCap})
  249. if len(v.updateSchedule) == 1 {
  250. v.scheduleCounter++
  251. id := v.scheduleCounter
  252. v.updateSchedule[0].id = id
  253. time.AfterFunc(dropCapacityDelay, func() { v.checkUpdate(id) })
  254. }
  255. } else {
  256. v.updateSchedule = nil
  257. }
  258. }
  259. // checkUpdate performs the next scheduled update if possible and schedules
  260. // the one after that
  261. func (v *priorityClientPool) checkUpdate(id uint64) {
  262. v.lock.Lock()
  263. defer v.lock.Unlock()
  264. if len(v.updateSchedule) == 0 || v.updateSchedule[0].id != id {
  265. return
  266. }
  267. v.setLimitsNow(v.maxPeers, v.updateSchedule[0].totalCap)
  268. v.updateSchedule = v.updateSchedule[1:]
  269. if len(v.updateSchedule) != 0 {
  270. v.scheduleCounter++
  271. id := v.scheduleCounter
  272. v.updateSchedule[0].id = id
  273. dt := time.Duration(v.updateSchedule[0].time - mclock.Now())
  274. time.AfterFunc(dt, func() { v.checkUpdate(id) })
  275. }
  276. }
  277. // setLimits updates the allowed peer count and total capacity immediately
  278. func (v *priorityClientPool) setLimitsNow(count int, totalCap uint64) {
  279. if v.priorityCount > count || v.totalConnectedCap > totalCap {
  280. for id, c := range v.clients {
  281. if c.connected {
  282. v.logger.Event(fmt.Sprintf("priorityClientPool: setLimitsNow kicked out, %x", id.Bytes()))
  283. c.connected = false
  284. v.totalConnectedCap -= c.cap
  285. v.logTotalPriConn.Update(float64(v.totalConnectedCap))
  286. v.priorityCount--
  287. v.clients[id] = c
  288. go v.ps.Unregister(c.peer.id)
  289. if v.priorityCount <= count && v.totalConnectedCap <= totalCap {
  290. break
  291. }
  292. }
  293. }
  294. }
  295. v.maxPeers = count
  296. v.totalCap = totalCap
  297. if v.child != nil {
  298. v.child.setLimits(v.maxPeers-v.priorityCount, v.totalCap-v.totalConnectedCap)
  299. }
  300. }
  301. // totalCapacity queries total available capacity for all clients
  302. func (v *priorityClientPool) totalCapacity() uint64 {
  303. v.lock.Lock()
  304. defer v.lock.Unlock()
  305. return v.totalCapAnnounced
  306. }
  307. // subscribeTotalCapacity subscribes to changed total capacity events
  308. func (v *priorityClientPool) subscribeTotalCapacity(sub *tcSubscription) {
  309. v.lock.Lock()
  310. defer v.lock.Unlock()
  311. v.subs[sub] = struct{}{}
  312. }
  313. // setClientCapacity sets the priority capacity assigned to a given client
  314. func (v *priorityClientPool) setClientCapacity(id enode.ID, cap uint64) error {
  315. v.lock.Lock()
  316. defer v.lock.Unlock()
  317. c := v.clients[id]
  318. if c.cap == cap {
  319. return nil
  320. }
  321. if c.connected {
  322. if v.totalConnectedCap+cap > v.totalCap+c.cap {
  323. return ErrTotalCap
  324. }
  325. if c.cap == 0 {
  326. if v.child != nil {
  327. v.child.unregisterPeer(c.peer)
  328. }
  329. v.priorityCount++
  330. }
  331. if cap == 0 {
  332. v.priorityCount--
  333. }
  334. v.totalConnectedCap += cap - c.cap
  335. v.logTotalPriConn.Update(float64(v.totalConnectedCap))
  336. if v.child != nil {
  337. v.child.setLimits(v.maxPeers-v.priorityCount, v.totalCap-v.totalConnectedCap)
  338. }
  339. if cap == 0 {
  340. if v.child != nil {
  341. v.child.registerPeer(c.peer)
  342. }
  343. c.peer.updateCapacity(v.freeClientCap)
  344. } else {
  345. c.peer.updateCapacity(cap)
  346. }
  347. }
  348. if cap != 0 || c.connected {
  349. c.cap = cap
  350. v.clients[id] = c
  351. } else {
  352. delete(v.clients, id)
  353. }
  354. if c.connected {
  355. v.logger.Event(fmt.Sprintf("priorityClientPool: changed capacity to %d, %x", cap, id.Bytes()))
  356. }
  357. return nil
  358. }
  359. // Benchmark runs a request performance benchmark with a given set of measurement setups
  360. // in multiple passes specified by passCount. The measurement time for each setup in each
  361. // pass is specified in milliseconds by length.
  362. //
  363. // Note: measurement time is adjusted for each pass depending on the previous ones.
  364. // Therefore a controlled total measurement time is achievable in multiple passes.
  365. func (api *PrivateLightServerAPI) Benchmark(setups []map[string]interface{}, passCount, length int) ([]map[string]interface{}, error) {
  366. benchmarks := make([]requestBenchmark, len(setups))
  367. for i, setup := range setups {
  368. if t, ok := setup["type"].(string); ok {
  369. getInt := func(field string, def int) int {
  370. if value, ok := setup[field].(float64); ok {
  371. return int(value)
  372. }
  373. return def
  374. }
  375. getBool := func(field string, def bool) bool {
  376. if value, ok := setup[field].(bool); ok {
  377. return value
  378. }
  379. return def
  380. }
  381. switch t {
  382. case "header":
  383. benchmarks[i] = &benchmarkBlockHeaders{
  384. amount: getInt("amount", 1),
  385. skip: getInt("skip", 1),
  386. byHash: getBool("byHash", false),
  387. reverse: getBool("reverse", false),
  388. }
  389. case "body":
  390. benchmarks[i] = &benchmarkBodiesOrReceipts{receipts: false}
  391. case "receipts":
  392. benchmarks[i] = &benchmarkBodiesOrReceipts{receipts: true}
  393. case "proof":
  394. benchmarks[i] = &benchmarkProofsOrCode{code: false}
  395. case "code":
  396. benchmarks[i] = &benchmarkProofsOrCode{code: true}
  397. case "cht":
  398. benchmarks[i] = &benchmarkHelperTrie{
  399. bloom: false,
  400. reqCount: getInt("amount", 1),
  401. }
  402. case "bloom":
  403. benchmarks[i] = &benchmarkHelperTrie{
  404. bloom: true,
  405. reqCount: getInt("amount", 1),
  406. }
  407. case "txSend":
  408. benchmarks[i] = &benchmarkTxSend{}
  409. case "txStatus":
  410. benchmarks[i] = &benchmarkTxStatus{}
  411. default:
  412. return nil, ErrUnknownBenchmarkType
  413. }
  414. } else {
  415. return nil, ErrUnknownBenchmarkType
  416. }
  417. }
  418. rs := api.server.protocolManager.runBenchmark(benchmarks, passCount, time.Millisecond*time.Duration(length))
  419. result := make([]map[string]interface{}, len(setups))
  420. for i, r := range rs {
  421. res := make(map[string]interface{})
  422. if r.err == nil {
  423. res["totalCount"] = r.totalCount
  424. res["avgTime"] = r.avgTime
  425. res["maxInSize"] = r.maxInSize
  426. res["maxOutSize"] = r.maxOutSize
  427. } else {
  428. res["error"] = r.err.Error()
  429. }
  430. result[i] = res
  431. }
  432. return result, nil
  433. }