1/* 2 * MinIO Go Library for Amazon S3 Compatible Cloud Storage 3 * Copyright 2017-2020 MinIO, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package minio 19 20import ( 21 "bufio" 22 "bytes" 23 "context" 24 "encoding/xml" 25 "net/http" 26 "net/url" 27 "time" 28 29 jsoniter "github.com/json-iterator/go" 30 "github.com/minio/minio-go/v7/pkg/notification" 31 "github.com/minio/minio-go/v7/pkg/s3utils" 32) 33 34// SetBucketNotification saves a new bucket notification with a context to control cancellations and timeouts. 35func (c Client) SetBucketNotification(ctx context.Context, bucketName string, config notification.Configuration) error { 36 // Input validation. 37 if err := s3utils.CheckValidBucketName(bucketName); err != nil { 38 return err 39 } 40 41 // Get resources properly escaped and lined up before 42 // using them in http request. 43 urlValues := make(url.Values) 44 urlValues.Set("notification", "") 45 46 notifBytes, err := xml.Marshal(&config) 47 if err != nil { 48 return err 49 } 50 51 notifBuffer := bytes.NewReader(notifBytes) 52 reqMetadata := requestMetadata{ 53 bucketName: bucketName, 54 queryValues: urlValues, 55 contentBody: notifBuffer, 56 contentLength: int64(len(notifBytes)), 57 contentMD5Base64: sumMD5Base64(notifBytes), 58 contentSHA256Hex: sum256Hex(notifBytes), 59 } 60 61 // Execute PUT to upload a new bucket notification. 62 resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata) 63 defer closeResponse(resp) 64 if err != nil { 65 return err 66 } 67 if resp != nil { 68 if resp.StatusCode != http.StatusOK { 69 return httpRespToErrorResponse(resp, bucketName, "") 70 } 71 } 72 return nil 73} 74 75// RemoveAllBucketNotification - Remove bucket notification clears all previously specified config 76func (c Client) RemoveAllBucketNotification(ctx context.Context, bucketName string) error { 77 return c.SetBucketNotification(ctx, bucketName, notification.Configuration{}) 78} 79 80// GetBucketNotification returns current bucket notification configuration 81func (c Client) GetBucketNotification(ctx context.Context, bucketName string) (bucketNotification notification.Configuration, err error) { 82 // Input validation. 83 if err := s3utils.CheckValidBucketName(bucketName); err != nil { 84 return notification.Configuration{}, err 85 } 86 return c.getBucketNotification(ctx, bucketName) 87} 88 89// Request server for notification rules. 90func (c Client) getBucketNotification(ctx context.Context, bucketName string) (notification.Configuration, error) { 91 urlValues := make(url.Values) 92 urlValues.Set("notification", "") 93 94 // Execute GET on bucket to list objects. 95 resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{ 96 bucketName: bucketName, 97 queryValues: urlValues, 98 contentSHA256Hex: emptySHA256Hex, 99 }) 100 101 defer closeResponse(resp) 102 if err != nil { 103 return notification.Configuration{}, err 104 } 105 return processBucketNotificationResponse(bucketName, resp) 106 107} 108 109// processes the GetNotification http response from the server. 110func processBucketNotificationResponse(bucketName string, resp *http.Response) (notification.Configuration, error) { 111 if resp.StatusCode != http.StatusOK { 112 errResponse := httpRespToErrorResponse(resp, bucketName, "") 113 return notification.Configuration{}, errResponse 114 } 115 var bucketNotification notification.Configuration 116 err := xmlDecoder(resp.Body, &bucketNotification) 117 if err != nil { 118 return notification.Configuration{}, err 119 } 120 return bucketNotification, nil 121} 122 123// ListenNotification listen for all events, this is a MinIO specific API 124func (c Client) ListenNotification(ctx context.Context, prefix, suffix string, events []string) <-chan notification.Info { 125 return c.ListenBucketNotification(ctx, "", prefix, suffix, events) 126} 127 128// ListenBucketNotification listen for bucket events, this is a MinIO specific API 129func (c Client) ListenBucketNotification(ctx context.Context, bucketName, prefix, suffix string, events []string) <-chan notification.Info { 130 notificationInfoCh := make(chan notification.Info, 1) 131 const notificationCapacity = 4 * 1024 * 1024 132 notificationEventBuffer := make([]byte, notificationCapacity) 133 // Only success, start a routine to start reading line by line. 134 go func(notificationInfoCh chan<- notification.Info) { 135 defer close(notificationInfoCh) 136 137 // Validate the bucket name. 138 if bucketName != "" { 139 if err := s3utils.CheckValidBucketName(bucketName); err != nil { 140 select { 141 case notificationInfoCh <- notification.Info{ 142 Err: err, 143 }: 144 case <-ctx.Done(): 145 } 146 return 147 } 148 } 149 150 // Check ARN partition to verify if listening bucket is supported 151 if s3utils.IsAmazonEndpoint(*c.endpointURL) || s3utils.IsGoogleEndpoint(*c.endpointURL) { 152 select { 153 case notificationInfoCh <- notification.Info{ 154 Err: errAPINotSupported("Listening for bucket notification is specific only to `minio` server endpoints"), 155 }: 156 case <-ctx.Done(): 157 } 158 return 159 } 160 161 // Continuously run and listen on bucket notification. 162 // Create a done channel to control 'ListObjects' go routine. 163 retryDoneCh := make(chan struct{}, 1) 164 165 // Indicate to our routine to exit cleanly upon return. 166 defer close(retryDoneCh) 167 168 // Prepare urlValues to pass into the request on every loop 169 urlValues := make(url.Values) 170 urlValues.Set("prefix", prefix) 171 urlValues.Set("suffix", suffix) 172 urlValues["events"] = events 173 174 // Wait on the jitter retry loop. 175 for range c.newRetryTimerContinous(time.Second, time.Second*30, MaxJitter, retryDoneCh) { 176 // Execute GET on bucket to list objects. 177 resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{ 178 bucketName: bucketName, 179 queryValues: urlValues, 180 contentSHA256Hex: emptySHA256Hex, 181 }) 182 if err != nil { 183 select { 184 case notificationInfoCh <- notification.Info{ 185 Err: err, 186 }: 187 case <-ctx.Done(): 188 } 189 return 190 } 191 192 // Validate http response, upon error return quickly. 193 if resp.StatusCode != http.StatusOK { 194 errResponse := httpRespToErrorResponse(resp, bucketName, "") 195 select { 196 case notificationInfoCh <- notification.Info{ 197 Err: errResponse, 198 }: 199 case <-ctx.Done(): 200 } 201 return 202 } 203 204 // Initialize a new bufio scanner, to read line by line. 205 bio := bufio.NewScanner(resp.Body) 206 207 // Use a higher buffer to support unexpected 208 // caching done by proxies 209 bio.Buffer(notificationEventBuffer, notificationCapacity) 210 var json = jsoniter.ConfigCompatibleWithStandardLibrary 211 212 // Unmarshal each line, returns marshaled values. 213 for bio.Scan() { 214 var notificationInfo notification.Info 215 if err = json.Unmarshal(bio.Bytes(), ¬ificationInfo); err != nil { 216 // Unexpected error during json unmarshal, send 217 // the error to caller for actionable as needed. 218 select { 219 case notificationInfoCh <- notification.Info{ 220 Err: err, 221 }: 222 case <-ctx.Done(): 223 return 224 } 225 closeResponse(resp) 226 continue 227 } 228 // Send notificationInfo 229 select { 230 case notificationInfoCh <- notificationInfo: 231 case <-ctx.Done(): 232 closeResponse(resp) 233 return 234 } 235 } 236 237 if err = bio.Err(); err != nil { 238 select { 239 case notificationInfoCh <- notification.Info{ 240 Err: err, 241 }: 242 case <-ctx.Done(): 243 return 244 } 245 } 246 247 // Close current connection before looping further. 248 closeResponse(resp) 249 250 } 251 }(notificationInfoCh) 252 253 // Returns the notification info channel, for caller to start reading from. 254 return notificationInfoCh 255} 256