| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077 |
- // Copyright 2018 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- package pss
- import (
- "bytes"
- "context"
- "crypto/ecdsa"
- "crypto/rand"
- "errors"
- "fmt"
- "hash"
- "sync"
- "time"
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/crypto"
- "github.com/ethereum/go-ethereum/metrics"
- "github.com/ethereum/go-ethereum/p2p"
- "github.com/ethereum/go-ethereum/p2p/enode"
- "github.com/ethereum/go-ethereum/p2p/protocols"
- "github.com/ethereum/go-ethereum/rpc"
- "github.com/ethereum/go-ethereum/swarm/log"
- "github.com/ethereum/go-ethereum/swarm/network"
- "github.com/ethereum/go-ethereum/swarm/pot"
- "github.com/ethereum/go-ethereum/swarm/storage"
- whisper "github.com/ethereum/go-ethereum/whisper/whisperv5"
- "golang.org/x/crypto/sha3"
- )
- const (
- defaultPaddingByteSize = 16
- DefaultMsgTTL = time.Second * 120
- defaultDigestCacheTTL = time.Second * 10
- defaultSymKeyCacheCapacity = 512
- digestLength = 32 // byte length of digest used for pss cache (currently same as swarm chunk hash)
- defaultWhisperWorkTime = 3
- defaultWhisperPoW = 0.0000000001
- defaultMaxMsgSize = 1024 * 1024
- defaultCleanInterval = time.Second * 60 * 10
- defaultOutboxCapacity = 100000
- pssProtocolName = "pss"
- pssVersion = 2
- hasherCount = 8
- )
- var (
- addressLength = len(pot.Address{})
- )
- // cache is used for preventing backwards routing
- // will also be instrumental in flood guard mechanism
- // and mailbox implementation
- type pssCacheEntry struct {
- expiresAt time.Time
- }
- // abstraction to enable access to p2p.protocols.Peer.Send
- type senderPeer interface {
- Info() *p2p.PeerInfo
- ID() enode.ID
- Address() []byte
- Send(context.Context, interface{}) error
- }
- // per-key peer related information
- // member `protected` prevents garbage collection of the instance
- type pssPeer struct {
- lastSeen time.Time
- address PssAddress
- protected bool
- }
- // Pss configuration parameters
- type PssParams struct {
- MsgTTL time.Duration
- CacheTTL time.Duration
- privateKey *ecdsa.PrivateKey
- SymKeyCacheCapacity int
- AllowRaw bool // If true, enables sending and receiving messages without builtin pss encryption
- }
- // Sane defaults for Pss
- func NewPssParams() *PssParams {
- return &PssParams{
- MsgTTL: DefaultMsgTTL,
- CacheTTL: defaultDigestCacheTTL,
- SymKeyCacheCapacity: defaultSymKeyCacheCapacity,
- }
- }
- func (params *PssParams) WithPrivateKey(privatekey *ecdsa.PrivateKey) *PssParams {
- params.privateKey = privatekey
- return params
- }
- // Toplevel pss object, takes care of message sending, receiving, decryption and encryption, message handler dispatchers and message forwarding.
- //
- // Implements node.Service
- type Pss struct {
- *network.Kademlia // we can get the Kademlia address from this
- privateKey *ecdsa.PrivateKey // pss can have it's own independent key
- w *whisper.Whisper // key and encryption backend
- auxAPIs []rpc.API // builtins (handshake, test) can add APIs
- // sending and forwarding
- fwdPool map[string]*protocols.Peer // keep track of all peers sitting on the pssmsg routing layer
- fwdPoolMu sync.RWMutex
- fwdCache map[pssDigest]pssCacheEntry // checksum of unique fields from pssmsg mapped to expiry, cache to determine whether to drop msg
- fwdCacheMu sync.RWMutex
- cacheTTL time.Duration // how long to keep messages in fwdCache (not implemented)
- msgTTL time.Duration
- paddingByteSize int
- capstring string
- outbox chan *PssMsg
- // keys and peers
- pubKeyPool map[string]map[Topic]*pssPeer // mapping of hex public keys to peer address by topic.
- pubKeyPoolMu sync.RWMutex
- symKeyPool map[string]map[Topic]*pssPeer // mapping of symkeyids to peer address by topic.
- symKeyPoolMu sync.RWMutex
- symKeyDecryptCache []*string // fast lookup of symkeys recently used for decryption; last used is on top of stack
- symKeyDecryptCacheCursor int // modular cursor pointing to last used, wraps on symKeyDecryptCache array
- symKeyDecryptCacheCapacity int // max amount of symkeys to keep.
- // message handling
- handlers map[Topic]map[*handler]bool // topic and version based pss payload handlers. See pss.Handle()
- handlersMu sync.RWMutex
- hashPool sync.Pool
- topicHandlerCaps map[Topic]*handlerCaps // caches capabilities of each topic's handlers (see handlerCap* consts in types.go)
- // process
- quitC chan struct{}
- }
- func (p *Pss) String() string {
- return fmt.Sprintf("pss: addr %x, pubkey %v", p.BaseAddr(), common.ToHex(crypto.FromECDSAPub(&p.privateKey.PublicKey)))
- }
- // Creates a new Pss instance.
- //
- // In addition to params, it takes a swarm network Kademlia
- // and a FileStore storage for message cache storage.
- func NewPss(k *network.Kademlia, params *PssParams) (*Pss, error) {
- if params.privateKey == nil {
- return nil, errors.New("missing private key for pss")
- }
- cap := p2p.Cap{
- Name: pssProtocolName,
- Version: pssVersion,
- }
- ps := &Pss{
- Kademlia: k,
- privateKey: params.privateKey,
- w: whisper.New(&whisper.DefaultConfig),
- quitC: make(chan struct{}),
- fwdPool: make(map[string]*protocols.Peer),
- fwdCache: make(map[pssDigest]pssCacheEntry),
- cacheTTL: params.CacheTTL,
- msgTTL: params.MsgTTL,
- paddingByteSize: defaultPaddingByteSize,
- capstring: cap.String(),
- outbox: make(chan *PssMsg, defaultOutboxCapacity),
- pubKeyPool: make(map[string]map[Topic]*pssPeer),
- symKeyPool: make(map[string]map[Topic]*pssPeer),
- symKeyDecryptCache: make([]*string, params.SymKeyCacheCapacity),
- symKeyDecryptCacheCapacity: params.SymKeyCacheCapacity,
- handlers: make(map[Topic]map[*handler]bool),
- topicHandlerCaps: make(map[Topic]*handlerCaps),
- hashPool: sync.Pool{
- New: func() interface{} {
- return sha3.NewLegacyKeccak256()
- },
- },
- }
- for i := 0; i < hasherCount; i++ {
- hashfunc := storage.MakeHashFunc(storage.DefaultHash)()
- ps.hashPool.Put(hashfunc)
- }
- return ps, nil
- }
- /////////////////////////////////////////////////////////////////////
- // SECTION: node.Service interface
- /////////////////////////////////////////////////////////////////////
- func (p *Pss) Start(srv *p2p.Server) error {
- go func() {
- ticker := time.NewTicker(defaultCleanInterval)
- cacheTicker := time.NewTicker(p.cacheTTL)
- defer ticker.Stop()
- defer cacheTicker.Stop()
- for {
- select {
- case <-cacheTicker.C:
- p.cleanFwdCache()
- case <-ticker.C:
- p.cleanKeys()
- case <-p.quitC:
- return
- }
- }
- }()
- go func() {
- for {
- select {
- case msg := <-p.outbox:
- err := p.forward(msg)
- if err != nil {
- log.Error(err.Error())
- metrics.GetOrRegisterCounter("pss.forward.err", nil).Inc(1)
- }
- case <-p.quitC:
- return
- }
- }
- }()
- log.Info("Started Pss")
- log.Info("Loaded EC keys", "pubkey", common.ToHex(crypto.FromECDSAPub(p.PublicKey())), "secp256", common.ToHex(crypto.CompressPubkey(p.PublicKey())))
- return nil
- }
- func (p *Pss) Stop() error {
- log.Info("Pss shutting down")
- close(p.quitC)
- return nil
- }
- var pssSpec = &protocols.Spec{
- Name: pssProtocolName,
- Version: pssVersion,
- MaxMsgSize: defaultMaxMsgSize,
- Messages: []interface{}{
- PssMsg{},
- },
- }
- func (p *Pss) Protocols() []p2p.Protocol {
- return []p2p.Protocol{
- {
- Name: pssSpec.Name,
- Version: pssSpec.Version,
- Length: pssSpec.Length(),
- Run: p.Run,
- },
- }
- }
- func (p *Pss) Run(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
- pp := protocols.NewPeer(peer, rw, pssSpec)
- p.fwdPoolMu.Lock()
- p.fwdPool[peer.Info().ID] = pp
- p.fwdPoolMu.Unlock()
- return pp.Run(p.handlePssMsg)
- }
- func (p *Pss) APIs() []rpc.API {
- apis := []rpc.API{
- {
- Namespace: "pss",
- Version: "1.0",
- Service: NewAPI(p),
- Public: true,
- },
- }
- apis = append(apis, p.auxAPIs...)
- return apis
- }
- // add API methods to the pss API
- // must be run before node is started
- func (p *Pss) addAPI(api rpc.API) {
- p.auxAPIs = append(p.auxAPIs, api)
- }
- // Returns the swarm Kademlia address of the pss node
- func (p *Pss) BaseAddr() []byte {
- return p.Kademlia.BaseAddr()
- }
- // Returns the pss node's public key
- func (p *Pss) PublicKey() *ecdsa.PublicKey {
- return &p.privateKey.PublicKey
- }
- /////////////////////////////////////////////////////////////////////
- // SECTION: Message handling
- /////////////////////////////////////////////////////////////////////
- // Links a handler function to a Topic
- //
- // All incoming messages with an envelope Topic matching the
- // topic specified will be passed to the given Handler function.
- //
- // There may be an arbitrary number of handler functions per topic.
- //
- // Returns a deregister function which needs to be called to
- // deregister the handler,
- func (p *Pss) Register(topic *Topic, hndlr *handler) func() {
- p.handlersMu.Lock()
- defer p.handlersMu.Unlock()
- handlers := p.handlers[*topic]
- if handlers == nil {
- handlers = make(map[*handler]bool)
- p.handlers[*topic] = handlers
- log.Debug("registered handler", "caps", hndlr.caps)
- }
- if hndlr.caps == nil {
- hndlr.caps = &handlerCaps{}
- }
- handlers[hndlr] = true
- if _, ok := p.topicHandlerCaps[*topic]; !ok {
- p.topicHandlerCaps[*topic] = &handlerCaps{}
- }
- if hndlr.caps.raw {
- p.topicHandlerCaps[*topic].raw = true
- }
- if hndlr.caps.prox {
- p.topicHandlerCaps[*topic].prox = true
- }
- return func() { p.deregister(topic, hndlr) }
- }
- func (p *Pss) deregister(topic *Topic, hndlr *handler) {
- p.handlersMu.Lock()
- defer p.handlersMu.Unlock()
- handlers := p.handlers[*topic]
- if len(handlers) > 1 {
- delete(p.handlers, *topic)
- // topic caps might have changed now that a handler is gone
- caps := &handlerCaps{}
- for h := range handlers {
- if h.caps.raw {
- caps.raw = true
- }
- if h.caps.prox {
- caps.prox = true
- }
- }
- p.topicHandlerCaps[*topic] = caps
- return
- }
- delete(handlers, hndlr)
- }
- // get all registered handlers for respective topics
- func (p *Pss) getHandlers(topic Topic) map[*handler]bool {
- p.handlersMu.RLock()
- defer p.handlersMu.RUnlock()
- return p.handlers[topic]
- }
- // Filters incoming messages for processing or forwarding.
- // Check if address partially matches
- // If yes, it CAN be for us, and we process it
- // Only passes error to pss protocol handler if payload is not valid pssmsg
- func (p *Pss) handlePssMsg(ctx context.Context, msg interface{}) error {
- metrics.GetOrRegisterCounter("pss.handlepssmsg", nil).Inc(1)
- pssmsg, ok := msg.(*PssMsg)
- if !ok {
- return fmt.Errorf("invalid message type. Expected *PssMsg, got %T ", msg)
- }
- log.Trace("handler", "self", label(p.Kademlia.BaseAddr()), "topic", label(pssmsg.Payload.Topic[:]))
- if int64(pssmsg.Expire) < time.Now().Unix() {
- metrics.GetOrRegisterCounter("pss.expire", nil).Inc(1)
- log.Warn("pss filtered expired message", "from", common.ToHex(p.Kademlia.BaseAddr()), "to", common.ToHex(pssmsg.To))
- return nil
- }
- if p.checkFwdCache(pssmsg) {
- log.Trace("pss relay block-cache match (process)", "from", common.ToHex(p.Kademlia.BaseAddr()), "to", (common.ToHex(pssmsg.To)))
- return nil
- }
- p.addFwdCache(pssmsg)
- psstopic := Topic(pssmsg.Payload.Topic)
- // raw is simplest handler contingency to check, so check that first
- var isRaw bool
- if pssmsg.isRaw() {
- if _, ok := p.topicHandlerCaps[psstopic]; ok {
- if !p.topicHandlerCaps[psstopic].raw {
- log.Debug("No handler for raw message", "topic", psstopic)
- return nil
- }
- }
- isRaw = true
- }
- // check if we can be recipient:
- // - no prox handler on message and partial address matches
- // - prox handler on message and we are in prox regardless of partial address match
- // store this result so we don't calculate again on every handler
- var isProx bool
- if _, ok := p.topicHandlerCaps[psstopic]; ok {
- isProx = p.topicHandlerCaps[psstopic].prox
- }
- isRecipient := p.isSelfPossibleRecipient(pssmsg, isProx)
- if !isRecipient {
- log.Trace("pss was for someone else :'( ... forwarding", "pss", common.ToHex(p.BaseAddr()), "prox", isProx)
- return p.enqueue(pssmsg)
- }
- log.Trace("pss for us, yay! ... let's process!", "pss", common.ToHex(p.BaseAddr()), "prox", isProx, "raw", isRaw, "topic", label(pssmsg.Payload.Topic[:]))
- if err := p.process(pssmsg, isRaw, isProx); err != nil {
- qerr := p.enqueue(pssmsg)
- if qerr != nil {
- return fmt.Errorf("process fail: processerr %v, queueerr: %v", err, qerr)
- }
- }
- return nil
- }
- // Entry point to processing a message for which the current node can be the intended recipient.
- // Attempts symmetric and asymmetric decryption with stored keys.
- // Dispatches message to all handlers matching the message topic
- func (p *Pss) process(pssmsg *PssMsg, raw bool, prox bool) error {
- metrics.GetOrRegisterCounter("pss.process", nil).Inc(1)
- var err error
- var recvmsg *whisper.ReceivedMessage
- var payload []byte
- var from PssAddress
- var asymmetric bool
- var keyid string
- var keyFunc func(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, PssAddress, error)
- envelope := pssmsg.Payload
- psstopic := Topic(envelope.Topic)
- if raw {
- payload = pssmsg.Payload.Data
- } else {
- if pssmsg.isSym() {
- keyFunc = p.processSym
- } else {
- asymmetric = true
- keyFunc = p.processAsym
- }
- recvmsg, keyid, from, err = keyFunc(envelope)
- if err != nil {
- return errors.New("Decryption failed")
- }
- payload = recvmsg.Payload
- }
- if len(pssmsg.To) < addressLength {
- if err := p.enqueue(pssmsg); err != nil {
- return err
- }
- }
- p.executeHandlers(psstopic, payload, from, raw, prox, asymmetric, keyid)
- return nil
- }
- func (p *Pss) executeHandlers(topic Topic, payload []byte, from PssAddress, raw bool, prox bool, asymmetric bool, keyid string) {
- handlers := p.getHandlers(topic)
- peer := p2p.NewPeer(enode.ID{}, fmt.Sprintf("%x", from), []p2p.Cap{})
- for h := range handlers {
- if !h.caps.raw && raw {
- log.Warn("norawhandler")
- continue
- }
- if !h.caps.prox && prox {
- log.Warn("noproxhandler")
- continue
- }
- err := (h.f)(payload, peer, asymmetric, keyid)
- if err != nil {
- log.Warn("Pss handler failed", "err", err)
- }
- }
- }
- // will return false if using partial address
- func (p *Pss) isSelfRecipient(msg *PssMsg) bool {
- return bytes.Equal(msg.To, p.Kademlia.BaseAddr())
- }
- // test match of leftmost bytes in given message to node's Kademlia address
- func (p *Pss) isSelfPossibleRecipient(msg *PssMsg, prox bool) bool {
- local := p.Kademlia.BaseAddr()
- // if a partial address matches we are possible recipient regardless of prox
- // if not and prox is not set, we are surely not
- if bytes.Equal(msg.To, local[:len(msg.To)]) {
- return true
- } else if !prox {
- return false
- }
- depth := p.Kademlia.NeighbourhoodDepth()
- po, _ := network.Pof(p.Kademlia.BaseAddr(), msg.To, 0)
- log.Trace("selfpossible", "po", po, "depth", depth)
- return depth <= po
- }
- /////////////////////////////////////////////////////////////////////
- // SECTION: Encryption
- /////////////////////////////////////////////////////////////////////
- // Links a peer ECDSA public key to a topic
- //
- // This is required for asymmetric message exchange
- // on the given topic
- //
- // The value in `address` will be used as a routing hint for the
- // public key / topic association
- func (p *Pss) SetPeerPublicKey(pubkey *ecdsa.PublicKey, topic Topic, address PssAddress) error {
- if err := validateAddress(address); err != nil {
- return err
- }
- pubkeybytes := crypto.FromECDSAPub(pubkey)
- if len(pubkeybytes) == 0 {
- return fmt.Errorf("invalid public key: %v", pubkey)
- }
- pubkeyid := common.ToHex(pubkeybytes)
- psp := &pssPeer{
- address: address,
- }
- p.pubKeyPoolMu.Lock()
- if _, ok := p.pubKeyPool[pubkeyid]; !ok {
- p.pubKeyPool[pubkeyid] = make(map[Topic]*pssPeer)
- }
- p.pubKeyPool[pubkeyid][topic] = psp
- p.pubKeyPoolMu.Unlock()
- log.Trace("added pubkey", "pubkeyid", pubkeyid, "topic", topic, "address", address)
- return nil
- }
- // Automatically generate a new symkey for a topic and address hint
- func (p *Pss) GenerateSymmetricKey(topic Topic, address PssAddress, addToCache bool) (string, error) {
- keyid, err := p.w.GenerateSymKey()
- if err != nil {
- return "", err
- }
- p.addSymmetricKeyToPool(keyid, topic, address, addToCache, false)
- return keyid, nil
- }
- // Links a peer symmetric key (arbitrary byte sequence) to a topic
- //
- // This is required for symmetrically encrypted message exchange
- // on the given topic
- //
- // The key is stored in the whisper backend.
- //
- // If addtocache is set to true, the key will be added to the cache of keys
- // used to attempt symmetric decryption of incoming messages.
- //
- // Returns a string id that can be used to retrieve the key bytes
- // from the whisper backend (see pss.GetSymmetricKey())
- func (p *Pss) SetSymmetricKey(key []byte, topic Topic, address PssAddress, addtocache bool) (string, error) {
- if err := validateAddress(address); err != nil {
- return "", err
- }
- return p.setSymmetricKey(key, topic, address, addtocache, true)
- }
- func (p *Pss) setSymmetricKey(key []byte, topic Topic, address PssAddress, addtocache bool, protected bool) (string, error) {
- keyid, err := p.w.AddSymKeyDirect(key)
- if err != nil {
- return "", err
- }
- p.addSymmetricKeyToPool(keyid, topic, address, addtocache, protected)
- return keyid, nil
- }
- // adds a symmetric key to the pss key pool, and optionally adds the key
- // to the collection of keys used to attempt symmetric decryption of
- // incoming messages
- func (p *Pss) addSymmetricKeyToPool(keyid string, topic Topic, address PssAddress, addtocache bool, protected bool) {
- psp := &pssPeer{
- address: address,
- protected: protected,
- }
- p.symKeyPoolMu.Lock()
- if _, ok := p.symKeyPool[keyid]; !ok {
- p.symKeyPool[keyid] = make(map[Topic]*pssPeer)
- }
- p.symKeyPool[keyid][topic] = psp
- p.symKeyPoolMu.Unlock()
- if addtocache {
- p.symKeyDecryptCacheCursor++
- p.symKeyDecryptCache[p.symKeyDecryptCacheCursor%cap(p.symKeyDecryptCache)] = &keyid
- }
- key, _ := p.GetSymmetricKey(keyid)
- log.Trace("added symkey", "symkeyid", keyid, "symkey", common.ToHex(key), "topic", topic, "address", address, "cache", addtocache)
- }
- // Returns a symmetric key byte seqyence stored in the whisper backend
- // by its unique id
- //
- // Passes on the error value from the whisper backend
- func (p *Pss) GetSymmetricKey(symkeyid string) ([]byte, error) {
- symkey, err := p.w.GetSymKey(symkeyid)
- if err != nil {
- return nil, err
- }
- return symkey, nil
- }
- // Returns all recorded topic and address combination for a specific public key
- func (p *Pss) GetPublickeyPeers(keyid string) (topic []Topic, address []PssAddress, err error) {
- p.pubKeyPoolMu.RLock()
- defer p.pubKeyPoolMu.RUnlock()
- for t, peer := range p.pubKeyPool[keyid] {
- topic = append(topic, t)
- address = append(address, peer.address)
- }
- return topic, address, nil
- }
- func (p *Pss) getPeerAddress(keyid string, topic Topic) (PssAddress, error) {
- p.pubKeyPoolMu.RLock()
- defer p.pubKeyPoolMu.RUnlock()
- if peers, ok := p.pubKeyPool[keyid]; ok {
- if t, ok := peers[topic]; ok {
- return t.address, nil
- }
- }
- return nil, fmt.Errorf("peer with pubkey %s, topic %x not found", keyid, topic)
- }
- // Attempt to decrypt, validate and unpack a
- // symmetrically encrypted message
- // If successful, returns the unpacked whisper ReceivedMessage struct
- // encapsulating the decrypted message, and the whisper backend id
- // of the symmetric key used to decrypt the message.
- // It fails if decryption of the message fails or if the message is corrupted
- func (p *Pss) processSym(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, PssAddress, error) {
- metrics.GetOrRegisterCounter("pss.process.sym", nil).Inc(1)
- for i := p.symKeyDecryptCacheCursor; i > p.symKeyDecryptCacheCursor-cap(p.symKeyDecryptCache) && i > 0; i-- {
- symkeyid := p.symKeyDecryptCache[i%cap(p.symKeyDecryptCache)]
- symkey, err := p.w.GetSymKey(*symkeyid)
- if err != nil {
- continue
- }
- recvmsg, err := envelope.OpenSymmetric(symkey)
- if err != nil {
- continue
- }
- if !recvmsg.Validate() {
- return nil, "", nil, fmt.Errorf("symmetrically encrypted message has invalid signature or is corrupt")
- }
- p.symKeyPoolMu.Lock()
- from := p.symKeyPool[*symkeyid][Topic(envelope.Topic)].address
- p.symKeyPoolMu.Unlock()
- p.symKeyDecryptCacheCursor++
- p.symKeyDecryptCache[p.symKeyDecryptCacheCursor%cap(p.symKeyDecryptCache)] = symkeyid
- return recvmsg, *symkeyid, from, nil
- }
- return nil, "", nil, fmt.Errorf("could not decrypt message")
- }
- // Attempt to decrypt, validate and unpack an
- // asymmetrically encrypted message
- // If successful, returns the unpacked whisper ReceivedMessage struct
- // encapsulating the decrypted message, and the byte representation of
- // the public key used to decrypt the message.
- // It fails if decryption of message fails, or if the message is corrupted
- func (p *Pss) processAsym(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, PssAddress, error) {
- metrics.GetOrRegisterCounter("pss.process.asym", nil).Inc(1)
- recvmsg, err := envelope.OpenAsymmetric(p.privateKey)
- if err != nil {
- return nil, "", nil, fmt.Errorf("could not decrypt message: %s", err)
- }
- // check signature (if signed), strip padding
- if !recvmsg.Validate() {
- return nil, "", nil, fmt.Errorf("invalid message")
- }
- pubkeyid := common.ToHex(crypto.FromECDSAPub(recvmsg.Src))
- var from PssAddress
- p.pubKeyPoolMu.Lock()
- if p.pubKeyPool[pubkeyid][Topic(envelope.Topic)] != nil {
- from = p.pubKeyPool[pubkeyid][Topic(envelope.Topic)].address
- }
- p.pubKeyPoolMu.Unlock()
- return recvmsg, pubkeyid, from, nil
- }
- // Symkey garbage collection
- // a key is removed if:
- // - it is not marked as protected
- // - it is not in the incoming decryption cache
- func (p *Pss) cleanKeys() (count int) {
- for keyid, peertopics := range p.symKeyPool {
- var expiredtopics []Topic
- for topic, psp := range peertopics {
- if psp.protected {
- continue
- }
- var match bool
- for i := p.symKeyDecryptCacheCursor; i > p.symKeyDecryptCacheCursor-cap(p.symKeyDecryptCache) && i > 0; i-- {
- cacheid := p.symKeyDecryptCache[i%cap(p.symKeyDecryptCache)]
- if *cacheid == keyid {
- match = true
- }
- }
- if !match {
- expiredtopics = append(expiredtopics, topic)
- }
- }
- for _, topic := range expiredtopics {
- p.symKeyPoolMu.Lock()
- delete(p.symKeyPool[keyid], topic)
- log.Trace("symkey cleanup deletion", "symkeyid", keyid, "topic", topic, "val", p.symKeyPool[keyid])
- p.symKeyPoolMu.Unlock()
- count++
- }
- }
- return
- }
- /////////////////////////////////////////////////////////////////////
- // SECTION: Message sending
- /////////////////////////////////////////////////////////////////////
- func (p *Pss) enqueue(msg *PssMsg) error {
- select {
- case p.outbox <- msg:
- return nil
- default:
- }
- metrics.GetOrRegisterCounter("pss.enqueue.outbox.full", nil).Inc(1)
- return errors.New("outbox full")
- }
- // Send a raw message (any encryption is responsibility of calling client)
- //
- // Will fail if raw messages are disallowed
- func (p *Pss) SendRaw(address PssAddress, topic Topic, msg []byte) error {
- if err := validateAddress(address); err != nil {
- return err
- }
- pssMsgParams := &msgParams{
- raw: true,
- }
- payload := &whisper.Envelope{
- Data: msg,
- Topic: whisper.TopicType(topic),
- }
- pssMsg := newPssMsg(pssMsgParams)
- pssMsg.To = address
- pssMsg.Expire = uint32(time.Now().Add(p.msgTTL).Unix())
- pssMsg.Payload = payload
- p.addFwdCache(pssMsg)
- err := p.enqueue(pssMsg)
- if err != nil {
- return err
- }
- // if we have a proxhandler on this topic
- // also deliver message to ourselves
- if _, ok := p.topicHandlerCaps[topic]; ok {
- if p.isSelfPossibleRecipient(pssMsg, true) && p.topicHandlerCaps[topic].prox {
- return p.process(pssMsg, true, true)
- }
- }
- return nil
- }
- // Send a message using symmetric encryption
- //
- // Fails if the key id does not match any of the stored symmetric keys
- func (p *Pss) SendSym(symkeyid string, topic Topic, msg []byte) error {
- symkey, err := p.GetSymmetricKey(symkeyid)
- if err != nil {
- return fmt.Errorf("missing valid send symkey %s: %v", symkeyid, err)
- }
- p.symKeyPoolMu.Lock()
- psp, ok := p.symKeyPool[symkeyid][topic]
- p.symKeyPoolMu.Unlock()
- if !ok {
- return fmt.Errorf("invalid topic '%s' for symkey '%s'", topic.String(), symkeyid)
- }
- return p.send(psp.address, topic, msg, false, symkey)
- }
- // Send a message using asymmetric encryption
- //
- // Fails if the key id does not match any in of the stored public keys
- func (p *Pss) SendAsym(pubkeyid string, topic Topic, msg []byte) error {
- if _, err := crypto.UnmarshalPubkey(common.FromHex(pubkeyid)); err != nil {
- return fmt.Errorf("Cannot unmarshal pubkey: %x", pubkeyid)
- }
- p.pubKeyPoolMu.Lock()
- psp, ok := p.pubKeyPool[pubkeyid][topic]
- p.pubKeyPoolMu.Unlock()
- if !ok {
- return fmt.Errorf("invalid topic '%s' for pubkey '%s'", topic.String(), pubkeyid)
- }
- return p.send(psp.address, topic, msg, true, common.FromHex(pubkeyid))
- }
- // Send is payload agnostic, and will accept any byte slice as payload
- // It generates an whisper envelope for the specified recipient and topic,
- // and wraps the message payload in it.
- // TODO: Implement proper message padding
- func (p *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key []byte) error {
- metrics.GetOrRegisterCounter("pss.send", nil).Inc(1)
- if key == nil || bytes.Equal(key, []byte{}) {
- return fmt.Errorf("Zero length key passed to pss send")
- }
- padding := make([]byte, p.paddingByteSize)
- c, err := rand.Read(padding)
- if err != nil {
- return err
- } else if c < p.paddingByteSize {
- return fmt.Errorf("invalid padding length: %d", c)
- }
- wparams := &whisper.MessageParams{
- TTL: defaultWhisperTTL,
- Src: p.privateKey,
- Topic: whisper.TopicType(topic),
- WorkTime: defaultWhisperWorkTime,
- PoW: defaultWhisperPoW,
- Payload: msg,
- Padding: padding,
- }
- if asymmetric {
- pk, err := crypto.UnmarshalPubkey(key)
- if err != nil {
- return fmt.Errorf("Cannot unmarshal pubkey: %x", key)
- }
- wparams.Dst = pk
- } else {
- wparams.KeySym = key
- }
- // set up outgoing message container, which does encryption and envelope wrapping
- woutmsg, err := whisper.NewSentMessage(wparams)
- if err != nil {
- return fmt.Errorf("failed to generate whisper message encapsulation: %v", err)
- }
- // performs encryption.
- // Does NOT perform / performs negligible PoW due to very low difficulty setting
- // after this the message is ready for sending
- envelope, err := woutmsg.Wrap(wparams)
- if err != nil {
- return fmt.Errorf("failed to perform whisper encryption: %v", err)
- }
- log.Trace("pssmsg whisper done", "env", envelope, "wparams payload", common.ToHex(wparams.Payload), "to", common.ToHex(to), "asym", asymmetric, "key", common.ToHex(key))
- // prepare for devp2p transport
- pssMsgParams := &msgParams{
- sym: !asymmetric,
- }
- pssMsg := newPssMsg(pssMsgParams)
- pssMsg.To = to
- pssMsg.Expire = uint32(time.Now().Add(p.msgTTL).Unix())
- pssMsg.Payload = envelope
- err = p.enqueue(pssMsg)
- if err != nil {
- return err
- }
- if _, ok := p.topicHandlerCaps[topic]; ok {
- if p.isSelfPossibleRecipient(pssMsg, true) && p.topicHandlerCaps[topic].prox {
- return p.process(pssMsg, true, true)
- }
- }
- return nil
- }
- // sendFunc is a helper function that tries to send a message and returns true on success.
- // It is set here for usage in production, and optionally overridden in tests.
- var sendFunc func(p *Pss, sp *network.Peer, msg *PssMsg) bool = sendMsg
- // tries to send a message, returns true if successful
- func sendMsg(p *Pss, sp *network.Peer, msg *PssMsg) bool {
- var isPssEnabled bool
- info := sp.Info()
- for _, capability := range info.Caps {
- if capability == p.capstring {
- isPssEnabled = true
- break
- }
- }
- if !isPssEnabled {
- log.Error("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps)
- return false
- }
- // get the protocol peer from the forwarding peer cache
- p.fwdPoolMu.RLock()
- pp := p.fwdPool[sp.Info().ID]
- p.fwdPoolMu.RUnlock()
- err := pp.Send(context.TODO(), msg)
- if err != nil {
- metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1)
- log.Error(err.Error())
- }
- return err == nil
- }
- // Forwards a pss message to the peer(s) based on recipient address according to the algorithm
- // described below. The recipient address can be of any length, and the byte slice will be matched
- // to the MSB slice of the peer address of the equivalent length.
- //
- // If the recipient address (or partial address) is within the neighbourhood depth of the forwarding
- // node, then it will be forwarded to all the nearest neighbours of the forwarding node. In case of
- // partial address, it should be forwarded to all the peers matching the partial address, if there
- // are any; otherwise only to one peer, closest to the recipient address. In any case, if the message
- // forwarding fails, the node should try to forward it to the next best peer, until the message is
- // successfully forwarded to at least one peer.
- func (p *Pss) forward(msg *PssMsg) error {
- metrics.GetOrRegisterCounter("pss.forward", nil).Inc(1)
- sent := 0 // number of successful sends
- to := make([]byte, addressLength)
- copy(to[:len(msg.To)], msg.To)
- neighbourhoodDepth := p.Kademlia.NeighbourhoodDepth()
- // luminosity is the opposite of darkness. the more bytes are removed from the address, the higher is darkness,
- // but the luminosity is less. here luminosity equals the number of bits given in the destination address.
- luminosityRadius := len(msg.To) * 8
- // proximity order function matching up to neighbourhoodDepth bits (po <= neighbourhoodDepth)
- pof := pot.DefaultPof(neighbourhoodDepth)
- // soft threshold for msg broadcast
- broadcastThreshold, _ := pof(to, p.BaseAddr(), 0)
- if broadcastThreshold > luminosityRadius {
- broadcastThreshold = luminosityRadius
- }
- var onlySendOnce bool // indicates if the message should only be sent to one peer with closest address
- // if measured from the recipient address as opposed to the base address (see Kademlia.EachConn
- // call below), then peers that fall in the same proximity bin as recipient address will appear
- // [at least] one bit closer, but only if these additional bits are given in the recipient address.
- if broadcastThreshold < luminosityRadius && broadcastThreshold < neighbourhoodDepth {
- broadcastThreshold++
- onlySendOnce = true
- }
- p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int, _ bool) bool {
- if po < broadcastThreshold && sent > 0 {
- return false // stop iterating
- }
- if sendFunc(p, sp, msg) {
- sent++
- if onlySendOnce {
- return false
- }
- if po == addressLength*8 {
- // stop iterating if successfully sent to the exact recipient (perfect match of full address)
- return false
- }
- }
- return true
- })
- // if we failed to send to anyone, re-insert message in the send-queue
- if sent == 0 {
- log.Debug("unable to forward to any peers")
- if err := p.enqueue(msg); err != nil {
- metrics.GetOrRegisterCounter("pss.forward.enqueue.error", nil).Inc(1)
- log.Error(err.Error())
- return err
- }
- }
- // cache the message
- p.addFwdCache(msg)
- return nil
- }
- /////////////////////////////////////////////////////////////////////
- // SECTION: Caching
- /////////////////////////////////////////////////////////////////////
- // cleanFwdCache is used to periodically remove expired entries from the forward cache
- func (p *Pss) cleanFwdCache() {
- metrics.GetOrRegisterCounter("pss.cleanfwdcache", nil).Inc(1)
- p.fwdCacheMu.Lock()
- defer p.fwdCacheMu.Unlock()
- for k, v := range p.fwdCache {
- if v.expiresAt.Before(time.Now()) {
- delete(p.fwdCache, k)
- }
- }
- }
- func label(b []byte) string {
- return fmt.Sprintf("%04x", b[:2])
- }
- // add a message to the cache
- func (p *Pss) addFwdCache(msg *PssMsg) error {
- metrics.GetOrRegisterCounter("pss.addfwdcache", nil).Inc(1)
- var entry pssCacheEntry
- var ok bool
- p.fwdCacheMu.Lock()
- defer p.fwdCacheMu.Unlock()
- digest := p.digest(msg)
- if entry, ok = p.fwdCache[digest]; !ok {
- entry = pssCacheEntry{}
- }
- entry.expiresAt = time.Now().Add(p.cacheTTL)
- p.fwdCache[digest] = entry
- return nil
- }
- // check if message is in the cache
- func (p *Pss) checkFwdCache(msg *PssMsg) bool {
- p.fwdCacheMu.Lock()
- defer p.fwdCacheMu.Unlock()
- digest := p.digest(msg)
- entry, ok := p.fwdCache[digest]
- if ok {
- if entry.expiresAt.After(time.Now()) {
- log.Trace("unexpired cache", "digest", fmt.Sprintf("%x", digest))
- metrics.GetOrRegisterCounter("pss.checkfwdcache.unexpired", nil).Inc(1)
- return true
- }
- metrics.GetOrRegisterCounter("pss.checkfwdcache.expired", nil).Inc(1)
- }
- return false
- }
- // Digest of message
- func (p *Pss) digest(msg *PssMsg) pssDigest {
- return p.digestBytes(msg.serialize())
- }
- func (p *Pss) digestBytes(msg []byte) pssDigest {
- hasher := p.hashPool.Get().(hash.Hash)
- defer p.hashPool.Put(hasher)
- hasher.Reset()
- hasher.Write(msg)
- digest := pssDigest{}
- key := hasher.Sum(nil)
- copy(digest[:], key[:digestLength])
- return digest
- }
- func validateAddress(addr PssAddress) error {
- if len(addr) > addressLength {
- return errors.New("address too long")
- }
- return nil
- }
|