pss_test.go 46 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package pss
  17. import (
  18. "bytes"
  19. "context"
  20. "crypto/ecdsa"
  21. "encoding/binary"
  22. "encoding/hex"
  23. "encoding/json"
  24. "flag"
  25. "fmt"
  26. "io/ioutil"
  27. "math/rand"
  28. "os"
  29. "strconv"
  30. "strings"
  31. "sync"
  32. "testing"
  33. "time"
  34. "github.com/ethereum/go-ethereum/common"
  35. "github.com/ethereum/go-ethereum/common/hexutil"
  36. "github.com/ethereum/go-ethereum/crypto"
  37. "github.com/ethereum/go-ethereum/log"
  38. "github.com/ethereum/go-ethereum/metrics"
  39. "github.com/ethereum/go-ethereum/metrics/influxdb"
  40. "github.com/ethereum/go-ethereum/node"
  41. "github.com/ethereum/go-ethereum/p2p"
  42. "github.com/ethereum/go-ethereum/p2p/discover"
  43. "github.com/ethereum/go-ethereum/p2p/protocols"
  44. "github.com/ethereum/go-ethereum/p2p/simulations"
  45. "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
  46. "github.com/ethereum/go-ethereum/rpc"
  47. "github.com/ethereum/go-ethereum/swarm/network"
  48. "github.com/ethereum/go-ethereum/swarm/state"
  49. whisper "github.com/ethereum/go-ethereum/whisper/whisperv5"
  50. )
  51. var (
  52. initOnce = sync.Once{}
  53. debugdebugflag = flag.Bool("vv", false, "veryverbose")
  54. debugflag = flag.Bool("v", false, "verbose")
  55. longrunning = flag.Bool("longrunning", false, "do run long-running tests")
  56. w *whisper.Whisper
  57. wapi *whisper.PublicWhisperAPI
  58. psslogmain log.Logger
  59. pssprotocols map[string]*protoCtrl
  60. useHandshake bool
  61. )
  62. func init() {
  63. flag.Parse()
  64. rand.Seed(time.Now().Unix())
  65. adapters.RegisterServices(newServices(false))
  66. initTest()
  67. }
  68. func initTest() {
  69. initOnce.Do(
  70. func() {
  71. loglevel := log.LvlInfo
  72. if *debugflag {
  73. loglevel = log.LvlDebug
  74. } else if *debugdebugflag {
  75. loglevel = log.LvlTrace
  76. }
  77. psslogmain = log.New("psslog", "*")
  78. hs := log.StreamHandler(os.Stderr, log.TerminalFormat(true))
  79. hf := log.LvlFilterHandler(loglevel, hs)
  80. h := log.CallerFileHandler(hf)
  81. log.Root().SetHandler(h)
  82. w = whisper.New(&whisper.DefaultConfig)
  83. wapi = whisper.NewPublicWhisperAPI(w)
  84. pssprotocols = make(map[string]*protoCtrl)
  85. },
  86. )
  87. }
  88. // test that topic conversion functions give predictable results
  89. func TestTopic(t *testing.T) {
  90. api := &API{}
  91. topicstr := strings.Join([]string{PingProtocol.Name, strconv.Itoa(int(PingProtocol.Version))}, ":")
  92. // bytestotopic is the authoritative topic conversion source
  93. topicobj := BytesToTopic([]byte(topicstr))
  94. // string to topic and bytes to topic must match
  95. topicapiobj, _ := api.StringToTopic(topicstr)
  96. if topicobj != topicapiobj {
  97. t.Fatalf("bytes and string topic conversion mismatch; %s != %s", topicobj, topicapiobj)
  98. }
  99. // string representation of topichex
  100. topichex := topicobj.String()
  101. // protocoltopic wrapper on pingtopic should be same as topicstring
  102. // check that it matches
  103. pingtopichex := PingTopic.String()
  104. if topichex != pingtopichex {
  105. t.Fatalf("protocol topic conversion mismatch; %s != %s", topichex, pingtopichex)
  106. }
  107. // json marshal of topic
  108. topicjsonout, err := topicobj.MarshalJSON()
  109. if err != nil {
  110. t.Fatal(err)
  111. }
  112. if string(topicjsonout)[1:len(topicjsonout)-1] != topichex {
  113. t.Fatalf("topic json marshal mismatch; %s != \"%s\"", topicjsonout, topichex)
  114. }
  115. // json unmarshal of topic
  116. var topicjsonin Topic
  117. topicjsonin.UnmarshalJSON(topicjsonout)
  118. if topicjsonin != topicobj {
  119. t.Fatalf("topic json unmarshal mismatch: %x != %x", topicjsonin, topicobj)
  120. }
  121. }
  122. // test bit packing of message control flags
  123. func TestMsgParams(t *testing.T) {
  124. var ctrl byte
  125. ctrl |= pssControlRaw
  126. p := newMsgParamsFromBytes([]byte{ctrl})
  127. m := newPssMsg(p)
  128. if !m.isRaw() || m.isSym() {
  129. t.Fatal("expected raw=true and sym=false")
  130. }
  131. ctrl |= pssControlSym
  132. p = newMsgParamsFromBytes([]byte{ctrl})
  133. m = newPssMsg(p)
  134. if !m.isRaw() || !m.isSym() {
  135. t.Fatal("expected raw=true and sym=true")
  136. }
  137. ctrl &= 0xff &^ pssControlRaw
  138. p = newMsgParamsFromBytes([]byte{ctrl})
  139. m = newPssMsg(p)
  140. if m.isRaw() || !m.isSym() {
  141. t.Fatal("expected raw=false and sym=true")
  142. }
  143. }
  144. // test if we can insert into cache, match items with cache and cache expiry
  145. func TestCache(t *testing.T) {
  146. var err error
  147. to, _ := hex.DecodeString("08090a0b0c0d0e0f1011121314150001020304050607161718191a1b1c1d1e1f")
  148. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  149. defer cancel()
  150. keys, err := wapi.NewKeyPair(ctx)
  151. privkey, err := w.GetPrivateKey(keys)
  152. if err != nil {
  153. t.Fatal(err)
  154. }
  155. ps := newTestPss(privkey, nil, nil)
  156. pp := NewPssParams().WithPrivateKey(privkey)
  157. data := []byte("foo")
  158. datatwo := []byte("bar")
  159. datathree := []byte("baz")
  160. wparams := &whisper.MessageParams{
  161. TTL: defaultWhisperTTL,
  162. Src: privkey,
  163. Dst: &privkey.PublicKey,
  164. Topic: whisper.TopicType(PingTopic),
  165. WorkTime: defaultWhisperWorkTime,
  166. PoW: defaultWhisperPoW,
  167. Payload: data,
  168. }
  169. woutmsg, err := whisper.NewSentMessage(wparams)
  170. env, err := woutmsg.Wrap(wparams)
  171. msg := &PssMsg{
  172. Payload: env,
  173. To: to,
  174. }
  175. wparams.Payload = datatwo
  176. woutmsg, err = whisper.NewSentMessage(wparams)
  177. envtwo, err := woutmsg.Wrap(wparams)
  178. msgtwo := &PssMsg{
  179. Payload: envtwo,
  180. To: to,
  181. }
  182. wparams.Payload = datathree
  183. woutmsg, err = whisper.NewSentMessage(wparams)
  184. envthree, err := woutmsg.Wrap(wparams)
  185. msgthree := &PssMsg{
  186. Payload: envthree,
  187. To: to,
  188. }
  189. digest := ps.digest(msg)
  190. if err != nil {
  191. t.Fatalf("could not store cache msgone: %v", err)
  192. }
  193. digesttwo := ps.digest(msgtwo)
  194. if err != nil {
  195. t.Fatalf("could not store cache msgtwo: %v", err)
  196. }
  197. digestthree := ps.digest(msgthree)
  198. if err != nil {
  199. t.Fatalf("could not store cache msgthree: %v", err)
  200. }
  201. if digest == digesttwo {
  202. t.Fatalf("different msgs return same hash: %d", digesttwo)
  203. }
  204. // check the cache
  205. err = ps.addFwdCache(msg)
  206. if err != nil {
  207. t.Fatalf("write to pss expire cache failed: %v", err)
  208. }
  209. if !ps.checkFwdCache(msg) {
  210. t.Fatalf("message %v should have EXPIRE record in cache but checkCache returned false", msg)
  211. }
  212. if ps.checkFwdCache(msgtwo) {
  213. t.Fatalf("message %v should NOT have EXPIRE record in cache but checkCache returned true", msgtwo)
  214. }
  215. time.Sleep(pp.CacheTTL + 1*time.Second)
  216. err = ps.addFwdCache(msgthree)
  217. if err != nil {
  218. t.Fatalf("write to pss expire cache failed: %v", err)
  219. }
  220. if ps.checkFwdCache(msg) {
  221. t.Fatalf("message %v should have expired from cache but checkCache returned true", msg)
  222. }
  223. if _, ok := ps.fwdCache[digestthree]; !ok {
  224. t.Fatalf("unexpired message should be in the cache: %v", digestthree)
  225. }
  226. if _, ok := ps.fwdCache[digesttwo]; ok {
  227. t.Fatalf("expired message should have been cleared from the cache: %v", digesttwo)
  228. }
  229. }
  230. // matching of address hints; whether a message could be or is for the node
  231. func TestAddressMatch(t *testing.T) {
  232. localaddr := network.RandomAddr().Over()
  233. copy(localaddr[:8], []byte("deadbeef"))
  234. remoteaddr := []byte("feedbeef")
  235. kadparams := network.NewKadParams()
  236. kad := network.NewKademlia(localaddr, kadparams)
  237. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  238. defer cancel()
  239. keys, err := wapi.NewKeyPair(ctx)
  240. if err != nil {
  241. t.Fatalf("Could not generate private key: %v", err)
  242. }
  243. privkey, err := w.GetPrivateKey(keys)
  244. pssp := NewPssParams().WithPrivateKey(privkey)
  245. ps, err := NewPss(kad, pssp)
  246. if err != nil {
  247. t.Fatal(err.Error())
  248. }
  249. pssmsg := &PssMsg{
  250. To: remoteaddr,
  251. Payload: &whisper.Envelope{},
  252. }
  253. // differ from first byte
  254. if ps.isSelfRecipient(pssmsg) {
  255. t.Fatalf("isSelfRecipient true but %x != %x", remoteaddr, localaddr)
  256. }
  257. if ps.isSelfPossibleRecipient(pssmsg) {
  258. t.Fatalf("isSelfPossibleRecipient true but %x != %x", remoteaddr[:8], localaddr[:8])
  259. }
  260. // 8 first bytes same
  261. copy(remoteaddr[:4], localaddr[:4])
  262. if ps.isSelfRecipient(pssmsg) {
  263. t.Fatalf("isSelfRecipient true but %x != %x", remoteaddr, localaddr)
  264. }
  265. if !ps.isSelfPossibleRecipient(pssmsg) {
  266. t.Fatalf("isSelfPossibleRecipient false but %x == %x", remoteaddr[:8], localaddr[:8])
  267. }
  268. // all bytes same
  269. pssmsg.To = localaddr
  270. if !ps.isSelfRecipient(pssmsg) {
  271. t.Fatalf("isSelfRecipient false but %x == %x", remoteaddr, localaddr)
  272. }
  273. if !ps.isSelfPossibleRecipient(pssmsg) {
  274. t.Fatalf("isSelfPossibleRecipient false but %x == %x", remoteaddr[:8], localaddr[:8])
  275. }
  276. }
  277. //
  278. func TestHandlerConditions(t *testing.T) {
  279. t.Skip("Disabled due to probable faulty logic for outbox expectations")
  280. // setup
  281. privkey, err := crypto.GenerateKey()
  282. if err != nil {
  283. t.Fatal(err.Error())
  284. }
  285. addr := make([]byte, 32)
  286. addr[0] = 0x01
  287. ps := newTestPss(privkey, network.NewKademlia(addr, network.NewKadParams()), NewPssParams())
  288. // message should pass
  289. msg := &PssMsg{
  290. To: addr,
  291. Expire: uint32(time.Now().Add(time.Second * 60).Unix()),
  292. Payload: &whisper.Envelope{
  293. Topic: [4]byte{},
  294. Data: []byte{0x66, 0x6f, 0x6f},
  295. },
  296. }
  297. if err := ps.handlePssMsg(msg); err != nil {
  298. t.Fatal(err.Error())
  299. }
  300. tmr := time.NewTimer(time.Millisecond * 100)
  301. var outmsg *PssMsg
  302. select {
  303. case outmsg = <-ps.outbox:
  304. case <-tmr.C:
  305. default:
  306. }
  307. if outmsg != nil {
  308. t.Fatalf("expected outbox empty after full address on msg, but had message %s", msg)
  309. }
  310. // message should pass and queue due to partial length
  311. msg.To = addr[0:1]
  312. msg.Payload.Data = []byte{0x78, 0x79, 0x80, 0x80, 0x79}
  313. if err := ps.handlePssMsg(msg); err != nil {
  314. t.Fatal(err.Error())
  315. }
  316. tmr.Reset(time.Millisecond * 100)
  317. outmsg = nil
  318. select {
  319. case outmsg = <-ps.outbox:
  320. case <-tmr.C:
  321. }
  322. if outmsg == nil {
  323. t.Fatal("expected message in outbox on encrypt fail, but empty")
  324. }
  325. outmsg = nil
  326. select {
  327. case outmsg = <-ps.outbox:
  328. default:
  329. }
  330. if outmsg != nil {
  331. t.Fatalf("expected only one queued message but also had message %v", msg)
  332. }
  333. // full address mismatch should put message in queue
  334. msg.To[0] = 0xff
  335. if err := ps.handlePssMsg(msg); err != nil {
  336. t.Fatal(err.Error())
  337. }
  338. tmr.Reset(time.Millisecond * 10)
  339. outmsg = nil
  340. select {
  341. case outmsg = <-ps.outbox:
  342. case <-tmr.C:
  343. }
  344. if outmsg == nil {
  345. t.Fatal("expected message in outbox on address mismatch, but empty")
  346. }
  347. outmsg = nil
  348. select {
  349. case outmsg = <-ps.outbox:
  350. default:
  351. }
  352. if outmsg != nil {
  353. t.Fatalf("expected only one queued message but also had message %v", msg)
  354. }
  355. // expired message should be dropped
  356. msg.Expire = uint32(time.Now().Add(-time.Second).Unix())
  357. if err := ps.handlePssMsg(msg); err != nil {
  358. t.Fatal(err.Error())
  359. }
  360. tmr.Reset(time.Millisecond * 10)
  361. outmsg = nil
  362. select {
  363. case outmsg = <-ps.outbox:
  364. case <-tmr.C:
  365. default:
  366. }
  367. if outmsg != nil {
  368. t.Fatalf("expected empty queue but have message %v", msg)
  369. }
  370. // invalid message should return error
  371. fckedupmsg := &struct {
  372. pssMsg *PssMsg
  373. }{
  374. pssMsg: &PssMsg{},
  375. }
  376. if err := ps.handlePssMsg(fckedupmsg); err == nil {
  377. t.Fatalf("expected error from processMsg but error nil")
  378. }
  379. // outbox full should return error
  380. msg.Expire = uint32(time.Now().Add(time.Second * 60).Unix())
  381. for i := 0; i < defaultOutboxCapacity; i++ {
  382. ps.outbox <- msg
  383. }
  384. msg.Payload.Data = []byte{0x62, 0x61, 0x72}
  385. err = ps.handlePssMsg(msg)
  386. if err == nil {
  387. t.Fatal("expected error when mailbox full, but was nil")
  388. }
  389. }
  390. // set and generate pubkeys and symkeys
  391. func TestKeys(t *testing.T) {
  392. // make our key and init pss with it
  393. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  394. defer cancel()
  395. ourkeys, err := wapi.NewKeyPair(ctx)
  396. if err != nil {
  397. t.Fatalf("create 'our' key fail")
  398. }
  399. ctx, cancel2 := context.WithTimeout(context.Background(), time.Second)
  400. defer cancel2()
  401. theirkeys, err := wapi.NewKeyPair(ctx)
  402. if err != nil {
  403. t.Fatalf("create 'their' key fail")
  404. }
  405. ourprivkey, err := w.GetPrivateKey(ourkeys)
  406. if err != nil {
  407. t.Fatalf("failed to retrieve 'our' private key")
  408. }
  409. theirprivkey, err := w.GetPrivateKey(theirkeys)
  410. if err != nil {
  411. t.Fatalf("failed to retrieve 'their' private key")
  412. }
  413. ps := newTestPss(ourprivkey, nil, nil)
  414. // set up peer with mock address, mapped to mocked publicaddress and with mocked symkey
  415. addr := make(PssAddress, 32)
  416. copy(addr, network.RandomAddr().Over())
  417. outkey := network.RandomAddr().Over()
  418. topicobj := BytesToTopic([]byte("foo:42"))
  419. ps.SetPeerPublicKey(&theirprivkey.PublicKey, topicobj, &addr)
  420. outkeyid, err := ps.SetSymmetricKey(outkey, topicobj, &addr, false)
  421. if err != nil {
  422. t.Fatalf("failed to set 'our' outgoing symmetric key")
  423. }
  424. // make a symmetric key that we will send to peer for encrypting messages to us
  425. inkeyid, err := ps.GenerateSymmetricKey(topicobj, &addr, true)
  426. if err != nil {
  427. t.Fatalf("failed to set 'our' incoming symmetric key")
  428. }
  429. // get the key back from whisper, check that it's still the same
  430. outkeyback, err := ps.w.GetSymKey(outkeyid)
  431. if err != nil {
  432. t.Fatalf(err.Error())
  433. }
  434. inkey, err := ps.w.GetSymKey(inkeyid)
  435. if err != nil {
  436. t.Fatalf(err.Error())
  437. }
  438. if !bytes.Equal(outkeyback, outkey) {
  439. t.Fatalf("passed outgoing symkey doesnt equal stored: %x / %x", outkey, outkeyback)
  440. }
  441. t.Logf("symout: %v", outkeyback)
  442. t.Logf("symin: %v", inkey)
  443. // check that the key is stored in the peerpool
  444. psp := ps.symKeyPool[inkeyid][topicobj]
  445. if psp.address != &addr {
  446. t.Fatalf("inkey address does not match; %p != %p", psp.address, &addr)
  447. }
  448. }
  449. func TestGetPublickeyEntries(t *testing.T) {
  450. privkey, err := crypto.GenerateKey()
  451. if err != nil {
  452. t.Fatal(err)
  453. }
  454. ps := newTestPss(privkey, nil, nil)
  455. peeraddr := network.RandomAddr().Over()
  456. topicaddr := make(map[Topic]PssAddress)
  457. topicaddr[Topic{0x13}] = peeraddr
  458. topicaddr[Topic{0x2a}] = peeraddr[:16]
  459. topicaddr[Topic{0x02, 0x9a}] = []byte{}
  460. remoteprivkey, err := crypto.GenerateKey()
  461. if err != nil {
  462. t.Fatal(err)
  463. }
  464. remotepubkeybytes := crypto.FromECDSAPub(&remoteprivkey.PublicKey)
  465. remotepubkeyhex := common.ToHex(remotepubkeybytes)
  466. pssapi := NewAPI(ps)
  467. for to, a := range topicaddr {
  468. err = pssapi.SetPeerPublicKey(remotepubkeybytes, to, a)
  469. if err != nil {
  470. t.Fatal(err)
  471. }
  472. }
  473. intopic, err := pssapi.GetPeerTopics(remotepubkeyhex)
  474. if err != nil {
  475. t.Fatal(err)
  476. }
  477. OUTER:
  478. for _, tnew := range intopic {
  479. for torig, addr := range topicaddr {
  480. if bytes.Equal(torig[:], tnew[:]) {
  481. inaddr, err := pssapi.GetPeerAddress(remotepubkeyhex, torig)
  482. if err != nil {
  483. t.Fatal(err)
  484. }
  485. if !bytes.Equal(addr, inaddr) {
  486. t.Fatalf("Address mismatch for topic %x; got %x, expected %x", torig, inaddr, addr)
  487. }
  488. delete(topicaddr, torig)
  489. continue OUTER
  490. }
  491. }
  492. t.Fatalf("received topic %x did not match any existing topics", tnew)
  493. }
  494. if len(topicaddr) != 0 {
  495. t.Fatalf("%d topics were not matched", len(topicaddr))
  496. }
  497. }
  498. type pssTestPeer struct {
  499. *protocols.Peer
  500. addr []byte
  501. }
  502. func (t *pssTestPeer) Address() []byte {
  503. return t.addr
  504. }
  505. func (t *pssTestPeer) Update(addr network.OverlayAddr) network.OverlayAddr {
  506. return addr
  507. }
  508. func (t *pssTestPeer) Off() network.OverlayAddr {
  509. return &pssTestPeer{}
  510. }
  511. // forwarding should skip peers that do not have matching pss capabilities
  512. func TestMismatch(t *testing.T) {
  513. // create privkey for forwarder node
  514. privkey, err := crypto.GenerateKey()
  515. if err != nil {
  516. t.Fatal(err)
  517. }
  518. // initialize overlay
  519. baseaddr := network.RandomAddr()
  520. kad := network.NewKademlia((baseaddr).Over(), network.NewKadParams())
  521. rw := &p2p.MsgPipeRW{}
  522. // one peer has a mismatching version of pss
  523. wrongpssaddr := network.RandomAddr()
  524. wrongpsscap := p2p.Cap{
  525. Name: pssProtocolName,
  526. Version: 0,
  527. }
  528. nid, _ := discover.HexID("0x01")
  529. wrongpsspeer := &pssTestPeer{
  530. Peer: protocols.NewPeer(p2p.NewPeer(nid, common.ToHex(wrongpssaddr.Over()), []p2p.Cap{wrongpsscap}), rw, nil),
  531. addr: wrongpssaddr.Over(),
  532. }
  533. // one peer doesn't even have pss (boo!)
  534. nopssaddr := network.RandomAddr()
  535. nopsscap := p2p.Cap{
  536. Name: "nopss",
  537. Version: 1,
  538. }
  539. nid, _ = discover.HexID("0x02")
  540. nopsspeer := &pssTestPeer{
  541. Peer: protocols.NewPeer(p2p.NewPeer(nid, common.ToHex(nopssaddr.Over()), []p2p.Cap{nopsscap}), rw, nil),
  542. addr: nopssaddr.Over(),
  543. }
  544. // add peers to kademlia and activate them
  545. // it's safe so don't check errors
  546. kad.Register([]network.OverlayAddr{wrongpsspeer})
  547. kad.On(wrongpsspeer)
  548. kad.Register([]network.OverlayAddr{nopsspeer})
  549. kad.On(nopsspeer)
  550. // create pss
  551. pssmsg := &PssMsg{
  552. To: []byte{},
  553. Expire: uint32(time.Now().Add(time.Second).Unix()),
  554. Payload: &whisper.Envelope{},
  555. }
  556. ps := newTestPss(privkey, kad, nil)
  557. // run the forward
  558. // it is enough that it completes; trying to send to incapable peers would create segfault
  559. ps.forward(pssmsg)
  560. }
  561. func TestSendRaw(t *testing.T) {
  562. t.Run("32", testSendRaw)
  563. t.Run("8", testSendRaw)
  564. t.Run("0", testSendRaw)
  565. }
  566. func testSendRaw(t *testing.T) {
  567. var addrsize int64
  568. var err error
  569. paramstring := strings.Split(t.Name(), "/")
  570. addrsize, _ = strconv.ParseInt(paramstring[1], 10, 0)
  571. log.Info("raw send test", "addrsize", addrsize)
  572. clients, err := setupNetwork(2, true)
  573. if err != nil {
  574. t.Fatal(err)
  575. }
  576. topic := "0xdeadbeef"
  577. var loaddrhex string
  578. err = clients[0].Call(&loaddrhex, "pss_baseAddr")
  579. if err != nil {
  580. t.Fatalf("rpc get node 1 baseaddr fail: %v", err)
  581. }
  582. loaddrhex = loaddrhex[:2+(addrsize*2)]
  583. var roaddrhex string
  584. err = clients[1].Call(&roaddrhex, "pss_baseAddr")
  585. if err != nil {
  586. t.Fatalf("rpc get node 2 baseaddr fail: %v", err)
  587. }
  588. roaddrhex = roaddrhex[:2+(addrsize*2)]
  589. time.Sleep(time.Millisecond * 500)
  590. // at this point we've verified that symkeys are saved and match on each peer
  591. // now try sending symmetrically encrypted message, both directions
  592. lmsgC := make(chan APIMsg)
  593. lctx, lcancel := context.WithTimeout(context.Background(), time.Second*10)
  594. defer lcancel()
  595. lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic)
  596. log.Trace("lsub", "id", lsub)
  597. defer lsub.Unsubscribe()
  598. rmsgC := make(chan APIMsg)
  599. rctx, rcancel := context.WithTimeout(context.Background(), time.Second*10)
  600. defer rcancel()
  601. rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic)
  602. log.Trace("rsub", "id", rsub)
  603. defer rsub.Unsubscribe()
  604. // send and verify delivery
  605. lmsg := []byte("plugh")
  606. err = clients[1].Call(nil, "pss_sendRaw", loaddrhex, topic, lmsg)
  607. if err != nil {
  608. t.Fatal(err)
  609. }
  610. select {
  611. case recvmsg := <-lmsgC:
  612. if !bytes.Equal(recvmsg.Msg, lmsg) {
  613. t.Fatalf("node 1 received payload mismatch: expected %v, got %v", lmsg, recvmsg)
  614. }
  615. case cerr := <-lctx.Done():
  616. t.Fatalf("test message (left) timed out: %v", cerr)
  617. }
  618. rmsg := []byte("xyzzy")
  619. err = clients[0].Call(nil, "pss_sendRaw", roaddrhex, topic, rmsg)
  620. if err != nil {
  621. t.Fatal(err)
  622. }
  623. select {
  624. case recvmsg := <-rmsgC:
  625. if !bytes.Equal(recvmsg.Msg, rmsg) {
  626. t.Fatalf("node 2 received payload mismatch: expected %x, got %v", rmsg, recvmsg.Msg)
  627. }
  628. case cerr := <-rctx.Done():
  629. t.Fatalf("test message (right) timed out: %v", cerr)
  630. }
  631. }
  632. // send symmetrically encrypted message between two directly connected peers
  633. func TestSendSym(t *testing.T) {
  634. t.Run("32", testSendSym)
  635. t.Run("8", testSendSym)
  636. t.Run("0", testSendSym)
  637. }
  638. func testSendSym(t *testing.T) {
  639. // address hint size
  640. var addrsize int64
  641. var err error
  642. paramstring := strings.Split(t.Name(), "/")
  643. addrsize, _ = strconv.ParseInt(paramstring[1], 10, 0)
  644. log.Info("sym send test", "addrsize", addrsize)
  645. clients, err := setupNetwork(2, false)
  646. if err != nil {
  647. t.Fatal(err)
  648. }
  649. var topic string
  650. err = clients[0].Call(&topic, "pss_stringToTopic", "foo:42")
  651. if err != nil {
  652. t.Fatal(err)
  653. }
  654. var loaddrhex string
  655. err = clients[0].Call(&loaddrhex, "pss_baseAddr")
  656. if err != nil {
  657. t.Fatalf("rpc get node 1 baseaddr fail: %v", err)
  658. }
  659. loaddrhex = loaddrhex[:2+(addrsize*2)]
  660. var roaddrhex string
  661. err = clients[1].Call(&roaddrhex, "pss_baseAddr")
  662. if err != nil {
  663. t.Fatalf("rpc get node 2 baseaddr fail: %v", err)
  664. }
  665. roaddrhex = roaddrhex[:2+(addrsize*2)]
  666. // retrieve public key from pss instance
  667. // set this public key reciprocally
  668. var lpubkeyhex string
  669. err = clients[0].Call(&lpubkeyhex, "pss_getPublicKey")
  670. if err != nil {
  671. t.Fatalf("rpc get node 1 pubkey fail: %v", err)
  672. }
  673. var rpubkeyhex string
  674. err = clients[1].Call(&rpubkeyhex, "pss_getPublicKey")
  675. if err != nil {
  676. t.Fatalf("rpc get node 2 pubkey fail: %v", err)
  677. }
  678. time.Sleep(time.Millisecond * 500)
  679. // at this point we've verified that symkeys are saved and match on each peer
  680. // now try sending symmetrically encrypted message, both directions
  681. lmsgC := make(chan APIMsg)
  682. lctx, lcancel := context.WithTimeout(context.Background(), time.Second*10)
  683. defer lcancel()
  684. lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic)
  685. log.Trace("lsub", "id", lsub)
  686. defer lsub.Unsubscribe()
  687. rmsgC := make(chan APIMsg)
  688. rctx, rcancel := context.WithTimeout(context.Background(), time.Second*10)
  689. defer rcancel()
  690. rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic)
  691. log.Trace("rsub", "id", rsub)
  692. defer rsub.Unsubscribe()
  693. lrecvkey := network.RandomAddr().Over()
  694. rrecvkey := network.RandomAddr().Over()
  695. var lkeyids [2]string
  696. var rkeyids [2]string
  697. // manually set reciprocal symkeys
  698. err = clients[0].Call(&lkeyids, "psstest_setSymKeys", rpubkeyhex, lrecvkey, rrecvkey, defaultSymKeySendLimit, topic, roaddrhex)
  699. if err != nil {
  700. t.Fatal(err)
  701. }
  702. err = clients[1].Call(&rkeyids, "psstest_setSymKeys", lpubkeyhex, rrecvkey, lrecvkey, defaultSymKeySendLimit, topic, loaddrhex)
  703. if err != nil {
  704. t.Fatal(err)
  705. }
  706. // send and verify delivery
  707. lmsg := []byte("plugh")
  708. err = clients[1].Call(nil, "pss_sendSym", rkeyids[1], topic, hexutil.Encode(lmsg))
  709. if err != nil {
  710. t.Fatal(err)
  711. }
  712. select {
  713. case recvmsg := <-lmsgC:
  714. if !bytes.Equal(recvmsg.Msg, lmsg) {
  715. t.Fatalf("node 1 received payload mismatch: expected %v, got %v", lmsg, recvmsg)
  716. }
  717. case cerr := <-lctx.Done():
  718. t.Fatalf("test message timed out: %v", cerr)
  719. }
  720. rmsg := []byte("xyzzy")
  721. err = clients[0].Call(nil, "pss_sendSym", lkeyids[1], topic, hexutil.Encode(rmsg))
  722. if err != nil {
  723. t.Fatal(err)
  724. }
  725. select {
  726. case recvmsg := <-rmsgC:
  727. if !bytes.Equal(recvmsg.Msg, rmsg) {
  728. t.Fatalf("node 2 received payload mismatch: expected %x, got %v", rmsg, recvmsg.Msg)
  729. }
  730. case cerr := <-rctx.Done():
  731. t.Fatalf("test message timed out: %v", cerr)
  732. }
  733. }
  734. // send asymmetrically encrypted message between two directly connected peers
  735. func TestSendAsym(t *testing.T) {
  736. t.Run("32", testSendAsym)
  737. t.Run("8", testSendAsym)
  738. t.Run("0", testSendAsym)
  739. }
  740. func testSendAsym(t *testing.T) {
  741. // address hint size
  742. var addrsize int64
  743. var err error
  744. paramstring := strings.Split(t.Name(), "/")
  745. addrsize, _ = strconv.ParseInt(paramstring[1], 10, 0)
  746. log.Info("asym send test", "addrsize", addrsize)
  747. clients, err := setupNetwork(2, false)
  748. if err != nil {
  749. t.Fatal(err)
  750. }
  751. var topic string
  752. err = clients[0].Call(&topic, "pss_stringToTopic", "foo:42")
  753. if err != nil {
  754. t.Fatal(err)
  755. }
  756. time.Sleep(time.Millisecond * 250)
  757. var loaddrhex string
  758. err = clients[0].Call(&loaddrhex, "pss_baseAddr")
  759. if err != nil {
  760. t.Fatalf("rpc get node 1 baseaddr fail: %v", err)
  761. }
  762. loaddrhex = loaddrhex[:2+(addrsize*2)]
  763. var roaddrhex string
  764. err = clients[1].Call(&roaddrhex, "pss_baseAddr")
  765. if err != nil {
  766. t.Fatalf("rpc get node 2 baseaddr fail: %v", err)
  767. }
  768. roaddrhex = roaddrhex[:2+(addrsize*2)]
  769. // retrieve public key from pss instance
  770. // set this public key reciprocally
  771. var lpubkey string
  772. err = clients[0].Call(&lpubkey, "pss_getPublicKey")
  773. if err != nil {
  774. t.Fatalf("rpc get node 1 pubkey fail: %v", err)
  775. }
  776. var rpubkey string
  777. err = clients[1].Call(&rpubkey, "pss_getPublicKey")
  778. if err != nil {
  779. t.Fatalf("rpc get node 2 pubkey fail: %v", err)
  780. }
  781. time.Sleep(time.Millisecond * 500) // replace with hive healthy code
  782. lmsgC := make(chan APIMsg)
  783. lctx, lcancel := context.WithTimeout(context.Background(), time.Second*10)
  784. defer lcancel()
  785. lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic)
  786. log.Trace("lsub", "id", lsub)
  787. defer lsub.Unsubscribe()
  788. rmsgC := make(chan APIMsg)
  789. rctx, rcancel := context.WithTimeout(context.Background(), time.Second*10)
  790. defer rcancel()
  791. rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic)
  792. log.Trace("rsub", "id", rsub)
  793. defer rsub.Unsubscribe()
  794. // store reciprocal public keys
  795. err = clients[0].Call(nil, "pss_setPeerPublicKey", rpubkey, topic, roaddrhex)
  796. if err != nil {
  797. t.Fatal(err)
  798. }
  799. err = clients[1].Call(nil, "pss_setPeerPublicKey", lpubkey, topic, loaddrhex)
  800. if err != nil {
  801. t.Fatal(err)
  802. }
  803. // send and verify delivery
  804. rmsg := []byte("xyzzy")
  805. err = clients[0].Call(nil, "pss_sendAsym", rpubkey, topic, hexutil.Encode(rmsg))
  806. if err != nil {
  807. t.Fatal(err)
  808. }
  809. select {
  810. case recvmsg := <-rmsgC:
  811. if !bytes.Equal(recvmsg.Msg, rmsg) {
  812. t.Fatalf("node 2 received payload mismatch: expected %v, got %v", rmsg, recvmsg.Msg)
  813. }
  814. case cerr := <-rctx.Done():
  815. t.Fatalf("test message timed out: %v", cerr)
  816. }
  817. lmsg := []byte("plugh")
  818. err = clients[1].Call(nil, "pss_sendAsym", lpubkey, topic, hexutil.Encode(lmsg))
  819. if err != nil {
  820. t.Fatal(err)
  821. }
  822. select {
  823. case recvmsg := <-lmsgC:
  824. if !bytes.Equal(recvmsg.Msg, lmsg) {
  825. t.Fatalf("node 1 received payload mismatch: expected %v, got %v", lmsg, recvmsg.Msg)
  826. }
  827. case cerr := <-lctx.Done():
  828. t.Fatalf("test message timed out: %v", cerr)
  829. }
  830. }
  831. type Job struct {
  832. Msg []byte
  833. SendNode discover.NodeID
  834. RecvNode discover.NodeID
  835. }
  836. func worker(id int, jobs <-chan Job, rpcs map[discover.NodeID]*rpc.Client, pubkeys map[discover.NodeID]string, topic string) {
  837. for j := range jobs {
  838. rpcs[j.SendNode].Call(nil, "pss_sendAsym", pubkeys[j.RecvNode], topic, hexutil.Encode(j.Msg))
  839. }
  840. }
  841. func TestNetwork(t *testing.T) {
  842. t.Run("16/1000/4/sim", testNetwork)
  843. }
  844. // params in run name:
  845. // nodes/msgs/addrbytes/adaptertype
  846. // if adaptertype is exec uses execadapter, simadapter otherwise
  847. func TestNetwork2000(t *testing.T) {
  848. //enableMetrics()
  849. if !*longrunning {
  850. t.Skip("run with --longrunning flag to run extensive network tests")
  851. }
  852. t.Run("3/2000/4/sim", testNetwork)
  853. t.Run("4/2000/4/sim", testNetwork)
  854. t.Run("8/2000/4/sim", testNetwork)
  855. t.Run("16/2000/4/sim", testNetwork)
  856. }
  857. func TestNetwork5000(t *testing.T) {
  858. //enableMetrics()
  859. if !*longrunning {
  860. t.Skip("run with --longrunning flag to run extensive network tests")
  861. }
  862. t.Run("3/5000/4/sim", testNetwork)
  863. t.Run("4/5000/4/sim", testNetwork)
  864. t.Run("8/5000/4/sim", testNetwork)
  865. t.Run("16/5000/4/sim", testNetwork)
  866. }
  867. func TestNetwork10000(t *testing.T) {
  868. //enableMetrics()
  869. if !*longrunning {
  870. t.Skip("run with --longrunning flag to run extensive network tests")
  871. }
  872. t.Run("3/10000/4/sim", testNetwork)
  873. t.Run("4/10000/4/sim", testNetwork)
  874. t.Run("8/10000/4/sim", testNetwork)
  875. }
  876. func testNetwork(t *testing.T) {
  877. type msgnotifyC struct {
  878. id discover.NodeID
  879. msgIdx int
  880. }
  881. paramstring := strings.Split(t.Name(), "/")
  882. nodecount, _ := strconv.ParseInt(paramstring[1], 10, 0)
  883. msgcount, _ := strconv.ParseInt(paramstring[2], 10, 0)
  884. addrsize, _ := strconv.ParseInt(paramstring[3], 10, 0)
  885. adapter := paramstring[4]
  886. log.Info("network test", "nodecount", nodecount, "msgcount", msgcount, "addrhintsize", addrsize)
  887. nodes := make([]discover.NodeID, nodecount)
  888. bzzaddrs := make(map[discover.NodeID]string, nodecount)
  889. rpcs := make(map[discover.NodeID]*rpc.Client, nodecount)
  890. pubkeys := make(map[discover.NodeID]string, nodecount)
  891. sentmsgs := make([][]byte, msgcount)
  892. recvmsgs := make([]bool, msgcount)
  893. nodemsgcount := make(map[discover.NodeID]int, nodecount)
  894. trigger := make(chan discover.NodeID)
  895. var a adapters.NodeAdapter
  896. if adapter == "exec" {
  897. dirname, err := ioutil.TempDir(".", "")
  898. if err != nil {
  899. t.Fatal(err)
  900. }
  901. a = adapters.NewExecAdapter(dirname)
  902. } else if adapter == "tcp" {
  903. a = adapters.NewTCPAdapter(newServices(false))
  904. } else if adapter == "sim" {
  905. a = adapters.NewSimAdapter(newServices(false))
  906. }
  907. net := simulations.NewNetwork(a, &simulations.NetworkConfig{
  908. ID: "0",
  909. })
  910. defer net.Shutdown()
  911. f, err := os.Open(fmt.Sprintf("testdata/snapshot_%d.json", nodecount))
  912. if err != nil {
  913. t.Fatal(err)
  914. }
  915. jsonbyte, err := ioutil.ReadAll(f)
  916. if err != nil {
  917. t.Fatal(err)
  918. }
  919. var snap simulations.Snapshot
  920. err = json.Unmarshal(jsonbyte, &snap)
  921. if err != nil {
  922. t.Fatal(err)
  923. }
  924. err = net.Load(&snap)
  925. if err != nil {
  926. //TODO: Fix p2p simulation framework to not crash when loading 32-nodes
  927. //t.Fatal(err)
  928. }
  929. time.Sleep(1 * time.Second)
  930. triggerChecks := func(trigger chan discover.NodeID, id discover.NodeID, rpcclient *rpc.Client, topic string) error {
  931. msgC := make(chan APIMsg)
  932. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  933. defer cancel()
  934. sub, err := rpcclient.Subscribe(ctx, "pss", msgC, "receive", topic)
  935. if err != nil {
  936. t.Fatal(err)
  937. }
  938. go func() {
  939. defer sub.Unsubscribe()
  940. for {
  941. select {
  942. case recvmsg := <-msgC:
  943. idx, _ := binary.Uvarint(recvmsg.Msg)
  944. if !recvmsgs[idx] {
  945. log.Debug("msg recv", "idx", idx, "id", id)
  946. recvmsgs[idx] = true
  947. trigger <- id
  948. }
  949. case <-sub.Err():
  950. return
  951. }
  952. }
  953. }()
  954. return nil
  955. }
  956. var topic string
  957. for i, nod := range net.GetNodes() {
  958. nodes[i] = nod.ID()
  959. rpcs[nodes[i]], err = nod.Client()
  960. if err != nil {
  961. t.Fatal(err)
  962. }
  963. if topic == "" {
  964. err = rpcs[nodes[i]].Call(&topic, "pss_stringToTopic", "foo:42")
  965. if err != nil {
  966. t.Fatal(err)
  967. }
  968. }
  969. var pubkey string
  970. err = rpcs[nodes[i]].Call(&pubkey, "pss_getPublicKey")
  971. if err != nil {
  972. t.Fatal(err)
  973. }
  974. pubkeys[nod.ID()] = pubkey
  975. var addrhex string
  976. err = rpcs[nodes[i]].Call(&addrhex, "pss_baseAddr")
  977. if err != nil {
  978. t.Fatal(err)
  979. }
  980. bzzaddrs[nodes[i]] = addrhex
  981. err = triggerChecks(trigger, nodes[i], rpcs[nodes[i]], topic)
  982. if err != nil {
  983. t.Fatal(err)
  984. }
  985. }
  986. time.Sleep(1 * time.Second)
  987. // setup workers
  988. jobs := make(chan Job, 10)
  989. for w := 1; w <= 10; w++ {
  990. go worker(w, jobs, rpcs, pubkeys, topic)
  991. }
  992. time.Sleep(1 * time.Second)
  993. for i := 0; i < int(msgcount); i++ {
  994. sendnodeidx := rand.Intn(int(nodecount))
  995. recvnodeidx := rand.Intn(int(nodecount - 1))
  996. if recvnodeidx >= sendnodeidx {
  997. recvnodeidx++
  998. }
  999. nodemsgcount[nodes[recvnodeidx]]++
  1000. sentmsgs[i] = make([]byte, 8)
  1001. c := binary.PutUvarint(sentmsgs[i], uint64(i))
  1002. if c == 0 {
  1003. t.Fatal("0 byte message")
  1004. }
  1005. if err != nil {
  1006. t.Fatal(err)
  1007. }
  1008. err = rpcs[nodes[sendnodeidx]].Call(nil, "pss_setPeerPublicKey", pubkeys[nodes[recvnodeidx]], topic, bzzaddrs[nodes[recvnodeidx]])
  1009. if err != nil {
  1010. t.Fatal(err)
  1011. }
  1012. jobs <- Job{
  1013. Msg: sentmsgs[i],
  1014. SendNode: nodes[sendnodeidx],
  1015. RecvNode: nodes[recvnodeidx],
  1016. }
  1017. }
  1018. finalmsgcount := 0
  1019. ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
  1020. defer cancel()
  1021. outer:
  1022. for i := 0; i < int(msgcount); i++ {
  1023. select {
  1024. case id := <-trigger:
  1025. nodemsgcount[id]--
  1026. finalmsgcount++
  1027. case <-ctx.Done():
  1028. log.Warn("timeout")
  1029. break outer
  1030. }
  1031. }
  1032. for i, msg := range recvmsgs {
  1033. if !msg {
  1034. log.Debug("missing message", "idx", i)
  1035. }
  1036. }
  1037. t.Logf("%d of %d messages received", finalmsgcount, msgcount)
  1038. if finalmsgcount != int(msgcount) {
  1039. t.Fatalf("%d messages were not received", int(msgcount)-finalmsgcount)
  1040. }
  1041. }
  1042. // check that in a network of a -> b -> c -> a
  1043. // a doesn't receive a sent message twice
  1044. func TestDeduplication(t *testing.T) {
  1045. var err error
  1046. clients, err := setupNetwork(3, false)
  1047. if err != nil {
  1048. t.Fatal(err)
  1049. }
  1050. var addrsize = 32
  1051. var loaddrhex string
  1052. err = clients[0].Call(&loaddrhex, "pss_baseAddr")
  1053. if err != nil {
  1054. t.Fatalf("rpc get node 1 baseaddr fail: %v", err)
  1055. }
  1056. loaddrhex = loaddrhex[:2+(addrsize*2)]
  1057. var roaddrhex string
  1058. err = clients[1].Call(&roaddrhex, "pss_baseAddr")
  1059. if err != nil {
  1060. t.Fatalf("rpc get node 2 baseaddr fail: %v", err)
  1061. }
  1062. roaddrhex = roaddrhex[:2+(addrsize*2)]
  1063. var xoaddrhex string
  1064. err = clients[2].Call(&xoaddrhex, "pss_baseAddr")
  1065. if err != nil {
  1066. t.Fatalf("rpc get node 3 baseaddr fail: %v", err)
  1067. }
  1068. xoaddrhex = xoaddrhex[:2+(addrsize*2)]
  1069. log.Info("peer", "l", loaddrhex, "r", roaddrhex, "x", xoaddrhex)
  1070. var topic string
  1071. err = clients[0].Call(&topic, "pss_stringToTopic", "foo:42")
  1072. if err != nil {
  1073. t.Fatal(err)
  1074. }
  1075. time.Sleep(time.Millisecond * 250)
  1076. // retrieve public key from pss instance
  1077. // set this public key reciprocally
  1078. var rpubkey string
  1079. err = clients[1].Call(&rpubkey, "pss_getPublicKey")
  1080. if err != nil {
  1081. t.Fatalf("rpc get receivenode pubkey fail: %v", err)
  1082. }
  1083. time.Sleep(time.Millisecond * 500) // replace with hive healthy code
  1084. rmsgC := make(chan APIMsg)
  1085. rctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
  1086. defer cancel()
  1087. rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic)
  1088. log.Trace("rsub", "id", rsub)
  1089. defer rsub.Unsubscribe()
  1090. // store public key for recipient
  1091. // zero-length address means forward to all
  1092. // we have just two peers, they will be in proxbin, and will both receive
  1093. err = clients[0].Call(nil, "pss_setPeerPublicKey", rpubkey, topic, "0x")
  1094. if err != nil {
  1095. t.Fatal(err)
  1096. }
  1097. // send and verify delivery
  1098. rmsg := []byte("xyzzy")
  1099. err = clients[0].Call(nil, "pss_sendAsym", rpubkey, topic, hexutil.Encode(rmsg))
  1100. if err != nil {
  1101. t.Fatal(err)
  1102. }
  1103. var receivedok bool
  1104. OUTER:
  1105. for {
  1106. select {
  1107. case <-rmsgC:
  1108. if receivedok {
  1109. t.Fatalf("duplicate message received")
  1110. }
  1111. receivedok = true
  1112. case <-rctx.Done():
  1113. break OUTER
  1114. }
  1115. }
  1116. if !receivedok {
  1117. t.Fatalf("message did not arrive")
  1118. }
  1119. }
  1120. // symmetric send performance with varying message sizes
  1121. func BenchmarkSymkeySend(b *testing.B) {
  1122. b.Run(fmt.Sprintf("%d", 256), benchmarkSymKeySend)
  1123. b.Run(fmt.Sprintf("%d", 1024), benchmarkSymKeySend)
  1124. b.Run(fmt.Sprintf("%d", 1024*1024), benchmarkSymKeySend)
  1125. b.Run(fmt.Sprintf("%d", 1024*1024*10), benchmarkSymKeySend)
  1126. b.Run(fmt.Sprintf("%d", 1024*1024*100), benchmarkSymKeySend)
  1127. }
  1128. func benchmarkSymKeySend(b *testing.B) {
  1129. msgsizestring := strings.Split(b.Name(), "/")
  1130. if len(msgsizestring) != 2 {
  1131. b.Fatalf("benchmark called without msgsize param")
  1132. }
  1133. msgsize, err := strconv.ParseInt(msgsizestring[1], 10, 0)
  1134. if err != nil {
  1135. b.Fatalf("benchmark called with invalid msgsize param '%s': %v", msgsizestring[1], err)
  1136. }
  1137. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  1138. defer cancel()
  1139. keys, err := wapi.NewKeyPair(ctx)
  1140. privkey, err := w.GetPrivateKey(keys)
  1141. ps := newTestPss(privkey, nil, nil)
  1142. msg := make([]byte, msgsize)
  1143. rand.Read(msg)
  1144. topic := BytesToTopic([]byte("foo"))
  1145. to := make(PssAddress, 32)
  1146. copy(to[:], network.RandomAddr().Over())
  1147. symkeyid, err := ps.GenerateSymmetricKey(topic, &to, true)
  1148. if err != nil {
  1149. b.Fatalf("could not generate symkey: %v", err)
  1150. }
  1151. symkey, err := ps.w.GetSymKey(symkeyid)
  1152. if err != nil {
  1153. b.Fatalf("could not retrieve symkey: %v", err)
  1154. }
  1155. ps.SetSymmetricKey(symkey, topic, &to, false)
  1156. b.ResetTimer()
  1157. for i := 0; i < b.N; i++ {
  1158. ps.SendSym(symkeyid, topic, msg)
  1159. }
  1160. }
  1161. // asymmetric send performance with varying message sizes
  1162. func BenchmarkAsymkeySend(b *testing.B) {
  1163. b.Run(fmt.Sprintf("%d", 256), benchmarkAsymKeySend)
  1164. b.Run(fmt.Sprintf("%d", 1024), benchmarkAsymKeySend)
  1165. b.Run(fmt.Sprintf("%d", 1024*1024), benchmarkAsymKeySend)
  1166. b.Run(fmt.Sprintf("%d", 1024*1024*10), benchmarkAsymKeySend)
  1167. b.Run(fmt.Sprintf("%d", 1024*1024*100), benchmarkAsymKeySend)
  1168. }
  1169. func benchmarkAsymKeySend(b *testing.B) {
  1170. msgsizestring := strings.Split(b.Name(), "/")
  1171. if len(msgsizestring) != 2 {
  1172. b.Fatalf("benchmark called without msgsize param")
  1173. }
  1174. msgsize, err := strconv.ParseInt(msgsizestring[1], 10, 0)
  1175. if err != nil {
  1176. b.Fatalf("benchmark called with invalid msgsize param '%s': %v", msgsizestring[1], err)
  1177. }
  1178. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  1179. defer cancel()
  1180. keys, err := wapi.NewKeyPair(ctx)
  1181. privkey, err := w.GetPrivateKey(keys)
  1182. ps := newTestPss(privkey, nil, nil)
  1183. msg := make([]byte, msgsize)
  1184. rand.Read(msg)
  1185. topic := BytesToTopic([]byte("foo"))
  1186. to := make(PssAddress, 32)
  1187. copy(to[:], network.RandomAddr().Over())
  1188. ps.SetPeerPublicKey(&privkey.PublicKey, topic, &to)
  1189. b.ResetTimer()
  1190. for i := 0; i < b.N; i++ {
  1191. ps.SendAsym(common.ToHex(crypto.FromECDSAPub(&privkey.PublicKey)), topic, msg)
  1192. }
  1193. }
  1194. func BenchmarkSymkeyBruteforceChangeaddr(b *testing.B) {
  1195. for i := 100; i < 100000; i = i * 10 {
  1196. for j := 32; j < 10000; j = j * 8 {
  1197. b.Run(fmt.Sprintf("%d/%d", i, j), benchmarkSymkeyBruteforceChangeaddr)
  1198. }
  1199. //b.Run(fmt.Sprintf("%d", i), benchmarkSymkeyBruteforceChangeaddr)
  1200. }
  1201. }
  1202. // decrypt performance using symkey cache, worst case
  1203. // (decrypt key always last in cache)
  1204. func benchmarkSymkeyBruteforceChangeaddr(b *testing.B) {
  1205. keycountstring := strings.Split(b.Name(), "/")
  1206. cachesize := int64(0)
  1207. var ps *Pss
  1208. if len(keycountstring) < 2 {
  1209. b.Fatalf("benchmark called without count param")
  1210. }
  1211. keycount, err := strconv.ParseInt(keycountstring[1], 10, 0)
  1212. if err != nil {
  1213. b.Fatalf("benchmark called with invalid count param '%s': %v", keycountstring[1], err)
  1214. }
  1215. if len(keycountstring) == 3 {
  1216. cachesize, err = strconv.ParseInt(keycountstring[2], 10, 0)
  1217. if err != nil {
  1218. b.Fatalf("benchmark called with invalid cachesize '%s': %v", keycountstring[2], err)
  1219. }
  1220. }
  1221. pssmsgs := make([]*PssMsg, 0, keycount)
  1222. var keyid string
  1223. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  1224. defer cancel()
  1225. keys, err := wapi.NewKeyPair(ctx)
  1226. privkey, err := w.GetPrivateKey(keys)
  1227. if cachesize > 0 {
  1228. ps = newTestPss(privkey, nil, &PssParams{SymKeyCacheCapacity: int(cachesize)})
  1229. } else {
  1230. ps = newTestPss(privkey, nil, nil)
  1231. }
  1232. topic := BytesToTopic([]byte("foo"))
  1233. for i := 0; i < int(keycount); i++ {
  1234. to := make(PssAddress, 32)
  1235. copy(to[:], network.RandomAddr().Over())
  1236. keyid, err = ps.GenerateSymmetricKey(topic, &to, true)
  1237. if err != nil {
  1238. b.Fatalf("cant generate symkey #%d: %v", i, err)
  1239. }
  1240. symkey, err := ps.w.GetSymKey(keyid)
  1241. if err != nil {
  1242. b.Fatalf("could not retrieve symkey %s: %v", keyid, err)
  1243. }
  1244. wparams := &whisper.MessageParams{
  1245. TTL: defaultWhisperTTL,
  1246. KeySym: symkey,
  1247. Topic: whisper.TopicType(topic),
  1248. WorkTime: defaultWhisperWorkTime,
  1249. PoW: defaultWhisperPoW,
  1250. Payload: []byte("xyzzy"),
  1251. Padding: []byte("1234567890abcdef"),
  1252. }
  1253. woutmsg, err := whisper.NewSentMessage(wparams)
  1254. if err != nil {
  1255. b.Fatalf("could not create whisper message: %v", err)
  1256. }
  1257. env, err := woutmsg.Wrap(wparams)
  1258. if err != nil {
  1259. b.Fatalf("could not generate whisper envelope: %v", err)
  1260. }
  1261. ps.Register(&topic, func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error {
  1262. return nil
  1263. })
  1264. pssmsgs = append(pssmsgs, &PssMsg{
  1265. To: to,
  1266. Payload: env,
  1267. })
  1268. }
  1269. b.ResetTimer()
  1270. for i := 0; i < b.N; i++ {
  1271. if err := ps.process(pssmsgs[len(pssmsgs)-(i%len(pssmsgs))-1]); err != nil {
  1272. b.Fatalf("pss processing failed: %v", err)
  1273. }
  1274. }
  1275. }
  1276. func BenchmarkSymkeyBruteforceSameaddr(b *testing.B) {
  1277. for i := 100; i < 100000; i = i * 10 {
  1278. for j := 32; j < 10000; j = j * 8 {
  1279. b.Run(fmt.Sprintf("%d/%d", i, j), benchmarkSymkeyBruteforceSameaddr)
  1280. }
  1281. }
  1282. }
  1283. // decrypt performance using symkey cache, best case
  1284. // (decrypt key always first in cache)
  1285. func benchmarkSymkeyBruteforceSameaddr(b *testing.B) {
  1286. var keyid string
  1287. var ps *Pss
  1288. cachesize := int64(0)
  1289. keycountstring := strings.Split(b.Name(), "/")
  1290. if len(keycountstring) < 2 {
  1291. b.Fatalf("benchmark called without count param")
  1292. }
  1293. keycount, err := strconv.ParseInt(keycountstring[1], 10, 0)
  1294. if err != nil {
  1295. b.Fatalf("benchmark called with invalid count param '%s': %v", keycountstring[1], err)
  1296. }
  1297. if len(keycountstring) == 3 {
  1298. cachesize, err = strconv.ParseInt(keycountstring[2], 10, 0)
  1299. if err != nil {
  1300. b.Fatalf("benchmark called with invalid cachesize '%s': %v", keycountstring[2], err)
  1301. }
  1302. }
  1303. addr := make([]PssAddress, keycount)
  1304. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  1305. defer cancel()
  1306. keys, err := wapi.NewKeyPair(ctx)
  1307. privkey, err := w.GetPrivateKey(keys)
  1308. if cachesize > 0 {
  1309. ps = newTestPss(privkey, nil, &PssParams{SymKeyCacheCapacity: int(cachesize)})
  1310. } else {
  1311. ps = newTestPss(privkey, nil, nil)
  1312. }
  1313. topic := BytesToTopic([]byte("foo"))
  1314. for i := 0; i < int(keycount); i++ {
  1315. copy(addr[i], network.RandomAddr().Over())
  1316. keyid, err = ps.GenerateSymmetricKey(topic, &addr[i], true)
  1317. if err != nil {
  1318. b.Fatalf("cant generate symkey #%d: %v", i, err)
  1319. }
  1320. }
  1321. symkey, err := ps.w.GetSymKey(keyid)
  1322. if err != nil {
  1323. b.Fatalf("could not retrieve symkey %s: %v", keyid, err)
  1324. }
  1325. wparams := &whisper.MessageParams{
  1326. TTL: defaultWhisperTTL,
  1327. KeySym: symkey,
  1328. Topic: whisper.TopicType(topic),
  1329. WorkTime: defaultWhisperWorkTime,
  1330. PoW: defaultWhisperPoW,
  1331. Payload: []byte("xyzzy"),
  1332. Padding: []byte("1234567890abcdef"),
  1333. }
  1334. woutmsg, err := whisper.NewSentMessage(wparams)
  1335. if err != nil {
  1336. b.Fatalf("could not create whisper message: %v", err)
  1337. }
  1338. env, err := woutmsg.Wrap(wparams)
  1339. if err != nil {
  1340. b.Fatalf("could not generate whisper envelope: %v", err)
  1341. }
  1342. ps.Register(&topic, func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error {
  1343. return nil
  1344. })
  1345. pssmsg := &PssMsg{
  1346. To: addr[len(addr)-1][:],
  1347. Payload: env,
  1348. }
  1349. for i := 0; i < b.N; i++ {
  1350. if err := ps.process(pssmsg); err != nil {
  1351. b.Fatalf("pss processing failed: %v", err)
  1352. }
  1353. }
  1354. }
  1355. // setup simulated network with bzz/discovery and pss services.
  1356. // connects nodes in a circle
  1357. // if allowRaw is set, omission of builtin pss encryption is enabled (see PssParams)
  1358. func setupNetwork(numnodes int, allowRaw bool) (clients []*rpc.Client, err error) {
  1359. nodes := make([]*simulations.Node, numnodes)
  1360. clients = make([]*rpc.Client, numnodes)
  1361. if numnodes < 2 {
  1362. return nil, fmt.Errorf("Minimum two nodes in network")
  1363. }
  1364. adapter := adapters.NewSimAdapter(newServices(allowRaw))
  1365. net := simulations.NewNetwork(adapter, &simulations.NetworkConfig{
  1366. ID: "0",
  1367. DefaultService: "bzz",
  1368. })
  1369. for i := 0; i < numnodes; i++ {
  1370. nodeconf := adapters.RandomNodeConfig()
  1371. nodeconf.Services = []string{"bzz", pssProtocolName}
  1372. nodes[i], err = net.NewNodeWithConfig(nodeconf)
  1373. if err != nil {
  1374. return nil, fmt.Errorf("error creating node 1: %v", err)
  1375. }
  1376. err = net.Start(nodes[i].ID())
  1377. if err != nil {
  1378. return nil, fmt.Errorf("error starting node 1: %v", err)
  1379. }
  1380. if i > 0 {
  1381. err = net.Connect(nodes[i].ID(), nodes[i-1].ID())
  1382. if err != nil {
  1383. return nil, fmt.Errorf("error connecting nodes: %v", err)
  1384. }
  1385. }
  1386. clients[i], err = nodes[i].Client()
  1387. if err != nil {
  1388. return nil, fmt.Errorf("create node 1 rpc client fail: %v", err)
  1389. }
  1390. }
  1391. if numnodes > 2 {
  1392. err = net.Connect(nodes[0].ID(), nodes[len(nodes)-1].ID())
  1393. if err != nil {
  1394. return nil, fmt.Errorf("error connecting first and last nodes")
  1395. }
  1396. }
  1397. return clients, nil
  1398. }
  1399. func newServices(allowRaw bool) adapters.Services {
  1400. stateStore := state.NewInmemoryStore()
  1401. kademlias := make(map[discover.NodeID]*network.Kademlia)
  1402. kademlia := func(id discover.NodeID) *network.Kademlia {
  1403. if k, ok := kademlias[id]; ok {
  1404. return k
  1405. }
  1406. addr := network.NewAddrFromNodeID(id)
  1407. params := network.NewKadParams()
  1408. params.MinProxBinSize = 2
  1409. params.MaxBinSize = 3
  1410. params.MinBinSize = 1
  1411. params.MaxRetries = 1000
  1412. params.RetryExponent = 2
  1413. params.RetryInterval = 1000000
  1414. kademlias[id] = network.NewKademlia(addr.Over(), params)
  1415. return kademlias[id]
  1416. }
  1417. return adapters.Services{
  1418. pssProtocolName: func(ctx *adapters.ServiceContext) (node.Service, error) {
  1419. // execadapter does not exec init()
  1420. initTest()
  1421. ctxlocal, cancel := context.WithTimeout(context.Background(), time.Second)
  1422. defer cancel()
  1423. keys, err := wapi.NewKeyPair(ctxlocal)
  1424. privkey, err := w.GetPrivateKey(keys)
  1425. pssp := NewPssParams().WithPrivateKey(privkey)
  1426. pssp.AllowRaw = allowRaw
  1427. pskad := kademlia(ctx.Config.ID)
  1428. ps, err := NewPss(pskad, pssp)
  1429. if err != nil {
  1430. return nil, err
  1431. }
  1432. ping := &Ping{
  1433. OutC: make(chan bool),
  1434. Pong: true,
  1435. }
  1436. p2pp := NewPingProtocol(ping)
  1437. pp, err := RegisterProtocol(ps, &PingTopic, PingProtocol, p2pp, &ProtocolParams{Asymmetric: true})
  1438. if err != nil {
  1439. return nil, err
  1440. }
  1441. if useHandshake {
  1442. SetHandshakeController(ps, NewHandshakeParams())
  1443. }
  1444. ps.Register(&PingTopic, pp.Handle)
  1445. ps.addAPI(rpc.API{
  1446. Namespace: "psstest",
  1447. Version: "0.3",
  1448. Service: NewAPITest(ps),
  1449. Public: false,
  1450. })
  1451. if err != nil {
  1452. log.Error("Couldnt register pss protocol", "err", err)
  1453. os.Exit(1)
  1454. }
  1455. pssprotocols[ctx.Config.ID.String()] = &protoCtrl{
  1456. C: ping.OutC,
  1457. protocol: pp,
  1458. run: p2pp.Run,
  1459. }
  1460. return ps, nil
  1461. },
  1462. "bzz": func(ctx *adapters.ServiceContext) (node.Service, error) {
  1463. addr := network.NewAddrFromNodeID(ctx.Config.ID)
  1464. hp := network.NewHiveParams()
  1465. hp.Discovery = false
  1466. config := &network.BzzConfig{
  1467. OverlayAddr: addr.Over(),
  1468. UnderlayAddr: addr.Under(),
  1469. HiveParams: hp,
  1470. }
  1471. return network.NewBzz(config, kademlia(ctx.Config.ID), stateStore, nil, nil), nil
  1472. },
  1473. }
  1474. }
  1475. func newTestPss(privkey *ecdsa.PrivateKey, overlay network.Overlay, ppextra *PssParams) *Pss {
  1476. var nid discover.NodeID
  1477. copy(nid[:], crypto.FromECDSAPub(&privkey.PublicKey))
  1478. addr := network.NewAddrFromNodeID(nid)
  1479. // set up routing if kademlia is not passed to us
  1480. if overlay == nil {
  1481. kp := network.NewKadParams()
  1482. kp.MinProxBinSize = 3
  1483. overlay = network.NewKademlia(addr.Over(), kp)
  1484. }
  1485. // create pss
  1486. pp := NewPssParams().WithPrivateKey(privkey)
  1487. if ppextra != nil {
  1488. pp.SymKeyCacheCapacity = ppextra.SymKeyCacheCapacity
  1489. }
  1490. ps, err := NewPss(overlay, pp)
  1491. if err != nil {
  1492. return nil
  1493. }
  1494. ps.Start(nil)
  1495. return ps
  1496. }
  1497. // API calls for test/development use
  1498. type APITest struct {
  1499. *Pss
  1500. }
  1501. func NewAPITest(ps *Pss) *APITest {
  1502. return &APITest{Pss: ps}
  1503. }
  1504. func (apitest *APITest) SetSymKeys(pubkeyid string, recvsymkey []byte, sendsymkey []byte, limit uint16, topic Topic, to PssAddress) ([2]string, error) {
  1505. recvsymkeyid, err := apitest.SetSymmetricKey(recvsymkey, topic, &to, true)
  1506. if err != nil {
  1507. return [2]string{}, err
  1508. }
  1509. sendsymkeyid, err := apitest.SetSymmetricKey(sendsymkey, topic, &to, false)
  1510. if err != nil {
  1511. return [2]string{}, err
  1512. }
  1513. return [2]string{recvsymkeyid, sendsymkeyid}, nil
  1514. }
  1515. func (apitest *APITest) Clean() (int, error) {
  1516. return apitest.Pss.cleanKeys(), nil
  1517. }
  1518. // enableMetrics is starting InfluxDB reporter so that we collect stats when running tests locally
  1519. func enableMetrics() {
  1520. metrics.Enabled = true
  1521. go influxdb.InfluxDBWithTags(metrics.DefaultRegistry, 1*time.Second, "http://localhost:8086", "metrics", "admin", "admin", "swarm.", map[string]string{
  1522. "host": "test",
  1523. })
  1524. }