| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297 |
- // Copyright 2016 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- package rpc
- import (
- "errors"
- "sync"
- "time"
- "github.com/ethereum/go-ethereum/logger"
- "github.com/ethereum/go-ethereum/logger/glog"
- "golang.org/x/net/context"
- )
- var (
- // ErrNotificationsUnsupported is returned when the connection doesn't support notifications
- ErrNotificationsUnsupported = errors.New("notifications not supported")
- // ErrNotificationNotFound is returned when the notification for the given id is not found
- ErrNotificationNotFound = errors.New("notification not found")
- // errNotifierStopped is returned when the notifier is stopped (e.g. codec is closed)
- errNotifierStopped = errors.New("unable to send notification")
- // errNotificationQueueFull is returns when there are too many notifications in the queue
- errNotificationQueueFull = errors.New("too many pending notifications")
- )
- // unsubSignal is a signal that the subscription is unsubscribed. It is used to flush buffered
- // notifications that might be pending in the internal queue.
- var unsubSignal = new(struct{})
- // UnsubscribeCallback defines a callback that is called when a subcription ends.
- // It receives the subscription id as argument.
- type UnsubscribeCallback func(id string)
- // notification is a helper object that holds event data for a subscription
- type notification struct {
- sub *bufferedSubscription // subscription id
- data interface{} // event data
- }
- // A Notifier type describes the interface for objects that can send create subscriptions
- type Notifier interface {
- // Create a new subscription. The given callback is called when this subscription
- // is cancelled (e.g. client send an unsubscribe, connection closed).
- NewSubscription(UnsubscribeCallback) (Subscription, error)
- // Cancel subscription
- Unsubscribe(id string) error
- }
- type notifierKey struct{}
- // NotifierFromContext returns the Notifier value stored in ctx, if any.
- func NotifierFromContext(ctx context.Context) (Notifier, bool) {
- n, ok := ctx.Value(notifierKey{}).(Notifier)
- return n, ok
- }
- // Subscription defines the interface for objects that can notify subscribers
- type Subscription interface {
- // Inform client of an event
- Notify(data interface{}) error
- // Unique identifier
- ID() string
- // Cancel subscription
- Cancel() error
- }
- // bufferedSubscription is a subscription that uses a bufferedNotifier to send
- // notifications to subscribers.
- type bufferedSubscription struct {
- id string
- unsubOnce sync.Once // call unsub method once
- unsub UnsubscribeCallback // called on Unsubscribed
- notifier *bufferedNotifier // forward notifications to
- pending chan interface{} // closed when active
- flushed chan interface{} // closed when all buffered notifications are send
- lastNotification time.Time // last time a notification was send
- }
- // ID returns the subscription identifier that the client uses to refer to this instance.
- func (s *bufferedSubscription) ID() string {
- return s.id
- }
- // Cancel informs the notifier that this subscription is cancelled by the API
- func (s *bufferedSubscription) Cancel() error {
- return s.notifier.Unsubscribe(s.id)
- }
- // Notify the subscriber of a particular event.
- func (s *bufferedSubscription) Notify(data interface{}) error {
- return s.notifier.send(s.id, data)
- }
- // bufferedNotifier is a notifier that queues notifications in an internal queue and
- // send them as fast as possible to the client from this queue. It will stop if the
- // queue grows past a given size.
- type bufferedNotifier struct {
- codec ServerCodec // underlying connection
- mu sync.Mutex // guard internal state
- subscriptions map[string]*bufferedSubscription // keep track of subscriptions associated with codec
- queueSize int // max number of items in queue
- queue chan *notification // notification queue
- stopped bool // indication if this notifier is ordered to stop
- }
- // newBufferedNotifier returns a notifier that queues notifications in an internal queue
- // from which notifications are send as fast as possible to the client. If the queue size
- // limit is reached (client is unable to keep up) it will stop and closes the codec.
- func newBufferedNotifier(codec ServerCodec, size int) *bufferedNotifier {
- notifier := &bufferedNotifier{
- codec: codec,
- subscriptions: make(map[string]*bufferedSubscription),
- queue: make(chan *notification, size),
- queueSize: size,
- }
- go notifier.run()
- return notifier
- }
- // NewSubscription creates a new subscription that forwards events to this instance internal
- // queue. The given callback is called when the subscription is unsubscribed/cancelled.
- func (n *bufferedNotifier) NewSubscription(callback UnsubscribeCallback) (Subscription, error) {
- id, err := newSubscriptionID()
- if err != nil {
- return nil, err
- }
- n.mu.Lock()
- defer n.mu.Unlock()
- if n.stopped {
- return nil, errNotifierStopped
- }
- sub := &bufferedSubscription{
- id: id,
- unsub: callback,
- notifier: n,
- pending: make(chan interface{}),
- flushed: make(chan interface{}),
- lastNotification: time.Now(),
- }
- n.subscriptions[id] = sub
- return sub, nil
- }
- // Remove the given subscription. If subscription is not found notificationNotFoundErr is returned.
- func (n *bufferedNotifier) Unsubscribe(subid string) error {
- n.mu.Lock()
- sub, found := n.subscriptions[subid]
- n.mu.Unlock()
- if found {
- // send the unsubscribe signal, this will cause the notifier not to accept new events
- // for this subscription and will close the flushed channel after the last (buffered)
- // notification was send to the client.
- if err := n.send(subid, unsubSignal); err != nil {
- return err
- }
- // wait for confirmation that all (buffered) events are send for this subscription.
- // this ensures that the unsubscribe method response is not send before all buffered
- // events for this subscription are send.
- <-sub.flushed
- return nil
- }
- return ErrNotificationNotFound
- }
- // Send enques the given data for the subscription with public ID on the internal queue. t returns
- // an error when the notifier is stopped or the queue is full. If data is the unsubscribe signal it
- // will remove the subscription with the given id from the subscription collection.
- func (n *bufferedNotifier) send(id string, data interface{}) error {
- n.mu.Lock()
- defer n.mu.Unlock()
- if n.stopped {
- return errNotifierStopped
- }
- var (
- subscription *bufferedSubscription
- found bool
- )
- // check if subscription is associated with this connection, it might be cancelled
- // (subscribe/connection closed)
- if subscription, found = n.subscriptions[id]; !found {
- glog.V(logger.Error).Infof("received notification for unknown subscription %s\n", id)
- return ErrNotificationNotFound
- }
- // received the unsubscribe signal. Add it to the queue to make sure any pending notifications
- // for this subscription are send. When the run loop receives this singal it will signal that
- // all pending subscriptions are flushed and that the confirmation of the unsubscribe can be
- // send to the user. Remove the subscriptions to make sure new notifications are not accepted.
- if data == unsubSignal {
- delete(n.subscriptions, id)
- if subscription.unsub != nil {
- subscription.unsubOnce.Do(func() { subscription.unsub(id) })
- }
- }
- subscription.lastNotification = time.Now()
- if len(n.queue) >= n.queueSize {
- glog.V(logger.Warn).Infoln("too many buffered notifications -> close connection")
- n.codec.Close()
- return errNotificationQueueFull
- }
- n.queue <- ¬ification{subscription, data}
- return nil
- }
- // run reads notifications from the internal queue and sends them to the client. In case of an
- // error, or when the codec is closed it will cancel all active subscriptions and returns.
- func (n *bufferedNotifier) run() {
- defer func() {
- n.mu.Lock()
- defer n.mu.Unlock()
- n.stopped = true
- close(n.queue)
- // on exit call unsubscribe callback
- for id, sub := range n.subscriptions {
- if sub.unsub != nil {
- sub.unsubOnce.Do(func() { sub.unsub(id) })
- }
- close(sub.flushed)
- delete(n.subscriptions, id)
- }
- }()
- for {
- select {
- case notification := <-n.queue:
- // It can happen that an event is raised before the RPC server was able to send the sub
- // id to the client. Therefore subscriptions are marked as pending until the sub id was
- // send. The RPC server will activate the subscription by closing the pending chan.
- <-notification.sub.pending
- if notification.data == unsubSignal {
- // unsubSignal is the last accepted message for this subscription. Raise the signal
- // that all buffered notifications are sent by closing the flushed channel. This
- // indicates that the response for the unsubscribe can be send to the client.
- close(notification.sub.flushed)
- } else {
- msg := n.codec.CreateNotification(notification.sub.id, notification.data)
- if err := n.codec.Write(msg); err != nil {
- n.codec.Close()
- // unable to send notification to client, unsubscribe all subscriptions
- glog.V(logger.Warn).Infof("unable to send notification - %v\n", err)
- return
- }
- }
- case <-n.codec.Closed(): // connection was closed
- glog.V(logger.Debug).Infoln("codec closed, stop subscriptions")
- return
- }
- }
- }
- // Marks the subscription as active. This will causes the notifications for this subscription to be
- // forwarded to the client.
- func (n *bufferedNotifier) activate(subid string) {
- n.mu.Lock()
- defer n.mu.Unlock()
- if sub, found := n.subscriptions[subid]; found {
- close(sub.pending)
- }
- }
|