backend.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. package eth
  2. import (
  3. "fmt"
  4. "net"
  5. "sync"
  6. "github.com/ethereum/go-ethereum/core"
  7. "github.com/ethereum/go-ethereum/crypto"
  8. "github.com/ethereum/go-ethereum/ethdb"
  9. "github.com/ethereum/go-ethereum/ethutil"
  10. "github.com/ethereum/go-ethereum/event"
  11. ethlogger "github.com/ethereum/go-ethereum/logger"
  12. "github.com/ethereum/go-ethereum/p2p"
  13. "github.com/ethereum/go-ethereum/pow/ezp"
  14. "github.com/ethereum/go-ethereum/rpc"
  15. "github.com/ethereum/go-ethereum/whisper"
  16. )
  17. const (
  18. seedNodeAddress = "poc-7.ethdev.com:30300"
  19. )
  20. type Config struct {
  21. Name string
  22. Version string
  23. Identifier string
  24. KeyStore string
  25. DataDir string
  26. LogFile string
  27. LogLevel int
  28. KeyRing string
  29. MaxPeers int
  30. Port string
  31. NATType string
  32. PMPGateway string
  33. Shh bool
  34. Dial bool
  35. KeyManager *crypto.KeyManager
  36. }
  37. var logger = ethlogger.NewLogger("SERV")
  38. type Ethereum struct {
  39. // Channel for shutting down the ethereum
  40. shutdownChan chan bool
  41. quit chan bool
  42. // DB interface
  43. db ethutil.Database
  44. blacklist p2p.Blacklist
  45. //*** SERVICES ***
  46. // State manager for processing new blocks and managing the over all states
  47. blockProcessor *core.BlockProcessor
  48. txPool *core.TxPool
  49. chainManager *core.ChainManager
  50. blockPool *BlockPool
  51. whisper *whisper.Whisper
  52. net *p2p.Server
  53. eventMux *event.TypeMux
  54. txSub event.Subscription
  55. blockSub event.Subscription
  56. RpcServer *rpc.JsonRpcServer
  57. keyManager *crypto.KeyManager
  58. clientIdentity p2p.ClientIdentity
  59. logger ethlogger.LogSystem
  60. synclock sync.Mutex
  61. syncGroup sync.WaitGroup
  62. Mining bool
  63. }
  64. func New(config *Config) (*Ethereum, error) {
  65. // Boostrap database
  66. logger := ethlogger.New(config.DataDir, config.LogFile, config.LogLevel)
  67. db, err := ethdb.NewLDBDatabase("database")
  68. if err != nil {
  69. return nil, err
  70. }
  71. // Perform database sanity checks
  72. d, _ := db.Get([]byte("ProtocolVersion"))
  73. protov := ethutil.NewValue(d).Uint()
  74. if protov != ProtocolVersion && protov != 0 {
  75. return nil, fmt.Errorf("Database version mismatch. Protocol(%d / %d). `rm -rf %s`", protov, ProtocolVersion, ethutil.Config.ExecPath+"/database")
  76. }
  77. // Create new keymanager
  78. var keyManager *crypto.KeyManager
  79. switch config.KeyStore {
  80. case "db":
  81. keyManager = crypto.NewDBKeyManager(db)
  82. case "file":
  83. keyManager = crypto.NewFileKeyManager(config.DataDir)
  84. default:
  85. return nil, fmt.Errorf("unknown keystore type: %s", config.KeyStore)
  86. }
  87. // Initialise the keyring
  88. keyManager.Init(config.KeyRing, 0, false)
  89. // Create a new client id for this instance. This will help identifying the node on the network
  90. clientId := p2p.NewSimpleClientIdentity(config.Name, config.Version, config.Identifier, keyManager.PublicKey())
  91. saveProtocolVersion(db)
  92. ethutil.Config.Db = db
  93. eth := &Ethereum{
  94. shutdownChan: make(chan bool),
  95. quit: make(chan bool),
  96. db: db,
  97. keyManager: keyManager,
  98. clientIdentity: clientId,
  99. blacklist: p2p.NewBlacklist(),
  100. eventMux: &event.TypeMux{},
  101. logger: logger,
  102. }
  103. eth.chainManager = core.NewChainManager(eth.EventMux())
  104. eth.txPool = core.NewTxPool(eth.EventMux())
  105. eth.blockProcessor = core.NewBlockProcessor(eth.txPool, eth.chainManager, eth.EventMux())
  106. eth.chainManager.SetProcessor(eth.blockProcessor)
  107. eth.whisper = whisper.New()
  108. hasBlock := eth.chainManager.HasBlock
  109. insertChain := eth.chainManager.InsertChain
  110. eth.blockPool = NewBlockPool(hasBlock, insertChain, ezp.Verify)
  111. ethProto := EthProtocol(eth.txPool, eth.chainManager, eth.blockPool)
  112. protocols := []p2p.Protocol{ethProto}
  113. if config.Shh {
  114. eth.whisper = whisper.New()
  115. protocols = append(protocols, eth.whisper.Protocol())
  116. }
  117. nat, err := p2p.ParseNAT(config.NATType, config.PMPGateway)
  118. if err != nil {
  119. return nil, err
  120. }
  121. eth.net = &p2p.Server{
  122. Identity: clientId,
  123. MaxPeers: config.MaxPeers,
  124. Protocols: protocols,
  125. Blacklist: eth.blacklist,
  126. NAT: nat,
  127. NoDial: !config.Dial,
  128. }
  129. if len(config.Port) > 0 {
  130. eth.net.ListenAddr = ":" + config.Port
  131. }
  132. return eth, nil
  133. }
  134. func (s *Ethereum) KeyManager() *crypto.KeyManager {
  135. return s.keyManager
  136. }
  137. func (s *Ethereum) Logger() ethlogger.LogSystem {
  138. return s.logger
  139. }
  140. func (s *Ethereum) ClientIdentity() p2p.ClientIdentity {
  141. return s.clientIdentity
  142. }
  143. func (s *Ethereum) ChainManager() *core.ChainManager {
  144. return s.chainManager
  145. }
  146. func (s *Ethereum) BlockProcessor() *core.BlockProcessor {
  147. return s.blockProcessor
  148. }
  149. func (s *Ethereum) TxPool() *core.TxPool {
  150. return s.txPool
  151. }
  152. func (s *Ethereum) BlockPool() *BlockPool {
  153. return s.blockPool
  154. }
  155. func (s *Ethereum) Whisper() *whisper.Whisper {
  156. return s.whisper
  157. }
  158. func (s *Ethereum) EventMux() *event.TypeMux {
  159. return s.eventMux
  160. }
  161. func (self *Ethereum) Db() ethutil.Database {
  162. return self.db
  163. }
  164. func (s *Ethereum) IsMining() bool {
  165. return s.Mining
  166. }
  167. func (s *Ethereum) IsListening() bool {
  168. // XXX TODO
  169. return false
  170. }
  171. func (s *Ethereum) PeerCount() int {
  172. return s.net.PeerCount()
  173. }
  174. func (s *Ethereum) Peers() []*p2p.Peer {
  175. return s.net.Peers()
  176. }
  177. func (s *Ethereum) MaxPeers() int {
  178. return s.net.MaxPeers
  179. }
  180. // Start the ethereum
  181. func (s *Ethereum) Start(seed bool) error {
  182. err := s.net.Start()
  183. if err != nil {
  184. return err
  185. }
  186. // Start services
  187. s.txPool.Start()
  188. s.blockPool.Start()
  189. if s.whisper != nil {
  190. s.whisper.Start()
  191. }
  192. // broadcast transactions
  193. s.txSub = s.eventMux.Subscribe(core.TxPreEvent{})
  194. go s.txBroadcastLoop()
  195. // broadcast mined blocks
  196. s.blockSub = s.eventMux.Subscribe(core.NewMinedBlockEvent{})
  197. go s.blockBroadcastLoop()
  198. // TODO: read peers here
  199. if seed {
  200. logger.Infof("Connect to seed node %v", seedNodeAddress)
  201. if err := s.SuggestPeer(seedNodeAddress); err != nil {
  202. return err
  203. }
  204. }
  205. logger.Infoln("Server started")
  206. return nil
  207. }
  208. func (self *Ethereum) SuggestPeer(addr string) error {
  209. netaddr, err := net.ResolveTCPAddr("tcp", addr)
  210. if err != nil {
  211. logger.Errorf("couldn't resolve %s:", addr, err)
  212. return err
  213. }
  214. self.net.SuggestPeer(netaddr.IP, netaddr.Port, nil)
  215. return nil
  216. }
  217. func (s *Ethereum) Stop() {
  218. // Close the database
  219. defer s.db.Close()
  220. close(s.quit)
  221. s.txSub.Unsubscribe() // quits txBroadcastLoop
  222. s.blockSub.Unsubscribe() // quits blockBroadcastLoop
  223. if s.RpcServer != nil {
  224. s.RpcServer.Stop()
  225. }
  226. s.txPool.Stop()
  227. s.eventMux.Stop()
  228. s.blockPool.Stop()
  229. if s.whisper != nil {
  230. s.whisper.Stop()
  231. }
  232. logger.Infoln("Server stopped")
  233. close(s.shutdownChan)
  234. }
  235. // This function will wait for a shutdown and resumes main thread execution
  236. func (s *Ethereum) WaitForShutdown() {
  237. <-s.shutdownChan
  238. }
  239. // now tx broadcasting is taken out of txPool
  240. // handled here via subscription, efficiency?
  241. func (self *Ethereum) txBroadcastLoop() {
  242. // automatically stops if unsubscribe
  243. for obj := range self.txSub.Chan() {
  244. event := obj.(core.TxPreEvent)
  245. self.net.Broadcast("eth", TxMsg, event.Tx.RlpData())
  246. }
  247. }
  248. func (self *Ethereum) blockBroadcastLoop() {
  249. // automatically stops if unsubscribe
  250. for obj := range self.blockSub.Chan() {
  251. switch ev := obj.(type) {
  252. case core.NewMinedBlockEvent:
  253. self.net.Broadcast("eth", NewBlockMsg, ev.Block.RlpData(), ev.Block.Td)
  254. }
  255. }
  256. }
  257. func saveProtocolVersion(db ethutil.Database) {
  258. d, _ := db.Get([]byte("ProtocolVersion"))
  259. protocolVersion := ethutil.NewValue(d).Uint()
  260. if protocolVersion == 0 {
  261. db.Put([]byte("ProtocolVersion"), ethutil.NewValue(ProtocolVersion).Bytes())
  262. }
  263. }