1/*
2 * MinIO Go Library for Amazon S3 Compatible Cloud Storage
3 * Copyright 2017-2020 MinIO, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package minio
19
20import (
21	"bufio"
22	"bytes"
23	"context"
24	"encoding/xml"
25	"net/http"
26	"net/url"
27	"time"
28
29	jsoniter "github.com/json-iterator/go"
30	"github.com/minio/minio-go/v7/pkg/notification"
31	"github.com/minio/minio-go/v7/pkg/s3utils"
32)
33
34// SetBucketNotification saves a new bucket notification with a context to control cancellations and timeouts.
35func (c Client) SetBucketNotification(ctx context.Context, bucketName string, config notification.Configuration) error {
36	// Input validation.
37	if err := s3utils.CheckValidBucketName(bucketName); err != nil {
38		return err
39	}
40
41	// Get resources properly escaped and lined up before
42	// using them in http request.
43	urlValues := make(url.Values)
44	urlValues.Set("notification", "")
45
46	notifBytes, err := xml.Marshal(&config)
47	if err != nil {
48		return err
49	}
50
51	notifBuffer := bytes.NewReader(notifBytes)
52	reqMetadata := requestMetadata{
53		bucketName:       bucketName,
54		queryValues:      urlValues,
55		contentBody:      notifBuffer,
56		contentLength:    int64(len(notifBytes)),
57		contentMD5Base64: sumMD5Base64(notifBytes),
58		contentSHA256Hex: sum256Hex(notifBytes),
59	}
60
61	// Execute PUT to upload a new bucket notification.
62	resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)
63	defer closeResponse(resp)
64	if err != nil {
65		return err
66	}
67	if resp != nil {
68		if resp.StatusCode != http.StatusOK {
69			return httpRespToErrorResponse(resp, bucketName, "")
70		}
71	}
72	return nil
73}
74
75// RemoveAllBucketNotification - Remove bucket notification clears all previously specified config
76func (c Client) RemoveAllBucketNotification(ctx context.Context, bucketName string) error {
77	return c.SetBucketNotification(ctx, bucketName, notification.Configuration{})
78}
79
80// GetBucketNotification returns current bucket notification configuration
81func (c Client) GetBucketNotification(ctx context.Context, bucketName string) (bucketNotification notification.Configuration, err error) {
82	// Input validation.
83	if err := s3utils.CheckValidBucketName(bucketName); err != nil {
84		return notification.Configuration{}, err
85	}
86	return c.getBucketNotification(ctx, bucketName)
87}
88
89// Request server for notification rules.
90func (c Client) getBucketNotification(ctx context.Context, bucketName string) (notification.Configuration, error) {
91	urlValues := make(url.Values)
92	urlValues.Set("notification", "")
93
94	// Execute GET on bucket to list objects.
95	resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{
96		bucketName:       bucketName,
97		queryValues:      urlValues,
98		contentSHA256Hex: emptySHA256Hex,
99	})
100
101	defer closeResponse(resp)
102	if err != nil {
103		return notification.Configuration{}, err
104	}
105	return processBucketNotificationResponse(bucketName, resp)
106
107}
108
109// processes the GetNotification http response from the server.
110func processBucketNotificationResponse(bucketName string, resp *http.Response) (notification.Configuration, error) {
111	if resp.StatusCode != http.StatusOK {
112		errResponse := httpRespToErrorResponse(resp, bucketName, "")
113		return notification.Configuration{}, errResponse
114	}
115	var bucketNotification notification.Configuration
116	err := xmlDecoder(resp.Body, &bucketNotification)
117	if err != nil {
118		return notification.Configuration{}, err
119	}
120	return bucketNotification, nil
121}
122
123// ListenNotification listen for all events, this is a MinIO specific API
124func (c Client) ListenNotification(ctx context.Context, prefix, suffix string, events []string) <-chan notification.Info {
125	return c.ListenBucketNotification(ctx, "", prefix, suffix, events)
126}
127
128// ListenBucketNotification listen for bucket events, this is a MinIO specific API
129func (c Client) ListenBucketNotification(ctx context.Context, bucketName, prefix, suffix string, events []string) <-chan notification.Info {
130	notificationInfoCh := make(chan notification.Info, 1)
131	const notificationCapacity = 4 * 1024 * 1024
132	notificationEventBuffer := make([]byte, notificationCapacity)
133	// Only success, start a routine to start reading line by line.
134	go func(notificationInfoCh chan<- notification.Info) {
135		defer close(notificationInfoCh)
136
137		// Validate the bucket name.
138		if bucketName != "" {
139			if err := s3utils.CheckValidBucketName(bucketName); err != nil {
140				select {
141				case notificationInfoCh <- notification.Info{
142					Err: err,
143				}:
144				case <-ctx.Done():
145				}
146				return
147			}
148		}
149
150		// Check ARN partition to verify if listening bucket is supported
151		if s3utils.IsAmazonEndpoint(*c.endpointURL) || s3utils.IsGoogleEndpoint(*c.endpointURL) {
152			select {
153			case notificationInfoCh <- notification.Info{
154				Err: errAPINotSupported("Listening for bucket notification is specific only to `minio` server endpoints"),
155			}:
156			case <-ctx.Done():
157			}
158			return
159		}
160
161		// Continuously run and listen on bucket notification.
162		// Create a done channel to control 'ListObjects' go routine.
163		retryDoneCh := make(chan struct{}, 1)
164
165		// Indicate to our routine to exit cleanly upon return.
166		defer close(retryDoneCh)
167
168		// Prepare urlValues to pass into the request on every loop
169		urlValues := make(url.Values)
170		urlValues.Set("prefix", prefix)
171		urlValues.Set("suffix", suffix)
172		urlValues["events"] = events
173
174		// Wait on the jitter retry loop.
175		for range c.newRetryTimerContinous(time.Second, time.Second*30, MaxJitter, retryDoneCh) {
176			// Execute GET on bucket to list objects.
177			resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{
178				bucketName:       bucketName,
179				queryValues:      urlValues,
180				contentSHA256Hex: emptySHA256Hex,
181			})
182			if err != nil {
183				select {
184				case notificationInfoCh <- notification.Info{
185					Err: err,
186				}:
187				case <-ctx.Done():
188				}
189				return
190			}
191
192			// Validate http response, upon error return quickly.
193			if resp.StatusCode != http.StatusOK {
194				errResponse := httpRespToErrorResponse(resp, bucketName, "")
195				select {
196				case notificationInfoCh <- notification.Info{
197					Err: errResponse,
198				}:
199				case <-ctx.Done():
200				}
201				return
202			}
203
204			// Initialize a new bufio scanner, to read line by line.
205			bio := bufio.NewScanner(resp.Body)
206
207			// Use a higher buffer to support unexpected
208			// caching done by proxies
209			bio.Buffer(notificationEventBuffer, notificationCapacity)
210			var json = jsoniter.ConfigCompatibleWithStandardLibrary
211
212			// Unmarshal each line, returns marshaled values.
213			for bio.Scan() {
214				var notificationInfo notification.Info
215				if err = json.Unmarshal(bio.Bytes(), &notificationInfo); err != nil {
216					// Unexpected error during json unmarshal, send
217					// the error to caller for actionable as needed.
218					select {
219					case notificationInfoCh <- notification.Info{
220						Err: err,
221					}:
222					case <-ctx.Done():
223						return
224					}
225					closeResponse(resp)
226					continue
227				}
228				// Send notificationInfo
229				select {
230				case notificationInfoCh <- notificationInfo:
231				case <-ctx.Done():
232					closeResponse(resp)
233					return
234				}
235			}
236
237			if err = bio.Err(); err != nil {
238				select {
239				case notificationInfoCh <- notification.Info{
240					Err: err,
241				}:
242				case <-ctx.Done():
243					return
244				}
245			}
246
247			// Close current connection before looping further.
248			closeResponse(resp)
249
250		}
251	}(notificationInfoCh)
252
253	// Returns the notification info channel, for caller to start reading from.
254	return notificationInfoCh
255}
256