| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375 |
- // Copyright 2016 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- package rpc
- import (
- "container/list"
- "context"
- crand "crypto/rand"
- "encoding/binary"
- "encoding/hex"
- "encoding/json"
- "errors"
- "math/rand"
- "reflect"
- "strings"
- "sync"
- "time"
- )
- var (
- // ErrNotificationsUnsupported is returned when the connection doesn't support notifications
- ErrNotificationsUnsupported = errors.New("notifications not supported")
- // ErrNotificationNotFound is returned when the notification for the given id is not found
- ErrSubscriptionNotFound = errors.New("subscription not found")
- )
- var globalGen = randomIDGenerator()
- // ID defines a pseudo random number that is used to identify RPC subscriptions.
- type ID string
- // NewID returns a new, random ID.
- func NewID() ID {
- return globalGen()
- }
- // randomIDGenerator returns a function generates a random IDs.
- func randomIDGenerator() func() ID {
- var buf = make([]byte, 8)
- var seed int64
- if _, err := crand.Read(buf); err == nil {
- seed = int64(binary.BigEndian.Uint64(buf))
- } else {
- seed = int64(time.Now().Nanosecond())
- }
- var (
- mu sync.Mutex
- rng = rand.New(rand.NewSource(seed))
- )
- return func() ID {
- mu.Lock()
- defer mu.Unlock()
- id := make([]byte, 16)
- rng.Read(id)
- return encodeID(id)
- }
- }
- func encodeID(b []byte) ID {
- id := hex.EncodeToString(b)
- id = strings.TrimLeft(id, "0")
- if id == "" {
- id = "0" // ID's are RPC quantities, no leading zero's and 0 is 0x0.
- }
- return ID("0x" + id)
- }
- type notifierKey struct{}
- // NotifierFromContext returns the Notifier value stored in ctx, if any.
- func NotifierFromContext(ctx context.Context) (*Notifier, bool) {
- n, ok := ctx.Value(notifierKey{}).(*Notifier)
- return n, ok
- }
- // Notifier is tied to a RPC connection that supports subscriptions.
- // Server callbacks use the notifier to send notifications.
- type Notifier struct {
- h *handler
- namespace string
- mu sync.Mutex
- sub *Subscription
- buffer []json.RawMessage
- callReturned bool
- activated bool
- }
- // CreateSubscription returns a new subscription that is coupled to the
- // RPC connection. By default subscriptions are inactive and notifications
- // are dropped until the subscription is marked as active. This is done
- // by the RPC server after the subscription ID is send to the client.
- func (n *Notifier) CreateSubscription() *Subscription {
- n.mu.Lock()
- defer n.mu.Unlock()
- if n.sub != nil {
- panic("can't create multiple subscriptions with Notifier")
- } else if n.callReturned {
- panic("can't create subscription after subscribe call has returned")
- }
- n.sub = &Subscription{ID: n.h.idgen(), namespace: n.namespace, err: make(chan error, 1)}
- return n.sub
- }
- // Notify sends a notification to the client with the given data as payload.
- // If an error occurs the RPC connection is closed and the error is returned.
- func (n *Notifier) Notify(id ID, data interface{}) error {
- enc, err := json.Marshal(data)
- if err != nil {
- return err
- }
- n.mu.Lock()
- defer n.mu.Unlock()
- if n.sub == nil {
- panic("can't Notify before subscription is created")
- } else if n.sub.ID != id {
- panic("Notify with wrong ID")
- }
- if n.activated {
- return n.send(n.sub, enc)
- }
- n.buffer = append(n.buffer, enc)
- return nil
- }
- // Closed returns a channel that is closed when the RPC connection is closed.
- // Deprecated: use subscription error channel
- func (n *Notifier) Closed() <-chan interface{} {
- return n.h.conn.closed()
- }
- // takeSubscription returns the subscription (if one has been created). No subscription can
- // be created after this call.
- func (n *Notifier) takeSubscription() *Subscription {
- n.mu.Lock()
- defer n.mu.Unlock()
- n.callReturned = true
- return n.sub
- }
- // activate is called after the subscription ID was sent to client. Notifications are
- // buffered before activation. This prevents notifications being sent to the client before
- // the subscription ID is sent to the client.
- func (n *Notifier) activate() error {
- n.mu.Lock()
- defer n.mu.Unlock()
- for _, data := range n.buffer {
- if err := n.send(n.sub, data); err != nil {
- return err
- }
- }
- n.activated = true
- return nil
- }
- func (n *Notifier) send(sub *Subscription, data json.RawMessage) error {
- params, _ := json.Marshal(&subscriptionResult{ID: string(sub.ID), Result: data})
- ctx := context.Background()
- return n.h.conn.writeJSON(ctx, &jsonrpcMessage{
- Version: vsn,
- Method: n.namespace + notificationMethodSuffix,
- Params: params,
- })
- }
- // A Subscription is created by a notifier and tied to that notifier. The client can use
- // this subscription to wait for an unsubscribe request for the client, see Err().
- type Subscription struct {
- ID ID
- namespace string
- err chan error // closed on unsubscribe
- }
- // Err returns a channel that is closed when the client send an unsubscribe request.
- func (s *Subscription) Err() <-chan error {
- return s.err
- }
- // MarshalJSON marshals a subscription as its ID.
- func (s *Subscription) MarshalJSON() ([]byte, error) {
- return json.Marshal(s.ID)
- }
- // ClientSubscription is a subscription established through the Client's Subscribe or
- // EthSubscribe methods.
- type ClientSubscription struct {
- client *Client
- etype reflect.Type
- channel reflect.Value
- namespace string
- subid string
- // The in channel receives notification values from client dispatcher.
- in chan json.RawMessage
- // The error channel receives the error from the forwarding loop.
- // It is closed by Unsubscribe.
- err chan error
- errOnce sync.Once
- // Closing of the subscription is requested by sending on 'quit'. This is handled by
- // the forwarding loop, which closes 'forwardDone' when it has stopped sending to
- // sub.channel. Finally, 'unsubDone' is closed after unsubscribing on the server side.
- quit chan error
- forwardDone chan struct{}
- unsubDone chan struct{}
- }
- // This is the sentinel value sent on sub.quit when Unsubscribe is called.
- var errUnsubscribed = errors.New("unsubscribed")
- func newClientSubscription(c *Client, namespace string, channel reflect.Value) *ClientSubscription {
- sub := &ClientSubscription{
- client: c,
- namespace: namespace,
- etype: channel.Type().Elem(),
- channel: channel,
- in: make(chan json.RawMessage),
- quit: make(chan error),
- forwardDone: make(chan struct{}),
- unsubDone: make(chan struct{}),
- err: make(chan error, 1),
- }
- return sub
- }
- // Err returns the subscription error channel. The intended use of Err is to schedule
- // resubscription when the client connection is closed unexpectedly.
- //
- // The error channel receives a value when the subscription has ended due to an error. The
- // received error is nil if Close has been called on the underlying client and no other
- // error has occurred.
- //
- // The error channel is closed when Unsubscribe is called on the subscription.
- func (sub *ClientSubscription) Err() <-chan error {
- return sub.err
- }
- // Unsubscribe unsubscribes the notification and closes the error channel.
- // It can safely be called more than once.
- func (sub *ClientSubscription) Unsubscribe() {
- sub.errOnce.Do(func() {
- select {
- case sub.quit <- errUnsubscribed:
- <-sub.unsubDone
- case <-sub.unsubDone:
- }
- close(sub.err)
- })
- }
- // deliver is called by the client's message dispatcher to send a notification value.
- func (sub *ClientSubscription) deliver(result json.RawMessage) (ok bool) {
- select {
- case sub.in <- result:
- return true
- case <-sub.forwardDone:
- return false
- }
- }
- // close is called by the client's message dispatcher when the connection is closed.
- func (sub *ClientSubscription) close(err error) {
- select {
- case sub.quit <- err:
- case <-sub.forwardDone:
- }
- }
- // run is the forwarding loop of the subscription. It runs in its own goroutine and
- // is launched by the client's handler after the subscription has been created.
- func (sub *ClientSubscription) run() {
- defer close(sub.unsubDone)
- unsubscribe, err := sub.forward()
- // The client's dispatch loop won't be able to execute the unsubscribe call if it is
- // blocked in sub.deliver() or sub.close(). Closing forwardDone unblocks them.
- close(sub.forwardDone)
- // Call the unsubscribe method on the server.
- if unsubscribe {
- sub.requestUnsubscribe()
- }
- // Send the error.
- if err != nil {
- if err == ErrClientQuit {
- // ErrClientQuit gets here when Client.Close is called. This is reported as a
- // nil error because it's not an error, but we can't close sub.err here.
- err = nil
- }
- sub.err <- err
- }
- }
- // forward is the forwarding loop. It takes in RPC notifications and sends them
- // on the subscription channel.
- func (sub *ClientSubscription) forward() (unsubscribeServer bool, err error) {
- cases := []reflect.SelectCase{
- {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)},
- {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.in)},
- {Dir: reflect.SelectSend, Chan: sub.channel},
- }
- buffer := list.New()
- for {
- var chosen int
- var recv reflect.Value
- if buffer.Len() == 0 {
- // Idle, omit send case.
- chosen, recv, _ = reflect.Select(cases[:2])
- } else {
- // Non-empty buffer, send the first queued item.
- cases[2].Send = reflect.ValueOf(buffer.Front().Value)
- chosen, recv, _ = reflect.Select(cases)
- }
- switch chosen {
- case 0: // <-sub.quit
- if !recv.IsNil() {
- err = recv.Interface().(error)
- }
- if err == errUnsubscribed {
- // Exiting because Unsubscribe was called, unsubscribe on server.
- return true, nil
- }
- return false, err
- case 1: // <-sub.in
- val, err := sub.unmarshal(recv.Interface().(json.RawMessage))
- if err != nil {
- return true, err
- }
- if buffer.Len() == maxClientSubscriptionBuffer {
- return true, ErrSubscriptionQueueOverflow
- }
- buffer.PushBack(val)
- case 2: // sub.channel<-
- cases[2].Send = reflect.Value{} // Don't hold onto the value.
- buffer.Remove(buffer.Front())
- }
- }
- }
- func (sub *ClientSubscription) unmarshal(result json.RawMessage) (interface{}, error) {
- val := reflect.New(sub.etype)
- err := json.Unmarshal(result, val.Interface())
- return val.Elem().Interface(), err
- }
- func (sub *ClientSubscription) requestUnsubscribe() error {
- var result interface{}
- return sub.client.Call(&result, sub.namespace+unsubscribeMethodSuffix, sub.subid)
- }
|