|
|
@@ -3,9 +3,11 @@ package p2p
|
|
|
import (
|
|
|
"bytes"
|
|
|
"encoding/binary"
|
|
|
+ "errors"
|
|
|
"io"
|
|
|
"io/ioutil"
|
|
|
"math/big"
|
|
|
+ "sync/atomic"
|
|
|
|
|
|
"github.com/ethereum/go-ethereum/ethutil"
|
|
|
"github.com/ethereum/go-ethereum/rlp"
|
|
|
@@ -153,3 +155,78 @@ func (r *postrack) ReadByte() (byte, error) {
|
|
|
}
|
|
|
return b, err
|
|
|
}
|
|
|
+
|
|
|
+// MsgPipe creates a message pipe. Reads on one end are matched
|
|
|
+// with writes on the other. The pipe is full-duplex, both ends
|
|
|
+// implement MsgReadWriter.
|
|
|
+func MsgPipe() (*MsgPipeRW, *MsgPipeRW) {
|
|
|
+ var (
|
|
|
+ c1, c2 = make(chan Msg), make(chan Msg)
|
|
|
+ closing = make(chan struct{})
|
|
|
+ closed = new(int32)
|
|
|
+ rw1 = &MsgPipeRW{c1, c2, closing, closed}
|
|
|
+ rw2 = &MsgPipeRW{c2, c1, closing, closed}
|
|
|
+ )
|
|
|
+ return rw1, rw2
|
|
|
+}
|
|
|
+
|
|
|
+// ErrPipeClosed is returned from pipe operations after the
|
|
|
+// pipe has been closed.
|
|
|
+var ErrPipeClosed = errors.New("p2p: read or write on closed message pipe")
|
|
|
+
|
|
|
+// MsgPipeRW is an endpoint of a MsgReadWriter pipe.
|
|
|
+type MsgPipeRW struct {
|
|
|
+ w chan<- Msg
|
|
|
+ r <-chan Msg
|
|
|
+ closing chan struct{}
|
|
|
+ closed *int32
|
|
|
+}
|
|
|
+
|
|
|
+// WriteMsg sends a messsage on the pipe.
|
|
|
+// It blocks until the receiver has consumed the message payload.
|
|
|
+func (p *MsgPipeRW) WriteMsg(msg Msg) error {
|
|
|
+ if atomic.LoadInt32(p.closed) == 0 {
|
|
|
+ consumed := make(chan struct{}, 1)
|
|
|
+ msg.Payload = &eofSignal{msg.Payload, int64(msg.Size), consumed}
|
|
|
+ select {
|
|
|
+ case p.w <- msg:
|
|
|
+ if msg.Size > 0 {
|
|
|
+ // wait for payload read or discard
|
|
|
+ <-consumed
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+ case <-p.closing:
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return ErrPipeClosed
|
|
|
+}
|
|
|
+
|
|
|
+// EncodeMsg is a convenient shorthand for sending an RLP-encoded message.
|
|
|
+func (p *MsgPipeRW) EncodeMsg(code uint64, data ...interface{}) error {
|
|
|
+ return p.WriteMsg(NewMsg(code, data...))
|
|
|
+}
|
|
|
+
|
|
|
+// ReadMsg returns a message sent on the other end of the pipe.
|
|
|
+func (p *MsgPipeRW) ReadMsg() (Msg, error) {
|
|
|
+ if atomic.LoadInt32(p.closed) == 0 {
|
|
|
+ select {
|
|
|
+ case msg := <-p.r:
|
|
|
+ return msg, nil
|
|
|
+ case <-p.closing:
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return Msg{}, ErrPipeClosed
|
|
|
+}
|
|
|
+
|
|
|
+// Close unblocks any pending ReadMsg and WriteMsg calls on both ends
|
|
|
+// of the pipe. They will return ErrPipeClosed. Note that Close does
|
|
|
+// not interrupt any reads from a message payload.
|
|
|
+func (p *MsgPipeRW) Close() error {
|
|
|
+ if atomic.AddInt32(p.closed, 1) != 1 {
|
|
|
+ // someone else is already closing
|
|
|
+ atomic.StoreInt32(p.closed, 1) // avoid overflow
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ close(p.closing)
|
|
|
+ return nil
|
|
|
+}
|