Forráskód Böngészése

initial commit for new backend (eth.Ethereum)

zelig 11 éve
szülő
commit
02017ed0e0
1 módosított fájl, 357 hozzáadás és 0 törlés
  1. 357 0
      eth/backend.go

+ 357 - 0
eth/backend.go

@@ -0,0 +1,357 @@
+package eth
+
+import (
+	"encoding/json"
+	"net"
+	"path"
+	"sync"
+
+	"github.com/ethereum/go-ethereum/core"
+	"github.com/ethereum/go-ethereum/core/types"
+	"github.com/ethereum/go-ethereum/crypto"
+	"github.com/ethereum/go-ethereum/ethutil"
+	"github.com/ethereum/go-ethereum/event"
+	ethlogger "github.com/ethereum/go-ethereum/logger"
+	"github.com/ethereum/go-ethereum/p2p"
+	"github.com/ethereum/go-ethereum/rpc"
+	"github.com/ethereum/go-ethereum/state"
+)
+
+const (
+	seedNodeAddress = "poc-7.ethdev.com:30300"
+)
+
+var logger = ethlogger.NewLogger("SERV")
+
+type Ethereum struct {
+	// Channel for shutting down the ethereum
+	shutdownChan chan bool
+	quit         chan bool
+
+	// DB interface
+	db ethutil.Database
+	// State manager for processing new blocks and managing the over all states
+	blockManager *core.BlockManager
+
+	// The transaction pool. Transaction can be pushed on this pool
+	// for later including in the blocks
+	txPool *core.TxPool
+	// The canonical chain
+	chainManager *core.ChainManager
+	// The block pool
+	blockPool *BlockPool
+	// Event
+	eventMux *event.TypeMux
+
+	// Nonce
+	Nonce uint64
+
+	ListenAddr string
+
+	blacklist p2p.Blacklist
+	server    *p2p.Server
+	txSub     event.Subscription
+	blockSub  event.Subscription
+
+	// Capabilities for outgoing peers
+	// serverCaps Caps
+	peersFile string
+
+	Mining bool
+
+	RpcServer *rpc.JsonRpcServer
+
+	keyManager *crypto.KeyManager
+
+	clientIdentity p2p.ClientIdentity
+
+	synclock  sync.Mutex
+	syncGroup sync.WaitGroup
+
+	filterMu sync.RWMutex
+	filterId int
+	filters  map[int]*core.Filter
+}
+
+func New(db ethutil.Database, identity p2p.ClientIdentity, keyManager *crypto.KeyManager, nat p2p.NAT, port string, maxPeers int) (*Ethereum, error) {
+
+	saveProtocolVersion(db)
+	ethutil.Config.Db = db
+
+	// FIXME:
+	blacklist := p2p.NewBlacklist()
+	// Sorry Py person. I must blacklist. you perform badly
+	blacklist.Put(ethutil.Hex2Bytes("64656330303561383532336435376331616537643864663236623336313863373537353163636634333530626263396330346237336262623931383064393031"))
+
+	peersFile := path.Join(ethutil.Config.ExecPath, "known_peers.json")
+
+	nonce, _ := ethutil.RandomUint64()
+
+	listenAddr := ":" + port
+
+	eth := &Ethereum{
+		shutdownChan: make(chan bool),
+		quit:         make(chan bool),
+		db:           db,
+		Nonce:        nonce,
+		// serverCaps:     caps,
+		peersFile:      peersFile,
+		ListenAddr:     listenAddr,
+		keyManager:     keyManager,
+		clientIdentity: identity,
+		blacklist:      blacklist,
+		eventMux:       &event.TypeMux{},
+		filters:        make(map[int]*core.Filter),
+	}
+
+	eth.txPool = core.NewTxPool(eth)
+	eth.chainManager = core.NewChainManager(eth.EventMux())
+	eth.blockManager = core.NewBlockManager(eth)
+	eth.chainManager.SetProcessor(eth.blockManager)
+
+	hasBlock := eth.chainManager.HasBlock
+	insertChain := eth.chainManager.InsertChain
+	// pow := ezp.New()
+	// verifyPoW := pow.Verify
+	verifyPoW := func(*types.Block) bool { return true }
+	eth.blockPool = NewBlockPool(hasBlock, insertChain, verifyPoW)
+
+	// Start the tx pool
+	eth.txPool.Start()
+
+	ethProto := EthProtocol(eth.txPool, eth.chainManager, eth.blockPool)
+	protocols := []p2p.Protocol{ethProto}
+
+	server := &p2p.Server{
+		Identity:   identity,
+		MaxPeers:   maxPeers,
+		Protocols:  protocols,
+		ListenAddr: listenAddr,
+		Blacklist:  blacklist,
+		NAT:        nat,
+	}
+
+	eth.server = server
+
+	return eth, nil
+}
+
+func (s *Ethereum) KeyManager() *crypto.KeyManager {
+	return s.keyManager
+}
+
+func (s *Ethereum) ClientIdentity() p2p.ClientIdentity {
+	return s.clientIdentity
+}
+
+func (s *Ethereum) ChainManager() *core.ChainManager {
+	return s.chainManager
+}
+
+func (s *Ethereum) BlockManager() *core.BlockManager {
+	return s.blockManager
+}
+
+func (s *Ethereum) TxPool() *core.TxPool {
+	return s.txPool
+}
+
+func (s *Ethereum) BlockPool() *BlockPool {
+	return s.blockPool
+}
+
+func (s *Ethereum) EventMux() *event.TypeMux {
+	return s.eventMux
+}
+func (self *Ethereum) Db() ethutil.Database {
+	return self.db
+}
+
+func (s *Ethereum) IsMining() bool {
+	return s.Mining
+}
+
+func (s *Ethereum) IsListening() bool {
+	if s.ListenAddr == "" {
+		return false
+	} else {
+		return true
+	}
+}
+
+func (s *Ethereum) PeerCount() int {
+	return s.server.PeerCount()
+}
+
+func (s *Ethereum) Peers() []*p2p.Peer {
+	return s.server.Peers()
+}
+
+// Start the ethereum
+func (s *Ethereum) Start(seed bool) error {
+	err := s.server.Start()
+	if err != nil {
+		return err
+	}
+	s.blockPool.Start()
+	s.blockManager.Start()
+
+	go s.filterLoop()
+
+	// broadcast transactions
+	s.txSub = s.eventMux.Subscribe(core.TxPreEvent{})
+	go s.txBroadcastLoop()
+
+	// broadcast mined blocks
+	s.blockSub = s.eventMux.Subscribe(core.NewMinedBlockEvent{})
+	go s.blockBroadcastLoop()
+
+	// TODO: read peers here
+	if seed {
+		logger.Infof("Connect to seed node %v", seedNodeAddress)
+		if err := s.SuggestPeer(seedNodeAddress); err != nil {
+			return err
+		}
+	}
+
+	logger.Infoln("Server started")
+	return nil
+}
+
+func (self *Ethereum) SuggestPeer(addr string) error {
+	netaddr, err := net.ResolveTCPAddr("tcp", addr)
+	if err != nil {
+		logger.Errorf("couldn't resolve %s:", addr, err)
+		return err
+	}
+
+	self.server.SuggestPeer(netaddr.IP, netaddr.Port, nil)
+	return nil
+}
+
+func (s *Ethereum) Stop() {
+	// Close the database
+	defer s.db.Close()
+
+	//
+	// WritePeers(s.peersFile, s.server.PeerAddresses())
+	close(s.quit)
+
+	s.txSub.Unsubscribe()    // quits txBroadcastLoop
+	s.blockSub.Unsubscribe() // quits blockBroadcastLoop
+
+	if s.RpcServer != nil {
+		s.RpcServer.Stop()
+	}
+	s.txPool.Stop()
+	s.blockManager.Stop()
+	s.eventMux.Stop()
+	s.blockPool.Stop()
+
+	logger.Infoln("Server stopped")
+	close(s.shutdownChan)
+}
+
+// This function will wait for a shutdown and resumes main thread execution
+func (s *Ethereum) WaitForShutdown() {
+	<-s.shutdownChan
+}
+
+func WritePeers(path string, addresses []string) {
+	if len(addresses) > 0 {
+		data, _ := json.MarshalIndent(addresses, "", "    ")
+		ethutil.WriteFile(path, data)
+	}
+}
+
+func ReadPeers(path string) (ips []string, err error) {
+	var data string
+	data, err = ethutil.ReadAllFile(path)
+	if err != nil {
+		json.Unmarshal([]byte(data), &ips)
+	}
+	return
+}
+
+// now tx broadcasting is taken out of txPool
+// handled here via subscription, efficiency?
+func (self *Ethereum) txBroadcastLoop() {
+	// automatically stops if unsubscribe
+	for obj := range self.txSub.Chan() {
+		event := obj.(core.TxPreEvent)
+		self.server.Broadcast("eth", TxMsg, []interface{}{event.Tx.RlpData()})
+	}
+}
+
+func (self *Ethereum) blockBroadcastLoop() {
+	// automatically stops if unsubscribe
+	for obj := range self.txSub.Chan() {
+		event := obj.(core.NewMinedBlockEvent)
+		self.server.Broadcast("eth", NewBlockMsg, event.Block.Value().Val)
+	}
+}
+
+func saveProtocolVersion(db ethutil.Database) {
+	d, _ := db.Get([]byte("ProtocolVersion"))
+	protocolVersion := ethutil.NewValue(d).Uint()
+
+	if protocolVersion == 0 {
+		db.Put([]byte("ProtocolVersion"), ethutil.NewValue(ProtocolVersion).Bytes())
+	}
+}
+
+// InstallFilter adds filter for blockchain events.
+// The filter's callbacks will run for matching blocks and messages.
+// The filter should not be modified after it has been installed.
+func (self *Ethereum) InstallFilter(filter *core.Filter) (id int) {
+	self.filterMu.Lock()
+	id = self.filterId
+	self.filters[id] = filter
+	self.filterId++
+	self.filterMu.Unlock()
+	return id
+}
+
+func (self *Ethereum) UninstallFilter(id int) {
+	self.filterMu.Lock()
+	delete(self.filters, id)
+	self.filterMu.Unlock()
+}
+
+// GetFilter retrieves a filter installed using InstallFilter.
+// The filter may not be modified.
+func (self *Ethereum) GetFilter(id int) *core.Filter {
+	self.filterMu.RLock()
+	defer self.filterMu.RUnlock()
+	return self.filters[id]
+}
+
+func (self *Ethereum) filterLoop() {
+	// Subscribe to events
+	events := self.eventMux.Subscribe(core.NewBlockEvent{}, state.Messages(nil))
+	for event := range events.Chan() {
+		switch event.(type) {
+		case core.NewBlockEvent:
+			self.filterMu.RLock()
+			for _, filter := range self.filters {
+				if filter.BlockCallback != nil {
+					e := event.(core.NewBlockEvent)
+					filter.BlockCallback(e.Block)
+				}
+			}
+			self.filterMu.RUnlock()
+		case state.Messages:
+			self.filterMu.RLock()
+			for _, filter := range self.filters {
+				if filter.MessageCallback != nil {
+					e := event.(state.Messages)
+					msgs := filter.FilterMessages(e)
+					if len(msgs) > 0 {
+						filter.MessageCallback(msgs)
+					}
+				}
+			}
+			self.filterMu.RUnlock()
+		}
+	}
+}