queue.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. package storage
  2. import (
  3. "encoding/xml"
  4. "fmt"
  5. "net/http"
  6. "net/url"
  7. "strconv"
  8. "strings"
  9. )
  10. const (
  11. // casing is per Golang's http.Header canonicalizing the header names.
  12. approximateMessagesCountHeader = "X-Ms-Approximate-Messages-Count"
  13. userDefinedMetadataHeaderPrefix = "X-Ms-Meta-"
  14. )
  15. func pathForQueue(queue string) string { return fmt.Sprintf("/%s", queue) }
  16. func pathForQueueMessages(queue string) string { return fmt.Sprintf("/%s/messages", queue) }
  17. func pathForMessage(queue, name string) string { return fmt.Sprintf("/%s/messages/%s", queue, name) }
  18. type putMessageRequest struct {
  19. XMLName xml.Name `xml:"QueueMessage"`
  20. MessageText string `xml:"MessageText"`
  21. }
  22. // PutMessageParameters is the set of options can be specified for Put Messsage
  23. // operation. A zero struct does not use any preferences for the request.
  24. type PutMessageParameters struct {
  25. VisibilityTimeout int
  26. MessageTTL int
  27. }
  28. func (p PutMessageParameters) getParameters() url.Values {
  29. out := url.Values{}
  30. if p.VisibilityTimeout != 0 {
  31. out.Set("visibilitytimeout", strconv.Itoa(p.VisibilityTimeout))
  32. }
  33. if p.MessageTTL != 0 {
  34. out.Set("messagettl", strconv.Itoa(p.MessageTTL))
  35. }
  36. return out
  37. }
  38. // GetMessagesParameters is the set of options can be specified for Get
  39. // Messsages operation. A zero struct does not use any preferences for the
  40. // request.
  41. type GetMessagesParameters struct {
  42. NumOfMessages int
  43. VisibilityTimeout int
  44. }
  45. func (p GetMessagesParameters) getParameters() url.Values {
  46. out := url.Values{}
  47. if p.NumOfMessages != 0 {
  48. out.Set("numofmessages", strconv.Itoa(p.NumOfMessages))
  49. }
  50. if p.VisibilityTimeout != 0 {
  51. out.Set("visibilitytimeout", strconv.Itoa(p.VisibilityTimeout))
  52. }
  53. return out
  54. }
  55. // PeekMessagesParameters is the set of options can be specified for Peek
  56. // Messsage operation. A zero struct does not use any preferences for the
  57. // request.
  58. type PeekMessagesParameters struct {
  59. NumOfMessages int
  60. }
  61. func (p PeekMessagesParameters) getParameters() url.Values {
  62. out := url.Values{"peekonly": {"true"}} // Required for peek operation
  63. if p.NumOfMessages != 0 {
  64. out.Set("numofmessages", strconv.Itoa(p.NumOfMessages))
  65. }
  66. return out
  67. }
  68. // UpdateMessageParameters is the set of options can be specified for Update Messsage
  69. // operation. A zero struct does not use any preferences for the request.
  70. type UpdateMessageParameters struct {
  71. PopReceipt string
  72. VisibilityTimeout int
  73. }
  74. func (p UpdateMessageParameters) getParameters() url.Values {
  75. out := url.Values{}
  76. if p.PopReceipt != "" {
  77. out.Set("popreceipt", p.PopReceipt)
  78. }
  79. if p.VisibilityTimeout != 0 {
  80. out.Set("visibilitytimeout", strconv.Itoa(p.VisibilityTimeout))
  81. }
  82. return out
  83. }
  84. // GetMessagesResponse represents a response returned from Get Messages
  85. // operation.
  86. type GetMessagesResponse struct {
  87. XMLName xml.Name `xml:"QueueMessagesList"`
  88. QueueMessagesList []GetMessageResponse `xml:"QueueMessage"`
  89. }
  90. // GetMessageResponse represents a QueueMessage object returned from Get
  91. // Messages operation response.
  92. type GetMessageResponse struct {
  93. MessageID string `xml:"MessageId"`
  94. InsertionTime string `xml:"InsertionTime"`
  95. ExpirationTime string `xml:"ExpirationTime"`
  96. PopReceipt string `xml:"PopReceipt"`
  97. TimeNextVisible string `xml:"TimeNextVisible"`
  98. DequeueCount int `xml:"DequeueCount"`
  99. MessageText string `xml:"MessageText"`
  100. }
  101. // PeekMessagesResponse represents a response returned from Get Messages
  102. // operation.
  103. type PeekMessagesResponse struct {
  104. XMLName xml.Name `xml:"QueueMessagesList"`
  105. QueueMessagesList []PeekMessageResponse `xml:"QueueMessage"`
  106. }
  107. // PeekMessageResponse represents a QueueMessage object returned from Peek
  108. // Messages operation response.
  109. type PeekMessageResponse struct {
  110. MessageID string `xml:"MessageId"`
  111. InsertionTime string `xml:"InsertionTime"`
  112. ExpirationTime string `xml:"ExpirationTime"`
  113. DequeueCount int `xml:"DequeueCount"`
  114. MessageText string `xml:"MessageText"`
  115. }
  116. // QueueMetadataResponse represents user defined metadata and queue
  117. // properties on a specific queue.
  118. //
  119. // See https://msdn.microsoft.com/en-us/library/azure/dd179384.aspx
  120. type QueueMetadataResponse struct {
  121. ApproximateMessageCount int
  122. UserDefinedMetadata map[string]string
  123. }
  124. // SetMetadata operation sets user-defined metadata on the specified queue.
  125. // Metadata is associated with the queue as name-value pairs.
  126. //
  127. // See https://msdn.microsoft.com/en-us/library/azure/dd179348.aspx
  128. func (c QueueServiceClient) SetMetadata(name string, metadata map[string]string) error {
  129. uri := c.client.getEndpoint(queueServiceName, pathForQueue(name), url.Values{"comp": []string{"metadata"}})
  130. metadata = c.client.protectUserAgent(metadata)
  131. headers := c.client.getStandardHeaders()
  132. for k, v := range metadata {
  133. headers[userDefinedMetadataHeaderPrefix+k] = v
  134. }
  135. resp, err := c.client.exec(http.MethodPut, uri, headers, nil, c.auth)
  136. if err != nil {
  137. return err
  138. }
  139. defer readAndCloseBody(resp.body)
  140. return checkRespCode(resp.statusCode, []int{http.StatusNoContent})
  141. }
  142. // GetMetadata operation retrieves user-defined metadata and queue
  143. // properties on the specified queue. Metadata is associated with
  144. // the queue as name-values pairs.
  145. //
  146. // See https://msdn.microsoft.com/en-us/library/azure/dd179384.aspx
  147. //
  148. // Because the way Golang's http client (and http.Header in particular)
  149. // canonicalize header names, the returned metadata names would always
  150. // be all lower case.
  151. func (c QueueServiceClient) GetMetadata(name string) (QueueMetadataResponse, error) {
  152. qm := QueueMetadataResponse{}
  153. qm.UserDefinedMetadata = make(map[string]string)
  154. uri := c.client.getEndpoint(queueServiceName, pathForQueue(name), url.Values{"comp": []string{"metadata"}})
  155. headers := c.client.getStandardHeaders()
  156. resp, err := c.client.exec(http.MethodGet, uri, headers, nil, c.auth)
  157. if err != nil {
  158. return qm, err
  159. }
  160. defer readAndCloseBody(resp.body)
  161. for k, v := range resp.headers {
  162. if len(v) != 1 {
  163. return qm, fmt.Errorf("Unexpected number of values (%d) in response header '%s'", len(v), k)
  164. }
  165. value := v[0]
  166. if k == approximateMessagesCountHeader {
  167. qm.ApproximateMessageCount, err = strconv.Atoi(value)
  168. if err != nil {
  169. return qm, fmt.Errorf("Unexpected value in response header '%s': '%s' ", k, value)
  170. }
  171. } else if strings.HasPrefix(k, userDefinedMetadataHeaderPrefix) {
  172. name := strings.TrimPrefix(k, userDefinedMetadataHeaderPrefix)
  173. qm.UserDefinedMetadata[strings.ToLower(name)] = value
  174. }
  175. }
  176. return qm, checkRespCode(resp.statusCode, []int{http.StatusOK})
  177. }
  178. // CreateQueue operation creates a queue under the given account.
  179. //
  180. // See https://msdn.microsoft.com/en-us/library/azure/dd179342.aspx
  181. func (c QueueServiceClient) CreateQueue(name string) error {
  182. uri := c.client.getEndpoint(queueServiceName, pathForQueue(name), url.Values{})
  183. headers := c.client.getStandardHeaders()
  184. resp, err := c.client.exec(http.MethodPut, uri, headers, nil, c.auth)
  185. if err != nil {
  186. return err
  187. }
  188. defer readAndCloseBody(resp.body)
  189. return checkRespCode(resp.statusCode, []int{http.StatusCreated})
  190. }
  191. // DeleteQueue operation permanently deletes the specified queue.
  192. //
  193. // See https://msdn.microsoft.com/en-us/library/azure/dd179436.aspx
  194. func (c QueueServiceClient) DeleteQueue(name string) error {
  195. uri := c.client.getEndpoint(queueServiceName, pathForQueue(name), url.Values{})
  196. resp, err := c.client.exec(http.MethodDelete, uri, c.client.getStandardHeaders(), nil, c.auth)
  197. if err != nil {
  198. return err
  199. }
  200. defer readAndCloseBody(resp.body)
  201. return checkRespCode(resp.statusCode, []int{http.StatusNoContent})
  202. }
  203. // QueueExists returns true if a queue with given name exists.
  204. func (c QueueServiceClient) QueueExists(name string) (bool, error) {
  205. uri := c.client.getEndpoint(queueServiceName, pathForQueue(name), url.Values{"comp": {"metadata"}})
  206. resp, err := c.client.exec(http.MethodGet, uri, c.client.getStandardHeaders(), nil, c.auth)
  207. if resp != nil && (resp.statusCode == http.StatusOK || resp.statusCode == http.StatusNotFound) {
  208. return resp.statusCode == http.StatusOK, nil
  209. }
  210. return false, err
  211. }
  212. // PutMessage operation adds a new message to the back of the message queue.
  213. //
  214. // See https://msdn.microsoft.com/en-us/library/azure/dd179346.aspx
  215. func (c QueueServiceClient) PutMessage(queue string, message string, params PutMessageParameters) error {
  216. uri := c.client.getEndpoint(queueServiceName, pathForQueueMessages(queue), params.getParameters())
  217. req := putMessageRequest{MessageText: message}
  218. body, nn, err := xmlMarshal(req)
  219. if err != nil {
  220. return err
  221. }
  222. headers := c.client.getStandardHeaders()
  223. headers["Content-Length"] = strconv.Itoa(nn)
  224. resp, err := c.client.exec(http.MethodPost, uri, headers, body, c.auth)
  225. if err != nil {
  226. return err
  227. }
  228. defer readAndCloseBody(resp.body)
  229. return checkRespCode(resp.statusCode, []int{http.StatusCreated})
  230. }
  231. // ClearMessages operation deletes all messages from the specified queue.
  232. //
  233. // See https://msdn.microsoft.com/en-us/library/azure/dd179454.aspx
  234. func (c QueueServiceClient) ClearMessages(queue string) error {
  235. uri := c.client.getEndpoint(queueServiceName, pathForQueueMessages(queue), url.Values{})
  236. resp, err := c.client.exec(http.MethodDelete, uri, c.client.getStandardHeaders(), nil, c.auth)
  237. if err != nil {
  238. return err
  239. }
  240. defer readAndCloseBody(resp.body)
  241. return checkRespCode(resp.statusCode, []int{http.StatusNoContent})
  242. }
  243. // GetMessages operation retrieves one or more messages from the front of the
  244. // queue.
  245. //
  246. // See https://msdn.microsoft.com/en-us/library/azure/dd179474.aspx
  247. func (c QueueServiceClient) GetMessages(queue string, params GetMessagesParameters) (GetMessagesResponse, error) {
  248. var r GetMessagesResponse
  249. uri := c.client.getEndpoint(queueServiceName, pathForQueueMessages(queue), params.getParameters())
  250. resp, err := c.client.exec(http.MethodGet, uri, c.client.getStandardHeaders(), nil, c.auth)
  251. if err != nil {
  252. return r, err
  253. }
  254. defer resp.body.Close()
  255. err = xmlUnmarshal(resp.body, &r)
  256. return r, err
  257. }
  258. // PeekMessages retrieves one or more messages from the front of the queue, but
  259. // does not alter the visibility of the message.
  260. //
  261. // See https://msdn.microsoft.com/en-us/library/azure/dd179472.aspx
  262. func (c QueueServiceClient) PeekMessages(queue string, params PeekMessagesParameters) (PeekMessagesResponse, error) {
  263. var r PeekMessagesResponse
  264. uri := c.client.getEndpoint(queueServiceName, pathForQueueMessages(queue), params.getParameters())
  265. resp, err := c.client.exec(http.MethodGet, uri, c.client.getStandardHeaders(), nil, c.auth)
  266. if err != nil {
  267. return r, err
  268. }
  269. defer resp.body.Close()
  270. err = xmlUnmarshal(resp.body, &r)
  271. return r, err
  272. }
  273. // DeleteMessage operation deletes the specified message.
  274. //
  275. // See https://msdn.microsoft.com/en-us/library/azure/dd179347.aspx
  276. func (c QueueServiceClient) DeleteMessage(queue, messageID, popReceipt string) error {
  277. uri := c.client.getEndpoint(queueServiceName, pathForMessage(queue, messageID), url.Values{
  278. "popreceipt": {popReceipt}})
  279. resp, err := c.client.exec(http.MethodDelete, uri, c.client.getStandardHeaders(), nil, c.auth)
  280. if err != nil {
  281. return err
  282. }
  283. defer readAndCloseBody(resp.body)
  284. return checkRespCode(resp.statusCode, []int{http.StatusNoContent})
  285. }
  286. // UpdateMessage operation deletes the specified message.
  287. //
  288. // See https://msdn.microsoft.com/en-us/library/azure/hh452234.aspx
  289. func (c QueueServiceClient) UpdateMessage(queue string, messageID string, message string, params UpdateMessageParameters) error {
  290. uri := c.client.getEndpoint(queueServiceName, pathForMessage(queue, messageID), params.getParameters())
  291. req := putMessageRequest{MessageText: message}
  292. body, nn, err := xmlMarshal(req)
  293. if err != nil {
  294. return err
  295. }
  296. headers := c.client.getStandardHeaders()
  297. headers["Content-Length"] = fmt.Sprintf("%d", nn)
  298. resp, err := c.client.exec(http.MethodPut, uri, headers, body, c.auth)
  299. if err != nil {
  300. return err
  301. }
  302. defer readAndCloseBody(resp.body)
  303. return checkRespCode(resp.statusCode, []int{http.StatusNoContent})
  304. }