api.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "context"
  19. "errors"
  20. "sync"
  21. "time"
  22. "github.com/ethereum/go-ethereum/common/hexutil"
  23. "github.com/ethereum/go-ethereum/common/mclock"
  24. "github.com/ethereum/go-ethereum/p2p/enode"
  25. "github.com/ethereum/go-ethereum/rpc"
  26. )
  27. var (
  28. ErrMinCap = errors.New("capacity too small")
  29. ErrTotalCap = errors.New("total capacity exceeded")
  30. ErrUnknownBenchmarkType = errors.New("unknown benchmark type")
  31. dropCapacityDelay = time.Second // delay applied to decreasing capacity changes
  32. )
  33. // PrivateLightServerAPI provides an API to access the LES light server.
  34. // It offers only methods that operate on public data that is freely available to anyone.
  35. type PrivateLightServerAPI struct {
  36. server *LesServer
  37. }
  38. // NewPrivateLightServerAPI creates a new LES light server API.
  39. func NewPrivateLightServerAPI(server *LesServer) *PrivateLightServerAPI {
  40. return &PrivateLightServerAPI{
  41. server: server,
  42. }
  43. }
  44. // TotalCapacity queries total available capacity for all clients
  45. func (api *PrivateLightServerAPI) TotalCapacity() hexutil.Uint64 {
  46. return hexutil.Uint64(api.server.priorityClientPool.totalCapacity())
  47. }
  48. // SubscribeTotalCapacity subscribes to changed total capacity events.
  49. // If onlyUnderrun is true then notification is sent only if the total capacity
  50. // drops under the total capacity of connected priority clients.
  51. //
  52. // Note: actually applying decreasing total capacity values is delayed while the
  53. // notification is sent instantly. This allows lowering the capacity of a priority client
  54. // or choosing which one to drop before the system drops some of them automatically.
  55. func (api *PrivateLightServerAPI) SubscribeTotalCapacity(ctx context.Context, onlyUnderrun bool) (*rpc.Subscription, error) {
  56. notifier, supported := rpc.NotifierFromContext(ctx)
  57. if !supported {
  58. return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
  59. }
  60. rpcSub := notifier.CreateSubscription()
  61. api.server.priorityClientPool.subscribeTotalCapacity(&tcSubscription{notifier, rpcSub, onlyUnderrun})
  62. return rpcSub, nil
  63. }
  64. type (
  65. // tcSubscription represents a total capacity subscription
  66. tcSubscription struct {
  67. notifier *rpc.Notifier
  68. rpcSub *rpc.Subscription
  69. onlyUnderrun bool
  70. }
  71. tcSubs map[*tcSubscription]struct{}
  72. )
  73. // send sends a changed total capacity event to the subscribers
  74. func (s tcSubs) send(tc uint64, underrun bool) {
  75. for sub := range s {
  76. select {
  77. case <-sub.rpcSub.Err():
  78. delete(s, sub)
  79. case <-sub.notifier.Closed():
  80. delete(s, sub)
  81. default:
  82. if underrun || !sub.onlyUnderrun {
  83. sub.notifier.Notify(sub.rpcSub.ID, tc)
  84. }
  85. }
  86. }
  87. }
  88. // MinimumCapacity queries minimum assignable capacity for a single client
  89. func (api *PrivateLightServerAPI) MinimumCapacity() hexutil.Uint64 {
  90. return hexutil.Uint64(minCapacity)
  91. }
  92. // FreeClientCapacity queries the capacity provided for free clients
  93. func (api *PrivateLightServerAPI) FreeClientCapacity() hexutil.Uint64 {
  94. return hexutil.Uint64(api.server.freeClientCap)
  95. }
  96. // SetClientCapacity sets the priority capacity assigned to a given client.
  97. // If the assigned capacity is bigger than zero then connection is always
  98. // guaranteed. The sum of capacity assigned to priority clients can not exceed
  99. // the total available capacity.
  100. //
  101. // Note: assigned capacity can be changed while the client is connected with
  102. // immediate effect.
  103. func (api *PrivateLightServerAPI) SetClientCapacity(id enode.ID, cap uint64) error {
  104. if cap != 0 && cap < minCapacity {
  105. return ErrMinCap
  106. }
  107. return api.server.priorityClientPool.setClientCapacity(id, cap)
  108. }
  109. // GetClientCapacity returns the capacity assigned to a given client
  110. func (api *PrivateLightServerAPI) GetClientCapacity(id enode.ID) hexutil.Uint64 {
  111. api.server.priorityClientPool.lock.Lock()
  112. defer api.server.priorityClientPool.lock.Unlock()
  113. return hexutil.Uint64(api.server.priorityClientPool.clients[id].cap)
  114. }
  115. // clientPool is implemented by both the free and priority client pools
  116. type clientPool interface {
  117. peerSetNotify
  118. setLimits(count int, totalCap uint64)
  119. }
  120. // priorityClientPool stores information about prioritized clients
  121. type priorityClientPool struct {
  122. lock sync.Mutex
  123. child clientPool
  124. ps *peerSet
  125. clients map[enode.ID]priorityClientInfo
  126. totalCap, totalCapAnnounced uint64
  127. totalConnectedCap, freeClientCap uint64
  128. maxPeers, priorityCount int
  129. subs tcSubs
  130. updateSchedule []scheduledUpdate
  131. scheduleCounter uint64
  132. }
  133. // scheduledUpdate represents a delayed total capacity update
  134. type scheduledUpdate struct {
  135. time mclock.AbsTime
  136. totalCap, id uint64
  137. }
  138. // priorityClientInfo entries exist for all prioritized clients and currently connected non-priority clients
  139. type priorityClientInfo struct {
  140. cap uint64 // zero for non-priority clients
  141. connected bool
  142. peer *peer
  143. }
  144. // newPriorityClientPool creates a new priority client pool
  145. func newPriorityClientPool(freeClientCap uint64, ps *peerSet, child clientPool) *priorityClientPool {
  146. return &priorityClientPool{
  147. clients: make(map[enode.ID]priorityClientInfo),
  148. freeClientCap: freeClientCap,
  149. ps: ps,
  150. child: child,
  151. }
  152. }
  153. // registerPeer is called when a new client is connected. If the client has no
  154. // priority assigned then it is passed to the child pool which may either keep it
  155. // or disconnect it.
  156. //
  157. // Note: priorityClientPool also stores a record about free clients while they are
  158. // connected in order to be able to assign priority to them later.
  159. func (v *priorityClientPool) registerPeer(p *peer) {
  160. v.lock.Lock()
  161. defer v.lock.Unlock()
  162. id := p.ID()
  163. c := v.clients[id]
  164. if c.connected {
  165. return
  166. }
  167. if c.cap == 0 && v.child != nil {
  168. v.child.registerPeer(p)
  169. }
  170. if c.cap != 0 && v.totalConnectedCap+c.cap > v.totalCap {
  171. go v.ps.Unregister(p.id)
  172. return
  173. }
  174. c.connected = true
  175. c.peer = p
  176. v.clients[id] = c
  177. if c.cap != 0 {
  178. v.priorityCount++
  179. v.totalConnectedCap += c.cap
  180. if v.child != nil {
  181. v.child.setLimits(v.maxPeers-v.priorityCount, v.totalCap-v.totalConnectedCap)
  182. }
  183. p.updateCapacity(c.cap)
  184. }
  185. }
  186. // unregisterPeer is called when a client is disconnected. If the client has no
  187. // priority assigned then it is also removed from the child pool.
  188. func (v *priorityClientPool) unregisterPeer(p *peer) {
  189. v.lock.Lock()
  190. defer v.lock.Unlock()
  191. id := p.ID()
  192. c := v.clients[id]
  193. if !c.connected {
  194. return
  195. }
  196. if c.cap != 0 {
  197. c.connected = false
  198. v.clients[id] = c
  199. v.priorityCount--
  200. v.totalConnectedCap -= c.cap
  201. if v.child != nil {
  202. v.child.setLimits(v.maxPeers-v.priorityCount, v.totalCap-v.totalConnectedCap)
  203. }
  204. } else {
  205. if v.child != nil {
  206. v.child.unregisterPeer(p)
  207. }
  208. delete(v.clients, id)
  209. }
  210. }
  211. // setLimits updates the allowed peer count and total capacity of the priority
  212. // client pool. Since the free client pool is a child of the priority pool the
  213. // remaining peer count and capacity is assigned to the free pool by calling its
  214. // own setLimits function.
  215. //
  216. // Note: a decreasing change of the total capacity is applied with a delay.
  217. func (v *priorityClientPool) setLimits(count int, totalCap uint64) {
  218. v.lock.Lock()
  219. defer v.lock.Unlock()
  220. v.totalCapAnnounced = totalCap
  221. if totalCap > v.totalCap {
  222. v.setLimitsNow(count, totalCap)
  223. v.subs.send(totalCap, false)
  224. return
  225. }
  226. v.setLimitsNow(count, v.totalCap)
  227. if totalCap < v.totalCap {
  228. v.subs.send(totalCap, totalCap < v.totalConnectedCap)
  229. for i, s := range v.updateSchedule {
  230. if totalCap >= s.totalCap {
  231. s.totalCap = totalCap
  232. v.updateSchedule = v.updateSchedule[:i+1]
  233. return
  234. }
  235. }
  236. v.updateSchedule = append(v.updateSchedule, scheduledUpdate{time: mclock.Now() + mclock.AbsTime(dropCapacityDelay), totalCap: totalCap})
  237. if len(v.updateSchedule) == 1 {
  238. v.scheduleCounter++
  239. id := v.scheduleCounter
  240. v.updateSchedule[0].id = id
  241. time.AfterFunc(dropCapacityDelay, func() { v.checkUpdate(id) })
  242. }
  243. } else {
  244. v.updateSchedule = nil
  245. }
  246. }
  247. // checkUpdate performs the next scheduled update if possible and schedules
  248. // the one after that
  249. func (v *priorityClientPool) checkUpdate(id uint64) {
  250. v.lock.Lock()
  251. defer v.lock.Unlock()
  252. if len(v.updateSchedule) == 0 || v.updateSchedule[0].id != id {
  253. return
  254. }
  255. v.setLimitsNow(v.maxPeers, v.updateSchedule[0].totalCap)
  256. v.updateSchedule = v.updateSchedule[1:]
  257. if len(v.updateSchedule) != 0 {
  258. v.scheduleCounter++
  259. id := v.scheduleCounter
  260. v.updateSchedule[0].id = id
  261. dt := time.Duration(v.updateSchedule[0].time - mclock.Now())
  262. time.AfterFunc(dt, func() { v.checkUpdate(id) })
  263. }
  264. }
  265. // setLimits updates the allowed peer count and total capacity immediately
  266. func (v *priorityClientPool) setLimitsNow(count int, totalCap uint64) {
  267. if v.priorityCount > count || v.totalConnectedCap > totalCap {
  268. for id, c := range v.clients {
  269. if c.connected {
  270. c.connected = false
  271. v.totalConnectedCap -= c.cap
  272. v.priorityCount--
  273. v.clients[id] = c
  274. go v.ps.Unregister(c.peer.id)
  275. if v.priorityCount <= count && v.totalConnectedCap <= totalCap {
  276. break
  277. }
  278. }
  279. }
  280. }
  281. v.maxPeers = count
  282. v.totalCap = totalCap
  283. if v.child != nil {
  284. v.child.setLimits(v.maxPeers-v.priorityCount, v.totalCap-v.totalConnectedCap)
  285. }
  286. }
  287. // totalCapacity queries total available capacity for all clients
  288. func (v *priorityClientPool) totalCapacity() uint64 {
  289. v.lock.Lock()
  290. defer v.lock.Unlock()
  291. return v.totalCapAnnounced
  292. }
  293. // subscribeTotalCapacity subscribes to changed total capacity events
  294. func (v *priorityClientPool) subscribeTotalCapacity(sub *tcSubscription) {
  295. v.lock.Lock()
  296. defer v.lock.Unlock()
  297. v.subs[sub] = struct{}{}
  298. }
  299. // setClientCapacity sets the priority capacity assigned to a given client
  300. func (v *priorityClientPool) setClientCapacity(id enode.ID, cap uint64) error {
  301. v.lock.Lock()
  302. defer v.lock.Unlock()
  303. c := v.clients[id]
  304. if c.cap == cap {
  305. return nil
  306. }
  307. if c.connected {
  308. if v.totalConnectedCap+cap > v.totalCap+c.cap {
  309. return ErrTotalCap
  310. }
  311. if c.cap == 0 {
  312. if v.child != nil {
  313. v.child.unregisterPeer(c.peer)
  314. }
  315. v.priorityCount++
  316. }
  317. if cap == 0 {
  318. v.priorityCount--
  319. }
  320. v.totalConnectedCap += cap - c.cap
  321. if v.child != nil {
  322. v.child.setLimits(v.maxPeers-v.priorityCount, v.totalCap-v.totalConnectedCap)
  323. }
  324. if cap == 0 {
  325. if v.child != nil {
  326. v.child.registerPeer(c.peer)
  327. }
  328. c.peer.updateCapacity(v.freeClientCap)
  329. } else {
  330. c.peer.updateCapacity(cap)
  331. }
  332. }
  333. if cap != 0 || c.connected {
  334. c.cap = cap
  335. v.clients[id] = c
  336. } else {
  337. delete(v.clients, id)
  338. }
  339. return nil
  340. }
  341. // Benchmark runs a request performance benchmark with a given set of measurement setups
  342. // in multiple passes specified by passCount. The measurement time for each setup in each
  343. // pass is specified in milliseconds by length.
  344. //
  345. // Note: measurement time is adjusted for each pass depending on the previous ones.
  346. // Therefore a controlled total measurement time is achievable in multiple passes.
  347. func (api *PrivateLightServerAPI) Benchmark(setups []map[string]interface{}, passCount, length int) ([]map[string]interface{}, error) {
  348. benchmarks := make([]requestBenchmark, len(setups))
  349. for i, setup := range setups {
  350. if t, ok := setup["type"].(string); ok {
  351. getInt := func(field string, def int) int {
  352. if value, ok := setup[field].(float64); ok {
  353. return int(value)
  354. }
  355. return def
  356. }
  357. getBool := func(field string, def bool) bool {
  358. if value, ok := setup[field].(bool); ok {
  359. return value
  360. }
  361. return def
  362. }
  363. switch t {
  364. case "header":
  365. benchmarks[i] = &benchmarkBlockHeaders{
  366. amount: getInt("amount", 1),
  367. skip: getInt("skip", 1),
  368. byHash: getBool("byHash", false),
  369. reverse: getBool("reverse", false),
  370. }
  371. case "body":
  372. benchmarks[i] = &benchmarkBodiesOrReceipts{receipts: false}
  373. case "receipts":
  374. benchmarks[i] = &benchmarkBodiesOrReceipts{receipts: true}
  375. case "proof":
  376. benchmarks[i] = &benchmarkProofsOrCode{code: false}
  377. case "code":
  378. benchmarks[i] = &benchmarkProofsOrCode{code: true}
  379. case "cht":
  380. benchmarks[i] = &benchmarkHelperTrie{
  381. bloom: false,
  382. reqCount: getInt("amount", 1),
  383. }
  384. case "bloom":
  385. benchmarks[i] = &benchmarkHelperTrie{
  386. bloom: true,
  387. reqCount: getInt("amount", 1),
  388. }
  389. case "txSend":
  390. benchmarks[i] = &benchmarkTxSend{}
  391. case "txStatus":
  392. benchmarks[i] = &benchmarkTxStatus{}
  393. default:
  394. return nil, ErrUnknownBenchmarkType
  395. }
  396. } else {
  397. return nil, ErrUnknownBenchmarkType
  398. }
  399. }
  400. rs := api.server.protocolManager.runBenchmark(benchmarks, passCount, time.Millisecond*time.Duration(length))
  401. result := make([]map[string]interface{}, len(setups))
  402. for i, r := range rs {
  403. res := make(map[string]interface{})
  404. if r.err == nil {
  405. res["totalCount"] = r.totalCount
  406. res["avgTime"] = r.avgTime
  407. res["maxInSize"] = r.maxInSize
  408. res["maxOutSize"] = r.maxOutSize
  409. } else {
  410. res["error"] = r.err.Error()
  411. }
  412. result[i] = res
  413. }
  414. return result, nil
  415. }