pss.go 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package pss
  17. import (
  18. "bytes"
  19. "context"
  20. "crypto/ecdsa"
  21. "crypto/rand"
  22. "errors"
  23. "fmt"
  24. "hash"
  25. "sync"
  26. "time"
  27. "github.com/ethereum/go-ethereum/common"
  28. "github.com/ethereum/go-ethereum/crypto"
  29. "github.com/ethereum/go-ethereum/metrics"
  30. "github.com/ethereum/go-ethereum/p2p"
  31. "github.com/ethereum/go-ethereum/p2p/enode"
  32. "github.com/ethereum/go-ethereum/p2p/protocols"
  33. "github.com/ethereum/go-ethereum/rpc"
  34. "github.com/ethereum/go-ethereum/swarm/log"
  35. "github.com/ethereum/go-ethereum/swarm/network"
  36. "github.com/ethereum/go-ethereum/swarm/pot"
  37. "github.com/ethereum/go-ethereum/swarm/storage"
  38. whisper "github.com/ethereum/go-ethereum/whisper/whisperv5"
  39. "golang.org/x/crypto/sha3"
  40. )
  41. const (
  42. defaultPaddingByteSize = 16
  43. DefaultMsgTTL = time.Second * 120
  44. defaultDigestCacheTTL = time.Second * 10
  45. defaultSymKeyCacheCapacity = 512
  46. digestLength = 32 // byte length of digest used for pss cache (currently same as swarm chunk hash)
  47. defaultWhisperWorkTime = 3
  48. defaultWhisperPoW = 0.0000000001
  49. defaultMaxMsgSize = 1024 * 1024
  50. defaultCleanInterval = time.Second * 60 * 10
  51. defaultOutboxCapacity = 100000
  52. pssProtocolName = "pss"
  53. pssVersion = 2
  54. hasherCount = 8
  55. )
  56. var (
  57. addressLength = len(pot.Address{})
  58. )
  59. // cache is used for preventing backwards routing
  60. // will also be instrumental in flood guard mechanism
  61. // and mailbox implementation
  62. type pssCacheEntry struct {
  63. expiresAt time.Time
  64. }
  65. // abstraction to enable access to p2p.protocols.Peer.Send
  66. type senderPeer interface {
  67. Info() *p2p.PeerInfo
  68. ID() enode.ID
  69. Address() []byte
  70. Send(context.Context, interface{}) error
  71. }
  72. // per-key peer related information
  73. // member `protected` prevents garbage collection of the instance
  74. type pssPeer struct {
  75. lastSeen time.Time
  76. address PssAddress
  77. protected bool
  78. }
  79. // Pss configuration parameters
  80. type PssParams struct {
  81. MsgTTL time.Duration
  82. CacheTTL time.Duration
  83. privateKey *ecdsa.PrivateKey
  84. SymKeyCacheCapacity int
  85. AllowRaw bool // If true, enables sending and receiving messages without builtin pss encryption
  86. }
  87. // Sane defaults for Pss
  88. func NewPssParams() *PssParams {
  89. return &PssParams{
  90. MsgTTL: DefaultMsgTTL,
  91. CacheTTL: defaultDigestCacheTTL,
  92. SymKeyCacheCapacity: defaultSymKeyCacheCapacity,
  93. }
  94. }
  95. func (params *PssParams) WithPrivateKey(privatekey *ecdsa.PrivateKey) *PssParams {
  96. params.privateKey = privatekey
  97. return params
  98. }
  99. // Toplevel pss object, takes care of message sending, receiving, decryption and encryption, message handler dispatchers and message forwarding.
  100. //
  101. // Implements node.Service
  102. type Pss struct {
  103. *network.Kademlia // we can get the Kademlia address from this
  104. privateKey *ecdsa.PrivateKey // pss can have it's own independent key
  105. w *whisper.Whisper // key and encryption backend
  106. auxAPIs []rpc.API // builtins (handshake, test) can add APIs
  107. // sending and forwarding
  108. fwdPool map[string]*protocols.Peer // keep track of all peers sitting on the pssmsg routing layer
  109. fwdPoolMu sync.RWMutex
  110. fwdCache map[pssDigest]pssCacheEntry // checksum of unique fields from pssmsg mapped to expiry, cache to determine whether to drop msg
  111. fwdCacheMu sync.RWMutex
  112. cacheTTL time.Duration // how long to keep messages in fwdCache (not implemented)
  113. msgTTL time.Duration
  114. paddingByteSize int
  115. capstring string
  116. outbox chan *PssMsg
  117. // keys and peers
  118. pubKeyPool map[string]map[Topic]*pssPeer // mapping of hex public keys to peer address by topic.
  119. pubKeyPoolMu sync.RWMutex
  120. symKeyPool map[string]map[Topic]*pssPeer // mapping of symkeyids to peer address by topic.
  121. symKeyPoolMu sync.RWMutex
  122. symKeyDecryptCache []*string // fast lookup of symkeys recently used for decryption; last used is on top of stack
  123. symKeyDecryptCacheCursor int // modular cursor pointing to last used, wraps on symKeyDecryptCache array
  124. symKeyDecryptCacheCapacity int // max amount of symkeys to keep.
  125. // message handling
  126. handlers map[Topic]map[*handler]bool // topic and version based pss payload handlers. See pss.Handle()
  127. handlersMu sync.RWMutex
  128. hashPool sync.Pool
  129. topicHandlerCaps map[Topic]*handlerCaps // caches capabilities of each topic's handlers (see handlerCap* consts in types.go)
  130. // process
  131. quitC chan struct{}
  132. }
  133. func (p *Pss) String() string {
  134. return fmt.Sprintf("pss: addr %x, pubkey %v", p.BaseAddr(), common.ToHex(crypto.FromECDSAPub(&p.privateKey.PublicKey)))
  135. }
  136. // Creates a new Pss instance.
  137. //
  138. // In addition to params, it takes a swarm network Kademlia
  139. // and a FileStore storage for message cache storage.
  140. func NewPss(k *network.Kademlia, params *PssParams) (*Pss, error) {
  141. if params.privateKey == nil {
  142. return nil, errors.New("missing private key for pss")
  143. }
  144. cap := p2p.Cap{
  145. Name: pssProtocolName,
  146. Version: pssVersion,
  147. }
  148. ps := &Pss{
  149. Kademlia: k,
  150. privateKey: params.privateKey,
  151. w: whisper.New(&whisper.DefaultConfig),
  152. quitC: make(chan struct{}),
  153. fwdPool: make(map[string]*protocols.Peer),
  154. fwdCache: make(map[pssDigest]pssCacheEntry),
  155. cacheTTL: params.CacheTTL,
  156. msgTTL: params.MsgTTL,
  157. paddingByteSize: defaultPaddingByteSize,
  158. capstring: cap.String(),
  159. outbox: make(chan *PssMsg, defaultOutboxCapacity),
  160. pubKeyPool: make(map[string]map[Topic]*pssPeer),
  161. symKeyPool: make(map[string]map[Topic]*pssPeer),
  162. symKeyDecryptCache: make([]*string, params.SymKeyCacheCapacity),
  163. symKeyDecryptCacheCapacity: params.SymKeyCacheCapacity,
  164. handlers: make(map[Topic]map[*handler]bool),
  165. topicHandlerCaps: make(map[Topic]*handlerCaps),
  166. hashPool: sync.Pool{
  167. New: func() interface{} {
  168. return sha3.NewLegacyKeccak256()
  169. },
  170. },
  171. }
  172. for i := 0; i < hasherCount; i++ {
  173. hashfunc := storage.MakeHashFunc(storage.DefaultHash)()
  174. ps.hashPool.Put(hashfunc)
  175. }
  176. return ps, nil
  177. }
  178. /////////////////////////////////////////////////////////////////////
  179. // SECTION: node.Service interface
  180. /////////////////////////////////////////////////////////////////////
  181. func (p *Pss) Start(srv *p2p.Server) error {
  182. go func() {
  183. ticker := time.NewTicker(defaultCleanInterval)
  184. cacheTicker := time.NewTicker(p.cacheTTL)
  185. defer ticker.Stop()
  186. defer cacheTicker.Stop()
  187. for {
  188. select {
  189. case <-cacheTicker.C:
  190. p.cleanFwdCache()
  191. case <-ticker.C:
  192. p.cleanKeys()
  193. case <-p.quitC:
  194. return
  195. }
  196. }
  197. }()
  198. go func() {
  199. for {
  200. select {
  201. case msg := <-p.outbox:
  202. err := p.forward(msg)
  203. if err != nil {
  204. log.Error(err.Error())
  205. metrics.GetOrRegisterCounter("pss.forward.err", nil).Inc(1)
  206. }
  207. case <-p.quitC:
  208. return
  209. }
  210. }
  211. }()
  212. log.Info("Started Pss")
  213. log.Info("Loaded EC keys", "pubkey", common.ToHex(crypto.FromECDSAPub(p.PublicKey())), "secp256", common.ToHex(crypto.CompressPubkey(p.PublicKey())))
  214. return nil
  215. }
  216. func (p *Pss) Stop() error {
  217. log.Info("Pss shutting down")
  218. close(p.quitC)
  219. return nil
  220. }
  221. var pssSpec = &protocols.Spec{
  222. Name: pssProtocolName,
  223. Version: pssVersion,
  224. MaxMsgSize: defaultMaxMsgSize,
  225. Messages: []interface{}{
  226. PssMsg{},
  227. },
  228. }
  229. func (p *Pss) Protocols() []p2p.Protocol {
  230. return []p2p.Protocol{
  231. {
  232. Name: pssSpec.Name,
  233. Version: pssSpec.Version,
  234. Length: pssSpec.Length(),
  235. Run: p.Run,
  236. },
  237. }
  238. }
  239. func (p *Pss) Run(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
  240. pp := protocols.NewPeer(peer, rw, pssSpec)
  241. p.fwdPoolMu.Lock()
  242. p.fwdPool[peer.Info().ID] = pp
  243. p.fwdPoolMu.Unlock()
  244. return pp.Run(p.handlePssMsg)
  245. }
  246. func (p *Pss) APIs() []rpc.API {
  247. apis := []rpc.API{
  248. {
  249. Namespace: "pss",
  250. Version: "1.0",
  251. Service: NewAPI(p),
  252. Public: true,
  253. },
  254. }
  255. apis = append(apis, p.auxAPIs...)
  256. return apis
  257. }
  258. // add API methods to the pss API
  259. // must be run before node is started
  260. func (p *Pss) addAPI(api rpc.API) {
  261. p.auxAPIs = append(p.auxAPIs, api)
  262. }
  263. // Returns the swarm Kademlia address of the pss node
  264. func (p *Pss) BaseAddr() []byte {
  265. return p.Kademlia.BaseAddr()
  266. }
  267. // Returns the pss node's public key
  268. func (p *Pss) PublicKey() *ecdsa.PublicKey {
  269. return &p.privateKey.PublicKey
  270. }
  271. /////////////////////////////////////////////////////////////////////
  272. // SECTION: Message handling
  273. /////////////////////////////////////////////////////////////////////
  274. // Links a handler function to a Topic
  275. //
  276. // All incoming messages with an envelope Topic matching the
  277. // topic specified will be passed to the given Handler function.
  278. //
  279. // There may be an arbitrary number of handler functions per topic.
  280. //
  281. // Returns a deregister function which needs to be called to
  282. // deregister the handler,
  283. func (p *Pss) Register(topic *Topic, hndlr *handler) func() {
  284. p.handlersMu.Lock()
  285. defer p.handlersMu.Unlock()
  286. handlers := p.handlers[*topic]
  287. if handlers == nil {
  288. handlers = make(map[*handler]bool)
  289. p.handlers[*topic] = handlers
  290. log.Debug("registered handler", "caps", hndlr.caps)
  291. }
  292. if hndlr.caps == nil {
  293. hndlr.caps = &handlerCaps{}
  294. }
  295. handlers[hndlr] = true
  296. if _, ok := p.topicHandlerCaps[*topic]; !ok {
  297. p.topicHandlerCaps[*topic] = &handlerCaps{}
  298. }
  299. if hndlr.caps.raw {
  300. p.topicHandlerCaps[*topic].raw = true
  301. }
  302. if hndlr.caps.prox {
  303. p.topicHandlerCaps[*topic].prox = true
  304. }
  305. return func() { p.deregister(topic, hndlr) }
  306. }
  307. func (p *Pss) deregister(topic *Topic, hndlr *handler) {
  308. p.handlersMu.Lock()
  309. defer p.handlersMu.Unlock()
  310. handlers := p.handlers[*topic]
  311. if len(handlers) > 1 {
  312. delete(p.handlers, *topic)
  313. // topic caps might have changed now that a handler is gone
  314. caps := &handlerCaps{}
  315. for h := range handlers {
  316. if h.caps.raw {
  317. caps.raw = true
  318. }
  319. if h.caps.prox {
  320. caps.prox = true
  321. }
  322. }
  323. p.topicHandlerCaps[*topic] = caps
  324. return
  325. }
  326. delete(handlers, hndlr)
  327. }
  328. // get all registered handlers for respective topics
  329. func (p *Pss) getHandlers(topic Topic) map[*handler]bool {
  330. p.handlersMu.RLock()
  331. defer p.handlersMu.RUnlock()
  332. return p.handlers[topic]
  333. }
  334. // Filters incoming messages for processing or forwarding.
  335. // Check if address partially matches
  336. // If yes, it CAN be for us, and we process it
  337. // Only passes error to pss protocol handler if payload is not valid pssmsg
  338. func (p *Pss) handlePssMsg(ctx context.Context, msg interface{}) error {
  339. metrics.GetOrRegisterCounter("pss.handlepssmsg", nil).Inc(1)
  340. pssmsg, ok := msg.(*PssMsg)
  341. if !ok {
  342. return fmt.Errorf("invalid message type. Expected *PssMsg, got %T ", msg)
  343. }
  344. log.Trace("handler", "self", label(p.Kademlia.BaseAddr()), "topic", label(pssmsg.Payload.Topic[:]))
  345. if int64(pssmsg.Expire) < time.Now().Unix() {
  346. metrics.GetOrRegisterCounter("pss.expire", nil).Inc(1)
  347. log.Warn("pss filtered expired message", "from", common.ToHex(p.Kademlia.BaseAddr()), "to", common.ToHex(pssmsg.To))
  348. return nil
  349. }
  350. if p.checkFwdCache(pssmsg) {
  351. log.Trace("pss relay block-cache match (process)", "from", common.ToHex(p.Kademlia.BaseAddr()), "to", (common.ToHex(pssmsg.To)))
  352. return nil
  353. }
  354. p.addFwdCache(pssmsg)
  355. psstopic := Topic(pssmsg.Payload.Topic)
  356. // raw is simplest handler contingency to check, so check that first
  357. var isRaw bool
  358. if pssmsg.isRaw() {
  359. if _, ok := p.topicHandlerCaps[psstopic]; ok {
  360. if !p.topicHandlerCaps[psstopic].raw {
  361. log.Debug("No handler for raw message", "topic", psstopic)
  362. return nil
  363. }
  364. }
  365. isRaw = true
  366. }
  367. // check if we can be recipient:
  368. // - no prox handler on message and partial address matches
  369. // - prox handler on message and we are in prox regardless of partial address match
  370. // store this result so we don't calculate again on every handler
  371. var isProx bool
  372. if _, ok := p.topicHandlerCaps[psstopic]; ok {
  373. isProx = p.topicHandlerCaps[psstopic].prox
  374. }
  375. isRecipient := p.isSelfPossibleRecipient(pssmsg, isProx)
  376. if !isRecipient {
  377. log.Trace("pss was for someone else :'( ... forwarding", "pss", common.ToHex(p.BaseAddr()), "prox", isProx)
  378. return p.enqueue(pssmsg)
  379. }
  380. log.Trace("pss for us, yay! ... let's process!", "pss", common.ToHex(p.BaseAddr()), "prox", isProx, "raw", isRaw, "topic", label(pssmsg.Payload.Topic[:]))
  381. if err := p.process(pssmsg, isRaw, isProx); err != nil {
  382. qerr := p.enqueue(pssmsg)
  383. if qerr != nil {
  384. return fmt.Errorf("process fail: processerr %v, queueerr: %v", err, qerr)
  385. }
  386. }
  387. return nil
  388. }
  389. // Entry point to processing a message for which the current node can be the intended recipient.
  390. // Attempts symmetric and asymmetric decryption with stored keys.
  391. // Dispatches message to all handlers matching the message topic
  392. func (p *Pss) process(pssmsg *PssMsg, raw bool, prox bool) error {
  393. metrics.GetOrRegisterCounter("pss.process", nil).Inc(1)
  394. var err error
  395. var recvmsg *whisper.ReceivedMessage
  396. var payload []byte
  397. var from PssAddress
  398. var asymmetric bool
  399. var keyid string
  400. var keyFunc func(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, PssAddress, error)
  401. envelope := pssmsg.Payload
  402. psstopic := Topic(envelope.Topic)
  403. if raw {
  404. payload = pssmsg.Payload.Data
  405. } else {
  406. if pssmsg.isSym() {
  407. keyFunc = p.processSym
  408. } else {
  409. asymmetric = true
  410. keyFunc = p.processAsym
  411. }
  412. recvmsg, keyid, from, err = keyFunc(envelope)
  413. if err != nil {
  414. return errors.New("Decryption failed")
  415. }
  416. payload = recvmsg.Payload
  417. }
  418. if len(pssmsg.To) < addressLength {
  419. if err := p.enqueue(pssmsg); err != nil {
  420. return err
  421. }
  422. }
  423. p.executeHandlers(psstopic, payload, from, raw, prox, asymmetric, keyid)
  424. return nil
  425. }
  426. func (p *Pss) executeHandlers(topic Topic, payload []byte, from PssAddress, raw bool, prox bool, asymmetric bool, keyid string) {
  427. handlers := p.getHandlers(topic)
  428. peer := p2p.NewPeer(enode.ID{}, fmt.Sprintf("%x", from), []p2p.Cap{})
  429. for h := range handlers {
  430. if !h.caps.raw && raw {
  431. log.Warn("norawhandler")
  432. continue
  433. }
  434. if !h.caps.prox && prox {
  435. log.Warn("noproxhandler")
  436. continue
  437. }
  438. err := (h.f)(payload, peer, asymmetric, keyid)
  439. if err != nil {
  440. log.Warn("Pss handler failed", "err", err)
  441. }
  442. }
  443. }
  444. // will return false if using partial address
  445. func (p *Pss) isSelfRecipient(msg *PssMsg) bool {
  446. return bytes.Equal(msg.To, p.Kademlia.BaseAddr())
  447. }
  448. // test match of leftmost bytes in given message to node's Kademlia address
  449. func (p *Pss) isSelfPossibleRecipient(msg *PssMsg, prox bool) bool {
  450. local := p.Kademlia.BaseAddr()
  451. // if a partial address matches we are possible recipient regardless of prox
  452. // if not and prox is not set, we are surely not
  453. if bytes.Equal(msg.To, local[:len(msg.To)]) {
  454. return true
  455. } else if !prox {
  456. return false
  457. }
  458. depth := p.Kademlia.NeighbourhoodDepth()
  459. po, _ := network.Pof(p.Kademlia.BaseAddr(), msg.To, 0)
  460. log.Trace("selfpossible", "po", po, "depth", depth)
  461. return depth <= po
  462. }
  463. /////////////////////////////////////////////////////////////////////
  464. // SECTION: Encryption
  465. /////////////////////////////////////////////////////////////////////
  466. // Links a peer ECDSA public key to a topic
  467. //
  468. // This is required for asymmetric message exchange
  469. // on the given topic
  470. //
  471. // The value in `address` will be used as a routing hint for the
  472. // public key / topic association
  473. func (p *Pss) SetPeerPublicKey(pubkey *ecdsa.PublicKey, topic Topic, address PssAddress) error {
  474. if err := validateAddress(address); err != nil {
  475. return err
  476. }
  477. pubkeybytes := crypto.FromECDSAPub(pubkey)
  478. if len(pubkeybytes) == 0 {
  479. return fmt.Errorf("invalid public key: %v", pubkey)
  480. }
  481. pubkeyid := common.ToHex(pubkeybytes)
  482. psp := &pssPeer{
  483. address: address,
  484. }
  485. p.pubKeyPoolMu.Lock()
  486. if _, ok := p.pubKeyPool[pubkeyid]; !ok {
  487. p.pubKeyPool[pubkeyid] = make(map[Topic]*pssPeer)
  488. }
  489. p.pubKeyPool[pubkeyid][topic] = psp
  490. p.pubKeyPoolMu.Unlock()
  491. log.Trace("added pubkey", "pubkeyid", pubkeyid, "topic", topic, "address", address)
  492. return nil
  493. }
  494. // Automatically generate a new symkey for a topic and address hint
  495. func (p *Pss) GenerateSymmetricKey(topic Topic, address PssAddress, addToCache bool) (string, error) {
  496. keyid, err := p.w.GenerateSymKey()
  497. if err != nil {
  498. return "", err
  499. }
  500. p.addSymmetricKeyToPool(keyid, topic, address, addToCache, false)
  501. return keyid, nil
  502. }
  503. // Links a peer symmetric key (arbitrary byte sequence) to a topic
  504. //
  505. // This is required for symmetrically encrypted message exchange
  506. // on the given topic
  507. //
  508. // The key is stored in the whisper backend.
  509. //
  510. // If addtocache is set to true, the key will be added to the cache of keys
  511. // used to attempt symmetric decryption of incoming messages.
  512. //
  513. // Returns a string id that can be used to retrieve the key bytes
  514. // from the whisper backend (see pss.GetSymmetricKey())
  515. func (p *Pss) SetSymmetricKey(key []byte, topic Topic, address PssAddress, addtocache bool) (string, error) {
  516. if err := validateAddress(address); err != nil {
  517. return "", err
  518. }
  519. return p.setSymmetricKey(key, topic, address, addtocache, true)
  520. }
  521. func (p *Pss) setSymmetricKey(key []byte, topic Topic, address PssAddress, addtocache bool, protected bool) (string, error) {
  522. keyid, err := p.w.AddSymKeyDirect(key)
  523. if err != nil {
  524. return "", err
  525. }
  526. p.addSymmetricKeyToPool(keyid, topic, address, addtocache, protected)
  527. return keyid, nil
  528. }
  529. // adds a symmetric key to the pss key pool, and optionally adds the key
  530. // to the collection of keys used to attempt symmetric decryption of
  531. // incoming messages
  532. func (p *Pss) addSymmetricKeyToPool(keyid string, topic Topic, address PssAddress, addtocache bool, protected bool) {
  533. psp := &pssPeer{
  534. address: address,
  535. protected: protected,
  536. }
  537. p.symKeyPoolMu.Lock()
  538. if _, ok := p.symKeyPool[keyid]; !ok {
  539. p.symKeyPool[keyid] = make(map[Topic]*pssPeer)
  540. }
  541. p.symKeyPool[keyid][topic] = psp
  542. p.symKeyPoolMu.Unlock()
  543. if addtocache {
  544. p.symKeyDecryptCacheCursor++
  545. p.symKeyDecryptCache[p.symKeyDecryptCacheCursor%cap(p.symKeyDecryptCache)] = &keyid
  546. }
  547. key, _ := p.GetSymmetricKey(keyid)
  548. log.Trace("added symkey", "symkeyid", keyid, "symkey", common.ToHex(key), "topic", topic, "address", address, "cache", addtocache)
  549. }
  550. // Returns a symmetric key byte seqyence stored in the whisper backend
  551. // by its unique id
  552. //
  553. // Passes on the error value from the whisper backend
  554. func (p *Pss) GetSymmetricKey(symkeyid string) ([]byte, error) {
  555. symkey, err := p.w.GetSymKey(symkeyid)
  556. if err != nil {
  557. return nil, err
  558. }
  559. return symkey, nil
  560. }
  561. // Returns all recorded topic and address combination for a specific public key
  562. func (p *Pss) GetPublickeyPeers(keyid string) (topic []Topic, address []PssAddress, err error) {
  563. p.pubKeyPoolMu.RLock()
  564. defer p.pubKeyPoolMu.RUnlock()
  565. for t, peer := range p.pubKeyPool[keyid] {
  566. topic = append(topic, t)
  567. address = append(address, peer.address)
  568. }
  569. return topic, address, nil
  570. }
  571. func (p *Pss) getPeerAddress(keyid string, topic Topic) (PssAddress, error) {
  572. p.pubKeyPoolMu.RLock()
  573. defer p.pubKeyPoolMu.RUnlock()
  574. if peers, ok := p.pubKeyPool[keyid]; ok {
  575. if t, ok := peers[topic]; ok {
  576. return t.address, nil
  577. }
  578. }
  579. return nil, fmt.Errorf("peer with pubkey %s, topic %x not found", keyid, topic)
  580. }
  581. // Attempt to decrypt, validate and unpack a
  582. // symmetrically encrypted message
  583. // If successful, returns the unpacked whisper ReceivedMessage struct
  584. // encapsulating the decrypted message, and the whisper backend id
  585. // of the symmetric key used to decrypt the message.
  586. // It fails if decryption of the message fails or if the message is corrupted
  587. func (p *Pss) processSym(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, PssAddress, error) {
  588. metrics.GetOrRegisterCounter("pss.process.sym", nil).Inc(1)
  589. for i := p.symKeyDecryptCacheCursor; i > p.symKeyDecryptCacheCursor-cap(p.symKeyDecryptCache) && i > 0; i-- {
  590. symkeyid := p.symKeyDecryptCache[i%cap(p.symKeyDecryptCache)]
  591. symkey, err := p.w.GetSymKey(*symkeyid)
  592. if err != nil {
  593. continue
  594. }
  595. recvmsg, err := envelope.OpenSymmetric(symkey)
  596. if err != nil {
  597. continue
  598. }
  599. if !recvmsg.Validate() {
  600. return nil, "", nil, fmt.Errorf("symmetrically encrypted message has invalid signature or is corrupt")
  601. }
  602. p.symKeyPoolMu.Lock()
  603. from := p.symKeyPool[*symkeyid][Topic(envelope.Topic)].address
  604. p.symKeyPoolMu.Unlock()
  605. p.symKeyDecryptCacheCursor++
  606. p.symKeyDecryptCache[p.symKeyDecryptCacheCursor%cap(p.symKeyDecryptCache)] = symkeyid
  607. return recvmsg, *symkeyid, from, nil
  608. }
  609. return nil, "", nil, fmt.Errorf("could not decrypt message")
  610. }
  611. // Attempt to decrypt, validate and unpack an
  612. // asymmetrically encrypted message
  613. // If successful, returns the unpacked whisper ReceivedMessage struct
  614. // encapsulating the decrypted message, and the byte representation of
  615. // the public key used to decrypt the message.
  616. // It fails if decryption of message fails, or if the message is corrupted
  617. func (p *Pss) processAsym(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, PssAddress, error) {
  618. metrics.GetOrRegisterCounter("pss.process.asym", nil).Inc(1)
  619. recvmsg, err := envelope.OpenAsymmetric(p.privateKey)
  620. if err != nil {
  621. return nil, "", nil, fmt.Errorf("could not decrypt message: %s", err)
  622. }
  623. // check signature (if signed), strip padding
  624. if !recvmsg.Validate() {
  625. return nil, "", nil, fmt.Errorf("invalid message")
  626. }
  627. pubkeyid := common.ToHex(crypto.FromECDSAPub(recvmsg.Src))
  628. var from PssAddress
  629. p.pubKeyPoolMu.Lock()
  630. if p.pubKeyPool[pubkeyid][Topic(envelope.Topic)] != nil {
  631. from = p.pubKeyPool[pubkeyid][Topic(envelope.Topic)].address
  632. }
  633. p.pubKeyPoolMu.Unlock()
  634. return recvmsg, pubkeyid, from, nil
  635. }
  636. // Symkey garbage collection
  637. // a key is removed if:
  638. // - it is not marked as protected
  639. // - it is not in the incoming decryption cache
  640. func (p *Pss) cleanKeys() (count int) {
  641. for keyid, peertopics := range p.symKeyPool {
  642. var expiredtopics []Topic
  643. for topic, psp := range peertopics {
  644. if psp.protected {
  645. continue
  646. }
  647. var match bool
  648. for i := p.symKeyDecryptCacheCursor; i > p.symKeyDecryptCacheCursor-cap(p.symKeyDecryptCache) && i > 0; i-- {
  649. cacheid := p.symKeyDecryptCache[i%cap(p.symKeyDecryptCache)]
  650. if *cacheid == keyid {
  651. match = true
  652. }
  653. }
  654. if !match {
  655. expiredtopics = append(expiredtopics, topic)
  656. }
  657. }
  658. for _, topic := range expiredtopics {
  659. p.symKeyPoolMu.Lock()
  660. delete(p.symKeyPool[keyid], topic)
  661. log.Trace("symkey cleanup deletion", "symkeyid", keyid, "topic", topic, "val", p.symKeyPool[keyid])
  662. p.symKeyPoolMu.Unlock()
  663. count++
  664. }
  665. }
  666. return
  667. }
  668. /////////////////////////////////////////////////////////////////////
  669. // SECTION: Message sending
  670. /////////////////////////////////////////////////////////////////////
  671. func (p *Pss) enqueue(msg *PssMsg) error {
  672. select {
  673. case p.outbox <- msg:
  674. return nil
  675. default:
  676. }
  677. metrics.GetOrRegisterCounter("pss.enqueue.outbox.full", nil).Inc(1)
  678. return errors.New("outbox full")
  679. }
  680. // Send a raw message (any encryption is responsibility of calling client)
  681. //
  682. // Will fail if raw messages are disallowed
  683. func (p *Pss) SendRaw(address PssAddress, topic Topic, msg []byte) error {
  684. if err := validateAddress(address); err != nil {
  685. return err
  686. }
  687. pssMsgParams := &msgParams{
  688. raw: true,
  689. }
  690. payload := &whisper.Envelope{
  691. Data: msg,
  692. Topic: whisper.TopicType(topic),
  693. }
  694. pssMsg := newPssMsg(pssMsgParams)
  695. pssMsg.To = address
  696. pssMsg.Expire = uint32(time.Now().Add(p.msgTTL).Unix())
  697. pssMsg.Payload = payload
  698. p.addFwdCache(pssMsg)
  699. err := p.enqueue(pssMsg)
  700. if err != nil {
  701. return err
  702. }
  703. // if we have a proxhandler on this topic
  704. // also deliver message to ourselves
  705. if _, ok := p.topicHandlerCaps[topic]; ok {
  706. if p.isSelfPossibleRecipient(pssMsg, true) && p.topicHandlerCaps[topic].prox {
  707. return p.process(pssMsg, true, true)
  708. }
  709. }
  710. return nil
  711. }
  712. // Send a message using symmetric encryption
  713. //
  714. // Fails if the key id does not match any of the stored symmetric keys
  715. func (p *Pss) SendSym(symkeyid string, topic Topic, msg []byte) error {
  716. symkey, err := p.GetSymmetricKey(symkeyid)
  717. if err != nil {
  718. return fmt.Errorf("missing valid send symkey %s: %v", symkeyid, err)
  719. }
  720. p.symKeyPoolMu.Lock()
  721. psp, ok := p.symKeyPool[symkeyid][topic]
  722. p.symKeyPoolMu.Unlock()
  723. if !ok {
  724. return fmt.Errorf("invalid topic '%s' for symkey '%s'", topic.String(), symkeyid)
  725. }
  726. return p.send(psp.address, topic, msg, false, symkey)
  727. }
  728. // Send a message using asymmetric encryption
  729. //
  730. // Fails if the key id does not match any in of the stored public keys
  731. func (p *Pss) SendAsym(pubkeyid string, topic Topic, msg []byte) error {
  732. if _, err := crypto.UnmarshalPubkey(common.FromHex(pubkeyid)); err != nil {
  733. return fmt.Errorf("Cannot unmarshal pubkey: %x", pubkeyid)
  734. }
  735. p.pubKeyPoolMu.Lock()
  736. psp, ok := p.pubKeyPool[pubkeyid][topic]
  737. p.pubKeyPoolMu.Unlock()
  738. if !ok {
  739. return fmt.Errorf("invalid topic '%s' for pubkey '%s'", topic.String(), pubkeyid)
  740. }
  741. return p.send(psp.address, topic, msg, true, common.FromHex(pubkeyid))
  742. }
  743. // Send is payload agnostic, and will accept any byte slice as payload
  744. // It generates an whisper envelope for the specified recipient and topic,
  745. // and wraps the message payload in it.
  746. // TODO: Implement proper message padding
  747. func (p *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key []byte) error {
  748. metrics.GetOrRegisterCounter("pss.send", nil).Inc(1)
  749. if key == nil || bytes.Equal(key, []byte{}) {
  750. return fmt.Errorf("Zero length key passed to pss send")
  751. }
  752. padding := make([]byte, p.paddingByteSize)
  753. c, err := rand.Read(padding)
  754. if err != nil {
  755. return err
  756. } else if c < p.paddingByteSize {
  757. return fmt.Errorf("invalid padding length: %d", c)
  758. }
  759. wparams := &whisper.MessageParams{
  760. TTL: defaultWhisperTTL,
  761. Src: p.privateKey,
  762. Topic: whisper.TopicType(topic),
  763. WorkTime: defaultWhisperWorkTime,
  764. PoW: defaultWhisperPoW,
  765. Payload: msg,
  766. Padding: padding,
  767. }
  768. if asymmetric {
  769. pk, err := crypto.UnmarshalPubkey(key)
  770. if err != nil {
  771. return fmt.Errorf("Cannot unmarshal pubkey: %x", key)
  772. }
  773. wparams.Dst = pk
  774. } else {
  775. wparams.KeySym = key
  776. }
  777. // set up outgoing message container, which does encryption and envelope wrapping
  778. woutmsg, err := whisper.NewSentMessage(wparams)
  779. if err != nil {
  780. return fmt.Errorf("failed to generate whisper message encapsulation: %v", err)
  781. }
  782. // performs encryption.
  783. // Does NOT perform / performs negligible PoW due to very low difficulty setting
  784. // after this the message is ready for sending
  785. envelope, err := woutmsg.Wrap(wparams)
  786. if err != nil {
  787. return fmt.Errorf("failed to perform whisper encryption: %v", err)
  788. }
  789. log.Trace("pssmsg whisper done", "env", envelope, "wparams payload", common.ToHex(wparams.Payload), "to", common.ToHex(to), "asym", asymmetric, "key", common.ToHex(key))
  790. // prepare for devp2p transport
  791. pssMsgParams := &msgParams{
  792. sym: !asymmetric,
  793. }
  794. pssMsg := newPssMsg(pssMsgParams)
  795. pssMsg.To = to
  796. pssMsg.Expire = uint32(time.Now().Add(p.msgTTL).Unix())
  797. pssMsg.Payload = envelope
  798. err = p.enqueue(pssMsg)
  799. if err != nil {
  800. return err
  801. }
  802. if _, ok := p.topicHandlerCaps[topic]; ok {
  803. if p.isSelfPossibleRecipient(pssMsg, true) && p.topicHandlerCaps[topic].prox {
  804. return p.process(pssMsg, true, true)
  805. }
  806. }
  807. return nil
  808. }
  809. // sendFunc is a helper function that tries to send a message and returns true on success.
  810. // It is set here for usage in production, and optionally overridden in tests.
  811. var sendFunc func(p *Pss, sp *network.Peer, msg *PssMsg) bool = sendMsg
  812. // tries to send a message, returns true if successful
  813. func sendMsg(p *Pss, sp *network.Peer, msg *PssMsg) bool {
  814. var isPssEnabled bool
  815. info := sp.Info()
  816. for _, capability := range info.Caps {
  817. if capability == p.capstring {
  818. isPssEnabled = true
  819. break
  820. }
  821. }
  822. if !isPssEnabled {
  823. log.Error("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps)
  824. return false
  825. }
  826. // get the protocol peer from the forwarding peer cache
  827. p.fwdPoolMu.RLock()
  828. pp := p.fwdPool[sp.Info().ID]
  829. p.fwdPoolMu.RUnlock()
  830. err := pp.Send(context.TODO(), msg)
  831. if err != nil {
  832. metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1)
  833. log.Error(err.Error())
  834. }
  835. return err == nil
  836. }
  837. // Forwards a pss message to the peer(s) based on recipient address according to the algorithm
  838. // described below. The recipient address can be of any length, and the byte slice will be matched
  839. // to the MSB slice of the peer address of the equivalent length.
  840. //
  841. // If the recipient address (or partial address) is within the neighbourhood depth of the forwarding
  842. // node, then it will be forwarded to all the nearest neighbours of the forwarding node. In case of
  843. // partial address, it should be forwarded to all the peers matching the partial address, if there
  844. // are any; otherwise only to one peer, closest to the recipient address. In any case, if the message
  845. // forwarding fails, the node should try to forward it to the next best peer, until the message is
  846. // successfully forwarded to at least one peer.
  847. func (p *Pss) forward(msg *PssMsg) error {
  848. metrics.GetOrRegisterCounter("pss.forward", nil).Inc(1)
  849. sent := 0 // number of successful sends
  850. to := make([]byte, addressLength)
  851. copy(to[:len(msg.To)], msg.To)
  852. neighbourhoodDepth := p.Kademlia.NeighbourhoodDepth()
  853. // luminosity is the opposite of darkness. the more bytes are removed from the address, the higher is darkness,
  854. // but the luminosity is less. here luminosity equals the number of bits given in the destination address.
  855. luminosityRadius := len(msg.To) * 8
  856. // proximity order function matching up to neighbourhoodDepth bits (po <= neighbourhoodDepth)
  857. pof := pot.DefaultPof(neighbourhoodDepth)
  858. // soft threshold for msg broadcast
  859. broadcastThreshold, _ := pof(to, p.BaseAddr(), 0)
  860. if broadcastThreshold > luminosityRadius {
  861. broadcastThreshold = luminosityRadius
  862. }
  863. var onlySendOnce bool // indicates if the message should only be sent to one peer with closest address
  864. // if measured from the recipient address as opposed to the base address (see Kademlia.EachConn
  865. // call below), then peers that fall in the same proximity bin as recipient address will appear
  866. // [at least] one bit closer, but only if these additional bits are given in the recipient address.
  867. if broadcastThreshold < luminosityRadius && broadcastThreshold < neighbourhoodDepth {
  868. broadcastThreshold++
  869. onlySendOnce = true
  870. }
  871. p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int, _ bool) bool {
  872. if po < broadcastThreshold && sent > 0 {
  873. return false // stop iterating
  874. }
  875. if sendFunc(p, sp, msg) {
  876. sent++
  877. if onlySendOnce {
  878. return false
  879. }
  880. if po == addressLength*8 {
  881. // stop iterating if successfully sent to the exact recipient (perfect match of full address)
  882. return false
  883. }
  884. }
  885. return true
  886. })
  887. // if we failed to send to anyone, re-insert message in the send-queue
  888. if sent == 0 {
  889. log.Debug("unable to forward to any peers")
  890. if err := p.enqueue(msg); err != nil {
  891. metrics.GetOrRegisterCounter("pss.forward.enqueue.error", nil).Inc(1)
  892. log.Error(err.Error())
  893. return err
  894. }
  895. }
  896. // cache the message
  897. p.addFwdCache(msg)
  898. return nil
  899. }
  900. /////////////////////////////////////////////////////////////////////
  901. // SECTION: Caching
  902. /////////////////////////////////////////////////////////////////////
  903. // cleanFwdCache is used to periodically remove expired entries from the forward cache
  904. func (p *Pss) cleanFwdCache() {
  905. metrics.GetOrRegisterCounter("pss.cleanfwdcache", nil).Inc(1)
  906. p.fwdCacheMu.Lock()
  907. defer p.fwdCacheMu.Unlock()
  908. for k, v := range p.fwdCache {
  909. if v.expiresAt.Before(time.Now()) {
  910. delete(p.fwdCache, k)
  911. }
  912. }
  913. }
  914. func label(b []byte) string {
  915. return fmt.Sprintf("%04x", b[:2])
  916. }
  917. // add a message to the cache
  918. func (p *Pss) addFwdCache(msg *PssMsg) error {
  919. metrics.GetOrRegisterCounter("pss.addfwdcache", nil).Inc(1)
  920. var entry pssCacheEntry
  921. var ok bool
  922. p.fwdCacheMu.Lock()
  923. defer p.fwdCacheMu.Unlock()
  924. digest := p.digest(msg)
  925. if entry, ok = p.fwdCache[digest]; !ok {
  926. entry = pssCacheEntry{}
  927. }
  928. entry.expiresAt = time.Now().Add(p.cacheTTL)
  929. p.fwdCache[digest] = entry
  930. return nil
  931. }
  932. // check if message is in the cache
  933. func (p *Pss) checkFwdCache(msg *PssMsg) bool {
  934. p.fwdCacheMu.Lock()
  935. defer p.fwdCacheMu.Unlock()
  936. digest := p.digest(msg)
  937. entry, ok := p.fwdCache[digest]
  938. if ok {
  939. if entry.expiresAt.After(time.Now()) {
  940. log.Trace("unexpired cache", "digest", fmt.Sprintf("%x", digest))
  941. metrics.GetOrRegisterCounter("pss.checkfwdcache.unexpired", nil).Inc(1)
  942. return true
  943. }
  944. metrics.GetOrRegisterCounter("pss.checkfwdcache.expired", nil).Inc(1)
  945. }
  946. return false
  947. }
  948. // Digest of message
  949. func (p *Pss) digest(msg *PssMsg) pssDigest {
  950. return p.digestBytes(msg.serialize())
  951. }
  952. func (p *Pss) digestBytes(msg []byte) pssDigest {
  953. hasher := p.hashPool.Get().(hash.Hash)
  954. defer p.hashPool.Put(hasher)
  955. hasher.Reset()
  956. hasher.Write(msg)
  957. digest := pssDigest{}
  958. key := hasher.Sum(nil)
  959. copy(digest[:], key[:digestLength])
  960. return digest
  961. }
  962. func validateAddress(addr PssAddress) error {
  963. if len(addr) > addressLength {
  964. return errors.New("address too long")
  965. }
  966. return nil
  967. }