whisper.go 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package whisperv6
  17. import (
  18. "bytes"
  19. "crypto/ecdsa"
  20. "crypto/sha256"
  21. "fmt"
  22. "math"
  23. "runtime"
  24. "sync"
  25. "time"
  26. mapset "github.com/deckarep/golang-set"
  27. "github.com/ethereum/go-ethereum/common"
  28. "github.com/ethereum/go-ethereum/crypto"
  29. "github.com/ethereum/go-ethereum/log"
  30. "github.com/ethereum/go-ethereum/node"
  31. "github.com/ethereum/go-ethereum/p2p"
  32. "github.com/ethereum/go-ethereum/rlp"
  33. "github.com/ethereum/go-ethereum/rpc"
  34. "github.com/syndtr/goleveldb/leveldb/errors"
  35. "golang.org/x/crypto/pbkdf2"
  36. "golang.org/x/sync/syncmap"
  37. )
  38. // Statistics holds several message-related counter for analytics
  39. // purposes.
  40. type Statistics struct {
  41. messagesCleared int
  42. memoryCleared int
  43. memoryUsed int
  44. cycles int
  45. totalMessagesCleared int
  46. }
  47. const (
  48. maxMsgSizeIdx = iota // Maximal message length allowed by the whisper node
  49. overflowIdx // Indicator of message queue overflow
  50. minPowIdx // Minimal PoW required by the whisper node
  51. minPowToleranceIdx // Minimal PoW tolerated by the whisper node for a limited time
  52. bloomFilterIdx // Bloom filter for topics of interest for this node
  53. bloomFilterToleranceIdx // Bloom filter tolerated by the whisper node for a limited time
  54. lightClientModeIdx // Light client mode. (does not forward any messages)
  55. restrictConnectionBetweenLightClientsIdx // Restrict connection between two light clients
  56. )
  57. // Whisper represents a dark communication interface through the Ethereum
  58. // network, using its very own P2P communication layer.
  59. type Whisper struct {
  60. protocol p2p.Protocol // Protocol description and parameters
  61. filters *Filters // Message filters installed with Subscribe function
  62. privateKeys map[string]*ecdsa.PrivateKey // Private key storage
  63. symKeys map[string][]byte // Symmetric key storage
  64. keyMu sync.RWMutex // Mutex associated with key storages
  65. poolMu sync.RWMutex // Mutex to sync the message and expiration pools
  66. envelopes map[common.Hash]*Envelope // Pool of envelopes currently tracked by this node
  67. expirations map[uint32]mapset.Set // Message expiration pool
  68. peerMu sync.RWMutex // Mutex to sync the active peer set
  69. peers map[*Peer]struct{} // Set of currently active peers
  70. messageQueue chan *Envelope // Message queue for normal whisper messages
  71. p2pMsgQueue chan *Envelope // Message queue for peer-to-peer messages (not to be forwarded any further)
  72. quit chan struct{} // Channel used for graceful exit
  73. settings syncmap.Map // holds configuration settings that can be dynamically changed
  74. syncAllowance int // maximum time in seconds allowed to process the whisper-related messages
  75. statsMu sync.Mutex // guard stats
  76. stats Statistics // Statistics of whisper node
  77. mailServer MailServer // MailServer interface
  78. wg sync.WaitGroup
  79. }
  80. // New creates a Whisper client ready to communicate through the Ethereum P2P network.
  81. func New(stack *node.Node, cfg *Config) (*Whisper, error) {
  82. if cfg == nil {
  83. cfg = &DefaultConfig
  84. }
  85. whisper := &Whisper{
  86. privateKeys: make(map[string]*ecdsa.PrivateKey),
  87. symKeys: make(map[string][]byte),
  88. envelopes: make(map[common.Hash]*Envelope),
  89. expirations: make(map[uint32]mapset.Set),
  90. peers: make(map[*Peer]struct{}),
  91. messageQueue: make(chan *Envelope, messageQueueLimit),
  92. p2pMsgQueue: make(chan *Envelope, messageQueueLimit),
  93. quit: make(chan struct{}),
  94. syncAllowance: DefaultSyncAllowance,
  95. }
  96. whisper.filters = NewFilters(whisper)
  97. whisper.settings.Store(minPowIdx, cfg.MinimumAcceptedPOW)
  98. whisper.settings.Store(maxMsgSizeIdx, cfg.MaxMessageSize)
  99. whisper.settings.Store(overflowIdx, false)
  100. whisper.settings.Store(restrictConnectionBetweenLightClientsIdx, cfg.RestrictConnectionBetweenLightClients)
  101. // p2p whisper sub protocol handler
  102. whisper.protocol = p2p.Protocol{
  103. Name: ProtocolName,
  104. Version: uint(ProtocolVersion),
  105. Length: NumberOfMessageCodes,
  106. Run: whisper.HandlePeer,
  107. NodeInfo: func() interface{} {
  108. return map[string]interface{}{
  109. "version": ProtocolVersionStr,
  110. "maxMessageSize": whisper.MaxMessageSize(),
  111. "minimumPoW": whisper.MinPow(),
  112. }
  113. },
  114. }
  115. stack.RegisterAPIs(whisper.APIs())
  116. stack.RegisterProtocols(whisper.Protocols())
  117. stack.RegisterLifecycle(whisper)
  118. return whisper, nil
  119. }
  120. // MinPow returns the PoW value required by this node.
  121. func (whisper *Whisper) MinPow() float64 {
  122. val, exist := whisper.settings.Load(minPowIdx)
  123. if !exist || val == nil {
  124. return DefaultMinimumPoW
  125. }
  126. v, ok := val.(float64)
  127. if !ok {
  128. log.Error("Error loading minPowIdx, using default")
  129. return DefaultMinimumPoW
  130. }
  131. return v
  132. }
  133. // MinPowTolerance returns the value of minimum PoW which is tolerated for a limited
  134. // time after PoW was changed. If sufficient time have elapsed or no change of PoW
  135. // have ever occurred, the return value will be the same as return value of MinPow().
  136. func (whisper *Whisper) MinPowTolerance() float64 {
  137. val, exist := whisper.settings.Load(minPowToleranceIdx)
  138. if !exist || val == nil {
  139. return DefaultMinimumPoW
  140. }
  141. return val.(float64)
  142. }
  143. // BloomFilter returns the aggregated bloom filter for all the topics of interest.
  144. // The nodes are required to send only messages that match the advertised bloom filter.
  145. // If a message does not match the bloom, it will tantamount to spam, and the peer will
  146. // be disconnected.
  147. func (whisper *Whisper) BloomFilter() []byte {
  148. val, exist := whisper.settings.Load(bloomFilterIdx)
  149. if !exist || val == nil {
  150. return nil
  151. }
  152. return val.([]byte)
  153. }
  154. // BloomFilterTolerance returns the bloom filter which is tolerated for a limited
  155. // time after new bloom was advertised to the peers. If sufficient time have elapsed
  156. // or no change of bloom filter have ever occurred, the return value will be the same
  157. // as return value of BloomFilter().
  158. func (whisper *Whisper) BloomFilterTolerance() []byte {
  159. val, exist := whisper.settings.Load(bloomFilterToleranceIdx)
  160. if !exist || val == nil {
  161. return nil
  162. }
  163. return val.([]byte)
  164. }
  165. // MaxMessageSize returns the maximum accepted message size.
  166. func (whisper *Whisper) MaxMessageSize() uint32 {
  167. val, _ := whisper.settings.Load(maxMsgSizeIdx)
  168. return val.(uint32)
  169. }
  170. // Overflow returns an indication if the message queue is full.
  171. func (whisper *Whisper) Overflow() bool {
  172. val, _ := whisper.settings.Load(overflowIdx)
  173. return val.(bool)
  174. }
  175. // APIs returns the RPC descriptors the Whisper implementation offers
  176. func (whisper *Whisper) APIs() []rpc.API {
  177. return []rpc.API{
  178. {
  179. Namespace: ProtocolName,
  180. Version: ProtocolVersionStr,
  181. Service: NewPublicWhisperAPI(whisper),
  182. Public: true,
  183. },
  184. }
  185. }
  186. // RegisterServer registers MailServer interface.
  187. // MailServer will process all the incoming messages with p2pRequestCode.
  188. func (whisper *Whisper) RegisterServer(server MailServer) {
  189. whisper.mailServer = server
  190. }
  191. // Protocols returns the whisper sub-protocols ran by this particular client.
  192. func (whisper *Whisper) Protocols() []p2p.Protocol {
  193. return []p2p.Protocol{whisper.protocol}
  194. }
  195. // Version returns the whisper sub-protocols version number.
  196. func (whisper *Whisper) Version() uint {
  197. return whisper.protocol.Version
  198. }
  199. // SetMaxMessageSize sets the maximal message size allowed by this node
  200. func (whisper *Whisper) SetMaxMessageSize(size uint32) error {
  201. if size > MaxMessageSize {
  202. return fmt.Errorf("message size too large [%d>%d]", size, MaxMessageSize)
  203. }
  204. whisper.settings.Store(maxMsgSizeIdx, size)
  205. return nil
  206. }
  207. // SetBloomFilter sets the new bloom filter
  208. func (whisper *Whisper) SetBloomFilter(bloom []byte) error {
  209. if len(bloom) != BloomFilterSize {
  210. return fmt.Errorf("invalid bloom filter size: %d", len(bloom))
  211. }
  212. b := make([]byte, BloomFilterSize)
  213. copy(b, bloom)
  214. whisper.settings.Store(bloomFilterIdx, b)
  215. whisper.notifyPeersAboutBloomFilterChange(b)
  216. whisper.wg.Add(1)
  217. go func() {
  218. // allow some time before all the peers have processed the notification
  219. defer whisper.wg.Done()
  220. ticker := time.NewTicker(time.Duration(whisper.syncAllowance) * time.Second)
  221. defer ticker.Stop()
  222. <-ticker.C
  223. whisper.settings.Store(bloomFilterToleranceIdx, b)
  224. }()
  225. return nil
  226. }
  227. // SetMinimumPoW sets the minimal PoW required by this node
  228. func (whisper *Whisper) SetMinimumPoW(val float64) error {
  229. if val < 0.0 {
  230. return fmt.Errorf("invalid PoW: %f", val)
  231. }
  232. whisper.settings.Store(minPowIdx, val)
  233. whisper.notifyPeersAboutPowRequirementChange(val)
  234. whisper.wg.Add(1)
  235. go func() {
  236. defer whisper.wg.Done()
  237. // allow some time before all the peers have processed the notification
  238. ticker := time.NewTicker(time.Duration(whisper.syncAllowance) * time.Second)
  239. defer ticker.Stop()
  240. <-ticker.C
  241. whisper.settings.Store(minPowToleranceIdx, val)
  242. }()
  243. return nil
  244. }
  245. // SetMinimumPowTest sets the minimal PoW in test environment
  246. func (whisper *Whisper) SetMinimumPowTest(val float64) {
  247. whisper.settings.Store(minPowIdx, val)
  248. whisper.notifyPeersAboutPowRequirementChange(val)
  249. whisper.settings.Store(minPowToleranceIdx, val)
  250. }
  251. //SetLightClientMode makes node light client (does not forward any messages)
  252. func (whisper *Whisper) SetLightClientMode(v bool) {
  253. whisper.settings.Store(lightClientModeIdx, v)
  254. }
  255. //LightClientMode indicates is this node is light client (does not forward any messages)
  256. func (whisper *Whisper) LightClientMode() bool {
  257. val, exist := whisper.settings.Load(lightClientModeIdx)
  258. if !exist || val == nil {
  259. return false
  260. }
  261. v, ok := val.(bool)
  262. return v && ok
  263. }
  264. //LightClientModeConnectionRestricted indicates that connection to light client in light client mode not allowed
  265. func (whisper *Whisper) LightClientModeConnectionRestricted() bool {
  266. val, exist := whisper.settings.Load(restrictConnectionBetweenLightClientsIdx)
  267. if !exist || val == nil {
  268. return false
  269. }
  270. v, ok := val.(bool)
  271. return v && ok
  272. }
  273. func (whisper *Whisper) notifyPeersAboutPowRequirementChange(pow float64) {
  274. arr := whisper.getPeers()
  275. for _, p := range arr {
  276. err := p.notifyAboutPowRequirementChange(pow)
  277. if err != nil {
  278. // allow one retry
  279. err = p.notifyAboutPowRequirementChange(pow)
  280. }
  281. if err != nil {
  282. log.Warn("failed to notify peer about new pow requirement", "peer", p.ID(), "error", err)
  283. }
  284. }
  285. }
  286. func (whisper *Whisper) notifyPeersAboutBloomFilterChange(bloom []byte) {
  287. arr := whisper.getPeers()
  288. for _, p := range arr {
  289. err := p.notifyAboutBloomFilterChange(bloom)
  290. if err != nil {
  291. // allow one retry
  292. err = p.notifyAboutBloomFilterChange(bloom)
  293. }
  294. if err != nil {
  295. log.Warn("failed to notify peer about new bloom filter", "peer", p.ID(), "error", err)
  296. }
  297. }
  298. }
  299. func (whisper *Whisper) getPeers() []*Peer {
  300. arr := make([]*Peer, len(whisper.peers))
  301. i := 0
  302. whisper.peerMu.Lock()
  303. defer whisper.peerMu.Unlock()
  304. for p := range whisper.peers {
  305. arr[i] = p
  306. i++
  307. }
  308. return arr
  309. }
  310. // getPeer retrieves peer by ID
  311. func (whisper *Whisper) getPeer(peerID []byte) (*Peer, error) {
  312. whisper.peerMu.Lock()
  313. defer whisper.peerMu.Unlock()
  314. for p := range whisper.peers {
  315. id := p.peer.ID()
  316. if bytes.Equal(peerID, id[:]) {
  317. return p, nil
  318. }
  319. }
  320. return nil, fmt.Errorf("could not find peer with ID: %x", peerID)
  321. }
  322. // AllowP2PMessagesFromPeer marks specific peer trusted,
  323. // which will allow it to send historic (expired) messages.
  324. func (whisper *Whisper) AllowP2PMessagesFromPeer(peerID []byte) error {
  325. p, err := whisper.getPeer(peerID)
  326. if err != nil {
  327. return err
  328. }
  329. p.trusted = true
  330. return nil
  331. }
  332. // RequestHistoricMessages sends a message with p2pRequestCode to a specific peer,
  333. // which is known to implement MailServer interface, and is supposed to process this
  334. // request and respond with a number of peer-to-peer messages (possibly expired),
  335. // which are not supposed to be forwarded any further.
  336. // The whisper protocol is agnostic of the format and contents of envelope.
  337. func (whisper *Whisper) RequestHistoricMessages(peerID []byte, envelope *Envelope) error {
  338. p, err := whisper.getPeer(peerID)
  339. if err != nil {
  340. return err
  341. }
  342. p.trusted = true
  343. return p2p.Send(p.ws, p2pRequestCode, envelope)
  344. }
  345. // SendP2PMessage sends a peer-to-peer message to a specific peer.
  346. func (whisper *Whisper) SendP2PMessage(peerID []byte, envelope *Envelope) error {
  347. p, err := whisper.getPeer(peerID)
  348. if err != nil {
  349. return err
  350. }
  351. return whisper.SendP2PDirect(p, envelope)
  352. }
  353. // SendP2PDirect sends a peer-to-peer message to a specific peer.
  354. func (whisper *Whisper) SendP2PDirect(peer *Peer, envelope *Envelope) error {
  355. return p2p.Send(peer.ws, p2pMessageCode, envelope)
  356. }
  357. // NewKeyPair generates a new cryptographic identity for the client, and injects
  358. // it into the known identities for message decryption. Returns ID of the new key pair.
  359. func (whisper *Whisper) NewKeyPair() (string, error) {
  360. key, err := crypto.GenerateKey()
  361. if err != nil || !validatePrivateKey(key) {
  362. key, err = crypto.GenerateKey() // retry once
  363. }
  364. if err != nil {
  365. return "", err
  366. }
  367. if !validatePrivateKey(key) {
  368. return "", fmt.Errorf("failed to generate valid key")
  369. }
  370. id, err := GenerateRandomID()
  371. if err != nil {
  372. return "", fmt.Errorf("failed to generate ID: %s", err)
  373. }
  374. whisper.keyMu.Lock()
  375. defer whisper.keyMu.Unlock()
  376. if whisper.privateKeys[id] != nil {
  377. return "", fmt.Errorf("failed to generate unique ID")
  378. }
  379. whisper.privateKeys[id] = key
  380. return id, nil
  381. }
  382. // DeleteKeyPair deletes the specified key if it exists.
  383. func (whisper *Whisper) DeleteKeyPair(key string) bool {
  384. whisper.keyMu.Lock()
  385. defer whisper.keyMu.Unlock()
  386. if whisper.privateKeys[key] != nil {
  387. delete(whisper.privateKeys, key)
  388. return true
  389. }
  390. return false
  391. }
  392. // AddKeyPair imports a asymmetric private key and returns it identifier.
  393. func (whisper *Whisper) AddKeyPair(key *ecdsa.PrivateKey) (string, error) {
  394. id, err := GenerateRandomID()
  395. if err != nil {
  396. return "", fmt.Errorf("failed to generate ID: %s", err)
  397. }
  398. whisper.keyMu.Lock()
  399. whisper.privateKeys[id] = key
  400. whisper.keyMu.Unlock()
  401. return id, nil
  402. }
  403. // HasKeyPair checks if the whisper node is configured with the private key
  404. // of the specified public pair.
  405. func (whisper *Whisper) HasKeyPair(id string) bool {
  406. whisper.keyMu.RLock()
  407. defer whisper.keyMu.RUnlock()
  408. return whisper.privateKeys[id] != nil
  409. }
  410. // GetPrivateKey retrieves the private key of the specified identity.
  411. func (whisper *Whisper) GetPrivateKey(id string) (*ecdsa.PrivateKey, error) {
  412. whisper.keyMu.RLock()
  413. defer whisper.keyMu.RUnlock()
  414. key := whisper.privateKeys[id]
  415. if key == nil {
  416. return nil, fmt.Errorf("invalid id")
  417. }
  418. return key, nil
  419. }
  420. // GenerateSymKey generates a random symmetric key and stores it under id,
  421. // which is then returned. Will be used in the future for session key exchange.
  422. func (whisper *Whisper) GenerateSymKey() (string, error) {
  423. key, err := generateSecureRandomData(aesKeyLength)
  424. if err != nil {
  425. return "", err
  426. } else if !validateDataIntegrity(key, aesKeyLength) {
  427. return "", fmt.Errorf("error in GenerateSymKey: crypto/rand failed to generate random data")
  428. }
  429. id, err := GenerateRandomID()
  430. if err != nil {
  431. return "", fmt.Errorf("failed to generate ID: %s", err)
  432. }
  433. whisper.keyMu.Lock()
  434. defer whisper.keyMu.Unlock()
  435. if whisper.symKeys[id] != nil {
  436. return "", fmt.Errorf("failed to generate unique ID")
  437. }
  438. whisper.symKeys[id] = key
  439. return id, nil
  440. }
  441. // AddSymKeyDirect stores the key, and returns its id.
  442. func (whisper *Whisper) AddSymKeyDirect(key []byte) (string, error) {
  443. if len(key) != aesKeyLength {
  444. return "", fmt.Errorf("wrong key size: %d", len(key))
  445. }
  446. id, err := GenerateRandomID()
  447. if err != nil {
  448. return "", fmt.Errorf("failed to generate ID: %s", err)
  449. }
  450. whisper.keyMu.Lock()
  451. defer whisper.keyMu.Unlock()
  452. if whisper.symKeys[id] != nil {
  453. return "", fmt.Errorf("failed to generate unique ID")
  454. }
  455. whisper.symKeys[id] = key
  456. return id, nil
  457. }
  458. // AddSymKeyFromPassword generates the key from password, stores it, and returns its id.
  459. func (whisper *Whisper) AddSymKeyFromPassword(password string) (string, error) {
  460. id, err := GenerateRandomID()
  461. if err != nil {
  462. return "", fmt.Errorf("failed to generate ID: %s", err)
  463. }
  464. if whisper.HasSymKey(id) {
  465. return "", fmt.Errorf("failed to generate unique ID")
  466. }
  467. // kdf should run no less than 0.1 seconds on an average computer,
  468. // because it's an once in a session experience
  469. derived := pbkdf2.Key([]byte(password), nil, 65356, aesKeyLength, sha256.New)
  470. if err != nil {
  471. return "", err
  472. }
  473. whisper.keyMu.Lock()
  474. defer whisper.keyMu.Unlock()
  475. // double check is necessary, because deriveKeyMaterial() is very slow
  476. if whisper.symKeys[id] != nil {
  477. return "", fmt.Errorf("critical error: failed to generate unique ID")
  478. }
  479. whisper.symKeys[id] = derived
  480. return id, nil
  481. }
  482. // HasSymKey returns true if there is a key associated with the given id.
  483. // Otherwise returns false.
  484. func (whisper *Whisper) HasSymKey(id string) bool {
  485. whisper.keyMu.RLock()
  486. defer whisper.keyMu.RUnlock()
  487. return whisper.symKeys[id] != nil
  488. }
  489. // DeleteSymKey deletes the key associated with the name string if it exists.
  490. func (whisper *Whisper) DeleteSymKey(id string) bool {
  491. whisper.keyMu.Lock()
  492. defer whisper.keyMu.Unlock()
  493. if whisper.symKeys[id] != nil {
  494. delete(whisper.symKeys, id)
  495. return true
  496. }
  497. return false
  498. }
  499. // GetSymKey returns the symmetric key associated with the given id.
  500. func (whisper *Whisper) GetSymKey(id string) ([]byte, error) {
  501. whisper.keyMu.RLock()
  502. defer whisper.keyMu.RUnlock()
  503. if whisper.symKeys[id] != nil {
  504. return whisper.symKeys[id], nil
  505. }
  506. return nil, fmt.Errorf("non-existent key ID")
  507. }
  508. // Subscribe installs a new message handler used for filtering, decrypting
  509. // and subsequent storing of incoming messages.
  510. func (whisper *Whisper) Subscribe(f *Filter) (string, error) {
  511. s, err := whisper.filters.Install(f)
  512. if err == nil {
  513. whisper.updateBloomFilter(f)
  514. }
  515. return s, err
  516. }
  517. // updateBloomFilter recalculates the new value of bloom filter,
  518. // and informs the peers if necessary.
  519. func (whisper *Whisper) updateBloomFilter(f *Filter) {
  520. aggregate := make([]byte, BloomFilterSize)
  521. for _, t := range f.Topics {
  522. top := BytesToTopic(t)
  523. b := TopicToBloom(top)
  524. aggregate = addBloom(aggregate, b)
  525. }
  526. if !BloomFilterMatch(whisper.BloomFilter(), aggregate) {
  527. // existing bloom filter must be updated
  528. aggregate = addBloom(whisper.BloomFilter(), aggregate)
  529. whisper.SetBloomFilter(aggregate)
  530. }
  531. }
  532. // GetFilter returns the filter by id.
  533. func (whisper *Whisper) GetFilter(id string) *Filter {
  534. return whisper.filters.Get(id)
  535. }
  536. // Unsubscribe removes an installed message handler.
  537. func (whisper *Whisper) Unsubscribe(id string) error {
  538. ok := whisper.filters.Uninstall(id)
  539. if !ok {
  540. return fmt.Errorf("Unsubscribe: Invalid ID")
  541. }
  542. return nil
  543. }
  544. // Send injects a message into the whisper send queue, to be distributed in the
  545. // network in the coming cycles.
  546. func (whisper *Whisper) Send(envelope *Envelope) error {
  547. ok, err := whisper.add(envelope, false)
  548. if err == nil && !ok {
  549. return fmt.Errorf("failed to add envelope")
  550. }
  551. return err
  552. }
  553. // Start implements node.Lifecycle, starting the background data propagation thread
  554. // of the Whisper protocol.
  555. func (whisper *Whisper) Start() error {
  556. log.Info("started whisper v." + ProtocolVersionStr)
  557. whisper.wg.Add(1)
  558. go whisper.update()
  559. numCPU := runtime.NumCPU()
  560. for i := 0; i < numCPU; i++ {
  561. whisper.wg.Add(1)
  562. go whisper.processQueue()
  563. }
  564. return nil
  565. }
  566. // Stop implements node.Lifecycle, stopping the background data propagation thread
  567. // of the Whisper protocol.
  568. func (whisper *Whisper) Stop() error {
  569. close(whisper.quit)
  570. whisper.wg.Wait()
  571. log.Info("whisper stopped")
  572. return nil
  573. }
  574. // HandlePeer is called by the underlying P2P layer when the whisper sub-protocol
  575. // connection is negotiated.
  576. func (whisper *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
  577. // Create the new peer and start tracking it
  578. whisperPeer := newPeer(whisper, peer, rw)
  579. whisper.peerMu.Lock()
  580. whisper.peers[whisperPeer] = struct{}{}
  581. whisper.peerMu.Unlock()
  582. defer func() {
  583. whisper.peerMu.Lock()
  584. delete(whisper.peers, whisperPeer)
  585. whisper.peerMu.Unlock()
  586. }()
  587. // Run the peer handshake and state updates
  588. if err := whisperPeer.handshake(); err != nil {
  589. return err
  590. }
  591. whisperPeer.start()
  592. defer whisperPeer.stop()
  593. return whisper.runMessageLoop(whisperPeer, rw)
  594. }
  595. // runMessageLoop reads and processes inbound messages directly to merge into client-global state.
  596. func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
  597. for {
  598. // fetch the next packet
  599. packet, err := rw.ReadMsg()
  600. if err != nil {
  601. log.Info("message loop", "peer", p.peer.ID(), "err", err)
  602. return err
  603. }
  604. if packet.Size > whisper.MaxMessageSize() {
  605. log.Warn("oversized message received", "peer", p.peer.ID())
  606. return errors.New("oversized message received")
  607. }
  608. switch packet.Code {
  609. case statusCode:
  610. // this should not happen, but no need to panic; just ignore this message.
  611. log.Warn("unxepected status message received", "peer", p.peer.ID())
  612. case messagesCode:
  613. // decode the contained envelopes
  614. var envelopes []*Envelope
  615. if err := packet.Decode(&envelopes); err != nil {
  616. log.Warn("failed to decode envelopes, peer will be disconnected", "peer", p.peer.ID(), "err", err)
  617. return errors.New("invalid envelopes")
  618. }
  619. trouble := false
  620. for _, env := range envelopes {
  621. cached, err := whisper.add(env, whisper.LightClientMode())
  622. if err != nil {
  623. trouble = true
  624. log.Error("bad envelope received, peer will be disconnected", "peer", p.peer.ID(), "err", err)
  625. }
  626. if cached {
  627. p.mark(env)
  628. }
  629. }
  630. if trouble {
  631. return errors.New("invalid envelope")
  632. }
  633. case powRequirementCode:
  634. s := rlp.NewStream(packet.Payload, uint64(packet.Size))
  635. i, err := s.Uint()
  636. if err != nil {
  637. log.Warn("failed to decode powRequirementCode message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
  638. return errors.New("invalid powRequirementCode message")
  639. }
  640. f := math.Float64frombits(i)
  641. if math.IsInf(f, 0) || math.IsNaN(f) || f < 0.0 {
  642. log.Warn("invalid value in powRequirementCode message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
  643. return errors.New("invalid value in powRequirementCode message")
  644. }
  645. p.powRequirement = f
  646. case bloomFilterExCode:
  647. var bloom []byte
  648. err := packet.Decode(&bloom)
  649. if err == nil && len(bloom) != BloomFilterSize {
  650. err = fmt.Errorf("wrong bloom filter size %d", len(bloom))
  651. }
  652. if err != nil {
  653. log.Warn("failed to decode bloom filter exchange message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
  654. return errors.New("invalid bloom filter exchange message")
  655. }
  656. p.setBloomFilter(bloom)
  657. case p2pMessageCode:
  658. // peer-to-peer message, sent directly to peer bypassing PoW checks, etc.
  659. // this message is not supposed to be forwarded to other peers, and
  660. // therefore might not satisfy the PoW, expiry and other requirements.
  661. // these messages are only accepted from the trusted peer.
  662. if p.trusted {
  663. var envelope Envelope
  664. if err := packet.Decode(&envelope); err != nil {
  665. log.Warn("failed to decode direct message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
  666. return errors.New("invalid direct message")
  667. }
  668. whisper.postEvent(&envelope, true)
  669. }
  670. case p2pRequestCode:
  671. // Must be processed if mail server is implemented. Otherwise ignore.
  672. if whisper.mailServer != nil {
  673. var request Envelope
  674. if err := packet.Decode(&request); err != nil {
  675. log.Warn("failed to decode p2p request message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
  676. return errors.New("invalid p2p request")
  677. }
  678. whisper.mailServer.DeliverMail(p, &request)
  679. }
  680. default:
  681. // New message types might be implemented in the future versions of Whisper.
  682. // For forward compatibility, just ignore.
  683. }
  684. packet.Discard()
  685. }
  686. }
  687. // add inserts a new envelope into the message pool to be distributed within the
  688. // whisper network. It also inserts the envelope into the expiration pool at the
  689. // appropriate time-stamp. In case of error, connection should be dropped.
  690. // param isP2P indicates whether the message is peer-to-peer (should not be forwarded).
  691. func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
  692. now := uint32(time.Now().Unix())
  693. sent := envelope.Expiry - envelope.TTL
  694. if sent > now {
  695. if sent-DefaultSyncAllowance > now {
  696. return false, fmt.Errorf("envelope created in the future [%x]", envelope.Hash())
  697. }
  698. // recalculate PoW, adjusted for the time difference, plus one second for latency
  699. envelope.calculatePoW(sent - now + 1)
  700. }
  701. if envelope.Expiry < now {
  702. if envelope.Expiry+DefaultSyncAllowance*2 < now {
  703. return false, fmt.Errorf("very old message")
  704. }
  705. log.Debug("expired envelope dropped", "hash", envelope.Hash().Hex())
  706. return false, nil // drop envelope without error
  707. }
  708. if uint32(envelope.size()) > whisper.MaxMessageSize() {
  709. return false, fmt.Errorf("huge messages are not allowed [%x]", envelope.Hash())
  710. }
  711. if envelope.PoW() < whisper.MinPow() {
  712. // maybe the value was recently changed, and the peers did not adjust yet.
  713. // in this case the previous value is retrieved by MinPowTolerance()
  714. // for a short period of peer synchronization.
  715. if envelope.PoW() < whisper.MinPowTolerance() {
  716. return false, fmt.Errorf("envelope with low PoW received: PoW=%f, hash=[%v]", envelope.PoW(), envelope.Hash().Hex())
  717. }
  718. }
  719. if !BloomFilterMatch(whisper.BloomFilter(), envelope.Bloom()) {
  720. // maybe the value was recently changed, and the peers did not adjust yet.
  721. // in this case the previous value is retrieved by BloomFilterTolerance()
  722. // for a short period of peer synchronization.
  723. if !BloomFilterMatch(whisper.BloomFilterTolerance(), envelope.Bloom()) {
  724. return false, fmt.Errorf("envelope does not match bloom filter, hash=[%v], bloom: \n%x \n%x \n%x",
  725. envelope.Hash().Hex(), whisper.BloomFilter(), envelope.Bloom(), envelope.Topic)
  726. }
  727. }
  728. hash := envelope.Hash()
  729. whisper.poolMu.Lock()
  730. _, alreadyCached := whisper.envelopes[hash]
  731. if !alreadyCached {
  732. whisper.envelopes[hash] = envelope
  733. if whisper.expirations[envelope.Expiry] == nil {
  734. whisper.expirations[envelope.Expiry] = mapset.NewThreadUnsafeSet()
  735. }
  736. if !whisper.expirations[envelope.Expiry].Contains(hash) {
  737. whisper.expirations[envelope.Expiry].Add(hash)
  738. }
  739. }
  740. whisper.poolMu.Unlock()
  741. if alreadyCached {
  742. log.Trace("whisper envelope already cached", "hash", envelope.Hash().Hex())
  743. } else {
  744. log.Trace("cached whisper envelope", "hash", envelope.Hash().Hex())
  745. whisper.statsMu.Lock()
  746. whisper.stats.memoryUsed += envelope.size()
  747. whisper.statsMu.Unlock()
  748. whisper.postEvent(envelope, isP2P) // notify the local node about the new message
  749. if whisper.mailServer != nil {
  750. whisper.mailServer.Archive(envelope)
  751. }
  752. }
  753. return true, nil
  754. }
  755. // postEvent queues the message for further processing.
  756. func (whisper *Whisper) postEvent(envelope *Envelope, isP2P bool) {
  757. if isP2P {
  758. whisper.p2pMsgQueue <- envelope
  759. } else {
  760. whisper.checkOverflow()
  761. whisper.messageQueue <- envelope
  762. }
  763. }
  764. // checkOverflow checks if message queue overflow occurs and reports it if necessary.
  765. func (whisper *Whisper) checkOverflow() {
  766. queueSize := len(whisper.messageQueue)
  767. if queueSize == messageQueueLimit {
  768. if !whisper.Overflow() {
  769. whisper.settings.Store(overflowIdx, true)
  770. log.Warn("message queue overflow")
  771. }
  772. } else if queueSize <= messageQueueLimit/2 {
  773. if whisper.Overflow() {
  774. whisper.settings.Store(overflowIdx, false)
  775. log.Warn("message queue overflow fixed (back to normal)")
  776. }
  777. }
  778. }
  779. // processQueue delivers the messages to the watchers during the lifetime of the whisper node.
  780. func (whisper *Whisper) processQueue() {
  781. defer whisper.wg.Done()
  782. var e *Envelope
  783. for {
  784. select {
  785. case <-whisper.quit:
  786. return
  787. case e = <-whisper.messageQueue:
  788. whisper.filters.NotifyWatchers(e, false)
  789. case e = <-whisper.p2pMsgQueue:
  790. whisper.filters.NotifyWatchers(e, true)
  791. }
  792. }
  793. }
  794. // update loops until the lifetime of the whisper node, updating its internal
  795. // state by expiring stale messages from the pool.
  796. func (whisper *Whisper) update() {
  797. defer whisper.wg.Done()
  798. // Start a ticker to check for expirations
  799. expire := time.NewTicker(expirationCycle)
  800. defer expire.Stop()
  801. // Repeat updates until termination is requested
  802. for {
  803. select {
  804. case <-expire.C:
  805. whisper.expire()
  806. case <-whisper.quit:
  807. return
  808. }
  809. }
  810. }
  811. // expire iterates over all the expiration timestamps, removing all stale
  812. // messages from the pools.
  813. func (whisper *Whisper) expire() {
  814. whisper.poolMu.Lock()
  815. defer whisper.poolMu.Unlock()
  816. whisper.statsMu.Lock()
  817. defer whisper.statsMu.Unlock()
  818. whisper.stats.reset()
  819. now := uint32(time.Now().Unix())
  820. for expiry, hashSet := range whisper.expirations {
  821. if expiry < now {
  822. // Dump all expired messages and remove timestamp
  823. hashSet.Each(func(v interface{}) bool {
  824. sz := whisper.envelopes[v.(common.Hash)].size()
  825. delete(whisper.envelopes, v.(common.Hash))
  826. whisper.stats.messagesCleared++
  827. whisper.stats.memoryCleared += sz
  828. whisper.stats.memoryUsed -= sz
  829. return false
  830. })
  831. whisper.expirations[expiry].Clear()
  832. delete(whisper.expirations, expiry)
  833. }
  834. }
  835. }
  836. // Stats returns the whisper node statistics.
  837. func (whisper *Whisper) Stats() Statistics {
  838. whisper.statsMu.Lock()
  839. defer whisper.statsMu.Unlock()
  840. return whisper.stats
  841. }
  842. // Envelopes retrieves all the messages currently pooled by the node.
  843. func (whisper *Whisper) Envelopes() []*Envelope {
  844. whisper.poolMu.RLock()
  845. defer whisper.poolMu.RUnlock()
  846. all := make([]*Envelope, 0, len(whisper.envelopes))
  847. for _, envelope := range whisper.envelopes {
  848. all = append(all, envelope)
  849. }
  850. return all
  851. }
  852. // isEnvelopeCached checks if envelope with specific hash has already been received and cached.
  853. func (whisper *Whisper) isEnvelopeCached(hash common.Hash) bool {
  854. whisper.poolMu.Lock()
  855. defer whisper.poolMu.Unlock()
  856. _, exist := whisper.envelopes[hash]
  857. return exist
  858. }
  859. // reset resets the node's statistics after each expiry cycle.
  860. func (s *Statistics) reset() {
  861. s.cycles++
  862. s.totalMessagesCleared += s.messagesCleared
  863. s.memoryCleared = 0
  864. s.messagesCleared = 0
  865. }
  866. // ValidatePublicKey checks the format of the given public key.
  867. func ValidatePublicKey(k *ecdsa.PublicKey) bool {
  868. return k != nil && k.X != nil && k.Y != nil && k.X.Sign() != 0 && k.Y.Sign() != 0
  869. }
  870. // validatePrivateKey checks the format of the given private key.
  871. func validatePrivateKey(k *ecdsa.PrivateKey) bool {
  872. if k == nil || k.D == nil || k.D.Sign() == 0 {
  873. return false
  874. }
  875. return ValidatePublicKey(&k.PublicKey)
  876. }
  877. // validateDataIntegrity returns false if the data have the wrong or contains all zeros,
  878. // which is the simplest and the most common bug.
  879. func validateDataIntegrity(k []byte, expectedSize int) bool {
  880. if len(k) != expectedSize {
  881. return false
  882. }
  883. if expectedSize > 3 && containsOnlyZeros(k) {
  884. return false
  885. }
  886. return true
  887. }
  888. // containsOnlyZeros checks if the data contain only zeros.
  889. func containsOnlyZeros(data []byte) bool {
  890. for _, b := range data {
  891. if b != 0 {
  892. return false
  893. }
  894. }
  895. return true
  896. }
  897. // bytesToUintLittleEndian converts the slice to 64-bit unsigned integer.
  898. func bytesToUintLittleEndian(b []byte) (res uint64) {
  899. mul := uint64(1)
  900. for i := 0; i < len(b); i++ {
  901. res += uint64(b[i]) * mul
  902. mul *= 256
  903. }
  904. return res
  905. }
  906. // BytesToUintBigEndian converts the slice to 64-bit unsigned integer.
  907. func BytesToUintBigEndian(b []byte) (res uint64) {
  908. for i := 0; i < len(b); i++ {
  909. res *= 256
  910. res += uint64(b[i])
  911. }
  912. return res
  913. }
  914. // GenerateRandomID generates a random string, which is then returned to be used as a key id
  915. func GenerateRandomID() (id string, err error) {
  916. buf, err := generateSecureRandomData(keyIDSize)
  917. if err != nil {
  918. return "", err
  919. }
  920. if !validateDataIntegrity(buf, keyIDSize) {
  921. return "", fmt.Errorf("error in generateRandomID: crypto/rand failed to generate random data")
  922. }
  923. id = common.Bytes2Hex(buf)
  924. return id, err
  925. }
  926. func isFullNode(bloom []byte) bool {
  927. if bloom == nil {
  928. return true
  929. }
  930. for _, b := range bloom {
  931. if b != 255 {
  932. return false
  933. }
  934. }
  935. return true
  936. }
  937. func BloomFilterMatch(filter, sample []byte) bool {
  938. if filter == nil {
  939. return true
  940. }
  941. for i := 0; i < BloomFilterSize; i++ {
  942. f := filter[i]
  943. s := sample[i]
  944. if (f | s) != f {
  945. return false
  946. }
  947. }
  948. return true
  949. }
  950. func addBloom(a, b []byte) []byte {
  951. c := make([]byte, BloomFilterSize)
  952. for i := 0; i < BloomFilterSize; i++ {
  953. c[i] = a[i] | b[i]
  954. }
  955. return c
  956. }
  957. func StandaloneWhisperService(cfg *Config) *Whisper {
  958. if cfg == nil {
  959. cfg = &DefaultConfig
  960. }
  961. whisper := &Whisper{
  962. privateKeys: make(map[string]*ecdsa.PrivateKey),
  963. symKeys: make(map[string][]byte),
  964. envelopes: make(map[common.Hash]*Envelope),
  965. expirations: make(map[uint32]mapset.Set),
  966. peers: make(map[*Peer]struct{}),
  967. messageQueue: make(chan *Envelope, messageQueueLimit),
  968. p2pMsgQueue: make(chan *Envelope, messageQueueLimit),
  969. quit: make(chan struct{}),
  970. syncAllowance: DefaultSyncAllowance,
  971. }
  972. whisper.filters = NewFilters(whisper)
  973. whisper.settings.Store(minPowIdx, cfg.MinimumAcceptedPOW)
  974. whisper.settings.Store(maxMsgSizeIdx, cfg.MaxMessageSize)
  975. whisper.settings.Store(overflowIdx, false)
  976. whisper.settings.Store(restrictConnectionBetweenLightClientsIdx, cfg.RestrictConnectionBetweenLightClients)
  977. // p2p whisper sub protocol handler
  978. whisper.protocol = p2p.Protocol{
  979. Name: ProtocolName,
  980. Version: uint(ProtocolVersion),
  981. Length: NumberOfMessageCodes,
  982. Run: whisper.HandlePeer,
  983. NodeInfo: func() interface{} {
  984. return map[string]interface{}{
  985. "version": ProtocolVersionStr,
  986. "maxMessageSize": whisper.MaxMessageSize(),
  987. "minimumPoW": whisper.MinPow(),
  988. }
  989. },
  990. }
  991. return whisper
  992. }