Explorar o código

Merge pull request #3605 from fjl/event-feed

event: add new Subscription type and related utilities
Péter Szilágyi %!s(int64=8) %!d(string=hai) anos
pai
achega
7734ead520

+ 1 - 1
core/tx_pool.go

@@ -89,7 +89,7 @@ type TxPool struct {
 	gasLimit     func() *big.Int // The current gas limit function callback
 	minGasPrice  *big.Int
 	eventMux     *event.TypeMux
-	events       event.Subscription
+	events       *event.TypeMuxSubscription
 	localTx      *txSet
 	signer       types.Signer
 	mu           sync.RWMutex

+ 2 - 2
eth/filters/filter_system.go

@@ -74,7 +74,7 @@ type subscription struct {
 // subscription which match the subscription criteria.
 type EventSystem struct {
 	mux       *event.TypeMux
-	sub       event.Subscription
+	sub       *event.TypeMuxSubscription
 	backend   Backend
 	lightMode bool
 	lastHead  *types.Header
@@ -277,7 +277,7 @@ func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscr
 type filterIndex map[Type]map[rpc.ID]*subscription
 
 // broadcast event to filters that match criteria.
-func (es *EventSystem) broadcast(filters filterIndex, ev *event.Event) {
+func (es *EventSystem) broadcast(filters filterIndex, ev *event.TypeMuxEvent) {
 	if ev == nil {
 		return
 	}

+ 2 - 2
eth/handler.go

@@ -78,8 +78,8 @@ type ProtocolManager struct {
 	SubProtocols []p2p.Protocol
 
 	eventMux      *event.TypeMux
-	txSub         event.Subscription
-	minedBlockSub event.Subscription
+	txSub         *event.TypeMuxSubscription
+	minedBlockSub *event.TypeMuxSubscription
 
 	// channels for fetcher, syncer, txsyncLoop
 	newPeerCh   chan *peer

+ 25 - 35
event/event.go

@@ -14,7 +14,7 @@
 // You should have received a copy of the GNU Lesser General Public License
 // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
 
-// Package event implements an event multiplexer.
+// Package event deals with subscriptions to real-time events.
 package event
 
 import (
@@ -25,33 +25,22 @@ import (
 	"time"
 )
 
-// Event is a time-tagged notification pushed to subscribers.
-type Event struct {
+// TypeMuxEvent is a time-tagged notification pushed to subscribers.
+type TypeMuxEvent struct {
 	Time time.Time
 	Data interface{}
 }
 
-// Subscription is implemented by event subscriptions.
-type Subscription interface {
-	// Chan returns a channel that carries events.
-	// Implementations should return the same channel
-	// for any subsequent calls to Chan.
-	Chan() <-chan *Event
-
-	// Unsubscribe stops delivery of events to a subscription.
-	// The event channel is closed.
-	// Unsubscribe can be called more than once.
-	Unsubscribe()
-}
-
 // A TypeMux dispatches events to registered receivers. Receivers can be
 // registered to handle events of certain type. Any operation
 // called after mux is stopped will return ErrMuxClosed.
 //
 // The zero value is ready to use.
+//
+// Deprecated: use Feed
 type TypeMux struct {
 	mutex   sync.RWMutex
-	subm    map[reflect.Type][]*muxsub
+	subm    map[reflect.Type][]*TypeMuxSubscription
 	stopped bool
 }
 
@@ -61,7 +50,7 @@ var ErrMuxClosed = errors.New("event: mux closed")
 // Subscribe creates a subscription for events of the given types. The
 // subscription's channel is closed when it is unsubscribed
 // or the mux is closed.
-func (mux *TypeMux) Subscribe(types ...interface{}) Subscription {
+func (mux *TypeMux) Subscribe(types ...interface{}) *TypeMuxSubscription {
 	sub := newsub(mux)
 	mux.mutex.Lock()
 	defer mux.mutex.Unlock()
@@ -72,7 +61,7 @@ func (mux *TypeMux) Subscribe(types ...interface{}) Subscription {
 		close(sub.postC)
 	} else {
 		if mux.subm == nil {
-			mux.subm = make(map[reflect.Type][]*muxsub)
+			mux.subm = make(map[reflect.Type][]*TypeMuxSubscription)
 		}
 		for _, t := range types {
 			rtyp := reflect.TypeOf(t)
@@ -80,7 +69,7 @@ func (mux *TypeMux) Subscribe(types ...interface{}) Subscription {
 			if find(oldsubs, sub) != -1 {
 				panic(fmt.Sprintf("event: duplicate type %s in Subscribe", rtyp))
 			}
-			subs := make([]*muxsub, len(oldsubs)+1)
+			subs := make([]*TypeMuxSubscription, len(oldsubs)+1)
 			copy(subs, oldsubs)
 			subs[len(oldsubs)] = sub
 			mux.subm[rtyp] = subs
@@ -92,7 +81,7 @@ func (mux *TypeMux) Subscribe(types ...interface{}) Subscription {
 // Post sends an event to all receivers registered for the given type.
 // It returns ErrMuxClosed if the mux has been stopped.
 func (mux *TypeMux) Post(ev interface{}) error {
-	event := &Event{
+	event := &TypeMuxEvent{
 		Time: time.Now(),
 		Data: ev,
 	}
@@ -125,7 +114,7 @@ func (mux *TypeMux) Stop() {
 	mux.mutex.Unlock()
 }
 
-func (mux *TypeMux) del(s *muxsub) {
+func (mux *TypeMux) del(s *TypeMuxSubscription) {
 	mux.mutex.Lock()
 	for typ, subs := range mux.subm {
 		if pos := find(subs, s); pos >= 0 {
@@ -139,7 +128,7 @@ func (mux *TypeMux) del(s *muxsub) {
 	s.mux.mutex.Unlock()
 }
 
-func find(slice []*muxsub, item *muxsub) int {
+func find(slice []*TypeMuxSubscription, item *TypeMuxSubscription) int {
 	for i, v := range slice {
 		if v == item {
 			return i
@@ -148,14 +137,15 @@ func find(slice []*muxsub, item *muxsub) int {
 	return -1
 }
 
-func posdelete(slice []*muxsub, pos int) []*muxsub {
-	news := make([]*muxsub, len(slice)-1)
+func posdelete(slice []*TypeMuxSubscription, pos int) []*TypeMuxSubscription {
+	news := make([]*TypeMuxSubscription, len(slice)-1)
 	copy(news[:pos], slice[:pos])
 	copy(news[pos:], slice[pos+1:])
 	return news
 }
 
-type muxsub struct {
+// TypeMuxSubscription is a subscription established through TypeMux.
+type TypeMuxSubscription struct {
 	mux     *TypeMux
 	created time.Time
 	closeMu sync.Mutex
@@ -166,13 +156,13 @@ type muxsub struct {
 	// postC can be set to nil without affecting the return value of
 	// Chan.
 	postMu sync.RWMutex
-	readC  <-chan *Event
-	postC  chan<- *Event
+	readC  <-chan *TypeMuxEvent
+	postC  chan<- *TypeMuxEvent
 }
 
-func newsub(mux *TypeMux) *muxsub {
-	c := make(chan *Event)
-	return &muxsub{
+func newsub(mux *TypeMux) *TypeMuxSubscription {
+	c := make(chan *TypeMuxEvent)
+	return &TypeMuxSubscription{
 		mux:     mux,
 		created: time.Now(),
 		readC:   c,
@@ -181,16 +171,16 @@ func newsub(mux *TypeMux) *muxsub {
 	}
 }
 
-func (s *muxsub) Chan() <-chan *Event {
+func (s *TypeMuxSubscription) Chan() <-chan *TypeMuxEvent {
 	return s.readC
 }
 
-func (s *muxsub) Unsubscribe() {
+func (s *TypeMuxSubscription) Unsubscribe() {
 	s.mux.del(s)
 	s.closewait()
 }
 
-func (s *muxsub) closewait() {
+func (s *TypeMuxSubscription) closewait() {
 	s.closeMu.Lock()
 	defer s.closeMu.Unlock()
 	if s.closed {
@@ -205,7 +195,7 @@ func (s *muxsub) closewait() {
 	s.postMu.Unlock()
 }
 
-func (s *muxsub) deliver(event *Event) {
+func (s *TypeMuxSubscription) deliver(event *TypeMuxEvent) {
 	// Short circuit delivery if stale event
 	if s.created.After(event.Time) {
 		return

+ 24 - 6
event/event_test.go

@@ -149,16 +149,34 @@ func emptySubscriber(mux *TypeMux, types ...interface{}) {
 	}()
 }
 
-func BenchmarkPost3(b *testing.B) {
-	var mux = new(TypeMux)
-	defer mux.Stop()
-	emptySubscriber(mux, testEvent(0))
-	emptySubscriber(mux, testEvent(0))
-	emptySubscriber(mux, testEvent(0))
+func BenchmarkPost1000(b *testing.B) {
+	var (
+		mux              = new(TypeMux)
+		subscribed, done sync.WaitGroup
+		nsubs            = 1000
+	)
+	subscribed.Add(nsubs)
+	done.Add(nsubs)
+	for i := 0; i < nsubs; i++ {
+		go func() {
+			s := mux.Subscribe(testEvent(0))
+			subscribed.Done()
+			for range s.Chan() {
+			}
+			done.Done()
+		}()
+	}
+	subscribed.Wait()
 
+	// The actual benchmark.
+	b.ResetTimer()
 	for i := 0; i < b.N; i++ {
 		mux.Post(testEvent(0))
 	}
+
+	b.StopTimer()
+	mux.Stop()
+	done.Wait()
 }
 
 func BenchmarkPostConcurrent(b *testing.B) {

+ 73 - 0
event/example_feed_test.go

@@ -0,0 +1,73 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package event_test
+
+import (
+	"fmt"
+
+	"github.com/ethereum/go-ethereum/event"
+)
+
+func ExampleFeed_acknowledgedEvents() {
+	// This example shows how the return value of Send can be used for request/reply
+	// interaction between event consumers and producers.
+	var feed event.Feed
+	type ackedEvent struct {
+		i   int
+		ack chan<- struct{}
+	}
+
+	// Consumers wait for events on the feed and acknowledge processing.
+	done := make(chan struct{})
+	defer close(done)
+	for i := 0; i < 3; i++ {
+		ch := make(chan ackedEvent, 100)
+		sub := feed.Subscribe(ch)
+		go func() {
+			defer sub.Unsubscribe()
+			for {
+				select {
+				case ev := <-ch:
+					fmt.Println(ev.i) // "process" the event
+					ev.ack <- struct{}{}
+				case <-done:
+					return
+				}
+			}
+		}()
+	}
+
+	// The producer sends values of type ackedEvent with increasing values of i.
+	// It waits for all consumers to acknowledge before sending the next event.
+	for i := 0; i < 3; i++ {
+		acksignal := make(chan struct{})
+		n := feed.Send(ackedEvent{i, acksignal})
+		for ack := 0; ack < n; ack++ {
+			<-acksignal
+		}
+	}
+	// Output:
+	// 0
+	// 0
+	// 0
+	// 1
+	// 1
+	// 1
+	// 2
+	// 2
+	// 2
+}

+ 128 - 0
event/example_scope_test.go

@@ -0,0 +1,128 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package event_test
+
+import (
+	"fmt"
+	"sync"
+
+	"github.com/ethereum/go-ethereum/event"
+)
+
+// This example demonstrates how SubscriptionScope can be used to control the lifetime of
+// subscriptions.
+//
+// Our example program consists of two servers, each of which performs a calculation when
+// requested. The servers also allow subscribing to results of all computations.
+type divServer struct{ results event.Feed }
+type mulServer struct{ results event.Feed }
+
+func (s *divServer) do(a, b int) int {
+	r := a / b
+	s.results.Send(r)
+	return r
+}
+
+func (s *mulServer) do(a, b int) int {
+	r := a * b
+	s.results.Send(r)
+	return r
+}
+
+// The servers are contained in an App. The app controls the servers and exposes them
+// through its API.
+type App struct {
+	divServer
+	mulServer
+	scope event.SubscriptionScope
+}
+
+func (s *App) Calc(op byte, a, b int) int {
+	switch op {
+	case '/':
+		return s.divServer.do(a, b)
+	case '*':
+		return s.mulServer.do(a, b)
+	default:
+		panic("invalid op")
+	}
+}
+
+// The app's SubscribeResults method starts sending calculation results to the given
+// channel. Subscriptions created through this method are tied to the lifetime of the App
+// because they are registered in the scope.
+func (s *App) SubscribeResults(op byte, ch chan<- int) event.Subscription {
+	switch op {
+	case '/':
+		return s.scope.Track(s.divServer.results.Subscribe(ch))
+	case '*':
+		return s.scope.Track(s.mulServer.results.Subscribe(ch))
+	default:
+		panic("invalid op")
+	}
+}
+
+// Stop stops the App, closing all subscriptions created through SubscribeResults.
+func (s *App) Stop() {
+	s.scope.Close()
+}
+
+func ExampleSubscriptionScope() {
+	// Create the app.
+	var (
+		app  App
+		wg   sync.WaitGroup
+		divs = make(chan int)
+		muls = make(chan int)
+	)
+
+	// Run a subscriber in the background.
+	divsub := app.SubscribeResults('/', divs)
+	mulsub := app.SubscribeResults('*', muls)
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		defer fmt.Println("subscriber exited")
+		defer divsub.Unsubscribe()
+		defer mulsub.Unsubscribe()
+		for {
+			select {
+			case result := <-divs:
+				fmt.Println("division happened:", result)
+			case result := <-muls:
+				fmt.Println("multiplication happened:", result)
+			case <-divsub.Err():
+				return
+			case <-mulsub.Err():
+				return
+			}
+		}
+	}()
+
+	// Interact with the app.
+	app.Calc('/', 22, 11)
+	app.Calc('*', 3, 4)
+
+	// Stop the app. This shuts down the subscriptions, causing the subscriber to exit.
+	app.Stop()
+	wg.Wait()
+
+	// Output:
+	// division happened: 2
+	// multiplication happened: 12
+	// subscriber exited
+}

+ 56 - 0
event/example_subscription_test.go

@@ -0,0 +1,56 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package event_test
+
+import (
+	"fmt"
+
+	"github.com/ethereum/go-ethereum/event"
+)
+
+func ExampleNewSubscription() {
+	// Create a subscription that sends 10 integers on ch.
+	ch := make(chan int)
+	sub := event.NewSubscription(func(quit <-chan struct{}) error {
+		for i := 0; i < 10; i++ {
+			select {
+			case ch <- i:
+			case <-quit:
+				fmt.Println("unsubscribed")
+				return nil
+			}
+		}
+		return nil
+	})
+
+	// This is the consumer. It reads 5 integers, then aborts the subscription.
+	// Note that Unsubscribe waits until the producer has shut down.
+	for i := range ch {
+		fmt.Println(i)
+		if i == 4 {
+			sub.Unsubscribe()
+			break
+		}
+	}
+	// Output:
+	// 0
+	// 1
+	// 2
+	// 3
+	// 4
+	// unsubscribed
+}

+ 249 - 0
event/feed.go

@@ -0,0 +1,249 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package event
+
+import (
+	"errors"
+	"reflect"
+	"sync"
+)
+
+var errBadChannel = errors.New("event: Subscribe argument does not have sendable channel type")
+
+// Feed implements one-to-many subscriptions where the carrier of events is a channel.
+// Values sent to a Feed are delivered to all subscribed channels simultaneously.
+//
+// Feeds can only be used with a single type. The type is determined by the first Send or
+// Subscribe operation. Subsequent calls to these methods panic if the type does not
+// match.
+//
+// The zero value is ready to use.
+type Feed struct {
+	// sendLock has a one-element buffer and is empty when held.
+	// It protects sendCases.
+	sendLock  chan struct{}
+	removeSub chan interface{} // interrupts Send
+	sendCases caseList         // the active set of select cases used by Send
+
+	// The inbox holds newly subscribed channels until they are added to sendCases.
+	mu     sync.Mutex
+	inbox  caseList
+	etype  reflect.Type
+	closed bool
+}
+
+// This is the index of the first actual subscription channel in sendCases.
+// sendCases[0] is a SelectRecv case for the removeSub channel.
+const firstSubSendCase = 1
+
+type feedTypeError struct {
+	got, want reflect.Type
+	op        string
+}
+
+func (e feedTypeError) Error() string {
+	return "event: wrong type in " + e.op + " got " + e.got.String() + ", want " + e.want.String()
+}
+
+func (f *Feed) init() {
+	if f.sendLock != nil {
+		return
+	}
+	f.removeSub = make(chan interface{})
+	f.sendLock = make(chan struct{}, 1)
+	f.sendLock <- struct{}{}
+	f.sendCases = caseList{{Chan: reflect.ValueOf(f.removeSub), Dir: reflect.SelectRecv}}
+}
+
+// Subscribe adds a channel to the feed. Future sends will be delivered on the channel
+// until the subscription is canceled. All channels added must have the same element type.
+//
+// The channel should have ample buffer space to avoid blocking other subscribers.
+// Slow subscribers are not dropped.
+func (f *Feed) Subscribe(channel interface{}) Subscription {
+	chanval := reflect.ValueOf(channel)
+	chantyp := chanval.Type()
+	if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.SendDir == 0 {
+		panic(errBadChannel)
+	}
+	sub := &feedSub{feed: f, channel: chanval, err: make(chan error, 1)}
+
+	f.mu.Lock()
+	defer f.mu.Unlock()
+	f.init()
+	if !f.typecheck(chantyp.Elem()) {
+		panic(feedTypeError{op: "Subscribe", got: chantyp, want: reflect.ChanOf(reflect.SendDir, f.etype)})
+	}
+	// Add the select case to the inbox.
+	// The next Send will add it to f.sendCases.
+	cas := reflect.SelectCase{Dir: reflect.SelectSend, Chan: chanval}
+	f.inbox = append(f.inbox, cas)
+	return sub
+}
+
+// note: callers must hold f.mu
+func (f *Feed) typecheck(typ reflect.Type) bool {
+	if f.etype == nil {
+		f.etype = typ
+		return true
+	}
+	return f.etype == typ
+}
+
+func (f *Feed) remove(sub *feedSub) {
+	// Delete from inbox first, which covers channels
+	// that have not been added to f.sendCases yet.
+	ch := sub.channel.Interface()
+	f.mu.Lock()
+	index := f.inbox.find(ch)
+	if index != -1 {
+		f.inbox = f.inbox.delete(index)
+		f.mu.Unlock()
+		return
+	}
+	f.mu.Unlock()
+
+	select {
+	case f.removeSub <- ch:
+		// Send will remove the channel from f.sendCases.
+	case <-f.sendLock:
+		// No Send is in progress, delete the channel now that we have the send lock.
+		f.sendCases = f.sendCases.delete(f.sendCases.find(ch))
+		f.sendLock <- struct{}{}
+	}
+}
+
+// Send delivers to all subscribed channels simultaneously.
+// It returns the number of subscribers that the value was sent to.
+func (f *Feed) Send(value interface{}) (nsent int) {
+	f.mu.Lock()
+	f.init()
+	f.mu.Unlock()
+
+	<-f.sendLock
+
+	// Add new cases from the inbox after taking the send lock.
+	f.mu.Lock()
+	f.sendCases = append(f.sendCases, f.inbox...)
+	f.inbox = nil
+	f.mu.Unlock()
+
+	// Set the sent value on all channels.
+	rvalue := reflect.ValueOf(value)
+	if !f.typecheck(rvalue.Type()) {
+		f.sendLock <- struct{}{}
+		panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype})
+	}
+	for i := firstSubSendCase; i < len(f.sendCases); i++ {
+		f.sendCases[i].Send = rvalue
+	}
+
+	// Send until all channels except removeSub have been chosen.
+	cases := f.sendCases
+	for {
+		// Fast path: try sending without blocking before adding to the select set.
+		// This should usually succeed if subscribers are fast enough and have free
+		// buffer space.
+		for i := firstSubSendCase; i < len(cases); i++ {
+			if cases[i].Chan.TrySend(rvalue) {
+				nsent++
+				cases = cases.deactivate(i)
+				i--
+			}
+		}
+		if len(cases) == firstSubSendCase {
+			break
+		}
+		// Select on all the receivers, waiting for them to unblock.
+		chosen, recv, _ := reflect.Select(cases)
+		if chosen == 0 /* <-f.removeSub */ {
+			index := f.sendCases.find(recv.Interface())
+			f.sendCases = f.sendCases.delete(index)
+			if index >= 0 && index < len(cases) {
+				cases = f.sendCases[:len(cases)-1]
+			}
+		} else {
+			cases = cases.deactivate(chosen)
+			nsent++
+		}
+	}
+
+	// Forget about the sent value and hand off the send lock.
+	for i := firstSubSendCase; i < len(f.sendCases); i++ {
+		f.sendCases[i].Send = reflect.Value{}
+	}
+	f.sendLock <- struct{}{}
+	return nsent
+}
+
+type feedSub struct {
+	feed    *Feed
+	channel reflect.Value
+	errOnce sync.Once
+	err     chan error
+}
+
+func (sub *feedSub) Unsubscribe() {
+	sub.errOnce.Do(func() {
+		sub.feed.remove(sub)
+		close(sub.err)
+	})
+}
+
+func (sub *feedSub) Err() <-chan error {
+	return sub.err
+}
+
+type caseList []reflect.SelectCase
+
+// find returns the index of a case containing the given channel.
+func (cs caseList) find(channel interface{}) int {
+	for i, cas := range cs {
+		if cas.Chan.Interface() == channel {
+			return i
+		}
+	}
+	return -1
+}
+
+// delete removes the given case from cs.
+func (cs caseList) delete(index int) caseList {
+	return append(cs[:index], cs[index+1:]...)
+}
+
+// deactivate moves the case at index into the non-accessible portion of the cs slice.
+func (cs caseList) deactivate(index int) caseList {
+	last := len(cs) - 1
+	cs[index], cs[last] = cs[last], cs[index]
+	return cs[:last]
+}
+
+// func (cs caseList) String() string {
+//     s := "["
+//     for i, cas := range cs {
+//             if i != 0 {
+//                     s += ", "
+//             }
+//             switch cas.Dir {
+//             case reflect.SelectSend:
+//                     s += fmt.Sprintf("%v<-", cas.Chan.Interface())
+//             case reflect.SelectRecv:
+//                     s += fmt.Sprintf("<-%v", cas.Chan.Interface())
+//             }
+//     }
+//     return s + "]"
+// }

+ 294 - 0
event/feed_test.go

@@ -0,0 +1,294 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package event
+
+import (
+	"fmt"
+	"reflect"
+	"sync"
+	"testing"
+	"time"
+)
+
+func TestFeedPanics(t *testing.T) {
+	{
+		var f Feed
+		f.Send(int(2))
+		want := feedTypeError{op: "Send", got: reflect.TypeOf(uint64(0)), want: reflect.TypeOf(int(0))}
+		if err := checkPanic(want, func() { f.Send(uint64(2)) }); err != nil {
+			t.Error(err)
+		}
+	}
+	{
+		var f Feed
+		ch := make(chan int)
+		f.Subscribe(ch)
+		want := feedTypeError{op: "Send", got: reflect.TypeOf(uint64(0)), want: reflect.TypeOf(int(0))}
+		if err := checkPanic(want, func() { f.Send(uint64(2)) }); err != nil {
+			t.Error(err)
+		}
+	}
+	{
+		var f Feed
+		f.Send(int(2))
+		want := feedTypeError{op: "Subscribe", got: reflect.TypeOf(make(chan uint64)), want: reflect.TypeOf(make(chan<- int))}
+		if err := checkPanic(want, func() { f.Subscribe(make(chan uint64)) }); err != nil {
+			t.Error(err)
+		}
+	}
+	{
+		var f Feed
+		if err := checkPanic(errBadChannel, func() { f.Subscribe(make(<-chan int)) }); err != nil {
+			t.Error(err)
+		}
+	}
+	{
+		var f Feed
+		if err := checkPanic(errBadChannel, func() { f.Subscribe(int(0)) }); err != nil {
+			t.Error(err)
+		}
+	}
+}
+
+func checkPanic(want error, fn func()) (err error) {
+	defer func() {
+		panic := recover()
+		if panic == nil {
+			err = fmt.Errorf("didn't panic")
+		} else if !reflect.DeepEqual(panic, want) {
+			err = fmt.Errorf("panicked with wrong error: got %q, want %q", panic, want)
+		}
+	}()
+	fn()
+	return nil
+}
+
+func TestFeed(t *testing.T) {
+	var feed Feed
+	var done, subscribed sync.WaitGroup
+	subscriber := func(i int) {
+		defer done.Done()
+
+		subchan := make(chan int)
+		sub := feed.Subscribe(subchan)
+		timeout := time.NewTimer(2 * time.Second)
+		subscribed.Done()
+
+		select {
+		case v := <-subchan:
+			if v != 1 {
+				t.Errorf("%d: received value %d, want 1", i, v)
+			}
+		case <-timeout.C:
+			t.Errorf("%d: receive timeout", i)
+		}
+
+		sub.Unsubscribe()
+		select {
+		case _, ok := <-sub.Err():
+			if ok {
+				t.Errorf("%d: error channel not closed after unsubscribe", i)
+			}
+		case <-timeout.C:
+			t.Errorf("%d: unsubscribe timeout", i)
+		}
+	}
+
+	const n = 1000
+	done.Add(n)
+	subscribed.Add(n)
+	for i := 0; i < n; i++ {
+		go subscriber(i)
+	}
+	subscribed.Wait()
+	if nsent := feed.Send(1); nsent != n {
+		t.Errorf("first send delivered %d times, want %d", nsent, n)
+	}
+	if nsent := feed.Send(2); nsent != 0 {
+		t.Errorf("second send delivered %d times, want 0", nsent)
+	}
+	done.Wait()
+}
+
+func TestFeedSubscribeSameChannel(t *testing.T) {
+	var (
+		feed Feed
+		done sync.WaitGroup
+		ch   = make(chan int)
+		sub1 = feed.Subscribe(ch)
+		sub2 = feed.Subscribe(ch)
+		_    = feed.Subscribe(ch)
+	)
+	expectSends := func(value, n int) {
+		if nsent := feed.Send(value); nsent != n {
+			t.Errorf("send delivered %d times, want %d", nsent, n)
+		}
+		done.Done()
+	}
+	expectRecv := func(wantValue, n int) {
+		for i := 0; i < n; i++ {
+			if v := <-ch; v != wantValue {
+				t.Errorf("received %d, want %d", v, wantValue)
+			}
+		}
+	}
+
+	done.Add(1)
+	go expectSends(1, 3)
+	expectRecv(1, 3)
+	done.Wait()
+
+	sub1.Unsubscribe()
+
+	done.Add(1)
+	go expectSends(2, 2)
+	expectRecv(2, 2)
+	done.Wait()
+
+	sub2.Unsubscribe()
+
+	done.Add(1)
+	go expectSends(3, 1)
+	expectRecv(3, 1)
+	done.Wait()
+}
+
+func TestFeedSubscribeBlockedPost(t *testing.T) {
+	var (
+		feed   Feed
+		nsends = 2000
+		ch1    = make(chan int)
+		ch2    = make(chan int)
+		wg     sync.WaitGroup
+	)
+	defer wg.Wait()
+
+	feed.Subscribe(ch1)
+	wg.Add(nsends)
+	for i := 0; i < nsends; i++ {
+		go func() {
+			feed.Send(99)
+			wg.Done()
+		}()
+	}
+
+	sub2 := feed.Subscribe(ch2)
+	defer sub2.Unsubscribe()
+
+	// We're done when ch1 has received N times.
+	// The number of receives on ch2 depends on scheduling.
+	for i := 0; i < nsends; {
+		select {
+		case <-ch1:
+			i++
+		case <-ch2:
+		}
+	}
+}
+
+func TestFeedUnsubscribeBlockedPost(t *testing.T) {
+	var (
+		feed   Feed
+		nsends = 200
+		chans  = make([]chan int, 2000)
+		subs   = make([]Subscription, len(chans))
+		bchan  = make(chan int)
+		bsub   = feed.Subscribe(bchan)
+		wg     sync.WaitGroup
+	)
+	for i := range chans {
+		chans[i] = make(chan int, nsends)
+	}
+
+	// Queue up some Sends. None of these can make progress while bchan isn't read.
+	wg.Add(nsends)
+	for i := 0; i < nsends; i++ {
+		go func() {
+			feed.Send(99)
+			wg.Done()
+		}()
+	}
+	// Subscribe the other channels.
+	for i, ch := range chans {
+		subs[i] = feed.Subscribe(ch)
+	}
+	// Unsubscribe them again.
+	for _, sub := range subs {
+		sub.Unsubscribe()
+	}
+	// Unblock the Sends.
+	bsub.Unsubscribe()
+	wg.Wait()
+}
+
+func TestFeedUnsubscribeFromInbox(t *testing.T) {
+	var (
+		feed Feed
+		ch1  = make(chan int)
+		ch2  = make(chan int)
+		sub1 = feed.Subscribe(ch1)
+		sub2 = feed.Subscribe(ch1)
+		sub3 = feed.Subscribe(ch2)
+	)
+	if len(feed.inbox) != 3 {
+		t.Errorf("inbox length != 3 after subscribe")
+	}
+	if len(feed.sendCases) != 1 {
+		t.Errorf("sendCases is non-empty after unsubscribe")
+	}
+
+	sub1.Unsubscribe()
+	sub2.Unsubscribe()
+	sub3.Unsubscribe()
+	if len(feed.inbox) != 0 {
+		t.Errorf("inbox is non-empty after unsubscribe")
+	}
+	if len(feed.sendCases) != 1 {
+		t.Errorf("sendCases is non-empty after unsubscribe")
+	}
+}
+
+func BenchmarkFeedSend1000(b *testing.B) {
+	var (
+		done  sync.WaitGroup
+		feed  Feed
+		nsubs = 1000
+	)
+	subscriber := func(ch <-chan int) {
+		for i := 0; i < b.N; i++ {
+			<-ch
+		}
+		done.Done()
+	}
+	done.Add(nsubs)
+	for i := 0; i < nsubs; i++ {
+		ch := make(chan int, 200)
+		feed.Subscribe(ch)
+		go subscriber(ch)
+	}
+
+	// The actual benchmark.
+	b.ResetTimer()
+	for i := 0; i < b.N; i++ {
+		if feed.Send(i) != nsubs {
+			panic("wrong number of sends")
+		}
+	}
+
+	b.StopTimer()
+	done.Wait()
+}

+ 275 - 0
event/subscription.go

@@ -0,0 +1,275 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package event
+
+import (
+	"sync"
+	"time"
+
+	"github.com/ethereum/go-ethereum/common/mclock"
+	"golang.org/x/net/context"
+)
+
+// Subscription represents a stream of events. The carrier of the events is typically a
+// channel, but isn't part of the interface.
+//
+// Subscriptions can fail while established. Failures are reported through an error
+// channel. It receives a value if there is an issue with the subscription (e.g. the
+// network connection delivering the events has been closed). Only one value will ever be
+// sent.
+//
+// The error channel is closed when the subscription ends successfully (i.e. when the
+// source of events is closed). It is also closed when Unsubscribe is called.
+//
+// The Unsubscribe method cancels the sending of events. You must call Unsubscribe in all
+// cases to ensure that resources related to the subscription are released. It can be
+// called any number of times.
+type Subscription interface {
+	Err() <-chan error // returns the error channel
+	Unsubscribe()      // cancels sending of events, closing the error channel
+}
+
+// NewSubscription runs a producer function as a subscription in a new goroutine. The
+// channel given to the producer is closed when Unsubscribe is called. If fn returns an
+// error, it is sent on the subscription's error channel.
+func NewSubscription(producer func(<-chan struct{}) error) Subscription {
+	s := &funcSub{unsub: make(chan struct{}), err: make(chan error, 1)}
+	go func() {
+		defer close(s.err)
+		err := producer(s.unsub)
+		s.mu.Lock()
+		defer s.mu.Unlock()
+		if !s.unsubscribed {
+			if err != nil {
+				s.err <- err
+			}
+			s.unsubscribed = true
+		}
+	}()
+	return s
+}
+
+type funcSub struct {
+	unsub        chan struct{}
+	err          chan error
+	mu           sync.Mutex
+	unsubscribed bool
+}
+
+func (s *funcSub) Unsubscribe() {
+	s.mu.Lock()
+	if s.unsubscribed {
+		s.mu.Unlock()
+		return
+	}
+	s.unsubscribed = true
+	close(s.unsub)
+	s.mu.Unlock()
+	// Wait for producer shutdown.
+	<-s.err
+}
+
+func (s *funcSub) Err() <-chan error {
+	return s.err
+}
+
+// Resubscribe calls fn repeatedly to keep a subscription established. When the
+// subscription is established, Resubscribe waits for it to fail and calls fn again. This
+// process repeats until Unsubscribe is called or the active subscription ends
+// successfully.
+//
+// Resubscribe applies backoff between calls to fn. The time between calls is adapted
+// based on the error rate, but will never exceed backoffMax.
+func Resubscribe(backoffMax time.Duration, fn ResubscribeFunc) Subscription {
+	s := &resubscribeSub{
+		waitTime:   backoffMax / 10,
+		backoffMax: backoffMax,
+		fn:         fn,
+		err:        make(chan error),
+		unsub:      make(chan struct{}),
+	}
+	go s.loop()
+	return s
+}
+
+// A ResubscribeFunc attempts to establish a subscription.
+type ResubscribeFunc func(context.Context) (Subscription, error)
+
+type resubscribeSub struct {
+	fn                   ResubscribeFunc
+	err                  chan error
+	unsub                chan struct{}
+	unsubOnce            sync.Once
+	lastTry              mclock.AbsTime
+	waitTime, backoffMax time.Duration
+}
+
+func (s *resubscribeSub) Unsubscribe() {
+	s.unsubOnce.Do(func() {
+		s.unsub <- struct{}{}
+		<-s.err
+	})
+}
+
+func (s *resubscribeSub) Err() <-chan error {
+	return s.err
+}
+
+func (s *resubscribeSub) loop() {
+	defer close(s.err)
+	var done bool
+	for !done {
+		sub := s.subscribe()
+		if sub == nil {
+			break
+		}
+		done = s.waitForError(sub)
+		sub.Unsubscribe()
+	}
+}
+
+func (s *resubscribeSub) subscribe() Subscription {
+	subscribed := make(chan error)
+	var sub Subscription
+retry:
+	for {
+		s.lastTry = mclock.Now()
+		ctx, cancel := context.WithCancel(context.Background())
+		go func() {
+			rsub, err := s.fn(ctx)
+			sub = rsub
+			subscribed <- err
+		}()
+		select {
+		case err := <-subscribed:
+			cancel()
+			if err != nil {
+				// Subscribing failed, wait before launching the next try.
+				if s.backoffWait() {
+					return nil
+				}
+				continue retry
+			}
+			if sub == nil {
+				panic("event: ResubscribeFunc returned nil subscription and no error")
+			}
+			return sub
+		case <-s.unsub:
+			cancel()
+			return nil
+		}
+	}
+}
+
+func (s *resubscribeSub) waitForError(sub Subscription) bool {
+	defer sub.Unsubscribe()
+	select {
+	case err := <-sub.Err():
+		return err == nil
+	case <-s.unsub:
+		return true
+	}
+}
+
+func (s *resubscribeSub) backoffWait() bool {
+	if time.Duration(mclock.Now()-s.lastTry) > s.backoffMax {
+		s.waitTime = s.backoffMax / 10
+	} else {
+		s.waitTime *= 2
+		if s.waitTime > s.backoffMax {
+			s.waitTime = s.backoffMax
+		}
+	}
+
+	t := time.NewTimer(s.waitTime)
+	defer t.Stop()
+	select {
+	case <-t.C:
+		return false
+	case <-s.unsub:
+		return true
+	}
+}
+
+// SubscriptionScope provides a facility to unsubscribe multiple subscriptions at once.
+//
+// For code that handle more than one subscription, a scope can be used to conveniently
+// unsubscribe all of them with a single call. The example demonstrates a typical use in a
+// larger program.
+//
+// The zero value is ready to use.
+type SubscriptionScope struct {
+	mu     sync.Mutex
+	subs   map[*scopeSub]struct{}
+	closed bool
+}
+
+type scopeSub struct {
+	sc *SubscriptionScope
+	s  Subscription
+}
+
+// Track starts tracking a subscription. If the scope is closed, Track returns nil. The
+// returned subscription is a wrapper. Unsubscribing the wrapper removes it from the
+// scope.
+func (sc *SubscriptionScope) Track(s Subscription) Subscription {
+	sc.mu.Lock()
+	defer sc.mu.Unlock()
+	if sc.closed {
+		return nil
+	}
+	if sc.subs == nil {
+		sc.subs = make(map[*scopeSub]struct{})
+	}
+	ss := &scopeSub{sc, s}
+	sc.subs[ss] = struct{}{}
+	return ss
+}
+
+// Close calls Unsubscribe on all tracked subscriptions and prevents further additions to
+// the tracked set. Calls to Track after Close return nil.
+func (sc *SubscriptionScope) Close() {
+	sc.mu.Lock()
+	defer sc.mu.Unlock()
+	if sc.closed {
+		return
+	}
+	sc.closed = true
+	for s := range sc.subs {
+		s.s.Unsubscribe()
+	}
+	sc.subs = nil
+}
+
+// Count returns the number of tracked subscriptions.
+// It is meant to be used for debugging.
+func (sc *SubscriptionScope) Count() int {
+	sc.mu.Lock()
+	defer sc.mu.Unlock()
+	return len(sc.subs)
+}
+
+func (s *scopeSub) Unsubscribe() {
+	s.s.Unsubscribe()
+	s.sc.mu.Lock()
+	defer s.sc.mu.Unlock()
+	delete(s.sc.subs, s)
+}
+
+func (s *scopeSub) Err() <-chan error {
+	return s.s.Err()
+}

+ 121 - 0
event/subscription_test.go

@@ -0,0 +1,121 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package event
+
+import (
+	"errors"
+	"testing"
+	"time"
+
+	"golang.org/x/net/context"
+)
+
+var errInts = errors.New("error in subscribeInts")
+
+func subscribeInts(max, fail int, c chan<- int) Subscription {
+	return NewSubscription(func(quit <-chan struct{}) error {
+		for i := 0; i < max; i++ {
+			if i >= fail {
+				return errInts
+			}
+			select {
+			case c <- i:
+			case <-quit:
+				return nil
+			}
+		}
+		return nil
+	})
+}
+
+func TestNewSubscriptionError(t *testing.T) {
+	t.Parallel()
+
+	channel := make(chan int)
+	sub := subscribeInts(10, 2, channel)
+loop:
+	for want := 0; want < 10; want++ {
+		select {
+		case got := <-channel:
+			if got != want {
+				t.Fatalf("wrong int %d, want %d", got, want)
+			}
+		case err := <-sub.Err():
+			if err != errInts {
+				t.Fatalf("wrong error: got %q, want %q", err, errInts)
+			}
+			if want != 2 {
+				t.Fatalf("got errInts at int %d, should be received at 2", want)
+			}
+			break loop
+		}
+	}
+	sub.Unsubscribe()
+
+	err, ok := <-sub.Err()
+	if err != nil {
+		t.Fatal("got non-nil error after Unsubscribe")
+	}
+	if ok {
+		t.Fatal("channel still open after Unsubscribe")
+	}
+}
+
+func TestResubscribe(t *testing.T) {
+	t.Parallel()
+
+	var i int
+	nfails := 6
+	sub := Resubscribe(100*time.Millisecond, func(ctx context.Context) (Subscription, error) {
+		// fmt.Printf("call #%d @ %v\n", i, time.Now())
+		i++
+		if i == 2 {
+			// Delay the second failure a bit to reset the resubscribe interval.
+			time.Sleep(200 * time.Millisecond)
+		}
+		if i < nfails {
+			return nil, errors.New("oops")
+		}
+		sub := NewSubscription(func(unsubscribed <-chan struct{}) error { return nil })
+		return sub, nil
+	})
+
+	<-sub.Err()
+	if i != nfails {
+		t.Fatalf("resubscribe function called %d times, want %d times", i, nfails)
+	}
+}
+
+func TestResubscribeAbort(t *testing.T) {
+	t.Parallel()
+
+	done := make(chan error)
+	sub := Resubscribe(0, func(ctx context.Context) (Subscription, error) {
+		select {
+		case <-ctx.Done():
+			done <- nil
+		case <-time.After(2 * time.Second):
+			done <- errors.New("context given to resubscribe function not canceled within 2s")
+		}
+		return nil, nil
+	})
+
+	sub.Unsubscribe()
+	if err := <-done; err != nil {
+		t.Fatal(err)
+	}
+}

+ 1 - 1
light/txpool.go

@@ -47,7 +47,7 @@ type TxPool struct {
 	signer   types.Signer
 	quit     chan bool
 	eventMux *event.TypeMux
-	events   event.Subscription
+	events   *event.TypeMuxSubscription
 	mu       sync.RWMutex
 	chain    *LightChain
 	odr      OdrBackend

+ 1 - 1
miner/worker.go

@@ -90,7 +90,7 @@ type worker struct {
 
 	// update loop
 	mux    *event.TypeMux
-	events event.Subscription
+	events *event.TypeMuxSubscription
 	wg     sync.WaitGroup
 
 	agents map[Agent]struct{}

+ 4 - 1
rpc/client.go

@@ -682,7 +682,7 @@ func newClientSubscription(c *Client, channel reflect.Value) *ClientSubscription
 // resubscription when the client connection is closed unexpectedly.
 //
 // The error channel receives a value when the subscription has ended due
-// to an error. The received error is ErrClientQuit if Close has been called
+// to an error. The received error is nil if Close has been called
 // on the underlying client and no other error has occurred.
 //
 // The error channel is closed when Unsubscribe is called on the subscription.
@@ -707,6 +707,9 @@ func (sub *ClientSubscription) quitWithError(err error, unsubscribeServer bool)
 			sub.requestUnsubscribe()
 		}
 		if err != nil {
+			if err == ErrClientQuit {
+				err = nil // Adhere to subscription semantics.
+			}
 			sub.err <- err
 		}
 	})