peer.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785
  1. package eth
  2. import (
  3. "bytes"
  4. "container/list"
  5. "fmt"
  6. "github.com/ethereum/eth-go/ethchain"
  7. "github.com/ethereum/eth-go/ethlog"
  8. "github.com/ethereum/eth-go/ethutil"
  9. "github.com/ethereum/eth-go/ethwire"
  10. "net"
  11. "strconv"
  12. "strings"
  13. "sync/atomic"
  14. "time"
  15. )
  16. var peerlogger = ethlog.NewLogger("PEER")
  17. const (
  18. // The size of the output buffer for writing messages
  19. outputBufferSize = 50
  20. // Current protocol version
  21. ProtocolVersion = 20
  22. // Interval for ping/pong message
  23. pingPongTimer = 2 * time.Second
  24. )
  25. type DiscReason byte
  26. const (
  27. // Values are given explicitly instead of by iota because these values are
  28. // defined by the wire protocol spec; it is easier for humans to ensure
  29. // correctness when values are explicit.
  30. DiscReRequested = 0x00
  31. DiscReTcpSysErr = 0x01
  32. DiscBadProto = 0x02
  33. DiscBadPeer = 0x03
  34. DiscTooManyPeers = 0x04
  35. DiscConnDup = 0x05
  36. DiscGenesisErr = 0x06
  37. DiscProtoErr = 0x07
  38. )
  39. var discReasonToString = []string{
  40. "Disconnect requested",
  41. "Disconnect TCP sys error",
  42. "Disconnect bad protocol",
  43. "Disconnect useless peer",
  44. "Disconnect too many peers",
  45. "Disconnect already connected",
  46. "Disconnect wrong genesis block",
  47. "Disconnect incompatible network",
  48. }
  49. func (d DiscReason) String() string {
  50. if len(discReasonToString) < int(d) {
  51. return "Unknown"
  52. }
  53. return discReasonToString[d]
  54. }
  55. // Peer capabilities
  56. type Caps byte
  57. const (
  58. CapPeerDiscTy = 1 << iota
  59. CapTxTy
  60. CapChainTy
  61. CapDefault = CapChainTy | CapTxTy | CapPeerDiscTy
  62. )
  63. var capsToString = map[Caps]string{
  64. CapPeerDiscTy: "Peer discovery",
  65. CapTxTy: "Transaction relaying",
  66. CapChainTy: "Block chain relaying",
  67. }
  68. func (c Caps) IsCap(cap Caps) bool {
  69. return c&cap > 0
  70. }
  71. func (c Caps) String() string {
  72. var caps []string
  73. if c.IsCap(CapPeerDiscTy) {
  74. caps = append(caps, capsToString[CapPeerDiscTy])
  75. }
  76. if c.IsCap(CapChainTy) {
  77. caps = append(caps, capsToString[CapChainTy])
  78. }
  79. if c.IsCap(CapTxTy) {
  80. caps = append(caps, capsToString[CapTxTy])
  81. }
  82. return strings.Join(caps, " | ")
  83. }
  84. type Peer struct {
  85. // Ethereum interface
  86. ethereum *Ethereum
  87. // Net connection
  88. conn net.Conn
  89. // Output queue which is used to communicate and handle messages
  90. outputQueue chan *ethwire.Msg
  91. // Quit channel
  92. quit chan bool
  93. // Determines whether it's an inbound or outbound peer
  94. inbound bool
  95. // Flag for checking the peer's connectivity state
  96. connected int32
  97. disconnect int32
  98. // Last known message send
  99. lastSend time.Time
  100. // Indicated whether a verack has been send or not
  101. // This flag is used by writeMessage to check if messages are allowed
  102. // to be send or not. If no version is known all messages are ignored.
  103. versionKnown bool
  104. // Last received pong message
  105. lastPong int64
  106. // Indicates whether a MsgGetPeersTy was requested of the peer
  107. // this to prevent receiving false peers.
  108. requestedPeerList bool
  109. host []byte
  110. port uint16
  111. caps Caps
  112. // This peer's public key
  113. pubkey []byte
  114. // Indicated whether the node is catching up or not
  115. catchingUp bool
  116. diverted bool
  117. blocksRequested int
  118. version string
  119. // We use this to give some kind of pingtime to a node, not very accurate, could be improved.
  120. pingTime time.Duration
  121. pingStartTime time.Time
  122. lastRequestedBlock *ethchain.Block
  123. }
  124. func NewPeer(conn net.Conn, ethereum *Ethereum, inbound bool) *Peer {
  125. pubkey := ethutil.GetKeyRing().Get(0).PublicKey[1:]
  126. return &Peer{
  127. outputQueue: make(chan *ethwire.Msg, outputBufferSize),
  128. quit: make(chan bool),
  129. ethereum: ethereum,
  130. conn: conn,
  131. inbound: inbound,
  132. disconnect: 0,
  133. connected: 1,
  134. port: 30303,
  135. pubkey: pubkey,
  136. blocksRequested: 10,
  137. caps: ethereum.ServerCaps(),
  138. version: ethutil.Config.ClientString,
  139. }
  140. }
  141. func NewOutboundPeer(addr string, ethereum *Ethereum, caps Caps) *Peer {
  142. p := &Peer{
  143. outputQueue: make(chan *ethwire.Msg, outputBufferSize),
  144. quit: make(chan bool),
  145. ethereum: ethereum,
  146. inbound: false,
  147. connected: 0,
  148. disconnect: 0,
  149. caps: caps,
  150. version: ethutil.Config.ClientString,
  151. }
  152. // Set up the connection in another goroutine so we don't block the main thread
  153. go func() {
  154. conn, err := net.DialTimeout("tcp", addr, 10*time.Second)
  155. if err != nil {
  156. peerlogger.Debugln("Connection to peer failed", err)
  157. p.Stop()
  158. return
  159. }
  160. p.conn = conn
  161. // Atomically set the connection state
  162. atomic.StoreInt32(&p.connected, 1)
  163. atomic.StoreInt32(&p.disconnect, 0)
  164. p.Start()
  165. }()
  166. return p
  167. }
  168. // Getters
  169. func (p *Peer) PingTime() string {
  170. return p.pingTime.String()
  171. }
  172. func (p *Peer) Inbound() bool {
  173. return p.inbound
  174. }
  175. func (p *Peer) LastSend() time.Time {
  176. return p.lastSend
  177. }
  178. func (p *Peer) LastPong() int64 {
  179. return p.lastPong
  180. }
  181. func (p *Peer) Host() []byte {
  182. return p.host
  183. }
  184. func (p *Peer) Port() uint16 {
  185. return p.port
  186. }
  187. func (p *Peer) Version() string {
  188. return p.version
  189. }
  190. func (p *Peer) Connected() *int32 {
  191. return &p.connected
  192. }
  193. // Setters
  194. func (p *Peer) SetVersion(version string) {
  195. p.version = version
  196. }
  197. // Outputs any RLP encoded data to the peer
  198. func (p *Peer) QueueMessage(msg *ethwire.Msg) {
  199. if atomic.LoadInt32(&p.connected) != 1 {
  200. return
  201. }
  202. p.outputQueue <- msg
  203. }
  204. func (p *Peer) writeMessage(msg *ethwire.Msg) {
  205. // Ignore the write if we're not connected
  206. if atomic.LoadInt32(&p.connected) != 1 {
  207. return
  208. }
  209. if !p.versionKnown {
  210. switch msg.Type {
  211. case ethwire.MsgHandshakeTy: // Ok
  212. default: // Anything but ack is allowed
  213. return
  214. }
  215. }
  216. peerlogger.DebugDetailln("<=", msg.Type, msg.Data)
  217. err := ethwire.WriteMessage(p.conn, msg)
  218. if err != nil {
  219. peerlogger.Debugln(" Can't send message:", err)
  220. // Stop the client if there was an error writing to it
  221. p.Stop()
  222. return
  223. }
  224. }
  225. // Outbound message handler. Outbound messages are handled here
  226. func (p *Peer) HandleOutbound() {
  227. // The ping timer. Makes sure that every 2 minutes a ping is send to the peer
  228. pingTimer := time.NewTicker(pingPongTimer)
  229. serviceTimer := time.NewTicker(5 * time.Minute)
  230. out:
  231. for {
  232. select {
  233. // Main message queue. All outbound messages are processed through here
  234. case msg := <-p.outputQueue:
  235. p.writeMessage(msg)
  236. p.lastSend = time.Now()
  237. // Ping timer
  238. case <-pingTimer.C:
  239. timeSince := time.Since(time.Unix(p.lastPong, 0))
  240. if !p.pingStartTime.IsZero() && p.lastPong != 0 && timeSince > (pingPongTimer+30*time.Second) {
  241. peerlogger.Infof("Peer did not respond to latest pong fast enough, it took %s, disconnecting.\n", timeSince)
  242. p.Stop()
  243. return
  244. }
  245. p.writeMessage(ethwire.NewMessage(ethwire.MsgPingTy, ""))
  246. p.pingStartTime = time.Now()
  247. // Service timer takes care of peer broadcasting, transaction
  248. // posting or block posting
  249. case <-serviceTimer.C:
  250. if p.caps&CapPeerDiscTy > 0 {
  251. msg := p.peersMessage()
  252. p.ethereum.BroadcastMsg(msg)
  253. }
  254. case <-p.quit:
  255. // Break out of the for loop if a quit message is posted
  256. break out
  257. }
  258. }
  259. clean:
  260. // This loop is for draining the output queue and anybody waiting for us
  261. for {
  262. select {
  263. case <-p.outputQueue:
  264. // TODO
  265. default:
  266. break clean
  267. }
  268. }
  269. }
  270. // Inbound handler. Inbound messages are received here and passed to the appropriate methods
  271. func (p *Peer) HandleInbound() {
  272. for atomic.LoadInt32(&p.disconnect) == 0 {
  273. // HMM?
  274. time.Sleep(500 * time.Millisecond)
  275. // Wait for a message from the peer
  276. msgs, err := ethwire.ReadMessages(p.conn)
  277. if err != nil {
  278. peerlogger.Debugln(err)
  279. }
  280. for _, msg := range msgs {
  281. peerlogger.DebugDetailln("=>", msg.Type, msg.Data)
  282. switch msg.Type {
  283. case ethwire.MsgHandshakeTy:
  284. // Version message
  285. p.handleHandshake(msg)
  286. if p.caps.IsCap(CapPeerDiscTy) {
  287. p.QueueMessage(ethwire.NewMessage(ethwire.MsgGetPeersTy, ""))
  288. }
  289. case ethwire.MsgDiscTy:
  290. p.Stop()
  291. peerlogger.Infoln("Disconnect peer:", DiscReason(msg.Data.Get(0).Uint()))
  292. case ethwire.MsgPingTy:
  293. // Respond back with pong
  294. p.QueueMessage(ethwire.NewMessage(ethwire.MsgPongTy, ""))
  295. case ethwire.MsgPongTy:
  296. // If we received a pong back from a peer we set the
  297. // last pong so the peer handler knows this peer is still
  298. // active.
  299. p.lastPong = time.Now().Unix()
  300. p.pingTime = time.Now().Sub(p.pingStartTime)
  301. case ethwire.MsgBlockTy:
  302. // Get all blocks and process them
  303. var block, lastBlock *ethchain.Block
  304. var err error
  305. // Make sure we are actually receiving anything
  306. if msg.Data.Len()-1 > 1 && p.diverted {
  307. // We requested blocks and now we need to make sure we have a common ancestor somewhere in these blocks so we can find
  308. // common ground to start syncing from
  309. lastBlock = ethchain.NewBlockFromRlpValue(msg.Data.Get(msg.Data.Len() - 1))
  310. if p.lastRequestedBlock != nil && bytes.Compare(lastBlock.Hash(), p.lastRequestedBlock.Hash()) == 0 {
  311. p.catchingUp = false
  312. continue
  313. }
  314. p.lastRequestedBlock = lastBlock
  315. peerlogger.Infof("Last block: %x. Checking if we have it locally.\n", lastBlock.Hash())
  316. for i := msg.Data.Len() - 1; i >= 0; i-- {
  317. block = ethchain.NewBlockFromRlpValue(msg.Data.Get(i))
  318. // Do we have this block on our chain? If so we can continue
  319. if !p.ethereum.StateManager().BlockChain().HasBlock(block.Hash()) {
  320. // We don't have this block, but we do have a block with the same prevHash, diversion time!
  321. if p.ethereum.StateManager().BlockChain().HasBlockWithPrevHash(block.PrevHash) {
  322. p.diverted = false
  323. if !p.ethereum.StateManager().BlockChain().FindCanonicalChainFromMsg(msg, block.PrevHash) {
  324. p.SyncWithPeerToLastKnown()
  325. }
  326. break
  327. }
  328. }
  329. }
  330. if !p.ethereum.StateManager().BlockChain().HasBlock(lastBlock.Hash()) {
  331. // If we can't find a common ancenstor we need to request more blocks.
  332. // FIXME: At one point this won't scale anymore since we are not asking for an offset
  333. // we just keep increasing the amount of blocks.
  334. p.blocksRequested = p.blocksRequested * 2
  335. peerlogger.Infof("No common ancestor found, requesting %d more blocks.\n", p.blocksRequested)
  336. p.catchingUp = false
  337. p.FindCommonParentBlock()
  338. break
  339. }
  340. }
  341. for i := msg.Data.Len() - 1; i >= 0; i-- {
  342. block = ethchain.NewBlockFromRlpValue(msg.Data.Get(i))
  343. //p.ethereum.StateManager().PrepareDefault(block)
  344. //state := p.ethereum.StateManager().CurrentState()
  345. err = p.ethereum.StateManager().Process(block, false)
  346. if err != nil {
  347. if ethutil.Config.Debug {
  348. peerlogger.Infof("Block %x failed\n", block.Hash())
  349. peerlogger.Infof("%v\n", err)
  350. peerlogger.Debugln(block)
  351. }
  352. break
  353. } else {
  354. lastBlock = block
  355. }
  356. }
  357. if msg.Data.Len() == 0 {
  358. // Set catching up to false if
  359. // the peer has nothing left to give
  360. p.catchingUp = false
  361. }
  362. if err != nil {
  363. // If the parent is unknown try to catch up with this peer
  364. if ethchain.IsParentErr(err) {
  365. peerlogger.Infoln("Attempting to catch. Parent known")
  366. p.catchingUp = false
  367. p.CatchupWithPeer(p.ethereum.BlockChain().CurrentBlock.Hash())
  368. } else if ethchain.IsValidationErr(err) {
  369. fmt.Println("Err:", err)
  370. p.catchingUp = false
  371. }
  372. } else {
  373. // If we're catching up, try to catch up further.
  374. if p.catchingUp && msg.Data.Len() > 1 {
  375. if lastBlock != nil {
  376. blockInfo := lastBlock.BlockInfo()
  377. peerlogger.DebugDetailf("Synced chain to #%d %x %x\n", blockInfo.Number, lastBlock.Hash(), blockInfo.Hash)
  378. }
  379. p.catchingUp = false
  380. hash := p.ethereum.BlockChain().CurrentBlock.Hash()
  381. p.CatchupWithPeer(hash)
  382. }
  383. }
  384. case ethwire.MsgTxTy:
  385. // If the message was a transaction queue the transaction
  386. // in the TxPool where it will undergo validation and
  387. // processing when a new block is found
  388. for i := 0; i < msg.Data.Len(); i++ {
  389. tx := ethchain.NewTransactionFromValue(msg.Data.Get(i))
  390. p.ethereum.TxPool().QueueTransaction(tx)
  391. }
  392. case ethwire.MsgGetPeersTy:
  393. // Flag this peer as a 'requested of new peers' this to
  394. // prevent malicious peers being forced.
  395. p.requestedPeerList = true
  396. // Peer asked for list of connected peers
  397. p.pushPeers()
  398. case ethwire.MsgPeersTy:
  399. // Received a list of peers (probably because MsgGetPeersTy was send)
  400. // Only act on message if we actually requested for a peers list
  401. if p.requestedPeerList {
  402. data := msg.Data
  403. // Create new list of possible peers for the ethereum to process
  404. peers := make([]string, data.Len())
  405. // Parse each possible peer
  406. for i := 0; i < data.Len(); i++ {
  407. value := data.Get(i)
  408. peers[i] = unpackAddr(value.Get(0), value.Get(1).Uint())
  409. }
  410. // Connect to the list of peers
  411. p.ethereum.ProcessPeerList(peers)
  412. // Mark unrequested again
  413. p.requestedPeerList = false
  414. }
  415. case ethwire.MsgGetChainTy:
  416. var parent *ethchain.Block
  417. // Length minus one since the very last element in the array is a count
  418. l := msg.Data.Len() - 1
  419. // Ignore empty get chains
  420. if l == 0 {
  421. break
  422. }
  423. // Amount of parents in the canonical chain
  424. //amountOfBlocks := msg.Data.Get(l).AsUint()
  425. amountOfBlocks := uint64(100)
  426. // Check each SHA block hash from the message and determine whether
  427. // the SHA is in the database
  428. for i := 0; i < l; i++ {
  429. if data := msg.Data.Get(i).Bytes(); p.ethereum.StateManager().BlockChain().HasBlock(data) {
  430. parent = p.ethereum.BlockChain().GetBlock(data)
  431. break
  432. }
  433. }
  434. // If a parent is found send back a reply
  435. if parent != nil {
  436. peerlogger.DebugDetailf("Found canonical block, returning chain from: %x ", parent.Hash())
  437. chain := p.ethereum.BlockChain().GetChainFromHash(parent.Hash(), amountOfBlocks)
  438. if len(chain) > 0 {
  439. //peerlogger.Debugf("Returning %d blocks: %x ", len(chain), parent.Hash())
  440. p.QueueMessage(ethwire.NewMessage(ethwire.MsgBlockTy, chain))
  441. } else {
  442. p.QueueMessage(ethwire.NewMessage(ethwire.MsgBlockTy, []interface{}{}))
  443. }
  444. } else {
  445. //peerlogger.Debugf("Could not find a similar block")
  446. // If no blocks are found we send back a reply with msg not in chain
  447. // and the last hash from get chain
  448. if l > 0 {
  449. lastHash := msg.Data.Get(l - 1)
  450. //log.Printf("Sending not in chain with hash %x\n", lastHash.AsRaw())
  451. p.QueueMessage(ethwire.NewMessage(ethwire.MsgNotInChainTy, []interface{}{lastHash.Raw()}))
  452. }
  453. }
  454. case ethwire.MsgNotInChainTy:
  455. peerlogger.DebugDetailf("Not in chain: %x\n", msg.Data.Get(0).Bytes())
  456. if p.diverted == true {
  457. // If were already looking for a common parent and we get here again we need to go deeper
  458. p.blocksRequested = p.blocksRequested * 2
  459. }
  460. p.diverted = true
  461. p.catchingUp = false
  462. p.FindCommonParentBlock()
  463. case ethwire.MsgGetTxsTy:
  464. // Get the current transactions of the pool
  465. txs := p.ethereum.TxPool().CurrentTransactions()
  466. // Get the RlpData values from the txs
  467. txsInterface := make([]interface{}, len(txs))
  468. for i, tx := range txs {
  469. txsInterface[i] = tx.RlpData()
  470. }
  471. // Broadcast it back to the peer
  472. p.QueueMessage(ethwire.NewMessage(ethwire.MsgTxTy, txsInterface))
  473. // Unofficial but fun nonetheless
  474. case ethwire.MsgTalkTy:
  475. peerlogger.Infoln("%v says: %s\n", p.conn.RemoteAddr(), msg.Data.Str())
  476. }
  477. }
  478. }
  479. p.Stop()
  480. }
  481. func (p *Peer) Start() {
  482. peerHost, peerPort, _ := net.SplitHostPort(p.conn.LocalAddr().String())
  483. servHost, servPort, _ := net.SplitHostPort(p.conn.RemoteAddr().String())
  484. if p.inbound {
  485. p.host, p.port = packAddr(peerHost, peerPort)
  486. } else {
  487. p.host, p.port = packAddr(servHost, servPort)
  488. }
  489. err := p.pushHandshake()
  490. if err != nil {
  491. peerlogger.Debugln("Peer can't send outbound version ack", err)
  492. p.Stop()
  493. return
  494. }
  495. go p.HandleOutbound()
  496. // Run the inbound handler in a new goroutine
  497. go p.HandleInbound()
  498. // Wait a few seconds for startup and then ask for an initial ping
  499. time.Sleep(2 * time.Second)
  500. p.writeMessage(ethwire.NewMessage(ethwire.MsgPingTy, ""))
  501. p.pingStartTime = time.Now()
  502. }
  503. func (p *Peer) Stop() {
  504. if atomic.AddInt32(&p.disconnect, 1) != 1 {
  505. return
  506. }
  507. close(p.quit)
  508. if atomic.LoadInt32(&p.connected) != 0 {
  509. p.writeMessage(ethwire.NewMessage(ethwire.MsgDiscTy, ""))
  510. p.conn.Close()
  511. }
  512. // Pre-emptively remove the peer; don't wait for reaping. We already know it's dead if we are here
  513. p.ethereum.RemovePeer(p)
  514. }
  515. func (p *Peer) pushHandshake() error {
  516. keyRing := ethutil.GetKeyRing().Get(0)
  517. if keyRing != nil {
  518. pubkey := keyRing.PublicKey
  519. msg := ethwire.NewMessage(ethwire.MsgHandshakeTy, []interface{}{
  520. uint32(ProtocolVersion), uint32(0), []byte(p.version), byte(p.caps), p.port, pubkey[1:],
  521. })
  522. p.QueueMessage(msg)
  523. }
  524. return nil
  525. }
  526. func (p *Peer) peersMessage() *ethwire.Msg {
  527. outPeers := make([]interface{}, len(p.ethereum.InOutPeers()))
  528. // Serialise each peer
  529. for i, peer := range p.ethereum.InOutPeers() {
  530. // Don't return localhost as valid peer
  531. if !net.ParseIP(peer.conn.RemoteAddr().String()).IsLoopback() {
  532. outPeers[i] = peer.RlpData()
  533. }
  534. }
  535. // Return the message to the peer with the known list of connected clients
  536. return ethwire.NewMessage(ethwire.MsgPeersTy, outPeers)
  537. }
  538. // Pushes the list of outbound peers to the client when requested
  539. func (p *Peer) pushPeers() {
  540. p.QueueMessage(p.peersMessage())
  541. }
  542. func (p *Peer) handleHandshake(msg *ethwire.Msg) {
  543. c := msg.Data
  544. // Set pubkey
  545. p.pubkey = c.Get(5).Bytes()
  546. if p.pubkey == nil {
  547. peerlogger.Warnln("Pubkey required, not supplied in handshake.")
  548. p.Stop()
  549. return
  550. }
  551. usedPub := 0
  552. // This peer is already added to the peerlist so we expect to find a double pubkey at least once
  553. eachPeer(p.ethereum.Peers(), func(peer *Peer, e *list.Element) {
  554. if bytes.Compare(p.pubkey, peer.pubkey) == 0 {
  555. usedPub++
  556. }
  557. })
  558. if usedPub > 0 {
  559. peerlogger.Debugf("Pubkey %x found more then once. Already connected to client.", p.pubkey)
  560. p.Stop()
  561. return
  562. }
  563. if c.Get(0).Uint() != ProtocolVersion {
  564. peerlogger.Debugf("Invalid peer version. Require protocol: %d. Received: %d\n", ProtocolVersion, c.Get(0).Uint())
  565. p.Stop()
  566. return
  567. }
  568. // [PROTOCOL_VERSION, NETWORK_ID, CLIENT_ID, CAPS, PORT, PUBKEY]
  569. p.versionKnown = true
  570. // If this is an inbound connection send an ack back
  571. if p.inbound {
  572. p.port = uint16(c.Get(4).Uint())
  573. // Self connect detection
  574. keyPair := ethutil.GetKeyRing().Get(0)
  575. if bytes.Compare(keyPair.PublicKey, p.pubkey) == 0 {
  576. p.Stop()
  577. return
  578. }
  579. }
  580. // Set the peer's caps
  581. p.caps = Caps(c.Get(3).Byte())
  582. // Get a reference to the peers version
  583. versionString := c.Get(2).Str()
  584. if len(versionString) > 0 {
  585. p.SetVersion(c.Get(2).Str())
  586. }
  587. p.ethereum.PushPeer(p)
  588. p.ethereum.reactor.Post("peerList", p.ethereum.Peers())
  589. ethlogger.Infof("Added peer (%s) %d / %d\n", p.conn.RemoteAddr(), p.ethereum.Peers().Len(), p.ethereum.MaxPeers)
  590. // Catch up with the connected peer
  591. if !p.ethereum.IsUpToDate() {
  592. peerlogger.Debugln("Already syncing up with a peer; sleeping")
  593. time.Sleep(10 * time.Second)
  594. }
  595. p.SyncWithPeerToLastKnown()
  596. peerlogger.Debugln(p)
  597. }
  598. func (p *Peer) String() string {
  599. var strBoundType string
  600. if p.inbound {
  601. strBoundType = "inbound"
  602. } else {
  603. strBoundType = "outbound"
  604. }
  605. var strConnectType string
  606. if atomic.LoadInt32(&p.disconnect) == 0 {
  607. strConnectType = "connected"
  608. } else {
  609. strConnectType = "disconnected"
  610. }
  611. return fmt.Sprintf("[%s] (%s) %v %s [%s]", strConnectType, strBoundType, p.conn.RemoteAddr(), p.version, p.caps)
  612. }
  613. func (p *Peer) SyncWithPeerToLastKnown() {
  614. p.catchingUp = false
  615. p.CatchupWithPeer(p.ethereum.BlockChain().CurrentBlock.Hash())
  616. }
  617. func (p *Peer) FindCommonParentBlock() {
  618. if p.catchingUp {
  619. return
  620. }
  621. p.catchingUp = true
  622. if p.blocksRequested == 0 {
  623. p.blocksRequested = 20
  624. }
  625. blocks := p.ethereum.BlockChain().GetChain(p.ethereum.BlockChain().CurrentBlock.Hash(), p.blocksRequested)
  626. var hashes []interface{}
  627. for _, block := range blocks {
  628. hashes = append(hashes, block.Hash())
  629. }
  630. msgInfo := append(hashes, uint64(len(hashes)))
  631. peerlogger.DebugDetailf("Asking for block from %x (%d total) from %s\n", p.ethereum.BlockChain().CurrentBlock.Hash(), len(hashes), p.conn.RemoteAddr().String())
  632. msg := ethwire.NewMessage(ethwire.MsgGetChainTy, msgInfo)
  633. p.QueueMessage(msg)
  634. }
  635. func (p *Peer) CatchupWithPeer(blockHash []byte) {
  636. if !p.catchingUp {
  637. // Make sure nobody else is catching up when you want to do this
  638. p.catchingUp = true
  639. msg := ethwire.NewMessage(ethwire.MsgGetChainTy, []interface{}{blockHash, uint64(50)})
  640. p.QueueMessage(msg)
  641. peerlogger.DebugDetailf("Requesting blockchain %x... from peer %s\n", p.ethereum.BlockChain().CurrentBlock.Hash()[:4], p.conn.RemoteAddr())
  642. msg = ethwire.NewMessage(ethwire.MsgGetTxsTy, []interface{}{})
  643. p.QueueMessage(msg)
  644. }
  645. }
  646. func (p *Peer) RlpData() []interface{} {
  647. return []interface{}{p.host, p.port, p.pubkey}
  648. }
  649. func packAddr(address, port string) ([]byte, uint16) {
  650. addr := strings.Split(address, ".")
  651. a, _ := strconv.Atoi(addr[0])
  652. b, _ := strconv.Atoi(addr[1])
  653. c, _ := strconv.Atoi(addr[2])
  654. d, _ := strconv.Atoi(addr[3])
  655. host := []byte{byte(a), byte(b), byte(c), byte(d)}
  656. prt, _ := strconv.Atoi(port)
  657. return host, uint16(prt)
  658. }
  659. func unpackAddr(value *ethutil.Value, p uint64) string {
  660. byts := value.Bytes()
  661. a := strconv.Itoa(int(byts[0]))
  662. b := strconv.Itoa(int(byts[1]))
  663. c := strconv.Itoa(int(byts[2]))
  664. d := strconv.Itoa(int(byts[3]))
  665. host := strings.Join([]string{a, b, c, d}, ".")
  666. port := strconv.Itoa(int(p))
  667. return net.JoinHostPort(host, port)
  668. }