progress.go 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. package pipeline
  2. import "io"
  3. // ********** The following is common between the request body AND the response body.
  4. // ProgressReceiver defines the signature of a callback function invoked as progress is reported.
  5. type ProgressReceiver func(bytesTransferred int64)
  6. // ********** The following are specific to the request body (a ReadSeekCloser)
  7. // This struct is used when sending a body to the network
  8. type requestBodyProgress struct {
  9. requestBody io.ReadSeeker // Seeking is required to support retries
  10. pr ProgressReceiver
  11. }
  12. // NewRequestBodyProgress adds progress reporting to an HTTP request's body stream.
  13. func NewRequestBodyProgress(requestBody io.ReadSeeker, pr ProgressReceiver) io.ReadSeeker {
  14. if pr == nil {
  15. panic("pr must not be nil")
  16. }
  17. return &requestBodyProgress{requestBody: requestBody, pr: pr}
  18. }
  19. // Read reads a block of data from an inner stream and reports progress
  20. func (rbp *requestBodyProgress) Read(p []byte) (n int, err error) {
  21. n, err = rbp.requestBody.Read(p)
  22. if err != nil {
  23. return
  24. }
  25. // Invokes the user's callback method to report progress
  26. position, err := rbp.requestBody.Seek(0, io.SeekCurrent)
  27. if err != nil {
  28. panic(err)
  29. }
  30. rbp.pr(position)
  31. return
  32. }
  33. func (rbp *requestBodyProgress) Seek(offset int64, whence int) (offsetFromStart int64, err error) {
  34. return rbp.requestBody.Seek(offset, whence)
  35. }
  36. // requestBodyProgress supports Close but the underlying stream may not; if it does, Close will close it.
  37. func (rbp *requestBodyProgress) Close() error {
  38. if c, ok := rbp.requestBody.(io.Closer); ok {
  39. return c.Close()
  40. }
  41. return nil
  42. }
  43. // ********** The following are specific to the response body (a ReadCloser)
  44. // This struct is used when sending a body to the network
  45. type responseBodyProgress struct {
  46. responseBody io.ReadCloser
  47. pr ProgressReceiver
  48. offset int64
  49. }
  50. // NewResponseBodyProgress adds progress reporting to an HTTP response's body stream.
  51. func NewResponseBodyProgress(responseBody io.ReadCloser, pr ProgressReceiver) io.ReadCloser {
  52. if pr == nil {
  53. panic("pr must not be nil")
  54. }
  55. return &responseBodyProgress{responseBody: responseBody, pr: pr, offset: 0}
  56. }
  57. // Read reads a block of data from an inner stream and reports progress
  58. func (rbp *responseBodyProgress) Read(p []byte) (n int, err error) {
  59. n, err = rbp.responseBody.Read(p)
  60. rbp.offset += int64(n)
  61. // Invokes the user's callback method to report progress
  62. rbp.pr(rbp.offset)
  63. return
  64. }
  65. func (rbp *responseBodyProgress) Close() error {
  66. return rbp.responseBody.Close()
  67. }