1package oss
2
3import (
4	"crypto/md5"
5	"encoding/base64"
6	"encoding/hex"
7	"encoding/json"
8	"errors"
9	"fmt"
10	"io/ioutil"
11	"os"
12	"path/filepath"
13	"time"
14)
15
16// UploadFile is multipart file upload.
17//
18// objectKey    the object name.
19// filePath    the local file path to upload.
20// partSize    the part size in byte.
21// options    the options for uploading object.
22//
23// error    it's nil if the operation succeeds, otherwise it's an error object.
24//
25func (bucket Bucket) UploadFile(objectKey, filePath string, partSize int64, options ...Option) error {
26	if partSize < MinPartSize || partSize > MaxPartSize {
27		return errors.New("oss: part size invalid range (100KB, 5GB]")
28	}
29
30	cpConf := getCpConfig(options)
31	routines := getRoutines(options)
32
33	if cpConf != nil && cpConf.IsEnable {
34		cpFilePath := getUploadCpFilePath(cpConf, filePath, bucket.BucketName, objectKey)
35		if cpFilePath != "" {
36			return bucket.uploadFileWithCp(objectKey, filePath, partSize, options, cpFilePath, routines)
37		}
38	}
39
40	return bucket.uploadFile(objectKey, filePath, partSize, options, routines)
41}
42
43func getUploadCpFilePath(cpConf *cpConfig, srcFile, destBucket, destObject string) string {
44	if cpConf.FilePath == "" && cpConf.DirPath != "" {
45		dest := fmt.Sprintf("oss://%v/%v", destBucket, destObject)
46		absPath, _ := filepath.Abs(srcFile)
47		cpFileName := getCpFileName(absPath, dest)
48		cpConf.FilePath = cpConf.DirPath + string(os.PathSeparator) + cpFileName
49	}
50	return cpConf.FilePath
51}
52
53// ----- concurrent upload without checkpoint  -----
54
55// getCpConfig gets checkpoint configuration
56func getCpConfig(options []Option) *cpConfig {
57	cpcOpt, err := findOption(options, checkpointConfig, nil)
58	if err != nil || cpcOpt == nil {
59		return nil
60	}
61
62	return cpcOpt.(*cpConfig)
63}
64
65// getCpFileName return the name of the checkpoint file
66func getCpFileName(src, dest string) string {
67	md5Ctx := md5.New()
68	md5Ctx.Write([]byte(src))
69	srcCheckSum := hex.EncodeToString(md5Ctx.Sum(nil))
70
71	md5Ctx.Reset()
72	md5Ctx.Write([]byte(dest))
73	destCheckSum := hex.EncodeToString(md5Ctx.Sum(nil))
74
75	return fmt.Sprintf("%v-%v.cp", srcCheckSum, destCheckSum)
76}
77
78// getRoutines gets the routine count. by default it's 1.
79func getRoutines(options []Option) int {
80	rtnOpt, err := findOption(options, routineNum, nil)
81	if err != nil || rtnOpt == nil {
82		return 1
83	}
84
85	rs := rtnOpt.(int)
86	if rs < 1 {
87		rs = 1
88	} else if rs > 100 {
89		rs = 100
90	}
91
92	return rs
93}
94
95// getPayer return the payer of the request
96func getPayer(options []Option) string {
97	payerOpt, err := findOption(options, HTTPHeaderOSSRequester, nil)
98	if err != nil || payerOpt == nil {
99		return ""
100	}
101
102	return payerOpt.(string)
103}
104
105// getProgressListener gets the progress callback
106func getProgressListener(options []Option) ProgressListener {
107	isSet, listener, _ := isOptionSet(options, progressListener)
108	if !isSet {
109		return nil
110	}
111	return listener.(ProgressListener)
112}
113
114// uploadPartHook is for testing usage
115type uploadPartHook func(id int, chunk FileChunk) error
116
117var uploadPartHooker uploadPartHook = defaultUploadPart
118
119func defaultUploadPart(id int, chunk FileChunk) error {
120	return nil
121}
122
123// workerArg defines worker argument structure
124type workerArg struct {
125	bucket   *Bucket
126	filePath string
127	imur     InitiateMultipartUploadResult
128	options  []Option
129	hook     uploadPartHook
130}
131
132// worker is the worker coroutine function
133func worker(id int, arg workerArg, jobs <-chan FileChunk, results chan<- UploadPart, failed chan<- error, die <-chan bool) {
134	for chunk := range jobs {
135		if err := arg.hook(id, chunk); err != nil {
136			failed <- err
137			break
138		}
139		part, err := arg.bucket.UploadPartFromFile(arg.imur, arg.filePath, chunk.Offset, chunk.Size, chunk.Number, arg.options...)
140		if err != nil {
141			failed <- err
142			break
143		}
144		select {
145		case <-die:
146			return
147		default:
148		}
149		results <- part
150	}
151}
152
153// scheduler function
154func scheduler(jobs chan FileChunk, chunks []FileChunk) {
155	for _, chunk := range chunks {
156		jobs <- chunk
157	}
158	close(jobs)
159}
160
161func getTotalBytes(chunks []FileChunk) int64 {
162	var tb int64
163	for _, chunk := range chunks {
164		tb += chunk.Size
165	}
166	return tb
167}
168
169// uploadFile is a concurrent upload, without checkpoint
170func (bucket Bucket) uploadFile(objectKey, filePath string, partSize int64, options []Option, routines int) error {
171	listener := getProgressListener(options)
172
173	chunks, err := SplitFileByPartSize(filePath, partSize)
174	if err != nil {
175		return err
176	}
177
178	payerOptions := []Option{}
179	payer := getPayer(options)
180	if payer != "" {
181		payerOptions = append(payerOptions, RequestPayer(PayerType(payer)))
182	}
183
184	// Initialize the multipart upload
185	imur, err := bucket.InitiateMultipartUpload(objectKey, options...)
186	if err != nil {
187		return err
188	}
189
190	jobs := make(chan FileChunk, len(chunks))
191	results := make(chan UploadPart, len(chunks))
192	failed := make(chan error)
193	die := make(chan bool)
194
195	var completedBytes int64
196	totalBytes := getTotalBytes(chunks)
197	event := newProgressEvent(TransferStartedEvent, 0, totalBytes)
198	publishProgress(listener, event)
199
200	// Start the worker coroutine
201	arg := workerArg{&bucket, filePath, imur, payerOptions, uploadPartHooker}
202	for w := 1; w <= routines; w++ {
203		go worker(w, arg, jobs, results, failed, die)
204	}
205
206	// Schedule the jobs
207	go scheduler(jobs, chunks)
208
209	// Waiting for the upload finished
210	completed := 0
211	parts := make([]UploadPart, len(chunks))
212	for completed < len(chunks) {
213		select {
214		case part := <-results:
215			completed++
216			parts[part.PartNumber-1] = part
217			completedBytes += chunks[part.PartNumber-1].Size
218			event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes)
219			publishProgress(listener, event)
220		case err := <-failed:
221			close(die)
222			event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes)
223			publishProgress(listener, event)
224			bucket.AbortMultipartUpload(imur, payerOptions...)
225			return err
226		}
227
228		if completed >= len(chunks) {
229			break
230		}
231	}
232
233	event = newProgressEvent(TransferStartedEvent, completedBytes, totalBytes)
234	publishProgress(listener, event)
235
236	// Complete the multpart upload
237	_, err = bucket.CompleteMultipartUpload(imur, parts, payerOptions...)
238	if err != nil {
239		bucket.AbortMultipartUpload(imur, payerOptions...)
240		return err
241	}
242	return nil
243}
244
245// ----- concurrent upload with checkpoint  -----
246const uploadCpMagic = "FE8BB4EA-B593-4FAC-AD7A-2459A36E2E62"
247
248type uploadCheckpoint struct {
249	Magic     string   // Magic
250	MD5       string   // Checkpoint file content's MD5
251	FilePath  string   // Local file path
252	FileStat  cpStat   // File state
253	ObjectKey string   // Key
254	UploadID  string   // Upload ID
255	Parts     []cpPart // All parts of the local file
256}
257
258type cpStat struct {
259	Size         int64     // File size
260	LastModified time.Time // File's last modified time
261	MD5          string    // Local file's MD5
262}
263
264type cpPart struct {
265	Chunk       FileChunk  // File chunk
266	Part        UploadPart // Uploaded part
267	IsCompleted bool       // Upload complete flag
268}
269
270// isValid checks if the uploaded data is valid---it's valid when the file is not updated and the checkpoint data is valid.
271func (cp uploadCheckpoint) isValid(filePath string) (bool, error) {
272	// Compare the CP's magic number and MD5.
273	cpb := cp
274	cpb.MD5 = ""
275	js, _ := json.Marshal(cpb)
276	sum := md5.Sum(js)
277	b64 := base64.StdEncoding.EncodeToString(sum[:])
278
279	if cp.Magic != uploadCpMagic || b64 != cp.MD5 {
280		return false, nil
281	}
282
283	// Make sure if the local file is updated.
284	fd, err := os.Open(filePath)
285	if err != nil {
286		return false, err
287	}
288	defer fd.Close()
289
290	st, err := fd.Stat()
291	if err != nil {
292		return false, err
293	}
294
295	md, err := calcFileMD5(filePath)
296	if err != nil {
297		return false, err
298	}
299
300	// Compare the file size, file's last modified time and file's MD5
301	if cp.FileStat.Size != st.Size() ||
302		cp.FileStat.LastModified != st.ModTime() ||
303		cp.FileStat.MD5 != md {
304		return false, nil
305	}
306
307	return true, nil
308}
309
310// load loads from the file
311func (cp *uploadCheckpoint) load(filePath string) error {
312	contents, err := ioutil.ReadFile(filePath)
313	if err != nil {
314		return err
315	}
316
317	err = json.Unmarshal(contents, cp)
318	return err
319}
320
321// dump dumps to the local file
322func (cp *uploadCheckpoint) dump(filePath string) error {
323	bcp := *cp
324
325	// Calculate MD5
326	bcp.MD5 = ""
327	js, err := json.Marshal(bcp)
328	if err != nil {
329		return err
330	}
331	sum := md5.Sum(js)
332	b64 := base64.StdEncoding.EncodeToString(sum[:])
333	bcp.MD5 = b64
334
335	// Serialization
336	js, err = json.Marshal(bcp)
337	if err != nil {
338		return err
339	}
340
341	// Dump
342	return ioutil.WriteFile(filePath, js, FilePermMode)
343}
344
345// updatePart updates the part status
346func (cp *uploadCheckpoint) updatePart(part UploadPart) {
347	cp.Parts[part.PartNumber-1].Part = part
348	cp.Parts[part.PartNumber-1].IsCompleted = true
349}
350
351// todoParts returns unfinished parts
352func (cp *uploadCheckpoint) todoParts() []FileChunk {
353	fcs := []FileChunk{}
354	for _, part := range cp.Parts {
355		if !part.IsCompleted {
356			fcs = append(fcs, part.Chunk)
357		}
358	}
359	return fcs
360}
361
362// allParts returns all parts
363func (cp *uploadCheckpoint) allParts() []UploadPart {
364	ps := []UploadPart{}
365	for _, part := range cp.Parts {
366		ps = append(ps, part.Part)
367	}
368	return ps
369}
370
371// getCompletedBytes returns completed bytes count
372func (cp *uploadCheckpoint) getCompletedBytes() int64 {
373	var completedBytes int64
374	for _, part := range cp.Parts {
375		if part.IsCompleted {
376			completedBytes += part.Chunk.Size
377		}
378	}
379	return completedBytes
380}
381
382// calcFileMD5 calculates the MD5 for the specified local file
383func calcFileMD5(filePath string) (string, error) {
384	return "", nil
385}
386
387// prepare initializes the multipart upload
388func prepare(cp *uploadCheckpoint, objectKey, filePath string, partSize int64, bucket *Bucket, options []Option) error {
389	// CP
390	cp.Magic = uploadCpMagic
391	cp.FilePath = filePath
392	cp.ObjectKey = objectKey
393
394	// Local file
395	fd, err := os.Open(filePath)
396	if err != nil {
397		return err
398	}
399	defer fd.Close()
400
401	st, err := fd.Stat()
402	if err != nil {
403		return err
404	}
405	cp.FileStat.Size = st.Size()
406	cp.FileStat.LastModified = st.ModTime()
407	md, err := calcFileMD5(filePath)
408	if err != nil {
409		return err
410	}
411	cp.FileStat.MD5 = md
412
413	// Chunks
414	parts, err := SplitFileByPartSize(filePath, partSize)
415	if err != nil {
416		return err
417	}
418
419	cp.Parts = make([]cpPart, len(parts))
420	for i, part := range parts {
421		cp.Parts[i].Chunk = part
422		cp.Parts[i].IsCompleted = false
423	}
424
425	// Init load
426	imur, err := bucket.InitiateMultipartUpload(objectKey, options...)
427	if err != nil {
428		return err
429	}
430	cp.UploadID = imur.UploadID
431
432	return nil
433}
434
435// complete completes the multipart upload and deletes the local CP files
436func complete(cp *uploadCheckpoint, bucket *Bucket, parts []UploadPart, cpFilePath string, options []Option) error {
437	imur := InitiateMultipartUploadResult{Bucket: bucket.BucketName,
438		Key: cp.ObjectKey, UploadID: cp.UploadID}
439	_, err := bucket.CompleteMultipartUpload(imur, parts, options...)
440	if err != nil {
441		return err
442	}
443	os.Remove(cpFilePath)
444	return err
445}
446
447// uploadFileWithCp handles concurrent upload with checkpoint
448func (bucket Bucket) uploadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int) error {
449	listener := getProgressListener(options)
450
451	payerOptions := []Option{}
452	payer := getPayer(options)
453	if payer != "" {
454		payerOptions = append(payerOptions, RequestPayer(PayerType(payer)))
455	}
456
457	// Load CP data
458	ucp := uploadCheckpoint{}
459	err := ucp.load(cpFilePath)
460	if err != nil {
461		os.Remove(cpFilePath)
462	}
463
464	// Load error or the CP data is invalid.
465	valid, err := ucp.isValid(filePath)
466	if err != nil || !valid {
467		if err = prepare(&ucp, objectKey, filePath, partSize, &bucket, options); err != nil {
468			return err
469		}
470		os.Remove(cpFilePath)
471	}
472
473	chunks := ucp.todoParts()
474	imur := InitiateMultipartUploadResult{
475		Bucket:   bucket.BucketName,
476		Key:      objectKey,
477		UploadID: ucp.UploadID}
478
479	jobs := make(chan FileChunk, len(chunks))
480	results := make(chan UploadPart, len(chunks))
481	failed := make(chan error)
482	die := make(chan bool)
483
484	completedBytes := ucp.getCompletedBytes()
485	event := newProgressEvent(TransferStartedEvent, completedBytes, ucp.FileStat.Size)
486	publishProgress(listener, event)
487
488	// Start the workers
489	arg := workerArg{&bucket, filePath, imur, payerOptions, uploadPartHooker}
490	for w := 1; w <= routines; w++ {
491		go worker(w, arg, jobs, results, failed, die)
492	}
493
494	// Schedule jobs
495	go scheduler(jobs, chunks)
496
497	// Waiting for the job finished
498	completed := 0
499	for completed < len(chunks) {
500		select {
501		case part := <-results:
502			completed++
503			ucp.updatePart(part)
504			ucp.dump(cpFilePath)
505			completedBytes += ucp.Parts[part.PartNumber-1].Chunk.Size
506			event = newProgressEvent(TransferDataEvent, completedBytes, ucp.FileStat.Size)
507			publishProgress(listener, event)
508		case err := <-failed:
509			close(die)
510			event = newProgressEvent(TransferFailedEvent, completedBytes, ucp.FileStat.Size)
511			publishProgress(listener, event)
512			return err
513		}
514
515		if completed >= len(chunks) {
516			break
517		}
518	}
519
520	event = newProgressEvent(TransferCompletedEvent, completedBytes, ucp.FileStat.Size)
521	publishProgress(listener, event)
522
523	// Complete the multipart upload
524	err = complete(&ucp, &bucket, ucp.allParts(), cpFilePath, payerOptions)
525	return err
526}
527