1package storage 2 3// Copyright 2017 Microsoft Corporation 4// 5// Licensed under the Apache License, Version 2.0 (the "License"); 6// you may not use this file except in compliance with the License. 7// You may obtain a copy of the License at 8// 9// http://www.apache.org/licenses/LICENSE-2.0 10// 11// Unless required by applicable law or agreed to in writing, software 12// distributed under the License is distributed on an "AS IS" BASIS, 13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14// See the License for the specific language governing permissions and 15// limitations under the License. 16 17import ( 18 "encoding/xml" 19 "fmt" 20 "net/http" 21 "net/url" 22 "strconv" 23 "time" 24) 25 26// Message represents an Azure message. 27type Message struct { 28 Queue *Queue 29 Text string `xml:"MessageText"` 30 ID string `xml:"MessageId"` 31 Insertion TimeRFC1123 `xml:"InsertionTime"` 32 Expiration TimeRFC1123 `xml:"ExpirationTime"` 33 PopReceipt string `xml:"PopReceipt"` 34 NextVisible TimeRFC1123 `xml:"TimeNextVisible"` 35 DequeueCount int `xml:"DequeueCount"` 36} 37 38func (m *Message) buildPath() string { 39 return fmt.Sprintf("%s/%s", m.Queue.buildPathMessages(), m.ID) 40} 41 42// PutMessageOptions is the set of options can be specified for Put Messsage 43// operation. A zero struct does not use any preferences for the request. 44type PutMessageOptions struct { 45 Timeout uint 46 VisibilityTimeout int 47 MessageTTL int 48 RequestID string `header:"x-ms-client-request-id"` 49} 50 51// Put operation adds a new message to the back of the message queue. 52// 53// See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Put-Message 54func (m *Message) Put(options *PutMessageOptions) error { 55 query := url.Values{} 56 headers := m.Queue.qsc.client.getStandardHeaders() 57 58 req := putMessageRequest{MessageText: m.Text} 59 body, nn, err := xmlMarshal(req) 60 if err != nil { 61 return err 62 } 63 headers["Content-Length"] = strconv.Itoa(nn) 64 65 if options != nil { 66 if options.VisibilityTimeout != 0 { 67 query.Set("visibilitytimeout", strconv.Itoa(options.VisibilityTimeout)) 68 } 69 if options.MessageTTL != 0 { 70 query.Set("messagettl", strconv.Itoa(options.MessageTTL)) 71 } 72 query = addTimeout(query, options.Timeout) 73 headers = mergeHeaders(headers, headersFromStruct(*options)) 74 } 75 76 uri := m.Queue.qsc.client.getEndpoint(queueServiceName, m.Queue.buildPathMessages(), query) 77 resp, err := m.Queue.qsc.client.exec(http.MethodPost, uri, headers, body, m.Queue.qsc.auth) 78 if err != nil { 79 return err 80 } 81 defer drainRespBody(resp) 82 err = checkRespCode(resp, []int{http.StatusCreated}) 83 if err != nil { 84 return err 85 } 86 err = xmlUnmarshal(resp.Body, m) 87 if err != nil { 88 return err 89 } 90 return nil 91} 92 93// UpdateMessageOptions is the set of options can be specified for Update Messsage 94// operation. A zero struct does not use any preferences for the request. 95type UpdateMessageOptions struct { 96 Timeout uint 97 VisibilityTimeout int 98 RequestID string `header:"x-ms-client-request-id"` 99} 100 101// Update operation updates the specified message. 102// 103// See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Update-Message 104func (m *Message) Update(options *UpdateMessageOptions) error { 105 query := url.Values{} 106 if m.PopReceipt != "" { 107 query.Set("popreceipt", m.PopReceipt) 108 } 109 110 headers := m.Queue.qsc.client.getStandardHeaders() 111 req := putMessageRequest{MessageText: m.Text} 112 body, nn, err := xmlMarshal(req) 113 if err != nil { 114 return err 115 } 116 headers["Content-Length"] = strconv.Itoa(nn) 117 118 if options != nil { 119 if options.VisibilityTimeout != 0 { 120 query.Set("visibilitytimeout", strconv.Itoa(options.VisibilityTimeout)) 121 } 122 query = addTimeout(query, options.Timeout) 123 headers = mergeHeaders(headers, headersFromStruct(*options)) 124 } 125 uri := m.Queue.qsc.client.getEndpoint(queueServiceName, m.buildPath(), query) 126 127 resp, err := m.Queue.qsc.client.exec(http.MethodPut, uri, headers, body, m.Queue.qsc.auth) 128 if err != nil { 129 return err 130 } 131 defer drainRespBody(resp) 132 133 m.PopReceipt = resp.Header.Get("x-ms-popreceipt") 134 nextTimeStr := resp.Header.Get("x-ms-time-next-visible") 135 if nextTimeStr != "" { 136 nextTime, err := time.Parse(time.RFC1123, nextTimeStr) 137 if err != nil { 138 return err 139 } 140 m.NextVisible = TimeRFC1123(nextTime) 141 } 142 143 return checkRespCode(resp, []int{http.StatusNoContent}) 144} 145 146// Delete operation deletes the specified message. 147// 148// See https://msdn.microsoft.com/en-us/library/azure/dd179347.aspx 149func (m *Message) Delete(options *QueueServiceOptions) error { 150 params := url.Values{"popreceipt": {m.PopReceipt}} 151 headers := m.Queue.qsc.client.getStandardHeaders() 152 153 if options != nil { 154 params = addTimeout(params, options.Timeout) 155 headers = mergeHeaders(headers, headersFromStruct(*options)) 156 } 157 uri := m.Queue.qsc.client.getEndpoint(queueServiceName, m.buildPath(), params) 158 159 resp, err := m.Queue.qsc.client.exec(http.MethodDelete, uri, headers, nil, m.Queue.qsc.auth) 160 if err != nil { 161 return err 162 } 163 defer drainRespBody(resp) 164 return checkRespCode(resp, []int{http.StatusNoContent}) 165} 166 167type putMessageRequest struct { 168 XMLName xml.Name `xml:"QueueMessage"` 169 MessageText string `xml:"MessageText"` 170} 171