Bläddra i källkod

event package

skyfffire 2 år sedan
förälder
incheckning
14c837c5f7
3 ändrade filer med 764 tillägg och 0 borttagningar
  1. 217 0
      event/event.go
  2. 248 0
      event/feed.go
  3. 299 0
      event/subscription.go

+ 217 - 0
event/event.go

@@ -0,0 +1,217 @@
+// Copyright 2014 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Package event deals with subscriptions to real-time events.
+package event
+
+import (
+	"errors"
+	"fmt"
+	"reflect"
+	"sync"
+	"time"
+)
+
+// TypeMuxEvent is a time-tagged notification pushed to subscribers.
+type TypeMuxEvent struct {
+	Time time.Time
+	Data interface{}
+}
+
+// A TypeMux dispatches events to registered receivers. Receivers can be
+// registered to handle events of certain type. Any operation
+// called after mux is stopped will return ErrMuxClosed.
+//
+// The zero value is ready to use.
+//
+// Deprecated: use Feed
+type TypeMux struct {
+	mutex   sync.RWMutex
+	subm    map[reflect.Type][]*TypeMuxSubscription
+	stopped bool
+}
+
+// ErrMuxClosed is returned when Posting on a closed TypeMux.
+var ErrMuxClosed = errors.New("event: mux closed")
+
+// Subscribe creates a subscription for events of the given types. The
+// subscription's channel is closed when it is unsubscribed
+// or the mux is closed.
+func (mux *TypeMux) Subscribe(types ...interface{}) *TypeMuxSubscription {
+	sub := newsub(mux)
+	mux.mutex.Lock()
+	defer mux.mutex.Unlock()
+	if mux.stopped {
+		// set the status to closed so that calling Unsubscribe after this
+		// call will short circuit.
+		sub.closed = true
+		close(sub.postC)
+	} else {
+		if mux.subm == nil {
+			mux.subm = make(map[reflect.Type][]*TypeMuxSubscription)
+		}
+		for _, t := range types {
+			rtyp := reflect.TypeOf(t)
+			oldsubs := mux.subm[rtyp]
+			if find(oldsubs, sub) != -1 {
+				panic(fmt.Sprintf("event: duplicate type %s in Subscribe", rtyp))
+			}
+			subs := make([]*TypeMuxSubscription, len(oldsubs)+1)
+			copy(subs, oldsubs)
+			subs[len(oldsubs)] = sub
+			mux.subm[rtyp] = subs
+		}
+	}
+	return sub
+}
+
+// Post sends an event to all receivers registered for the given type.
+// It returns ErrMuxClosed if the mux has been stopped.
+func (mux *TypeMux) Post(ev interface{}) error {
+	event := &TypeMuxEvent{
+		Time: time.Now(),
+		Data: ev,
+	}
+	rtyp := reflect.TypeOf(ev)
+	mux.mutex.RLock()
+	if mux.stopped {
+		mux.mutex.RUnlock()
+		return ErrMuxClosed
+	}
+	subs := mux.subm[rtyp]
+	mux.mutex.RUnlock()
+	for _, sub := range subs {
+		sub.deliver(event)
+	}
+	return nil
+}
+
+// Stop closes a mux. The mux can no longer be used.
+// Future Post calls will fail with ErrMuxClosed.
+// Stop blocks until all current deliveries have finished.
+func (mux *TypeMux) Stop() {
+	mux.mutex.Lock()
+	defer mux.mutex.Unlock()
+	for _, subs := range mux.subm {
+		for _, sub := range subs {
+			sub.closewait()
+		}
+	}
+	mux.subm = nil
+	mux.stopped = true
+}
+
+func (mux *TypeMux) del(s *TypeMuxSubscription) {
+	mux.mutex.Lock()
+	defer mux.mutex.Unlock()
+	for typ, subs := range mux.subm {
+		if pos := find(subs, s); pos >= 0 {
+			if len(subs) == 1 {
+				delete(mux.subm, typ)
+			} else {
+				mux.subm[typ] = posdelete(subs, pos)
+			}
+		}
+	}
+}
+
+func find(slice []*TypeMuxSubscription, item *TypeMuxSubscription) int {
+	for i, v := range slice {
+		if v == item {
+			return i
+		}
+	}
+	return -1
+}
+
+func posdelete(slice []*TypeMuxSubscription, pos int) []*TypeMuxSubscription {
+	news := make([]*TypeMuxSubscription, len(slice)-1)
+	copy(news[:pos], slice[:pos])
+	copy(news[pos:], slice[pos+1:])
+	return news
+}
+
+// TypeMuxSubscription is a subscription established through TypeMux.
+type TypeMuxSubscription struct {
+	mux     *TypeMux
+	created time.Time
+	closeMu sync.Mutex
+	closing chan struct{}
+	closed  bool
+
+	// these two are the same channel. they are stored separately so
+	// postC can be set to nil without affecting the return value of
+	// Chan.
+	postMu sync.RWMutex
+	readC  <-chan *TypeMuxEvent
+	postC  chan<- *TypeMuxEvent
+}
+
+func newsub(mux *TypeMux) *TypeMuxSubscription {
+	c := make(chan *TypeMuxEvent)
+	return &TypeMuxSubscription{
+		mux:     mux,
+		created: time.Now(),
+		readC:   c,
+		postC:   c,
+		closing: make(chan struct{}),
+	}
+}
+
+func (s *TypeMuxSubscription) Chan() <-chan *TypeMuxEvent {
+	return s.readC
+}
+
+func (s *TypeMuxSubscription) Unsubscribe() {
+	s.mux.del(s)
+	s.closewait()
+}
+
+func (s *TypeMuxSubscription) Closed() bool {
+	s.closeMu.Lock()
+	defer s.closeMu.Unlock()
+	return s.closed
+}
+
+func (s *TypeMuxSubscription) closewait() {
+	s.closeMu.Lock()
+	defer s.closeMu.Unlock()
+	if s.closed {
+		return
+	}
+	close(s.closing)
+	s.closed = true
+
+	s.postMu.Lock()
+	defer s.postMu.Unlock()
+	close(s.postC)
+	s.postC = nil
+}
+
+func (s *TypeMuxSubscription) deliver(event *TypeMuxEvent) {
+	// Short circuit delivery if stale event
+	if s.created.After(event.Time) {
+		return
+	}
+	// Otherwise deliver the event
+	s.postMu.RLock()
+	defer s.postMu.RUnlock()
+
+	select {
+	case s.postC <- event:
+	case <-s.closing:
+	}
+}

+ 248 - 0
event/feed.go

@@ -0,0 +1,248 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package event
+
+import (
+	"errors"
+	"reflect"
+	"sync"
+)
+
+var errBadChannel = errors.New("event: Subscribe argument does not have sendable channel type")
+
+// Feed implements one-to-many subscriptions where the carrier of events is a channel.
+// Values sent to a Feed are delivered to all subscribed channels simultaneously.
+//
+// Feeds can only be used with a single type. The type is determined by the first Send or
+// Subscribe operation. Subsequent calls to these methods panic if the type does not
+// match.
+//
+// The zero value is ready to use.
+type Feed struct {
+	once      sync.Once        // ensures that init only runs once
+	sendLock  chan struct{}    // sendLock has a one-element buffer and is empty when held.It protects sendCases.
+	removeSub chan interface{} // interrupts Send
+	sendCases caseList         // the active set of select cases used by Send
+
+	// The inbox holds newly subscribed channels until they are added to sendCases.
+	mu    sync.Mutex
+	inbox caseList
+	etype reflect.Type
+}
+
+// This is the index of the first actual subscription channel in sendCases.
+// sendCases[0] is a SelectRecv case for the removeSub channel.
+const firstSubSendCase = 1
+
+type feedTypeError struct {
+	got, want reflect.Type
+	op        string
+}
+
+func (e feedTypeError) Error() string {
+	return "event: wrong type in " + e.op + " got " + e.got.String() + ", want " + e.want.String()
+}
+
+func (f *Feed) init() {
+	f.removeSub = make(chan interface{})
+	f.sendLock = make(chan struct{}, 1)
+	f.sendLock <- struct{}{}
+	f.sendCases = caseList{{Chan: reflect.ValueOf(f.removeSub), Dir: reflect.SelectRecv}}
+}
+
+// Subscribe adds a channel to the feed. Future sends will be delivered on the channel
+// until the subscription is canceled. All channels added must have the same element type.
+//
+// The channel should have ample buffer space to avoid blocking other subscribers.
+// Slow subscribers are not dropped.
+func (f *Feed) Subscribe(channel interface{}) Subscription {
+	f.once.Do(f.init)
+
+	chanval := reflect.ValueOf(channel)
+	chantyp := chanval.Type()
+	if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.SendDir == 0 {
+		panic(errBadChannel)
+	}
+	sub := &feedSub{feed: f, channel: chanval, err: make(chan error, 1)}
+
+	f.mu.Lock()
+	defer f.mu.Unlock()
+	if !f.typecheck(chantyp.Elem()) {
+		panic(feedTypeError{op: "Subscribe", got: chantyp, want: reflect.ChanOf(reflect.SendDir, f.etype)})
+	}
+	// Add the select case to the inbox.
+	// The next Send will add it to f.sendCases.
+	cas := reflect.SelectCase{Dir: reflect.SelectSend, Chan: chanval}
+	f.inbox = append(f.inbox, cas)
+	return sub
+}
+
+// note: callers must hold f.mu
+func (f *Feed) typecheck(typ reflect.Type) bool {
+	if f.etype == nil {
+		f.etype = typ
+		return true
+	}
+	return f.etype == typ
+}
+
+func (f *Feed) remove(sub *feedSub) {
+	// Delete from inbox first, which covers channels
+	// that have not been added to f.sendCases yet.
+	ch := sub.channel.Interface()
+	f.mu.Lock()
+	index := f.inbox.find(ch)
+	if index != -1 {
+		f.inbox = f.inbox.delete(index)
+		f.mu.Unlock()
+		return
+	}
+	f.mu.Unlock()
+
+	select {
+	case f.removeSub <- ch:
+		// Send will remove the channel from f.sendCases.
+	case <-f.sendLock:
+		// No Send is in progress, delete the channel now that we have the send lock.
+		f.sendCases = f.sendCases.delete(f.sendCases.find(ch))
+		f.sendLock <- struct{}{}
+	}
+}
+
+// Send delivers to all subscribed channels simultaneously.
+// It returns the number of subscribers that the value was sent to.
+func (f *Feed) Send(value interface{}) (nsent int) {
+	rvalue := reflect.ValueOf(value)
+
+	f.once.Do(f.init)
+	<-f.sendLock
+
+	// Add new cases from the inbox after taking the send lock.
+	f.mu.Lock()
+	f.sendCases = append(f.sendCases, f.inbox...)
+	f.inbox = nil
+
+	if !f.typecheck(rvalue.Type()) {
+		f.sendLock <- struct{}{}
+		f.mu.Unlock()
+		panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype})
+	}
+	f.mu.Unlock()
+
+	// Set the sent value on all channels.
+	for i := firstSubSendCase; i < len(f.sendCases); i++ {
+		f.sendCases[i].Send = rvalue
+	}
+
+	// Send until all channels except removeSub have been chosen. 'cases' tracks a prefix
+	// of sendCases. When a send succeeds, the corresponding case moves to the end of
+	// 'cases' and it shrinks by one element.
+	cases := f.sendCases
+	for {
+		// Fast path: try sending without blocking before adding to the select set.
+		// This should usually succeed if subscribers are fast enough and have free
+		// buffer space.
+		for i := firstSubSendCase; i < len(cases); i++ {
+			if cases[i].Chan.TrySend(rvalue) {
+				nsent++
+				cases = cases.deactivate(i)
+				i--
+			}
+		}
+		if len(cases) == firstSubSendCase {
+			break
+		}
+		// Select on all the receivers, waiting for them to unblock.
+		chosen, recv, _ := reflect.Select(cases)
+		if chosen == 0 /* <-f.removeSub */ {
+			index := f.sendCases.find(recv.Interface())
+			f.sendCases = f.sendCases.delete(index)
+			if index >= 0 && index < len(cases) {
+				// Shrink 'cases' too because the removed case was still active.
+				cases = f.sendCases[:len(cases)-1]
+			}
+		} else {
+			cases = cases.deactivate(chosen)
+			nsent++
+		}
+	}
+
+	// Forget about the sent value and hand off the send lock.
+	for i := firstSubSendCase; i < len(f.sendCases); i++ {
+		f.sendCases[i].Send = reflect.Value{}
+	}
+	f.sendLock <- struct{}{}
+	return nsent
+}
+
+type feedSub struct {
+	feed    *Feed
+	channel reflect.Value
+	errOnce sync.Once
+	err     chan error
+}
+
+func (sub *feedSub) Unsubscribe() {
+	sub.errOnce.Do(func() {
+		sub.feed.remove(sub)
+		close(sub.err)
+	})
+}
+
+func (sub *feedSub) Err() <-chan error {
+	return sub.err
+}
+
+type caseList []reflect.SelectCase
+
+// find returns the index of a case containing the given channel.
+func (cs caseList) find(channel interface{}) int {
+	for i, cas := range cs {
+		if cas.Chan.Interface() == channel {
+			return i
+		}
+	}
+	return -1
+}
+
+// delete removes the given case from cs.
+func (cs caseList) delete(index int) caseList {
+	return append(cs[:index], cs[index+1:]...)
+}
+
+// deactivate moves the case at index into the non-accessible portion of the cs slice.
+func (cs caseList) deactivate(index int) caseList {
+	last := len(cs) - 1
+	cs[index], cs[last] = cs[last], cs[index]
+	return cs[:last]
+}
+
+// func (cs caseList) String() string {
+//     s := "["
+//     for i, cas := range cs {
+//             if i != 0 {
+//                     s += ", "
+//             }
+//             switch cas.Dir {
+//             case reflect.SelectSend:
+//                     s += fmt.Sprintf("%v<-", cas.Chan.Interface())
+//             case reflect.SelectRecv:
+//                     s += fmt.Sprintf("<-%v", cas.Chan.Interface())
+//             }
+//     }
+//     return s + "]"
+// }

+ 299 - 0
event/subscription.go

@@ -0,0 +1,299 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package event
+
+import (
+	"context"
+	"sync"
+	"time"
+
+	"github.com/ethereum/go-ethereum/common/gopool"
+	"github.com/ethereum/go-ethereum/common/mclock"
+)
+
+// Subscription represents a stream of events. The carrier of the events is typically a
+// channel, but isn't part of the interface.
+//
+// Subscriptions can fail while established. Failures are reported through an error
+// channel. It receives a value if there is an issue with the subscription (e.g. the
+// network connection delivering the events has been closed). Only one value will ever be
+// sent.
+//
+// The error channel is closed when the subscription ends successfully (i.e. when the
+// source of events is closed). It is also closed when Unsubscribe is called.
+//
+// The Unsubscribe method cancels the sending of events. You must call Unsubscribe in all
+// cases to ensure that resources related to the subscription are released. It can be
+// called any number of times.
+type Subscription interface {
+	Err() <-chan error // returns the error channel
+	Unsubscribe()      // cancels sending of events, closing the error channel
+}
+
+// NewSubscription runs a producer function as a subscription in a new goroutine. The
+// channel given to the producer is closed when Unsubscribe is called. If fn returns an
+// error, it is sent on the subscription's error channel.
+func NewSubscription(producer func(<-chan struct{}) error) Subscription {
+	s := &funcSub{unsub: make(chan struct{}), err: make(chan error, 1)}
+	gopool.Submit(func() {
+		defer close(s.err)
+		err := producer(s.unsub)
+		s.mu.Lock()
+		defer s.mu.Unlock()
+		if !s.unsubscribed {
+			if err != nil {
+				s.err <- err
+			}
+			s.unsubscribed = true
+		}
+	})
+	return s
+}
+
+type funcSub struct {
+	unsub        chan struct{}
+	err          chan error
+	mu           sync.Mutex
+	unsubscribed bool
+}
+
+func (s *funcSub) Unsubscribe() {
+	s.mu.Lock()
+	if s.unsubscribed {
+		s.mu.Unlock()
+		return
+	}
+	s.unsubscribed = true
+	close(s.unsub)
+	s.mu.Unlock()
+	// Wait for producer shutdown.
+	<-s.err
+}
+
+func (s *funcSub) Err() <-chan error {
+	return s.err
+}
+
+// Resubscribe calls fn repeatedly to keep a subscription established. When the
+// subscription is established, Resubscribe waits for it to fail and calls fn again. This
+// process repeats until Unsubscribe is called or the active subscription ends
+// successfully.
+//
+// Resubscribe applies backoff between calls to fn. The time between calls is adapted
+// based on the error rate, but will never exceed backoffMax.
+func Resubscribe(backoffMax time.Duration, fn ResubscribeFunc) Subscription {
+	return ResubscribeErr(backoffMax, func(ctx context.Context, _ error) (Subscription, error) {
+		return fn(ctx)
+	})
+}
+
+// A ResubscribeFunc attempts to establish a subscription.
+type ResubscribeFunc func(context.Context) (Subscription, error)
+
+// ResubscribeErr calls fn repeatedly to keep a subscription established. When the
+// subscription is established, ResubscribeErr waits for it to fail and calls fn again. This
+// process repeats until Unsubscribe is called or the active subscription ends
+// successfully.
+//
+// The difference between Resubscribe and ResubscribeErr is that with ResubscribeErr,
+// the error of the failing subscription is available to the callback for logging
+// purposes.
+//
+// ResubscribeErr applies backoff between calls to fn. The time between calls is adapted
+// based on the error rate, but will never exceed backoffMax.
+func ResubscribeErr(backoffMax time.Duration, fn ResubscribeErrFunc) Subscription {
+	s := &resubscribeSub{
+		waitTime:   backoffMax / 10,
+		backoffMax: backoffMax,
+		fn:         fn,
+		err:        make(chan error),
+		unsub:      make(chan struct{}),
+	}
+	go s.loop()
+	return s
+}
+
+// A ResubscribeErrFunc attempts to establish a subscription.
+// For every call but the first, the second argument to this function is
+// the error that occurred with the previous subscription.
+type ResubscribeErrFunc func(context.Context, error) (Subscription, error)
+
+type resubscribeSub struct {
+	fn                   ResubscribeErrFunc
+	err                  chan error
+	unsub                chan struct{}
+	unsubOnce            sync.Once
+	lastTry              mclock.AbsTime
+	lastSubErr           error
+	waitTime, backoffMax time.Duration
+}
+
+func (s *resubscribeSub) Unsubscribe() {
+	s.unsubOnce.Do(func() {
+		s.unsub <- struct{}{}
+		<-s.err
+	})
+}
+
+func (s *resubscribeSub) Err() <-chan error {
+	return s.err
+}
+
+func (s *resubscribeSub) loop() {
+	defer close(s.err)
+	var done bool
+	for !done {
+		sub := s.subscribe()
+		if sub == nil {
+			break
+		}
+		done = s.waitForError(sub)
+		sub.Unsubscribe()
+	}
+}
+
+func (s *resubscribeSub) subscribe() Subscription {
+	subscribed := make(chan error)
+	var sub Subscription
+	for {
+		s.lastTry = mclock.Now()
+		ctx, cancel := context.WithCancel(context.Background())
+		gopool.Submit(func() {
+			rsub, err := s.fn(ctx, s.lastSubErr)
+			sub = rsub
+			subscribed <- err
+		})
+		select {
+		case err := <-subscribed:
+			cancel()
+			if err == nil {
+				if sub == nil {
+					panic("event: ResubscribeFunc returned nil subscription and no error")
+				}
+				return sub
+			}
+			// Subscribing failed, wait before launching the next try.
+			if s.backoffWait() {
+				return nil // unsubscribed during wait
+			}
+		case <-s.unsub:
+			cancel()
+			<-subscribed // avoid leaking the s.fn goroutine.
+			return nil
+		}
+	}
+}
+
+func (s *resubscribeSub) waitForError(sub Subscription) bool {
+	defer sub.Unsubscribe()
+	select {
+	case err := <-sub.Err():
+		s.lastSubErr = err
+		return err == nil
+	case <-s.unsub:
+		return true
+	}
+}
+
+func (s *resubscribeSub) backoffWait() bool {
+	if time.Duration(mclock.Now()-s.lastTry) > s.backoffMax {
+		s.waitTime = s.backoffMax / 10
+	} else {
+		s.waitTime *= 2
+		if s.waitTime > s.backoffMax {
+			s.waitTime = s.backoffMax
+		}
+	}
+
+	t := time.NewTimer(s.waitTime)
+	defer t.Stop()
+	select {
+	case <-t.C:
+		return false
+	case <-s.unsub:
+		return true
+	}
+}
+
+// SubscriptionScope provides a facility to unsubscribe multiple subscriptions at once.
+//
+// For code that handle more than one subscription, a scope can be used to conveniently
+// unsubscribe all of them with a single call. The example demonstrates a typical use in a
+// larger program.
+//
+// The zero value is ready to use.
+type SubscriptionScope struct {
+	mu     sync.Mutex
+	subs   map[*scopeSub]struct{}
+	closed bool
+}
+
+type scopeSub struct {
+	sc *SubscriptionScope
+	s  Subscription
+}
+
+// Track starts tracking a subscription. If the scope is closed, Track returns nil. The
+// returned subscription is a wrapper. Unsubscribing the wrapper removes it from the
+// scope.
+func (sc *SubscriptionScope) Track(s Subscription) Subscription {
+	sc.mu.Lock()
+	defer sc.mu.Unlock()
+	if sc.closed {
+		return nil
+	}
+	if sc.subs == nil {
+		sc.subs = make(map[*scopeSub]struct{})
+	}
+	ss := &scopeSub{sc, s}
+	sc.subs[ss] = struct{}{}
+	return ss
+}
+
+// Close calls Unsubscribe on all tracked subscriptions and prevents further additions to
+// the tracked set. Calls to Track after Close return nil.
+func (sc *SubscriptionScope) Close() {
+	sc.mu.Lock()
+	defer sc.mu.Unlock()
+	if sc.closed {
+		return
+	}
+	sc.closed = true
+	for s := range sc.subs {
+		s.s.Unsubscribe()
+	}
+	sc.subs = nil
+}
+
+// Count returns the number of tracked subscriptions.
+// It is meant to be used for debugging.
+func (sc *SubscriptionScope) Count() int {
+	sc.mu.Lock()
+	defer sc.mu.Unlock()
+	return len(sc.subs)
+}
+
+func (s *scopeSub) Unsubscribe() {
+	s.s.Unsubscribe()
+	s.sc.mu.Lock()
+	defer s.sc.mu.Unlock()
+	delete(s.sc.subs, s)
+}
+
+func (s *scopeSub) Err() <-chan error {
+	return s.s.Err()
+}