ethereum.go 11 KB


  1. package eth
  2. import (
  3. "container/list"
  4. "fmt"
  5. "github.com/ethereum/eth-go/ethchain"
  6. "github.com/ethereum/eth-go/ethdb"
  7. "github.com/ethereum/eth-go/ethrpc"
  8. "github.com/ethereum/eth-go/ethutil"
  9. "github.com/ethereum/eth-go/ethwire"
  10. "github.com/ethereum/eth-go/ethlog"
  11. "io/ioutil"
  12. "math/rand"
  13. "net"
  14. "net/http"
  15. "strconv"
  16. "strings"
  17. "sync"
  18. "sync/atomic"
  19. "time"
  20. )
  21. var ethlogger = ethlog.NewLogger("SERV")
  22. func eachPeer(peers *list.List, callback func(*Peer, *list.Element)) {
  23. // Loop thru the peers and close them (if we had them)
  24. for e := peers.Front(); e != nil; e = e.Next() {
  25. if peer, ok := e.Value.(*Peer); ok {
  26. callback(peer, e)
  27. }
  28. }
  29. }
  30. const (
  31. processReapingTimeout = 60 // TODO increase
  32. )
  33. type Ethereum struct {
  34. // Channel for shutting down the ethereum
  35. shutdownChan chan bool
  36. quit chan bool
  37. // DB interface
  38. //db *ethdb.LDBDatabase
  39. db ethutil.Database
  40. // State manager for processing new blocks and managing the over all states
  41. stateManager *ethchain.StateManager
  42. // The transaction pool. Transaction can be pushed on this pool
  43. // for later including in the blocks
  44. txPool *ethchain.TxPool
  45. // The canonical chain
  46. blockChain *ethchain.BlockChain
  47. // Peers (NYI)
  48. peers *list.List
  49. // Nonce
  50. Nonce uint64
  51. Addr net.Addr
  52. Port string
  53. peerMut sync.Mutex
  54. // Capabilities for outgoing peers
  55. serverCaps Caps
  56. nat NAT
  57. // Specifies the desired amount of maximum peers
  58. MaxPeers int
  59. Mining bool
  60. listening bool
  61. reactor *ethutil.ReactorEngine
  62. RpcServer *ethrpc.JsonRpcServer
  63. }
  64. func New(caps Caps, usePnp bool) (*Ethereum, error) {
  65. db, err := ethdb.NewLDBDatabase("database")
  66. //db, err := ethdb.NewMemDatabase()
  67. if err != nil {
  68. return nil, err
  69. }
  70. var nat NAT
  71. if usePnp {
  72. nat, err = Discover()
  73. if err != nil {
  74. ethlogger.Debugln("UPnP failed", err)
  75. }
  76. }
  77. ethutil.Config.Db = db
  78. nonce, _ := ethutil.RandomUint64()
  79. ethereum := &Ethereum{
  80. shutdownChan: make(chan bool),
  81. quit: make(chan bool),
  82. db: db,
  83. peers: list.New(),
  84. Nonce: nonce,
  85. serverCaps: caps,
  86. nat: nat,
  87. }
  88. ethereum.reactor = ethutil.NewReactorEngine()
  89. ethereum.txPool = ethchain.NewTxPool(ethereum)
  90. ethereum.blockChain = ethchain.NewBlockChain(ethereum)
  91. ethereum.stateManager = ethchain.NewStateManager(ethereum)
  92. // Start the tx pool
  93. ethereum.txPool.Start()
  94. return ethereum, nil
  95. }
  96. func (s *Ethereum) Reactor() *ethutil.ReactorEngine {
  97. return s.reactor
  98. }
  99. func (s *Ethereum) BlockChain() *ethchain.BlockChain {
  100. return s.blockChain
  101. }
  102. func (s *Ethereum) StateManager() *ethchain.StateManager {
  103. return s.stateManager
  104. }
  105. func (s *Ethereum) TxPool() *ethchain.TxPool {
  106. return s.txPool
  107. }
  108. func (s *Ethereum) ServerCaps() Caps {
  109. return s.serverCaps
  110. }
  111. func (s *Ethereum) IsMining() bool {
  112. return s.Mining
  113. }
  114. func (s *Ethereum) PeerCount() int {
  115. return s.peers.Len()
  116. }
  117. func (s *Ethereum) IsUpToDate() bool {
  118. upToDate := true
  119. eachPeer(s.peers, func(peer *Peer, e *list.Element) {
  120. if atomic.LoadInt32(&peer.connected) == 1 {
  121. if peer.catchingUp == true {
  122. upToDate = false
  123. }
  124. }
  125. })
  126. return upToDate
  127. }
  128. func (s *Ethereum) PushPeer(peer *Peer) {
  129. s.peers.PushBack(peer)
  130. }
  131. func (s *Ethereum) IsListening() bool {
  132. return s.listening
  133. }
  134. func (s *Ethereum) AddPeer(conn net.Conn) {
  135. peer := NewPeer(conn, s, true)
  136. if peer != nil {
  137. if s.peers.Len() < s.MaxPeers {
  138. peer.Start()
  139. } else {
  140. ethlogger.Debugf("Max connected peers reached. Not adding incoming peer.")
  141. }
  142. }
  143. }
  144. func (s *Ethereum) ProcessPeerList(addrs []string) {
  145. for _, addr := range addrs {
  146. // TODO Probably requires some sanity checks
  147. s.ConnectToPeer(addr)
  148. }
  149. }
  150. func (s *Ethereum) ConnectToPeer(addr string) error {
  151. if s.peers.Len() < s.MaxPeers {
  152. var alreadyConnected bool
  153. ahost, _, _ := net.SplitHostPort(addr)
  154. var chost string
  155. ips, err := net.LookupIP(ahost)
  156. if err != nil {
  157. return err
  158. } else {
  159. // If more then one ip is available try stripping away the ipv6 ones
  160. if len(ips) > 1 {
  161. var ipsv4 []net.IP
  162. // For now remove the ipv6 addresses
  163. for _, ip := range ips {
  164. if strings.Contains(ip.String(), "::") {
  165. continue
  166. } else {
  167. ipsv4 = append(ipsv4, ip)
  168. }
  169. }
  170. if len(ipsv4) == 0 {
  171. return fmt.Errorf("[SERV] No IPV4 addresses available for hostname")
  172. }
  173. // Pick a random ipv4 address, simulating round-robin DNS.
  174. rand.Seed(time.Now().UTC().UnixNano())
  175. i := rand.Intn(len(ipsv4))
  176. chost = ipsv4[i].String()
  177. } else {
  178. if len(ips) == 0 {
  179. return fmt.Errorf("[SERV] No IPs resolved for the given hostname")
  180. return nil
  181. }
  182. chost = ips[0].String()
  183. }
  184. }
  185. eachPeer(s.peers, func(p *Peer, v *list.Element) {
  186. if p.conn == nil {
  187. return
  188. }
  189. phost, _, _ := net.SplitHostPort(p.conn.RemoteAddr().String())
  190. if phost == chost {
  191. alreadyConnected = true
  192. //ethlogger.Debugf("Peer %s already added.\n", chost)
  193. return
  194. }
  195. })
  196. if alreadyConnected {
  197. return nil
  198. }
  199. NewOutboundPeer(addr, s, s.serverCaps)
  200. }
  201. return nil
  202. }
  203. func (s *Ethereum) OutboundPeers() []*Peer {
  204. // Create a new peer slice with at least the length of the total peers
  205. outboundPeers := make([]*Peer, s.peers.Len())
  206. length := 0
  207. eachPeer(s.peers, func(p *Peer, e *list.Element) {
  208. if !p.inbound && p.conn != nil {
  209. outboundPeers[length] = p
  210. length++
  211. }
  212. })
  213. return outboundPeers[:length]
  214. }
  215. func (s *Ethereum) InboundPeers() []*Peer {
  216. // Create a new peer slice with at least the length of the total peers
  217. inboundPeers := make([]*Peer, s.peers.Len())
  218. length := 0
  219. eachPeer(s.peers, func(p *Peer, e *list.Element) {
  220. if p.inbound {
  221. inboundPeers[length] = p
  222. length++
  223. }
  224. })
  225. return inboundPeers[:length]
  226. }
  227. func (s *Ethereum) InOutPeers() []*Peer {
  228. // Reap the dead peers first
  229. s.reapPeers()
  230. // Create a new peer slice with at least the length of the total peers
  231. inboundPeers := make([]*Peer, s.peers.Len())
  232. length := 0
  233. eachPeer(s.peers, func(p *Peer, e *list.Element) {
  234. // Only return peers with an actual ip
  235. if len(p.host) > 0 {
  236. inboundPeers[length] = p
  237. length++
  238. }
  239. })
  240. return inboundPeers[:length]
  241. }
  242. func (s *Ethereum) Broadcast(msgType ethwire.MsgType, data []interface{}) {
  243. msg := ethwire.NewMessage(msgType, data)
  244. s.BroadcastMsg(msg)
  245. }
  246. func (s *Ethereum) BroadcastMsg(msg *ethwire.Msg) {
  247. eachPeer(s.peers, func(p *Peer, e *list.Element) {
  248. p.QueueMessage(msg)
  249. })
  250. }
  251. func (s *Ethereum) Peers() *list.List {
  252. return s.peers
  253. }
  254. func (s *Ethereum) reapPeers() {
  255. eachPeer(s.peers, func(p *Peer, e *list.Element) {
  256. if atomic.LoadInt32(&p.disconnect) == 1 || (p.inbound && (time.Now().Unix()-p.lastPong) > int64(5*time.Minute)) {
  257. s.removePeerElement(e)
  258. }
  259. })
  260. }
  261. func (s *Ethereum) removePeerElement(e *list.Element) {
  262. s.peerMut.Lock()
  263. defer s.peerMut.Unlock()
  264. s.peers.Remove(e)
  265. s.reactor.Post("peerList", s.peers)
  266. }
  267. func (s *Ethereum) RemovePeer(p *Peer) {
  268. eachPeer(s.peers, func(peer *Peer, e *list.Element) {
  269. if peer == p {
  270. s.removePeerElement(e)
  271. }
  272. })
  273. }
  274. func (s *Ethereum) ReapDeadPeerHandler() {
  275. reapTimer := time.NewTicker(processReapingTimeout * time.Second)
  276. for {
  277. select {
  278. case <-reapTimer.C:
  279. s.reapPeers()
  280. }
  281. }
  282. }
  283. // Start the ethereum
  284. func (s *Ethereum) Start(seed bool) {
  285. // Bind to addr and port
  286. ln, err := net.Listen("tcp", ":"+s.Port)
  287. if err != nil {
  288. ethlogger.Warnf("Port %s in use. Connection listening disabled. Acting as client", s.Port)
  289. s.listening = false
  290. } else {
  291. s.listening = true
  292. // Starting accepting connections
  293. ethlogger.Infoln("Ready and accepting connections")
  294. // Start the peer handler
  295. go s.peerHandler(ln)
  296. }
  297. if s.nat != nil {
  298. go s.upnpUpdateThread()
  299. }
  300. // Start the reaping processes
  301. go s.ReapDeadPeerHandler()
  302. if seed {
  303. s.Seed()
  304. }
  305. ethlogger.Infoln("Server started")
  306. }
  307. func (s *Ethereum) Seed() {
  308. ethlogger.Debugln("Retrieving seed nodes")
  309. // Eth-Go Bootstrapping
  310. ips, er := net.LookupIP("seed.bysh.me")
  311. if er == nil {
  312. peers := []string{}
  313. for _, ip := range ips {
  314. node := fmt.Sprintf("%s:%d", ip.String(), 30303)
  315. ethlogger.Debugln("Found DNS Go Peer:", node)
  316. peers = append(peers, node)
  317. }
  318. s.ProcessPeerList(peers)
  319. }
  320. // Official DNS Bootstrapping
  321. _, nodes, err := net.LookupSRV("eth", "tcp", "ethereum.org")
  322. if err == nil {
  323. peers := []string{}
  324. // Iterate SRV nodes
  325. for _, n := range nodes {
  326. target := n.Target
  327. port := strconv.Itoa(int(n.Port))
  328. // Resolve target to ip (Go returns list, so may resolve to multiple ips?)
  329. addr, err := net.LookupHost(target)
  330. if err == nil {
  331. for _, a := range addr {
  332. // Build string out of SRV port and Resolved IP
  333. peer := net.JoinHostPort(a, port)
  334. ethlogger.Debugln("Found DNS Bootstrap Peer:", peer)
  335. peers = append(peers, peer)
  336. }
  337. } else {
  338. ethlogger.Debugln("Couldn't resolve :", target)
  339. }
  340. }
  341. // Connect to Peer list
  342. s.ProcessPeerList(peers)
  343. } else {
  344. // Fallback to servers.poc3.txt
  345. resp, err := http.Get("http://www.ethereum.org/servers.poc3.txt")
  346. if err != nil {
  347. ethlogger.Warnln("Fetching seed failed:", err)
  348. return
  349. }
  350. defer resp.Body.Close()
  351. body, err := ioutil.ReadAll(resp.Body)
  352. if err != nil {
  353. ethlogger.Warnln("Reading seed failed:", err)
  354. return
  355. }
  356. s.ConnectToPeer(string(body))
  357. }
  358. }
  359. func (s *Ethereum) peerHandler(listener net.Listener) {
  360. for {
  361. conn, err := listener.Accept()
  362. if err != nil {
  363. ethlogger.Debugln(err)
  364. continue
  365. }
  366. go s.AddPeer(conn)
  367. }
  368. }
  369. func (s *Ethereum) Stop() {
  370. // Close the database
  371. defer s.db.Close()
  372. eachPeer(s.peers, func(p *Peer, e *list.Element) {
  373. p.Stop()
  374. })
  375. close(s.quit)
  376. if s.RpcServer != nil {
  377. s.RpcServer.Stop()
  378. }
  379. s.txPool.Stop()
  380. s.stateManager.Stop()
  381. ethlogger.Infoln("Server stopped")
  382. close(s.shutdownChan)
  383. }
  384. // This function will wait for a shutdown and resumes main thread execution
  385. func (s *Ethereum) WaitForShutdown() {
  386. <-s.shutdownChan
  387. }
  388. func (s *Ethereum) upnpUpdateThread() {
  389. // Go off immediately to prevent code duplication, thereafter we renew
  390. // lease every 15 minutes.
  391. timer := time.NewTimer(5 * time.Minute)
  392. lport, _ := strconv.ParseInt(s.Port, 10, 16)
  393. first := true
  394. out:
  395. for {
  396. select {
  397. case <-timer.C:
  398. var err error
  399. _, err = s.nat.AddPortMapping("TCP", int(lport), int(lport), "eth listen port", 20*60)
  400. if err != nil {
  401. ethlogger.Debugln("can't add UPnP port mapping:", err)
  402. break out
  403. }
  404. if first && err == nil {
  405. _, err = s.nat.GetExternalAddress()
  406. if err != nil {
  407. ethlogger.Debugln("UPnP can't get external address:", err)
  408. continue out
  409. }
  410. first = false
  411. }
  412. timer.Reset(time.Minute * 15)
  413. case <-s.quit:
  414. break out
  415. }
  416. }
  417. timer.Stop()
  418. if err := s.nat.DeletePortMapping("TCP", int(lport), int(lport)); err != nil {
  419. ethlogger.Debugln("unable to remove UPnP port mapping:", err)
  420. } else {
  421. ethlogger.Debugln("succesfully disestablished UPnP port mapping")
  422. }
  423. }