| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339 |
- package storage
- import (
- "encoding/xml"
- "fmt"
- "net/http"
- "net/url"
- "strconv"
- "strings"
- )
- const (
- // casing is per Golang's http.Header canonicalizing the header names.
- approximateMessagesCountHeader = "X-Ms-Approximate-Messages-Count"
- userDefinedMetadataHeaderPrefix = "X-Ms-Meta-"
- )
- func pathForQueue(queue string) string { return fmt.Sprintf("/%s", queue) }
- func pathForQueueMessages(queue string) string { return fmt.Sprintf("/%s/messages", queue) }
- func pathForMessage(queue, name string) string { return fmt.Sprintf("/%s/messages/%s", queue, name) }
- type putMessageRequest struct {
- XMLName xml.Name `xml:"QueueMessage"`
- MessageText string `xml:"MessageText"`
- }
- // PutMessageParameters is the set of options can be specified for Put Messsage
- // operation. A zero struct does not use any preferences for the request.
- type PutMessageParameters struct {
- VisibilityTimeout int
- MessageTTL int
- }
- func (p PutMessageParameters) getParameters() url.Values {
- out := url.Values{}
- if p.VisibilityTimeout != 0 {
- out.Set("visibilitytimeout", strconv.Itoa(p.VisibilityTimeout))
- }
- if p.MessageTTL != 0 {
- out.Set("messagettl", strconv.Itoa(p.MessageTTL))
- }
- return out
- }
- // GetMessagesParameters is the set of options can be specified for Get
- // Messsages operation. A zero struct does not use any preferences for the
- // request.
- type GetMessagesParameters struct {
- NumOfMessages int
- VisibilityTimeout int
- }
- func (p GetMessagesParameters) getParameters() url.Values {
- out := url.Values{}
- if p.NumOfMessages != 0 {
- out.Set("numofmessages", strconv.Itoa(p.NumOfMessages))
- }
- if p.VisibilityTimeout != 0 {
- out.Set("visibilitytimeout", strconv.Itoa(p.VisibilityTimeout))
- }
- return out
- }
- // PeekMessagesParameters is the set of options can be specified for Peek
- // Messsage operation. A zero struct does not use any preferences for the
- // request.
- type PeekMessagesParameters struct {
- NumOfMessages int
- }
- func (p PeekMessagesParameters) getParameters() url.Values {
- out := url.Values{"peekonly": {"true"}} // Required for peek operation
- if p.NumOfMessages != 0 {
- out.Set("numofmessages", strconv.Itoa(p.NumOfMessages))
- }
- return out
- }
- // UpdateMessageParameters is the set of options can be specified for Update Messsage
- // operation. A zero struct does not use any preferences for the request.
- type UpdateMessageParameters struct {
- PopReceipt string
- VisibilityTimeout int
- }
- func (p UpdateMessageParameters) getParameters() url.Values {
- out := url.Values{}
- if p.PopReceipt != "" {
- out.Set("popreceipt", p.PopReceipt)
- }
- if p.VisibilityTimeout != 0 {
- out.Set("visibilitytimeout", strconv.Itoa(p.VisibilityTimeout))
- }
- return out
- }
- // GetMessagesResponse represents a response returned from Get Messages
- // operation.
- type GetMessagesResponse struct {
- XMLName xml.Name `xml:"QueueMessagesList"`
- QueueMessagesList []GetMessageResponse `xml:"QueueMessage"`
- }
- // GetMessageResponse represents a QueueMessage object returned from Get
- // Messages operation response.
- type GetMessageResponse struct {
- MessageID string `xml:"MessageId"`
- InsertionTime string `xml:"InsertionTime"`
- ExpirationTime string `xml:"ExpirationTime"`
- PopReceipt string `xml:"PopReceipt"`
- TimeNextVisible string `xml:"TimeNextVisible"`
- DequeueCount int `xml:"DequeueCount"`
- MessageText string `xml:"MessageText"`
- }
- // PeekMessagesResponse represents a response returned from Get Messages
- // operation.
- type PeekMessagesResponse struct {
- XMLName xml.Name `xml:"QueueMessagesList"`
- QueueMessagesList []PeekMessageResponse `xml:"QueueMessage"`
- }
- // PeekMessageResponse represents a QueueMessage object returned from Peek
- // Messages operation response.
- type PeekMessageResponse struct {
- MessageID string `xml:"MessageId"`
- InsertionTime string `xml:"InsertionTime"`
- ExpirationTime string `xml:"ExpirationTime"`
- DequeueCount int `xml:"DequeueCount"`
- MessageText string `xml:"MessageText"`
- }
- // QueueMetadataResponse represents user defined metadata and queue
- // properties on a specific queue.
- //
- // See https://msdn.microsoft.com/en-us/library/azure/dd179384.aspx
- type QueueMetadataResponse struct {
- ApproximateMessageCount int
- UserDefinedMetadata map[string]string
- }
- // SetMetadata operation sets user-defined metadata on the specified queue.
- // Metadata is associated with the queue as name-value pairs.
- //
- // See https://msdn.microsoft.com/en-us/library/azure/dd179348.aspx
- func (c QueueServiceClient) SetMetadata(name string, metadata map[string]string) error {
- uri := c.client.getEndpoint(queueServiceName, pathForQueue(name), url.Values{"comp": []string{"metadata"}})
- metadata = c.client.protectUserAgent(metadata)
- headers := c.client.getStandardHeaders()
- for k, v := range metadata {
- headers[userDefinedMetadataHeaderPrefix+k] = v
- }
- resp, err := c.client.exec(http.MethodPut, uri, headers, nil, c.auth)
- if err != nil {
- return err
- }
- defer readAndCloseBody(resp.body)
- return checkRespCode(resp.statusCode, []int{http.StatusNoContent})
- }
- // GetMetadata operation retrieves user-defined metadata and queue
- // properties on the specified queue. Metadata is associated with
- // the queue as name-values pairs.
- //
- // See https://msdn.microsoft.com/en-us/library/azure/dd179384.aspx
- //
- // Because the way Golang's http client (and http.Header in particular)
- // canonicalize header names, the returned metadata names would always
- // be all lower case.
- func (c QueueServiceClient) GetMetadata(name string) (QueueMetadataResponse, error) {
- qm := QueueMetadataResponse{}
- qm.UserDefinedMetadata = make(map[string]string)
- uri := c.client.getEndpoint(queueServiceName, pathForQueue(name), url.Values{"comp": []string{"metadata"}})
- headers := c.client.getStandardHeaders()
- resp, err := c.client.exec(http.MethodGet, uri, headers, nil, c.auth)
- if err != nil {
- return qm, err
- }
- defer readAndCloseBody(resp.body)
- for k, v := range resp.headers {
- if len(v) != 1 {
- return qm, fmt.Errorf("Unexpected number of values (%d) in response header '%s'", len(v), k)
- }
- value := v[0]
- if k == approximateMessagesCountHeader {
- qm.ApproximateMessageCount, err = strconv.Atoi(value)
- if err != nil {
- return qm, fmt.Errorf("Unexpected value in response header '%s': '%s' ", k, value)
- }
- } else if strings.HasPrefix(k, userDefinedMetadataHeaderPrefix) {
- name := strings.TrimPrefix(k, userDefinedMetadataHeaderPrefix)
- qm.UserDefinedMetadata[strings.ToLower(name)] = value
- }
- }
- return qm, checkRespCode(resp.statusCode, []int{http.StatusOK})
- }
- // CreateQueue operation creates a queue under the given account.
- //
- // See https://msdn.microsoft.com/en-us/library/azure/dd179342.aspx
- func (c QueueServiceClient) CreateQueue(name string) error {
- uri := c.client.getEndpoint(queueServiceName, pathForQueue(name), url.Values{})
- headers := c.client.getStandardHeaders()
- resp, err := c.client.exec(http.MethodPut, uri, headers, nil, c.auth)
- if err != nil {
- return err
- }
- defer readAndCloseBody(resp.body)
- return checkRespCode(resp.statusCode, []int{http.StatusCreated})
- }
- // DeleteQueue operation permanently deletes the specified queue.
- //
- // See https://msdn.microsoft.com/en-us/library/azure/dd179436.aspx
- func (c QueueServiceClient) DeleteQueue(name string) error {
- uri := c.client.getEndpoint(queueServiceName, pathForQueue(name), url.Values{})
- resp, err := c.client.exec(http.MethodDelete, uri, c.client.getStandardHeaders(), nil, c.auth)
- if err != nil {
- return err
- }
- defer readAndCloseBody(resp.body)
- return checkRespCode(resp.statusCode, []int{http.StatusNoContent})
- }
- // QueueExists returns true if a queue with given name exists.
- func (c QueueServiceClient) QueueExists(name string) (bool, error) {
- uri := c.client.getEndpoint(queueServiceName, pathForQueue(name), url.Values{"comp": {"metadata"}})
- resp, err := c.client.exec(http.MethodGet, uri, c.client.getStandardHeaders(), nil, c.auth)
- if resp != nil && (resp.statusCode == http.StatusOK || resp.statusCode == http.StatusNotFound) {
- return resp.statusCode == http.StatusOK, nil
- }
- return false, err
- }
- // PutMessage operation adds a new message to the back of the message queue.
- //
- // See https://msdn.microsoft.com/en-us/library/azure/dd179346.aspx
- func (c QueueServiceClient) PutMessage(queue string, message string, params PutMessageParameters) error {
- uri := c.client.getEndpoint(queueServiceName, pathForQueueMessages(queue), params.getParameters())
- req := putMessageRequest{MessageText: message}
- body, nn, err := xmlMarshal(req)
- if err != nil {
- return err
- }
- headers := c.client.getStandardHeaders()
- headers["Content-Length"] = strconv.Itoa(nn)
- resp, err := c.client.exec(http.MethodPost, uri, headers, body, c.auth)
- if err != nil {
- return err
- }
- defer readAndCloseBody(resp.body)
- return checkRespCode(resp.statusCode, []int{http.StatusCreated})
- }
- // ClearMessages operation deletes all messages from the specified queue.
- //
- // See https://msdn.microsoft.com/en-us/library/azure/dd179454.aspx
- func (c QueueServiceClient) ClearMessages(queue string) error {
- uri := c.client.getEndpoint(queueServiceName, pathForQueueMessages(queue), url.Values{})
- resp, err := c.client.exec(http.MethodDelete, uri, c.client.getStandardHeaders(), nil, c.auth)
- if err != nil {
- return err
- }
- defer readAndCloseBody(resp.body)
- return checkRespCode(resp.statusCode, []int{http.StatusNoContent})
- }
- // GetMessages operation retrieves one or more messages from the front of the
- // queue.
- //
- // See https://msdn.microsoft.com/en-us/library/azure/dd179474.aspx
- func (c QueueServiceClient) GetMessages(queue string, params GetMessagesParameters) (GetMessagesResponse, error) {
- var r GetMessagesResponse
- uri := c.client.getEndpoint(queueServiceName, pathForQueueMessages(queue), params.getParameters())
- resp, err := c.client.exec(http.MethodGet, uri, c.client.getStandardHeaders(), nil, c.auth)
- if err != nil {
- return r, err
- }
- defer resp.body.Close()
- err = xmlUnmarshal(resp.body, &r)
- return r, err
- }
- // PeekMessages retrieves one or more messages from the front of the queue, but
- // does not alter the visibility of the message.
- //
- // See https://msdn.microsoft.com/en-us/library/azure/dd179472.aspx
- func (c QueueServiceClient) PeekMessages(queue string, params PeekMessagesParameters) (PeekMessagesResponse, error) {
- var r PeekMessagesResponse
- uri := c.client.getEndpoint(queueServiceName, pathForQueueMessages(queue), params.getParameters())
- resp, err := c.client.exec(http.MethodGet, uri, c.client.getStandardHeaders(), nil, c.auth)
- if err != nil {
- return r, err
- }
- defer resp.body.Close()
- err = xmlUnmarshal(resp.body, &r)
- return r, err
- }
- // DeleteMessage operation deletes the specified message.
- //
- // See https://msdn.microsoft.com/en-us/library/azure/dd179347.aspx
- func (c QueueServiceClient) DeleteMessage(queue, messageID, popReceipt string) error {
- uri := c.client.getEndpoint(queueServiceName, pathForMessage(queue, messageID), url.Values{
- "popreceipt": {popReceipt}})
- resp, err := c.client.exec(http.MethodDelete, uri, c.client.getStandardHeaders(), nil, c.auth)
- if err != nil {
- return err
- }
- defer readAndCloseBody(resp.body)
- return checkRespCode(resp.statusCode, []int{http.StatusNoContent})
- }
- // UpdateMessage operation deletes the specified message.
- //
- // See https://msdn.microsoft.com/en-us/library/azure/hh452234.aspx
- func (c QueueServiceClient) UpdateMessage(queue string, messageID string, message string, params UpdateMessageParameters) error {
- uri := c.client.getEndpoint(queueServiceName, pathForMessage(queue, messageID), params.getParameters())
- req := putMessageRequest{MessageText: message}
- body, nn, err := xmlMarshal(req)
- if err != nil {
- return err
- }
- headers := c.client.getStandardHeaders()
- headers["Content-Length"] = fmt.Sprintf("%d", nn)
- resp, err := c.client.exec(http.MethodPut, uri, headers, body, c.auth)
- if err != nil {
- return err
- }
- defer readAndCloseBody(resp.body)
- return checkRespCode(resp.statusCode, []int{http.StatusNoContent})
- }
|