| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454 |
- // Copyright 2018 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- package les
- import (
- "context"
- "errors"
- "sync"
- "time"
- "github.com/ethereum/go-ethereum/common/hexutil"
- "github.com/ethereum/go-ethereum/common/mclock"
- "github.com/ethereum/go-ethereum/p2p/enode"
- "github.com/ethereum/go-ethereum/rpc"
- )
- var (
- ErrMinCap = errors.New("capacity too small")
- ErrTotalCap = errors.New("total capacity exceeded")
- ErrUnknownBenchmarkType = errors.New("unknown benchmark type")
- dropCapacityDelay = time.Second // delay applied to decreasing capacity changes
- )
- // PrivateLightServerAPI provides an API to access the LES light server.
- // It offers only methods that operate on public data that is freely available to anyone.
- type PrivateLightServerAPI struct {
- server *LesServer
- }
- // NewPrivateLightServerAPI creates a new LES light server API.
- func NewPrivateLightServerAPI(server *LesServer) *PrivateLightServerAPI {
- return &PrivateLightServerAPI{
- server: server,
- }
- }
- // TotalCapacity queries total available capacity for all clients
- func (api *PrivateLightServerAPI) TotalCapacity() hexutil.Uint64 {
- return hexutil.Uint64(api.server.priorityClientPool.totalCapacity())
- }
- // SubscribeTotalCapacity subscribes to changed total capacity events.
- // If onlyUnderrun is true then notification is sent only if the total capacity
- // drops under the total capacity of connected priority clients.
- //
- // Note: actually applying decreasing total capacity values is delayed while the
- // notification is sent instantly. This allows lowering the capacity of a priority client
- // or choosing which one to drop before the system drops some of them automatically.
- func (api *PrivateLightServerAPI) SubscribeTotalCapacity(ctx context.Context, onlyUnderrun bool) (*rpc.Subscription, error) {
- notifier, supported := rpc.NotifierFromContext(ctx)
- if !supported {
- return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
- }
- rpcSub := notifier.CreateSubscription()
- api.server.priorityClientPool.subscribeTotalCapacity(&tcSubscription{notifier, rpcSub, onlyUnderrun})
- return rpcSub, nil
- }
- type (
- // tcSubscription represents a total capacity subscription
- tcSubscription struct {
- notifier *rpc.Notifier
- rpcSub *rpc.Subscription
- onlyUnderrun bool
- }
- tcSubs map[*tcSubscription]struct{}
- )
- // send sends a changed total capacity event to the subscribers
- func (s tcSubs) send(tc uint64, underrun bool) {
- for sub := range s {
- select {
- case <-sub.rpcSub.Err():
- delete(s, sub)
- case <-sub.notifier.Closed():
- delete(s, sub)
- default:
- if underrun || !sub.onlyUnderrun {
- sub.notifier.Notify(sub.rpcSub.ID, tc)
- }
- }
- }
- }
- // MinimumCapacity queries minimum assignable capacity for a single client
- func (api *PrivateLightServerAPI) MinimumCapacity() hexutil.Uint64 {
- return hexutil.Uint64(minCapacity)
- }
- // FreeClientCapacity queries the capacity provided for free clients
- func (api *PrivateLightServerAPI) FreeClientCapacity() hexutil.Uint64 {
- return hexutil.Uint64(api.server.freeClientCap)
- }
- // SetClientCapacity sets the priority capacity assigned to a given client.
- // If the assigned capacity is bigger than zero then connection is always
- // guaranteed. The sum of capacity assigned to priority clients can not exceed
- // the total available capacity.
- //
- // Note: assigned capacity can be changed while the client is connected with
- // immediate effect.
- func (api *PrivateLightServerAPI) SetClientCapacity(id enode.ID, cap uint64) error {
- if cap != 0 && cap < minCapacity {
- return ErrMinCap
- }
- return api.server.priorityClientPool.setClientCapacity(id, cap)
- }
- // GetClientCapacity returns the capacity assigned to a given client
- func (api *PrivateLightServerAPI) GetClientCapacity(id enode.ID) hexutil.Uint64 {
- api.server.priorityClientPool.lock.Lock()
- defer api.server.priorityClientPool.lock.Unlock()
- return hexutil.Uint64(api.server.priorityClientPool.clients[id].cap)
- }
- // clientPool is implemented by both the free and priority client pools
- type clientPool interface {
- peerSetNotify
- setLimits(count int, totalCap uint64)
- }
- // priorityClientPool stores information about prioritized clients
- type priorityClientPool struct {
- lock sync.Mutex
- child clientPool
- ps *peerSet
- clients map[enode.ID]priorityClientInfo
- totalCap, totalCapAnnounced uint64
- totalConnectedCap, freeClientCap uint64
- maxPeers, priorityCount int
- subs tcSubs
- updateSchedule []scheduledUpdate
- scheduleCounter uint64
- }
- // scheduledUpdate represents a delayed total capacity update
- type scheduledUpdate struct {
- time mclock.AbsTime
- totalCap, id uint64
- }
- // priorityClientInfo entries exist for all prioritized clients and currently connected non-priority clients
- type priorityClientInfo struct {
- cap uint64 // zero for non-priority clients
- connected bool
- peer *peer
- }
- // newPriorityClientPool creates a new priority client pool
- func newPriorityClientPool(freeClientCap uint64, ps *peerSet, child clientPool) *priorityClientPool {
- return &priorityClientPool{
- clients: make(map[enode.ID]priorityClientInfo),
- freeClientCap: freeClientCap,
- ps: ps,
- child: child,
- }
- }
- // registerPeer is called when a new client is connected. If the client has no
- // priority assigned then it is passed to the child pool which may either keep it
- // or disconnect it.
- //
- // Note: priorityClientPool also stores a record about free clients while they are
- // connected in order to be able to assign priority to them later.
- func (v *priorityClientPool) registerPeer(p *peer) {
- v.lock.Lock()
- defer v.lock.Unlock()
- id := p.ID()
- c := v.clients[id]
- if c.connected {
- return
- }
- if c.cap == 0 && v.child != nil {
- v.child.registerPeer(p)
- }
- if c.cap != 0 && v.totalConnectedCap+c.cap > v.totalCap {
- go v.ps.Unregister(p.id)
- return
- }
- c.connected = true
- c.peer = p
- v.clients[id] = c
- if c.cap != 0 {
- v.priorityCount++
- v.totalConnectedCap += c.cap
- if v.child != nil {
- v.child.setLimits(v.maxPeers-v.priorityCount, v.totalCap-v.totalConnectedCap)
- }
- p.updateCapacity(c.cap)
- }
- }
- // unregisterPeer is called when a client is disconnected. If the client has no
- // priority assigned then it is also removed from the child pool.
- func (v *priorityClientPool) unregisterPeer(p *peer) {
- v.lock.Lock()
- defer v.lock.Unlock()
- id := p.ID()
- c := v.clients[id]
- if !c.connected {
- return
- }
- if c.cap != 0 {
- c.connected = false
- v.clients[id] = c
- v.priorityCount--
- v.totalConnectedCap -= c.cap
- if v.child != nil {
- v.child.setLimits(v.maxPeers-v.priorityCount, v.totalCap-v.totalConnectedCap)
- }
- } else {
- if v.child != nil {
- v.child.unregisterPeer(p)
- }
- delete(v.clients, id)
- }
- }
- // setLimits updates the allowed peer count and total capacity of the priority
- // client pool. Since the free client pool is a child of the priority pool the
- // remaining peer count and capacity is assigned to the free pool by calling its
- // own setLimits function.
- //
- // Note: a decreasing change of the total capacity is applied with a delay.
- func (v *priorityClientPool) setLimits(count int, totalCap uint64) {
- v.lock.Lock()
- defer v.lock.Unlock()
- v.totalCapAnnounced = totalCap
- if totalCap > v.totalCap {
- v.setLimitsNow(count, totalCap)
- v.subs.send(totalCap, false)
- return
- }
- v.setLimitsNow(count, v.totalCap)
- if totalCap < v.totalCap {
- v.subs.send(totalCap, totalCap < v.totalConnectedCap)
- for i, s := range v.updateSchedule {
- if totalCap >= s.totalCap {
- s.totalCap = totalCap
- v.updateSchedule = v.updateSchedule[:i+1]
- return
- }
- }
- v.updateSchedule = append(v.updateSchedule, scheduledUpdate{time: mclock.Now() + mclock.AbsTime(dropCapacityDelay), totalCap: totalCap})
- if len(v.updateSchedule) == 1 {
- v.scheduleCounter++
- id := v.scheduleCounter
- v.updateSchedule[0].id = id
- time.AfterFunc(dropCapacityDelay, func() { v.checkUpdate(id) })
- }
- } else {
- v.updateSchedule = nil
- }
- }
- // checkUpdate performs the next scheduled update if possible and schedules
- // the one after that
- func (v *priorityClientPool) checkUpdate(id uint64) {
- v.lock.Lock()
- defer v.lock.Unlock()
- if len(v.updateSchedule) == 0 || v.updateSchedule[0].id != id {
- return
- }
- v.setLimitsNow(v.maxPeers, v.updateSchedule[0].totalCap)
- v.updateSchedule = v.updateSchedule[1:]
- if len(v.updateSchedule) != 0 {
- v.scheduleCounter++
- id := v.scheduleCounter
- v.updateSchedule[0].id = id
- dt := time.Duration(v.updateSchedule[0].time - mclock.Now())
- time.AfterFunc(dt, func() { v.checkUpdate(id) })
- }
- }
- // setLimits updates the allowed peer count and total capacity immediately
- func (v *priorityClientPool) setLimitsNow(count int, totalCap uint64) {
- if v.priorityCount > count || v.totalConnectedCap > totalCap {
- for id, c := range v.clients {
- if c.connected {
- c.connected = false
- v.totalConnectedCap -= c.cap
- v.priorityCount--
- v.clients[id] = c
- go v.ps.Unregister(c.peer.id)
- if v.priorityCount <= count && v.totalConnectedCap <= totalCap {
- break
- }
- }
- }
- }
- v.maxPeers = count
- v.totalCap = totalCap
- if v.child != nil {
- v.child.setLimits(v.maxPeers-v.priorityCount, v.totalCap-v.totalConnectedCap)
- }
- }
- // totalCapacity queries total available capacity for all clients
- func (v *priorityClientPool) totalCapacity() uint64 {
- v.lock.Lock()
- defer v.lock.Unlock()
- return v.totalCapAnnounced
- }
- // subscribeTotalCapacity subscribes to changed total capacity events
- func (v *priorityClientPool) subscribeTotalCapacity(sub *tcSubscription) {
- v.lock.Lock()
- defer v.lock.Unlock()
- v.subs[sub] = struct{}{}
- }
- // setClientCapacity sets the priority capacity assigned to a given client
- func (v *priorityClientPool) setClientCapacity(id enode.ID, cap uint64) error {
- v.lock.Lock()
- defer v.lock.Unlock()
- c := v.clients[id]
- if c.cap == cap {
- return nil
- }
- if c.connected {
- if v.totalConnectedCap+cap > v.totalCap+c.cap {
- return ErrTotalCap
- }
- if c.cap == 0 {
- if v.child != nil {
- v.child.unregisterPeer(c.peer)
- }
- v.priorityCount++
- }
- if cap == 0 {
- v.priorityCount--
- }
- v.totalConnectedCap += cap - c.cap
- if v.child != nil {
- v.child.setLimits(v.maxPeers-v.priorityCount, v.totalCap-v.totalConnectedCap)
- }
- if cap == 0 {
- if v.child != nil {
- v.child.registerPeer(c.peer)
- }
- c.peer.updateCapacity(v.freeClientCap)
- } else {
- c.peer.updateCapacity(cap)
- }
- }
- if cap != 0 || c.connected {
- c.cap = cap
- v.clients[id] = c
- } else {
- delete(v.clients, id)
- }
- return nil
- }
- // Benchmark runs a request performance benchmark with a given set of measurement setups
- // in multiple passes specified by passCount. The measurement time for each setup in each
- // pass is specified in milliseconds by length.
- //
- // Note: measurement time is adjusted for each pass depending on the previous ones.
- // Therefore a controlled total measurement time is achievable in multiple passes.
- func (api *PrivateLightServerAPI) Benchmark(setups []map[string]interface{}, passCount, length int) ([]map[string]interface{}, error) {
- benchmarks := make([]requestBenchmark, len(setups))
- for i, setup := range setups {
- if t, ok := setup["type"].(string); ok {
- getInt := func(field string, def int) int {
- if value, ok := setup[field].(float64); ok {
- return int(value)
- }
- return def
- }
- getBool := func(field string, def bool) bool {
- if value, ok := setup[field].(bool); ok {
- return value
- }
- return def
- }
- switch t {
- case "header":
- benchmarks[i] = &benchmarkBlockHeaders{
- amount: getInt("amount", 1),
- skip: getInt("skip", 1),
- byHash: getBool("byHash", false),
- reverse: getBool("reverse", false),
- }
- case "body":
- benchmarks[i] = &benchmarkBodiesOrReceipts{receipts: false}
- case "receipts":
- benchmarks[i] = &benchmarkBodiesOrReceipts{receipts: true}
- case "proof":
- benchmarks[i] = &benchmarkProofsOrCode{code: false}
- case "code":
- benchmarks[i] = &benchmarkProofsOrCode{code: true}
- case "cht":
- benchmarks[i] = &benchmarkHelperTrie{
- bloom: false,
- reqCount: getInt("amount", 1),
- }
- case "bloom":
- benchmarks[i] = &benchmarkHelperTrie{
- bloom: true,
- reqCount: getInt("amount", 1),
- }
- case "txSend":
- benchmarks[i] = &benchmarkTxSend{}
- case "txStatus":
- benchmarks[i] = &benchmarkTxStatus{}
- default:
- return nil, ErrUnknownBenchmarkType
- }
- } else {
- return nil, ErrUnknownBenchmarkType
- }
- }
- rs := api.server.protocolManager.runBenchmark(benchmarks, passCount, time.Millisecond*time.Duration(length))
- result := make([]map[string]interface{}, len(setups))
- for i, r := range rs {
- res := make(map[string]interface{})
- if r.err == nil {
- res["totalCount"] = r.totalCount
- res["avgTime"] = r.avgTime
- res["maxInSize"] = r.maxInSize
- res["maxOutSize"] = r.maxOutSize
- } else {
- res["error"] = r.err.Error()
- }
- result[i] = res
- }
- return result, nil
- }
|