ethereum.go 15 KB


  1. package eth
  2. import (
  3. "container/list"
  4. "encoding/json"
  5. "fmt"
  6. "math/big"
  7. "math/rand"
  8. "net"
  9. "path"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "sync/atomic"
  14. "time"
  15. "github.com/ethereum/go-ethereum/chain"
  16. "github.com/ethereum/go-ethereum/crypto"
  17. "github.com/ethereum/go-ethereum/ethutil"
  18. "github.com/ethereum/go-ethereum/event"
  19. "github.com/ethereum/go-ethereum/logger"
  20. "github.com/ethereum/go-ethereum/rpc"
  21. "github.com/ethereum/go-ethereum/state"
  22. "github.com/ethereum/go-ethereum/wire"
  23. )
  24. const (
  25. seedTextFileUri string = "http://www.ethereum.org/servers.poc3.txt"
  26. seedNodeAddress = "poc-7.ethdev.com:30303"
  27. )
  28. var loggerger = logger.NewLogger("SERV")
  29. func eachPeer(peers *list.List, callback func(*Peer, *list.Element)) {
  30. // Loop thru the peers and close them (if we had them)
  31. for e := peers.Front(); e != nil; e = e.Next() {
  32. callback(e.Value.(*Peer), e)
  33. }
  34. }
  35. const (
  36. processReapingTimeout = 60 // TODO increase
  37. )
  38. type Ethereum struct {
  39. // Channel for shutting down the ethereum
  40. shutdownChan chan bool
  41. quit chan bool
  42. // DB interface
  43. db ethutil.Database
  44. // State manager for processing new blocks and managing the over all states
  45. stateManager *chain.StateManager
  46. // The transaction pool. Transaction can be pushed on this pool
  47. // for later including in the blocks
  48. txPool *chain.TxPool
  49. // The canonical chain
  50. blockChain *chain.ChainManager
  51. // The block pool
  52. blockPool *BlockPool
  53. // Eventer
  54. eventMux event.TypeMux
  55. // Peers
  56. peers *list.List
  57. // Nonce
  58. Nonce uint64
  59. Addr net.Addr
  60. Port string
  61. blacklist [][]byte
  62. peerMut sync.Mutex
  63. // Capabilities for outgoing peers
  64. serverCaps Caps
  65. nat NAT
  66. // Specifies the desired amount of maximum peers
  67. MaxPeers int
  68. Mining bool
  69. listening bool
  70. RpcServer *rpc.JsonRpcServer
  71. keyManager *crypto.KeyManager
  72. clientIdentity wire.ClientIdentity
  73. isUpToDate bool
  74. filterMu sync.RWMutex
  75. filterId int
  76. filters map[int]*chain.Filter
  77. }
  78. func New(db ethutil.Database, clientIdentity wire.ClientIdentity, keyManager *crypto.KeyManager, caps Caps, usePnp bool) (*Ethereum, error) {
  79. var err error
  80. var nat NAT
  81. if usePnp {
  82. nat, err = Discover()
  83. if err != nil {
  84. loggerger.Debugln("UPnP failed", err)
  85. }
  86. }
  87. bootstrapDb(db)
  88. ethutil.Config.Db = db
  89. nonce, _ := ethutil.RandomUint64()
  90. ethereum := &Ethereum{
  91. shutdownChan: make(chan bool),
  92. quit: make(chan bool),
  93. db: db,
  94. peers: list.New(),
  95. Nonce: nonce,
  96. serverCaps: caps,
  97. nat: nat,
  98. keyManager: keyManager,
  99. clientIdentity: clientIdentity,
  100. isUpToDate: true,
  101. filters: make(map[int]*chain.Filter),
  102. }
  103. ethereum.blockPool = NewBlockPool(ethereum)
  104. ethereum.txPool = chain.NewTxPool(ethereum)
  105. ethereum.blockChain = chain.NewChainManager(ethereum)
  106. ethereum.stateManager = chain.NewStateManager(ethereum)
  107. // Start the tx pool
  108. ethereum.txPool.Start()
  109. return ethereum, nil
  110. }
  111. func (s *Ethereum) KeyManager() *crypto.KeyManager {
  112. return s.keyManager
  113. }
  114. func (s *Ethereum) ClientIdentity() wire.ClientIdentity {
  115. return s.clientIdentity
  116. }
  117. func (s *Ethereum) ChainManager() *chain.ChainManager {
  118. return s.blockChain
  119. }
  120. func (s *Ethereum) StateManager() *chain.StateManager {
  121. return s.stateManager
  122. }
  123. func (s *Ethereum) TxPool() *chain.TxPool {
  124. return s.txPool
  125. }
  126. func (s *Ethereum) BlockPool() *BlockPool {
  127. return s.blockPool
  128. }
  129. func (s *Ethereum) EventMux() *event.TypeMux {
  130. return &s.eventMux
  131. }
  132. func (self *Ethereum) Db() ethutil.Database {
  133. return self.db
  134. }
  135. func (s *Ethereum) ServerCaps() Caps {
  136. return s.serverCaps
  137. }
  138. func (s *Ethereum) IsMining() bool {
  139. return s.Mining
  140. }
  141. func (s *Ethereum) PeerCount() int {
  142. return s.peers.Len()
  143. }
  144. func (s *Ethereum) IsUpToDate() bool {
  145. upToDate := true
  146. eachPeer(s.peers, func(peer *Peer, e *list.Element) {
  147. if atomic.LoadInt32(&peer.connected) == 1 {
  148. if peer.catchingUp == true && peer.versionKnown {
  149. upToDate = false
  150. }
  151. }
  152. })
  153. return upToDate
  154. }
  155. func (s *Ethereum) PushPeer(peer *Peer) {
  156. s.peers.PushBack(peer)
  157. }
  158. func (s *Ethereum) IsListening() bool {
  159. return s.listening
  160. }
  161. func (s *Ethereum) HighestTDPeer() (td *big.Int) {
  162. td = big.NewInt(0)
  163. eachPeer(s.peers, func(p *Peer, v *list.Element) {
  164. if p.td.Cmp(td) > 0 {
  165. td = p.td
  166. }
  167. })
  168. return
  169. }
  170. func (self *Ethereum) BlacklistPeer(peer *Peer) {
  171. self.blacklist = append(self.blacklist, peer.pubkey)
  172. }
  173. func (s *Ethereum) AddPeer(conn net.Conn) {
  174. peer := NewPeer(conn, s, true)
  175. if peer != nil {
  176. if s.peers.Len() < s.MaxPeers {
  177. peer.Start()
  178. } else {
  179. loggerger.Debugf("Max connected peers reached. Not adding incoming peer.")
  180. }
  181. }
  182. }
  183. func (s *Ethereum) ProcessPeerList(addrs []string) {
  184. for _, addr := range addrs {
  185. // TODO Probably requires some sanity checks
  186. s.ConnectToPeer(addr)
  187. }
  188. }
  189. func (s *Ethereum) ConnectToPeer(addr string) error {
  190. if s.peers.Len() < s.MaxPeers {
  191. var alreadyConnected bool
  192. ahost, _, _ := net.SplitHostPort(addr)
  193. var chost string
  194. ips, err := net.LookupIP(ahost)
  195. if err != nil {
  196. return err
  197. } else {
  198. // If more then one ip is available try stripping away the ipv6 ones
  199. if len(ips) > 1 {
  200. var ipsv4 []net.IP
  201. // For now remove the ipv6 addresses
  202. for _, ip := range ips {
  203. if strings.Contains(ip.String(), "::") {
  204. continue
  205. } else {
  206. ipsv4 = append(ipsv4, ip)
  207. }
  208. }
  209. if len(ipsv4) == 0 {
  210. return fmt.Errorf("[SERV] No IPV4 addresses available for hostname")
  211. }
  212. // Pick a random ipv4 address, simulating round-robin DNS.
  213. rand.Seed(time.Now().UTC().UnixNano())
  214. i := rand.Intn(len(ipsv4))
  215. chost = ipsv4[i].String()
  216. } else {
  217. if len(ips) == 0 {
  218. return fmt.Errorf("[SERV] No IPs resolved for the given hostname")
  219. return nil
  220. }
  221. chost = ips[0].String()
  222. }
  223. }
  224. eachPeer(s.peers, func(p *Peer, v *list.Element) {
  225. if p.conn == nil {
  226. return
  227. }
  228. phost, _, _ := net.SplitHostPort(p.conn.RemoteAddr().String())
  229. if phost == chost {
  230. alreadyConnected = true
  231. //loggerger.Debugf("Peer %s already added.\n", chost)
  232. return
  233. }
  234. })
  235. if alreadyConnected {
  236. return nil
  237. }
  238. NewOutboundPeer(addr, s, s.serverCaps)
  239. }
  240. return nil
  241. }
  242. func (s *Ethereum) OutboundPeers() []*Peer {
  243. // Create a new peer slice with at least the length of the total peers
  244. outboundPeers := make([]*Peer, s.peers.Len())
  245. length := 0
  246. eachPeer(s.peers, func(p *Peer, e *list.Element) {
  247. if !p.inbound && p.conn != nil {
  248. outboundPeers[length] = p
  249. length++
  250. }
  251. })
  252. return outboundPeers[:length]
  253. }
  254. func (s *Ethereum) InboundPeers() []*Peer {
  255. // Create a new peer slice with at least the length of the total peers
  256. inboundPeers := make([]*Peer, s.peers.Len())
  257. length := 0
  258. eachPeer(s.peers, func(p *Peer, e *list.Element) {
  259. if p.inbound {
  260. inboundPeers[length] = p
  261. length++
  262. }
  263. })
  264. return inboundPeers[:length]
  265. }
  266. func (s *Ethereum) InOutPeers() []*Peer {
  267. // Reap the dead peers first
  268. s.reapPeers()
  269. // Create a new peer slice with at least the length of the total peers
  270. inboundPeers := make([]*Peer, s.peers.Len())
  271. length := 0
  272. eachPeer(s.peers, func(p *Peer, e *list.Element) {
  273. // Only return peers with an actual ip
  274. if len(p.host) > 0 {
  275. inboundPeers[length] = p
  276. length++
  277. }
  278. })
  279. return inboundPeers[:length]
  280. }
  281. func (s *Ethereum) Broadcast(msgType wire.MsgType, data []interface{}) {
  282. msg := wire.NewMessage(msgType, data)
  283. s.BroadcastMsg(msg)
  284. }
  285. func (s *Ethereum) BroadcastMsg(msg *wire.Msg) {
  286. eachPeer(s.peers, func(p *Peer, e *list.Element) {
  287. p.QueueMessage(msg)
  288. })
  289. }
  290. func (s *Ethereum) Peers() *list.List {
  291. return s.peers
  292. }
  293. func (s *Ethereum) reapPeers() {
  294. eachPeer(s.peers, func(p *Peer, e *list.Element) {
  295. if atomic.LoadInt32(&p.disconnect) == 1 || (p.inbound && (time.Now().Unix()-p.lastPong) > int64(5*time.Minute)) {
  296. s.removePeerElement(e)
  297. }
  298. })
  299. }
  300. func (s *Ethereum) removePeerElement(e *list.Element) {
  301. s.peerMut.Lock()
  302. defer s.peerMut.Unlock()
  303. s.peers.Remove(e)
  304. s.eventMux.Post(PeerListEvent{s.peers})
  305. }
  306. func (s *Ethereum) RemovePeer(p *Peer) {
  307. eachPeer(s.peers, func(peer *Peer, e *list.Element) {
  308. if peer == p {
  309. s.removePeerElement(e)
  310. }
  311. })
  312. }
  313. func (s *Ethereum) reapDeadPeerHandler() {
  314. reapTimer := time.NewTicker(processReapingTimeout * time.Second)
  315. for {
  316. select {
  317. case <-reapTimer.C:
  318. s.reapPeers()
  319. }
  320. }
  321. }
  322. // Start the ethereum
  323. func (s *Ethereum) Start(seed bool) {
  324. s.blockPool.Start()
  325. s.stateManager.Start()
  326. // Bind to addr and port
  327. ln, err := net.Listen("tcp", ":"+s.Port)
  328. if err != nil {
  329. loggerger.Warnf("Port %s in use. Connection listening disabled. Acting as client", s.Port)
  330. s.listening = false
  331. } else {
  332. s.listening = true
  333. // Starting accepting connections
  334. loggerger.Infoln("Ready and accepting connections")
  335. // Start the peer handler
  336. go s.peerHandler(ln)
  337. }
  338. if s.nat != nil {
  339. go s.upnpUpdateThread()
  340. }
  341. // Start the reaping processes
  342. go s.reapDeadPeerHandler()
  343. go s.update()
  344. go s.filterLoop()
  345. if seed {
  346. s.Seed()
  347. }
  348. loggerger.Infoln("Server started")
  349. }
  350. func (s *Ethereum) Seed() {
  351. // Sorry Py person. I must blacklist. you perform badly
  352. s.blacklist = append(s.blacklist, ethutil.Hex2Bytes("64656330303561383532336435376331616537643864663236623336313863373537353163636634333530626263396330346237336262623931383064393031"))
  353. ips := PastPeers()
  354. if len(ips) > 0 {
  355. for _, ip := range ips {
  356. loggerger.Infoln("Connecting to previous peer ", ip)
  357. s.ConnectToPeer(ip)
  358. }
  359. } else {
  360. loggerger.Debugln("Retrieving seed nodes")
  361. // Eth-Go Bootstrapping
  362. ips, er := net.LookupIP("seed.bysh.me")
  363. if er == nil {
  364. peers := []string{}
  365. for _, ip := range ips {
  366. node := fmt.Sprintf("%s:%d", ip.String(), 30303)
  367. loggerger.Debugln("Found DNS Go Peer:", node)
  368. peers = append(peers, node)
  369. }
  370. s.ProcessPeerList(peers)
  371. }
  372. // Official DNS Bootstrapping
  373. _, nodes, err := net.LookupSRV("eth", "tcp", "ethereum.org")
  374. if err == nil {
  375. peers := []string{}
  376. // Iterate SRV nodes
  377. for _, n := range nodes {
  378. target := n.Target
  379. port := strconv.Itoa(int(n.Port))
  380. // Resolve target to ip (Go returns list, so may resolve to multiple ips?)
  381. addr, err := net.LookupHost(target)
  382. if err == nil {
  383. for _, a := range addr {
  384. // Build string out of SRV port and Resolved IP
  385. peer := net.JoinHostPort(a, port)
  386. loggerger.Debugln("Found DNS Bootstrap Peer:", peer)
  387. peers = append(peers, peer)
  388. }
  389. } else {
  390. loggerger.Debugln("Couldn't resolve :", target)
  391. }
  392. }
  393. // Connect to Peer list
  394. s.ProcessPeerList(peers)
  395. }
  396. // XXX tmp
  397. s.ConnectToPeer(seedNodeAddress)
  398. }
  399. }
  400. func (s *Ethereum) peerHandler(listener net.Listener) {
  401. for {
  402. conn, err := listener.Accept()
  403. if err != nil {
  404. loggerger.Debugln(err)
  405. continue
  406. }
  407. go s.AddPeer(conn)
  408. }
  409. }
  410. func (s *Ethereum) Stop() {
  411. // Stop eventMux first, it will close all subscriptions.
  412. s.eventMux.Stop()
  413. // Close the database
  414. defer s.db.Close()
  415. var ips []string
  416. eachPeer(s.peers, func(p *Peer, e *list.Element) {
  417. ips = append(ips, p.conn.RemoteAddr().String())
  418. })
  419. if len(ips) > 0 {
  420. d, _ := json.MarshalIndent(ips, "", " ")
  421. ethutil.WriteFile(path.Join(ethutil.Config.ExecPath, "known_peers.json"), d)
  422. }
  423. eachPeer(s.peers, func(p *Peer, e *list.Element) {
  424. p.Stop()
  425. })
  426. close(s.quit)
  427. if s.RpcServer != nil {
  428. s.RpcServer.Stop()
  429. }
  430. s.txPool.Stop()
  431. s.stateManager.Stop()
  432. s.blockPool.Stop()
  433. loggerger.Infoln("Server stopped")
  434. close(s.shutdownChan)
  435. }
  436. // This function will wait for a shutdown and resumes main thread execution
  437. func (s *Ethereum) WaitForShutdown() {
  438. <-s.shutdownChan
  439. }
  440. func (s *Ethereum) upnpUpdateThread() {
  441. // Go off immediately to prevent code duplication, thereafter we renew
  442. // lease every 15 minutes.
  443. timer := time.NewTimer(5 * time.Minute)
  444. lport, _ := strconv.ParseInt(s.Port, 10, 16)
  445. first := true
  446. out:
  447. for {
  448. select {
  449. case <-timer.C:
  450. var err error
  451. _, err = s.nat.AddPortMapping("TCP", int(lport), int(lport), "eth listen port", 20*60)
  452. if err != nil {
  453. loggerger.Debugln("can't add UPnP port mapping:", err)
  454. break out
  455. }
  456. if first && err == nil {
  457. _, err = s.nat.GetExternalAddress()
  458. if err != nil {
  459. loggerger.Debugln("UPnP can't get external address:", err)
  460. continue out
  461. }
  462. first = false
  463. }
  464. timer.Reset(time.Minute * 15)
  465. case <-s.quit:
  466. break out
  467. }
  468. }
  469. timer.Stop()
  470. if err := s.nat.DeletePortMapping("TCP", int(lport), int(lport)); err != nil {
  471. loggerger.Debugln("unable to remove UPnP port mapping:", err)
  472. } else {
  473. loggerger.Debugln("succesfully disestablished UPnP port mapping")
  474. }
  475. }
  476. func (self *Ethereum) update() {
  477. upToDateTimer := time.NewTicker(1 * time.Second)
  478. out:
  479. for {
  480. select {
  481. case <-upToDateTimer.C:
  482. if self.IsUpToDate() && !self.isUpToDate {
  483. self.eventMux.Post(ChainSyncEvent{false})
  484. self.isUpToDate = true
  485. } else if !self.IsUpToDate() && self.isUpToDate {
  486. self.eventMux.Post(ChainSyncEvent{true})
  487. self.isUpToDate = false
  488. }
  489. case <-self.quit:
  490. break out
  491. }
  492. }
  493. }
  494. // InstallFilter adds filter for blockchain events.
  495. // The filter's callbacks will run for matching blocks and messages.
  496. // The filter should not be modified after it has been installed.
  497. func (self *Ethereum) InstallFilter(filter *chain.Filter) (id int) {
  498. self.filterMu.Lock()
  499. id = self.filterId
  500. self.filters[id] = filter
  501. self.filterId++
  502. self.filterMu.Unlock()
  503. return id
  504. }
  505. func (self *Ethereum) UninstallFilter(id int) {
  506. self.filterMu.Lock()
  507. delete(self.filters, id)
  508. self.filterMu.Unlock()
  509. }
  510. // GetFilter retrieves a filter installed using InstallFilter.
  511. // The filter may not be modified.
  512. func (self *Ethereum) GetFilter(id int) *chain.Filter {
  513. self.filterMu.RLock()
  514. defer self.filterMu.RUnlock()
  515. return self.filters[id]
  516. }
  517. func (self *Ethereum) filterLoop() {
  518. // Subscribe to events
  519. events := self.eventMux.Subscribe(chain.NewBlockEvent{}, state.Messages(nil))
  520. for event := range events.Chan() {
  521. switch event := event.(type) {
  522. case chain.NewBlockEvent:
  523. self.filterMu.RLock()
  524. for _, filter := range self.filters {
  525. if filter.BlockCallback != nil {
  526. filter.BlockCallback(event.Block)
  527. }
  528. }
  529. self.filterMu.RUnlock()
  530. case state.Messages:
  531. self.filterMu.RLock()
  532. for _, filter := range self.filters {
  533. if filter.MessageCallback != nil {
  534. msgs := filter.FilterMessages(event)
  535. if len(msgs) > 0 {
  536. filter.MessageCallback(msgs)
  537. }
  538. }
  539. }
  540. self.filterMu.RUnlock()
  541. }
  542. }
  543. }
  544. func bootstrapDb(db ethutil.Database) {
  545. d, _ := db.Get([]byte("ProtocolVersion"))
  546. protov := ethutil.NewValue(d).Uint()
  547. if protov == 0 {
  548. db.Put([]byte("ProtocolVersion"), ethutil.NewValue(ProtocolVersion).Bytes())
  549. }
  550. }
  551. func PastPeers() []string {
  552. var ips []string
  553. data, _ := ethutil.ReadAllFile(path.Join(ethutil.Config.ExecPath, "known_peers.json"))
  554. json.Unmarshal([]byte(data), &ips)
  555. return ips
  556. }