whisper.go 32 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package whisperv6
  17. import (
  18. "bytes"
  19. "crypto/ecdsa"
  20. "crypto/sha256"
  21. "fmt"
  22. "math"
  23. "runtime"
  24. "sync"
  25. "time"
  26. mapset "github.com/deckarep/golang-set"
  27. "github.com/ethereum/go-ethereum/common"
  28. "github.com/ethereum/go-ethereum/crypto"
  29. "github.com/ethereum/go-ethereum/log"
  30. "github.com/ethereum/go-ethereum/p2p"
  31. "github.com/ethereum/go-ethereum/rlp"
  32. "github.com/ethereum/go-ethereum/rpc"
  33. "github.com/syndtr/goleveldb/leveldb/errors"
  34. "golang.org/x/crypto/pbkdf2"
  35. "golang.org/x/sync/syncmap"
  36. )
  37. // Statistics holds several message-related counter for analytics
  38. // purposes.
  39. type Statistics struct {
  40. messagesCleared int
  41. memoryCleared int
  42. memoryUsed int
  43. cycles int
  44. totalMessagesCleared int
  45. }
  46. const (
  47. maxMsgSizeIdx = iota // Maximal message length allowed by the whisper node
  48. overflowIdx // Indicator of message queue overflow
  49. minPowIdx // Minimal PoW required by the whisper node
  50. minPowToleranceIdx // Minimal PoW tolerated by the whisper node for a limited time
  51. bloomFilterIdx // Bloom filter for topics of interest for this node
  52. bloomFilterToleranceIdx // Bloom filter tolerated by the whisper node for a limited time
  53. lightClientModeIdx // Light client mode. (does not forward any messages)
  54. restrictConnectionBetweenLightClientsIdx // Restrict connection between two light clients
  55. )
  56. // Whisper represents a dark communication interface through the Ethereum
  57. // network, using its very own P2P communication layer.
  58. type Whisper struct {
  59. protocol p2p.Protocol // Protocol description and parameters
  60. filters *Filters // Message filters installed with Subscribe function
  61. privateKeys map[string]*ecdsa.PrivateKey // Private key storage
  62. symKeys map[string][]byte // Symmetric key storage
  63. keyMu sync.RWMutex // Mutex associated with key storages
  64. poolMu sync.RWMutex // Mutex to sync the message and expiration pools
  65. envelopes map[common.Hash]*Envelope // Pool of envelopes currently tracked by this node
  66. expirations map[uint32]mapset.Set // Message expiration pool
  67. peerMu sync.RWMutex // Mutex to sync the active peer set
  68. peers map[*Peer]struct{} // Set of currently active peers
  69. messageQueue chan *Envelope // Message queue for normal whisper messages
  70. p2pMsgQueue chan *Envelope // Message queue for peer-to-peer messages (not to be forwarded any further)
  71. quit chan struct{} // Channel used for graceful exit
  72. settings syncmap.Map // holds configuration settings that can be dynamically changed
  73. syncAllowance int // maximum time in seconds allowed to process the whisper-related messages
  74. statsMu sync.Mutex // guard stats
  75. stats Statistics // Statistics of whisper node
  76. mailServer MailServer // MailServer interface
  77. wg sync.WaitGroup
  78. }
  79. // New creates a Whisper client ready to communicate through the Ethereum P2P network.
  80. func New(cfg *Config) *Whisper {
  81. if cfg == nil {
  82. cfg = &DefaultConfig
  83. }
  84. whisper := &Whisper{
  85. privateKeys: make(map[string]*ecdsa.PrivateKey),
  86. symKeys: make(map[string][]byte),
  87. envelopes: make(map[common.Hash]*Envelope),
  88. expirations: make(map[uint32]mapset.Set),
  89. peers: make(map[*Peer]struct{}),
  90. messageQueue: make(chan *Envelope, messageQueueLimit),
  91. p2pMsgQueue: make(chan *Envelope, messageQueueLimit),
  92. quit: make(chan struct{}),
  93. syncAllowance: DefaultSyncAllowance,
  94. }
  95. whisper.filters = NewFilters(whisper)
  96. whisper.settings.Store(minPowIdx, cfg.MinimumAcceptedPOW)
  97. whisper.settings.Store(maxMsgSizeIdx, cfg.MaxMessageSize)
  98. whisper.settings.Store(overflowIdx, false)
  99. whisper.settings.Store(restrictConnectionBetweenLightClientsIdx, cfg.RestrictConnectionBetweenLightClients)
  100. // p2p whisper sub protocol handler
  101. whisper.protocol = p2p.Protocol{
  102. Name: ProtocolName,
  103. Version: uint(ProtocolVersion),
  104. Length: NumberOfMessageCodes,
  105. Run: whisper.HandlePeer,
  106. NodeInfo: func() interface{} {
  107. return map[string]interface{}{
  108. "version": ProtocolVersionStr,
  109. "maxMessageSize": whisper.MaxMessageSize(),
  110. "minimumPoW": whisper.MinPow(),
  111. }
  112. },
  113. }
  114. return whisper
  115. }
  116. // MinPow returns the PoW value required by this node.
  117. func (whisper *Whisper) MinPow() float64 {
  118. val, exist := whisper.settings.Load(minPowIdx)
  119. if !exist || val == nil {
  120. return DefaultMinimumPoW
  121. }
  122. v, ok := val.(float64)
  123. if !ok {
  124. log.Error("Error loading minPowIdx, using default")
  125. return DefaultMinimumPoW
  126. }
  127. return v
  128. }
  129. // MinPowTolerance returns the value of minimum PoW which is tolerated for a limited
  130. // time after PoW was changed. If sufficient time have elapsed or no change of PoW
  131. // have ever occurred, the return value will be the same as return value of MinPow().
  132. func (whisper *Whisper) MinPowTolerance() float64 {
  133. val, exist := whisper.settings.Load(minPowToleranceIdx)
  134. if !exist || val == nil {
  135. return DefaultMinimumPoW
  136. }
  137. return val.(float64)
  138. }
  139. // BloomFilter returns the aggregated bloom filter for all the topics of interest.
  140. // The nodes are required to send only messages that match the advertised bloom filter.
  141. // If a message does not match the bloom, it will tantamount to spam, and the peer will
  142. // be disconnected.
  143. func (whisper *Whisper) BloomFilter() []byte {
  144. val, exist := whisper.settings.Load(bloomFilterIdx)
  145. if !exist || val == nil {
  146. return nil
  147. }
  148. return val.([]byte)
  149. }
  150. // BloomFilterTolerance returns the bloom filter which is tolerated for a limited
  151. // time after new bloom was advertised to the peers. If sufficient time have elapsed
  152. // or no change of bloom filter have ever occurred, the return value will be the same
  153. // as return value of BloomFilter().
  154. func (whisper *Whisper) BloomFilterTolerance() []byte {
  155. val, exist := whisper.settings.Load(bloomFilterToleranceIdx)
  156. if !exist || val == nil {
  157. return nil
  158. }
  159. return val.([]byte)
  160. }
  161. // MaxMessageSize returns the maximum accepted message size.
  162. func (whisper *Whisper) MaxMessageSize() uint32 {
  163. val, _ := whisper.settings.Load(maxMsgSizeIdx)
  164. return val.(uint32)
  165. }
  166. // Overflow returns an indication if the message queue is full.
  167. func (whisper *Whisper) Overflow() bool {
  168. val, _ := whisper.settings.Load(overflowIdx)
  169. return val.(bool)
  170. }
  171. // APIs returns the RPC descriptors the Whisper implementation offers
  172. func (whisper *Whisper) APIs() []rpc.API {
  173. return []rpc.API{
  174. {
  175. Namespace: ProtocolName,
  176. Version: ProtocolVersionStr,
  177. Service: NewPublicWhisperAPI(whisper),
  178. Public: true,
  179. },
  180. }
  181. }
  182. // RegisterServer registers MailServer interface.
  183. // MailServer will process all the incoming messages with p2pRequestCode.
  184. func (whisper *Whisper) RegisterServer(server MailServer) {
  185. whisper.mailServer = server
  186. }
  187. // Protocols returns the whisper sub-protocols ran by this particular client.
  188. func (whisper *Whisper) Protocols() []p2p.Protocol {
  189. return []p2p.Protocol{whisper.protocol}
  190. }
  191. // Version returns the whisper sub-protocols version number.
  192. func (whisper *Whisper) Version() uint {
  193. return whisper.protocol.Version
  194. }
  195. // SetMaxMessageSize sets the maximal message size allowed by this node
  196. func (whisper *Whisper) SetMaxMessageSize(size uint32) error {
  197. if size > MaxMessageSize {
  198. return fmt.Errorf("message size too large [%d>%d]", size, MaxMessageSize)
  199. }
  200. whisper.settings.Store(maxMsgSizeIdx, size)
  201. return nil
  202. }
  203. // SetBloomFilter sets the new bloom filter
  204. func (whisper *Whisper) SetBloomFilter(bloom []byte) error {
  205. if len(bloom) != BloomFilterSize {
  206. return fmt.Errorf("invalid bloom filter size: %d", len(bloom))
  207. }
  208. b := make([]byte, BloomFilterSize)
  209. copy(b, bloom)
  210. whisper.settings.Store(bloomFilterIdx, b)
  211. whisper.notifyPeersAboutBloomFilterChange(b)
  212. whisper.wg.Add(1)
  213. go func() {
  214. // allow some time before all the peers have processed the notification
  215. defer whisper.wg.Done()
  216. time.Sleep(time.Duration(whisper.syncAllowance) * time.Second)
  217. whisper.settings.Store(bloomFilterToleranceIdx, b)
  218. }()
  219. return nil
  220. }
  221. // SetMinimumPoW sets the minimal PoW required by this node
  222. func (whisper *Whisper) SetMinimumPoW(val float64) error {
  223. if val < 0.0 {
  224. return fmt.Errorf("invalid PoW: %f", val)
  225. }
  226. whisper.settings.Store(minPowIdx, val)
  227. whisper.notifyPeersAboutPowRequirementChange(val)
  228. whisper.wg.Add(1)
  229. go func() {
  230. defer whisper.wg.Done()
  231. // allow some time before all the peers have processed the notification
  232. time.Sleep(time.Duration(whisper.syncAllowance) * time.Second)
  233. whisper.settings.Store(minPowToleranceIdx, val)
  234. }()
  235. return nil
  236. }
  237. // SetMinimumPowTest sets the minimal PoW in test environment
  238. func (whisper *Whisper) SetMinimumPowTest(val float64) {
  239. whisper.settings.Store(minPowIdx, val)
  240. whisper.notifyPeersAboutPowRequirementChange(val)
  241. whisper.settings.Store(minPowToleranceIdx, val)
  242. }
  243. //SetLightClientMode makes node light client (does not forward any messages)
  244. func (whisper *Whisper) SetLightClientMode(v bool) {
  245. whisper.settings.Store(lightClientModeIdx, v)
  246. }
  247. //LightClientMode indicates is this node is light client (does not forward any messages)
  248. func (whisper *Whisper) LightClientMode() bool {
  249. val, exist := whisper.settings.Load(lightClientModeIdx)
  250. if !exist || val == nil {
  251. return false
  252. }
  253. v, ok := val.(bool)
  254. return v && ok
  255. }
  256. //LightClientModeConnectionRestricted indicates that connection to light client in light client mode not allowed
  257. func (whisper *Whisper) LightClientModeConnectionRestricted() bool {
  258. val, exist := whisper.settings.Load(restrictConnectionBetweenLightClientsIdx)
  259. if !exist || val == nil {
  260. return false
  261. }
  262. v, ok := val.(bool)
  263. return v && ok
  264. }
  265. func (whisper *Whisper) notifyPeersAboutPowRequirementChange(pow float64) {
  266. arr := whisper.getPeers()
  267. for _, p := range arr {
  268. err := p.notifyAboutPowRequirementChange(pow)
  269. if err != nil {
  270. // allow one retry
  271. err = p.notifyAboutPowRequirementChange(pow)
  272. }
  273. if err != nil {
  274. log.Warn("failed to notify peer about new pow requirement", "peer", p.ID(), "error", err)
  275. }
  276. }
  277. }
  278. func (whisper *Whisper) notifyPeersAboutBloomFilterChange(bloom []byte) {
  279. arr := whisper.getPeers()
  280. for _, p := range arr {
  281. err := p.notifyAboutBloomFilterChange(bloom)
  282. if err != nil {
  283. // allow one retry
  284. err = p.notifyAboutBloomFilterChange(bloom)
  285. }
  286. if err != nil {
  287. log.Warn("failed to notify peer about new bloom filter", "peer", p.ID(), "error", err)
  288. }
  289. }
  290. }
  291. func (whisper *Whisper) getPeers() []*Peer {
  292. arr := make([]*Peer, len(whisper.peers))
  293. i := 0
  294. whisper.peerMu.Lock()
  295. defer whisper.peerMu.Unlock()
  296. for p := range whisper.peers {
  297. arr[i] = p
  298. i++
  299. }
  300. return arr
  301. }
  302. // getPeer retrieves peer by ID
  303. func (whisper *Whisper) getPeer(peerID []byte) (*Peer, error) {
  304. whisper.peerMu.Lock()
  305. defer whisper.peerMu.Unlock()
  306. for p := range whisper.peers {
  307. id := p.peer.ID()
  308. if bytes.Equal(peerID, id[:]) {
  309. return p, nil
  310. }
  311. }
  312. return nil, fmt.Errorf("could not find peer with ID: %x", peerID)
  313. }
  314. // AllowP2PMessagesFromPeer marks specific peer trusted,
  315. // which will allow it to send historic (expired) messages.
  316. func (whisper *Whisper) AllowP2PMessagesFromPeer(peerID []byte) error {
  317. p, err := whisper.getPeer(peerID)
  318. if err != nil {
  319. return err
  320. }
  321. p.trusted = true
  322. return nil
  323. }
  324. // RequestHistoricMessages sends a message with p2pRequestCode to a specific peer,
  325. // which is known to implement MailServer interface, and is supposed to process this
  326. // request and respond with a number of peer-to-peer messages (possibly expired),
  327. // which are not supposed to be forwarded any further.
  328. // The whisper protocol is agnostic of the format and contents of envelope.
  329. func (whisper *Whisper) RequestHistoricMessages(peerID []byte, envelope *Envelope) error {
  330. p, err := whisper.getPeer(peerID)
  331. if err != nil {
  332. return err
  333. }
  334. p.trusted = true
  335. return p2p.Send(p.ws, p2pRequestCode, envelope)
  336. }
  337. // SendP2PMessage sends a peer-to-peer message to a specific peer.
  338. func (whisper *Whisper) SendP2PMessage(peerID []byte, envelope *Envelope) error {
  339. p, err := whisper.getPeer(peerID)
  340. if err != nil {
  341. return err
  342. }
  343. return whisper.SendP2PDirect(p, envelope)
  344. }
  345. // SendP2PDirect sends a peer-to-peer message to a specific peer.
  346. func (whisper *Whisper) SendP2PDirect(peer *Peer, envelope *Envelope) error {
  347. return p2p.Send(peer.ws, p2pMessageCode, envelope)
  348. }
  349. // NewKeyPair generates a new cryptographic identity for the client, and injects
  350. // it into the known identities for message decryption. Returns ID of the new key pair.
  351. func (whisper *Whisper) NewKeyPair() (string, error) {
  352. key, err := crypto.GenerateKey()
  353. if err != nil || !validatePrivateKey(key) {
  354. key, err = crypto.GenerateKey() // retry once
  355. }
  356. if err != nil {
  357. return "", err
  358. }
  359. if !validatePrivateKey(key) {
  360. return "", fmt.Errorf("failed to generate valid key")
  361. }
  362. id, err := GenerateRandomID()
  363. if err != nil {
  364. return "", fmt.Errorf("failed to generate ID: %s", err)
  365. }
  366. whisper.keyMu.Lock()
  367. defer whisper.keyMu.Unlock()
  368. if whisper.privateKeys[id] != nil {
  369. return "", fmt.Errorf("failed to generate unique ID")
  370. }
  371. whisper.privateKeys[id] = key
  372. return id, nil
  373. }
  374. // DeleteKeyPair deletes the specified key if it exists.
  375. func (whisper *Whisper) DeleteKeyPair(key string) bool {
  376. whisper.keyMu.Lock()
  377. defer whisper.keyMu.Unlock()
  378. if whisper.privateKeys[key] != nil {
  379. delete(whisper.privateKeys, key)
  380. return true
  381. }
  382. return false
  383. }
  384. // AddKeyPair imports a asymmetric private key and returns it identifier.
  385. func (whisper *Whisper) AddKeyPair(key *ecdsa.PrivateKey) (string, error) {
  386. id, err := GenerateRandomID()
  387. if err != nil {
  388. return "", fmt.Errorf("failed to generate ID: %s", err)
  389. }
  390. whisper.keyMu.Lock()
  391. whisper.privateKeys[id] = key
  392. whisper.keyMu.Unlock()
  393. return id, nil
  394. }
  395. // HasKeyPair checks if the whisper node is configured with the private key
  396. // of the specified public pair.
  397. func (whisper *Whisper) HasKeyPair(id string) bool {
  398. whisper.keyMu.RLock()
  399. defer whisper.keyMu.RUnlock()
  400. return whisper.privateKeys[id] != nil
  401. }
  402. // GetPrivateKey retrieves the private key of the specified identity.
  403. func (whisper *Whisper) GetPrivateKey(id string) (*ecdsa.PrivateKey, error) {
  404. whisper.keyMu.RLock()
  405. defer whisper.keyMu.RUnlock()
  406. key := whisper.privateKeys[id]
  407. if key == nil {
  408. return nil, fmt.Errorf("invalid id")
  409. }
  410. return key, nil
  411. }
  412. // GenerateSymKey generates a random symmetric key and stores it under id,
  413. // which is then returned. Will be used in the future for session key exchange.
  414. func (whisper *Whisper) GenerateSymKey() (string, error) {
  415. key, err := generateSecureRandomData(aesKeyLength)
  416. if err != nil {
  417. return "", err
  418. } else if !validateDataIntegrity(key, aesKeyLength) {
  419. return "", fmt.Errorf("error in GenerateSymKey: crypto/rand failed to generate random data")
  420. }
  421. id, err := GenerateRandomID()
  422. if err != nil {
  423. return "", fmt.Errorf("failed to generate ID: %s", err)
  424. }
  425. whisper.keyMu.Lock()
  426. defer whisper.keyMu.Unlock()
  427. if whisper.symKeys[id] != nil {
  428. return "", fmt.Errorf("failed to generate unique ID")
  429. }
  430. whisper.symKeys[id] = key
  431. return id, nil
  432. }
  433. // AddSymKeyDirect stores the key, and returns its id.
  434. func (whisper *Whisper) AddSymKeyDirect(key []byte) (string, error) {
  435. if len(key) != aesKeyLength {
  436. return "", fmt.Errorf("wrong key size: %d", len(key))
  437. }
  438. id, err := GenerateRandomID()
  439. if err != nil {
  440. return "", fmt.Errorf("failed to generate ID: %s", err)
  441. }
  442. whisper.keyMu.Lock()
  443. defer whisper.keyMu.Unlock()
  444. if whisper.symKeys[id] != nil {
  445. return "", fmt.Errorf("failed to generate unique ID")
  446. }
  447. whisper.symKeys[id] = key
  448. return id, nil
  449. }
  450. // AddSymKeyFromPassword generates the key from password, stores it, and returns its id.
  451. func (whisper *Whisper) AddSymKeyFromPassword(password string) (string, error) {
  452. id, err := GenerateRandomID()
  453. if err != nil {
  454. return "", fmt.Errorf("failed to generate ID: %s", err)
  455. }
  456. if whisper.HasSymKey(id) {
  457. return "", fmt.Errorf("failed to generate unique ID")
  458. }
  459. // kdf should run no less than 0.1 seconds on an average computer,
  460. // because it's an once in a session experience
  461. derived := pbkdf2.Key([]byte(password), nil, 65356, aesKeyLength, sha256.New)
  462. if err != nil {
  463. return "", err
  464. }
  465. whisper.keyMu.Lock()
  466. defer whisper.keyMu.Unlock()
  467. // double check is necessary, because deriveKeyMaterial() is very slow
  468. if whisper.symKeys[id] != nil {
  469. return "", fmt.Errorf("critical error: failed to generate unique ID")
  470. }
  471. whisper.symKeys[id] = derived
  472. return id, nil
  473. }
  474. // HasSymKey returns true if there is a key associated with the given id.
  475. // Otherwise returns false.
  476. func (whisper *Whisper) HasSymKey(id string) bool {
  477. whisper.keyMu.RLock()
  478. defer whisper.keyMu.RUnlock()
  479. return whisper.symKeys[id] != nil
  480. }
  481. // DeleteSymKey deletes the key associated with the name string if it exists.
  482. func (whisper *Whisper) DeleteSymKey(id string) bool {
  483. whisper.keyMu.Lock()
  484. defer whisper.keyMu.Unlock()
  485. if whisper.symKeys[id] != nil {
  486. delete(whisper.symKeys, id)
  487. return true
  488. }
  489. return false
  490. }
  491. // GetSymKey returns the symmetric key associated with the given id.
  492. func (whisper *Whisper) GetSymKey(id string) ([]byte, error) {
  493. whisper.keyMu.RLock()
  494. defer whisper.keyMu.RUnlock()
  495. if whisper.symKeys[id] != nil {
  496. return whisper.symKeys[id], nil
  497. }
  498. return nil, fmt.Errorf("non-existent key ID")
  499. }
  500. // Subscribe installs a new message handler used for filtering, decrypting
  501. // and subsequent storing of incoming messages.
  502. func (whisper *Whisper) Subscribe(f *Filter) (string, error) {
  503. s, err := whisper.filters.Install(f)
  504. if err == nil {
  505. whisper.updateBloomFilter(f)
  506. }
  507. return s, err
  508. }
  509. // updateBloomFilter recalculates the new value of bloom filter,
  510. // and informs the peers if necessary.
  511. func (whisper *Whisper) updateBloomFilter(f *Filter) {
  512. aggregate := make([]byte, BloomFilterSize)
  513. for _, t := range f.Topics {
  514. top := BytesToTopic(t)
  515. b := TopicToBloom(top)
  516. aggregate = addBloom(aggregate, b)
  517. }
  518. if !BloomFilterMatch(whisper.BloomFilter(), aggregate) {
  519. // existing bloom filter must be updated
  520. aggregate = addBloom(whisper.BloomFilter(), aggregate)
  521. whisper.SetBloomFilter(aggregate)
  522. }
  523. }
  524. // GetFilter returns the filter by id.
  525. func (whisper *Whisper) GetFilter(id string) *Filter {
  526. return whisper.filters.Get(id)
  527. }
  528. // Unsubscribe removes an installed message handler.
  529. func (whisper *Whisper) Unsubscribe(id string) error {
  530. ok := whisper.filters.Uninstall(id)
  531. if !ok {
  532. return fmt.Errorf("Unsubscribe: Invalid ID")
  533. }
  534. return nil
  535. }
  536. // Send injects a message into the whisper send queue, to be distributed in the
  537. // network in the coming cycles.
  538. func (whisper *Whisper) Send(envelope *Envelope) error {
  539. ok, err := whisper.add(envelope, false)
  540. if err == nil && !ok {
  541. return fmt.Errorf("failed to add envelope")
  542. }
  543. return err
  544. }
  545. // Start implements node.Service, starting the background data propagation thread
  546. // of the Whisper protocol.
  547. func (whisper *Whisper) Start(*p2p.Server) error {
  548. log.Info("started whisper v." + ProtocolVersionStr)
  549. whisper.wg.Add(1)
  550. go whisper.update()
  551. numCPU := runtime.NumCPU()
  552. for i := 0; i < numCPU; i++ {
  553. whisper.wg.Add(1)
  554. go whisper.processQueue()
  555. }
  556. return nil
  557. }
  558. // Stop implements node.Service, stopping the background data propagation thread
  559. // of the Whisper protocol.
  560. func (whisper *Whisper) Stop() error {
  561. close(whisper.quit)
  562. whisper.wg.Wait()
  563. log.Info("whisper stopped")
  564. return nil
  565. }
  566. // HandlePeer is called by the underlying P2P layer when the whisper sub-protocol
  567. // connection is negotiated.
  568. func (whisper *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
  569. // Create the new peer and start tracking it
  570. whisperPeer := newPeer(whisper, peer, rw)
  571. whisper.peerMu.Lock()
  572. whisper.peers[whisperPeer] = struct{}{}
  573. whisper.peerMu.Unlock()
  574. defer func() {
  575. whisper.peerMu.Lock()
  576. delete(whisper.peers, whisperPeer)
  577. whisper.peerMu.Unlock()
  578. }()
  579. // Run the peer handshake and state updates
  580. if err := whisperPeer.handshake(); err != nil {
  581. return err
  582. }
  583. whisperPeer.start()
  584. defer whisperPeer.stop()
  585. return whisper.runMessageLoop(whisperPeer, rw)
  586. }
  587. // runMessageLoop reads and processes inbound messages directly to merge into client-global state.
  588. func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
  589. for {
  590. // fetch the next packet
  591. packet, err := rw.ReadMsg()
  592. if err != nil {
  593. log.Info("message loop", "peer", p.peer.ID(), "err", err)
  594. return err
  595. }
  596. if packet.Size > whisper.MaxMessageSize() {
  597. log.Warn("oversized message received", "peer", p.peer.ID())
  598. return errors.New("oversized message received")
  599. }
  600. switch packet.Code {
  601. case statusCode:
  602. // this should not happen, but no need to panic; just ignore this message.
  603. log.Warn("unxepected status message received", "peer", p.peer.ID())
  604. case messagesCode:
  605. // decode the contained envelopes
  606. var envelopes []*Envelope
  607. if err := packet.Decode(&envelopes); err != nil {
  608. log.Warn("failed to decode envelopes, peer will be disconnected", "peer", p.peer.ID(), "err", err)
  609. return errors.New("invalid envelopes")
  610. }
  611. trouble := false
  612. for _, env := range envelopes {
  613. cached, err := whisper.add(env, whisper.LightClientMode())
  614. if err != nil {
  615. trouble = true
  616. log.Error("bad envelope received, peer will be disconnected", "peer", p.peer.ID(), "err", err)
  617. }
  618. if cached {
  619. p.mark(env)
  620. }
  621. }
  622. if trouble {
  623. return errors.New("invalid envelope")
  624. }
  625. case powRequirementCode:
  626. s := rlp.NewStream(packet.Payload, uint64(packet.Size))
  627. i, err := s.Uint()
  628. if err != nil {
  629. log.Warn("failed to decode powRequirementCode message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
  630. return errors.New("invalid powRequirementCode message")
  631. }
  632. f := math.Float64frombits(i)
  633. if math.IsInf(f, 0) || math.IsNaN(f) || f < 0.0 {
  634. log.Warn("invalid value in powRequirementCode message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
  635. return errors.New("invalid value in powRequirementCode message")
  636. }
  637. p.powRequirement = f
  638. case bloomFilterExCode:
  639. var bloom []byte
  640. err := packet.Decode(&bloom)
  641. if err == nil && len(bloom) != BloomFilterSize {
  642. err = fmt.Errorf("wrong bloom filter size %d", len(bloom))
  643. }
  644. if err != nil {
  645. log.Warn("failed to decode bloom filter exchange message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
  646. return errors.New("invalid bloom filter exchange message")
  647. }
  648. p.setBloomFilter(bloom)
  649. case p2pMessageCode:
  650. // peer-to-peer message, sent directly to peer bypassing PoW checks, etc.
  651. // this message is not supposed to be forwarded to other peers, and
  652. // therefore might not satisfy the PoW, expiry and other requirements.
  653. // these messages are only accepted from the trusted peer.
  654. if p.trusted {
  655. var envelope Envelope
  656. if err := packet.Decode(&envelope); err != nil {
  657. log.Warn("failed to decode direct message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
  658. return errors.New("invalid direct message")
  659. }
  660. whisper.postEvent(&envelope, true)
  661. }
  662. case p2pRequestCode:
  663. // Must be processed if mail server is implemented. Otherwise ignore.
  664. if whisper.mailServer != nil {
  665. var request Envelope
  666. if err := packet.Decode(&request); err != nil {
  667. log.Warn("failed to decode p2p request message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
  668. return errors.New("invalid p2p request")
  669. }
  670. whisper.mailServer.DeliverMail(p, &request)
  671. }
  672. default:
  673. // New message types might be implemented in the future versions of Whisper.
  674. // For forward compatibility, just ignore.
  675. }
  676. packet.Discard()
  677. }
  678. }
  679. // add inserts a new envelope into the message pool to be distributed within the
  680. // whisper network. It also inserts the envelope into the expiration pool at the
  681. // appropriate time-stamp. In case of error, connection should be dropped.
  682. // param isP2P indicates whether the message is peer-to-peer (should not be forwarded).
  683. func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
  684. now := uint32(time.Now().Unix())
  685. sent := envelope.Expiry - envelope.TTL
  686. if sent > now {
  687. if sent-DefaultSyncAllowance > now {
  688. return false, fmt.Errorf("envelope created in the future [%x]", envelope.Hash())
  689. }
  690. // recalculate PoW, adjusted for the time difference, plus one second for latency
  691. envelope.calculatePoW(sent - now + 1)
  692. }
  693. if envelope.Expiry < now {
  694. if envelope.Expiry+DefaultSyncAllowance*2 < now {
  695. return false, fmt.Errorf("very old message")
  696. }
  697. log.Debug("expired envelope dropped", "hash", envelope.Hash().Hex())
  698. return false, nil // drop envelope without error
  699. }
  700. if uint32(envelope.size()) > whisper.MaxMessageSize() {
  701. return false, fmt.Errorf("huge messages are not allowed [%x]", envelope.Hash())
  702. }
  703. if envelope.PoW() < whisper.MinPow() {
  704. // maybe the value was recently changed, and the peers did not adjust yet.
  705. // in this case the previous value is retrieved by MinPowTolerance()
  706. // for a short period of peer synchronization.
  707. if envelope.PoW() < whisper.MinPowTolerance() {
  708. return false, fmt.Errorf("envelope with low PoW received: PoW=%f, hash=[%v]", envelope.PoW(), envelope.Hash().Hex())
  709. }
  710. }
  711. if !BloomFilterMatch(whisper.BloomFilter(), envelope.Bloom()) {
  712. // maybe the value was recently changed, and the peers did not adjust yet.
  713. // in this case the previous value is retrieved by BloomFilterTolerance()
  714. // for a short period of peer synchronization.
  715. if !BloomFilterMatch(whisper.BloomFilterTolerance(), envelope.Bloom()) {
  716. return false, fmt.Errorf("envelope does not match bloom filter, hash=[%v], bloom: \n%x \n%x \n%x",
  717. envelope.Hash().Hex(), whisper.BloomFilter(), envelope.Bloom(), envelope.Topic)
  718. }
  719. }
  720. hash := envelope.Hash()
  721. whisper.poolMu.Lock()
  722. _, alreadyCached := whisper.envelopes[hash]
  723. if !alreadyCached {
  724. whisper.envelopes[hash] = envelope
  725. if whisper.expirations[envelope.Expiry] == nil {
  726. whisper.expirations[envelope.Expiry] = mapset.NewThreadUnsafeSet()
  727. }
  728. if !whisper.expirations[envelope.Expiry].Contains(hash) {
  729. whisper.expirations[envelope.Expiry].Add(hash)
  730. }
  731. }
  732. whisper.poolMu.Unlock()
  733. if alreadyCached {
  734. log.Trace("whisper envelope already cached", "hash", envelope.Hash().Hex())
  735. } else {
  736. log.Trace("cached whisper envelope", "hash", envelope.Hash().Hex())
  737. whisper.statsMu.Lock()
  738. whisper.stats.memoryUsed += envelope.size()
  739. whisper.statsMu.Unlock()
  740. whisper.postEvent(envelope, isP2P) // notify the local node about the new message
  741. if whisper.mailServer != nil {
  742. whisper.mailServer.Archive(envelope)
  743. }
  744. }
  745. return true, nil
  746. }
  747. // postEvent queues the message for further processing.
  748. func (whisper *Whisper) postEvent(envelope *Envelope, isP2P bool) {
  749. if isP2P {
  750. whisper.p2pMsgQueue <- envelope
  751. } else {
  752. whisper.checkOverflow()
  753. whisper.messageQueue <- envelope
  754. }
  755. }
  756. // checkOverflow checks if message queue overflow occurs and reports it if necessary.
  757. func (whisper *Whisper) checkOverflow() {
  758. queueSize := len(whisper.messageQueue)
  759. if queueSize == messageQueueLimit {
  760. if !whisper.Overflow() {
  761. whisper.settings.Store(overflowIdx, true)
  762. log.Warn("message queue overflow")
  763. }
  764. } else if queueSize <= messageQueueLimit/2 {
  765. if whisper.Overflow() {
  766. whisper.settings.Store(overflowIdx, false)
  767. log.Warn("message queue overflow fixed (back to normal)")
  768. }
  769. }
  770. }
  771. // processQueue delivers the messages to the watchers during the lifetime of the whisper node.
  772. func (whisper *Whisper) processQueue() {
  773. defer whisper.wg.Done()
  774. var e *Envelope
  775. for {
  776. select {
  777. case <-whisper.quit:
  778. return
  779. case e = <-whisper.messageQueue:
  780. whisper.filters.NotifyWatchers(e, false)
  781. case e = <-whisper.p2pMsgQueue:
  782. whisper.filters.NotifyWatchers(e, true)
  783. }
  784. }
  785. }
  786. // update loops until the lifetime of the whisper node, updating its internal
  787. // state by expiring stale messages from the pool.
  788. func (whisper *Whisper) update() {
  789. defer whisper.wg.Done()
  790. // Start a ticker to check for expirations
  791. expire := time.NewTicker(expirationCycle)
  792. defer expire.Stop()
  793. // Repeat updates until termination is requested
  794. for {
  795. select {
  796. case <-expire.C:
  797. whisper.expire()
  798. case <-whisper.quit:
  799. return
  800. }
  801. }
  802. }
  803. // expire iterates over all the expiration timestamps, removing all stale
  804. // messages from the pools.
  805. func (whisper *Whisper) expire() {
  806. whisper.poolMu.Lock()
  807. defer whisper.poolMu.Unlock()
  808. whisper.statsMu.Lock()
  809. defer whisper.statsMu.Unlock()
  810. whisper.stats.reset()
  811. now := uint32(time.Now().Unix())
  812. for expiry, hashSet := range whisper.expirations {
  813. if expiry < now {
  814. // Dump all expired messages and remove timestamp
  815. hashSet.Each(func(v interface{}) bool {
  816. sz := whisper.envelopes[v.(common.Hash)].size()
  817. delete(whisper.envelopes, v.(common.Hash))
  818. whisper.stats.messagesCleared++
  819. whisper.stats.memoryCleared += sz
  820. whisper.stats.memoryUsed -= sz
  821. return false
  822. })
  823. whisper.expirations[expiry].Clear()
  824. delete(whisper.expirations, expiry)
  825. }
  826. }
  827. }
  828. // Stats returns the whisper node statistics.
  829. func (whisper *Whisper) Stats() Statistics {
  830. whisper.statsMu.Lock()
  831. defer whisper.statsMu.Unlock()
  832. return whisper.stats
  833. }
  834. // Envelopes retrieves all the messages currently pooled by the node.
  835. func (whisper *Whisper) Envelopes() []*Envelope {
  836. whisper.poolMu.RLock()
  837. defer whisper.poolMu.RUnlock()
  838. all := make([]*Envelope, 0, len(whisper.envelopes))
  839. for _, envelope := range whisper.envelopes {
  840. all = append(all, envelope)
  841. }
  842. return all
  843. }
  844. // isEnvelopeCached checks if envelope with specific hash has already been received and cached.
  845. func (whisper *Whisper) isEnvelopeCached(hash common.Hash) bool {
  846. whisper.poolMu.Lock()
  847. defer whisper.poolMu.Unlock()
  848. _, exist := whisper.envelopes[hash]
  849. return exist
  850. }
  851. // reset resets the node's statistics after each expiry cycle.
  852. func (s *Statistics) reset() {
  853. s.cycles++
  854. s.totalMessagesCleared += s.messagesCleared
  855. s.memoryCleared = 0
  856. s.messagesCleared = 0
  857. }
  858. // ValidatePublicKey checks the format of the given public key.
  859. func ValidatePublicKey(k *ecdsa.PublicKey) bool {
  860. return k != nil && k.X != nil && k.Y != nil && k.X.Sign() != 0 && k.Y.Sign() != 0
  861. }
  862. // validatePrivateKey checks the format of the given private key.
  863. func validatePrivateKey(k *ecdsa.PrivateKey) bool {
  864. if k == nil || k.D == nil || k.D.Sign() == 0 {
  865. return false
  866. }
  867. return ValidatePublicKey(&k.PublicKey)
  868. }
  869. // validateDataIntegrity returns false if the data have the wrong or contains all zeros,
  870. // which is the simplest and the most common bug.
  871. func validateDataIntegrity(k []byte, expectedSize int) bool {
  872. if len(k) != expectedSize {
  873. return false
  874. }
  875. if expectedSize > 3 && containsOnlyZeros(k) {
  876. return false
  877. }
  878. return true
  879. }
  880. // containsOnlyZeros checks if the data contain only zeros.
  881. func containsOnlyZeros(data []byte) bool {
  882. for _, b := range data {
  883. if b != 0 {
  884. return false
  885. }
  886. }
  887. return true
  888. }
  889. // bytesToUintLittleEndian converts the slice to 64-bit unsigned integer.
  890. func bytesToUintLittleEndian(b []byte) (res uint64) {
  891. mul := uint64(1)
  892. for i := 0; i < len(b); i++ {
  893. res += uint64(b[i]) * mul
  894. mul *= 256
  895. }
  896. return res
  897. }
  898. // BytesToUintBigEndian converts the slice to 64-bit unsigned integer.
  899. func BytesToUintBigEndian(b []byte) (res uint64) {
  900. for i := 0; i < len(b); i++ {
  901. res *= 256
  902. res += uint64(b[i])
  903. }
  904. return res
  905. }
  906. // GenerateRandomID generates a random string, which is then returned to be used as a key id
  907. func GenerateRandomID() (id string, err error) {
  908. buf, err := generateSecureRandomData(keyIDSize)
  909. if err != nil {
  910. return "", err
  911. }
  912. if !validateDataIntegrity(buf, keyIDSize) {
  913. return "", fmt.Errorf("error in generateRandomID: crypto/rand failed to generate random data")
  914. }
  915. id = common.Bytes2Hex(buf)
  916. return id, err
  917. }
  918. func isFullNode(bloom []byte) bool {
  919. if bloom == nil {
  920. return true
  921. }
  922. for _, b := range bloom {
  923. if b != 255 {
  924. return false
  925. }
  926. }
  927. return true
  928. }
  929. func BloomFilterMatch(filter, sample []byte) bool {
  930. if filter == nil {
  931. return true
  932. }
  933. for i := 0; i < BloomFilterSize; i++ {
  934. f := filter[i]
  935. s := sample[i]
  936. if (f | s) != f {
  937. return false
  938. }
  939. }
  940. return true
  941. }
  942. func addBloom(a, b []byte) []byte {
  943. c := make([]byte, BloomFilterSize)
  944. for i := 0; i < BloomFilterSize; i++ {
  945. c[i] = a[i] | b[i]
  946. }
  947. return c
  948. }