1package oss
2
3import (
4	"crypto/md5"
5	"encoding/base64"
6	"encoding/hex"
7	"encoding/json"
8	"errors"
9	"fmt"
10	"io/ioutil"
11	"os"
12	"path/filepath"
13	"time"
14)
15
16// UploadFile is multipart file upload.
17//
18// objectKey    the object name.
19// filePath    the local file path to upload.
20// partSize    the part size in byte.
21// options    the options for uploading object.
22//
23// error    it's nil if the operation succeeds, otherwise it's an error object.
24//
25func (bucket Bucket) UploadFile(objectKey, filePath string, partSize int64, options ...Option) error {
26	if partSize < MinPartSize || partSize > MaxPartSize {
27		return errors.New("oss: part size invalid range (100KB, 5GB]")
28	}
29
30	cpConf := getCpConfig(options)
31	routines := getRoutines(options)
32
33	if cpConf != nil && cpConf.IsEnable {
34		cpFilePath := getUploadCpFilePath(cpConf, filePath, bucket.BucketName, objectKey)
35		if cpFilePath != "" {
36			return bucket.uploadFileWithCp(objectKey, filePath, partSize, options, cpFilePath, routines)
37		}
38	}
39
40	return bucket.uploadFile(objectKey, filePath, partSize, options, routines)
41}
42
43func getUploadCpFilePath(cpConf *cpConfig, srcFile, destBucket, destObject string) string {
44	if cpConf.FilePath == "" && cpConf.DirPath != "" {
45		dest := fmt.Sprintf("oss://%v/%v", destBucket, destObject)
46		absPath, _ := filepath.Abs(srcFile)
47		cpFileName := getCpFileName(absPath, dest, "")
48		cpConf.FilePath = cpConf.DirPath + string(os.PathSeparator) + cpFileName
49	}
50	return cpConf.FilePath
51}
52
53// ----- concurrent upload without checkpoint  -----
54
55// getCpConfig gets checkpoint configuration
56func getCpConfig(options []Option) *cpConfig {
57	cpcOpt, err := findOption(options, checkpointConfig, nil)
58	if err != nil || cpcOpt == nil {
59		return nil
60	}
61
62	return cpcOpt.(*cpConfig)
63}
64
65// getCpFileName return the name of the checkpoint file
66func getCpFileName(src, dest, versionId string) string {
67	md5Ctx := md5.New()
68	md5Ctx.Write([]byte(src))
69	srcCheckSum := hex.EncodeToString(md5Ctx.Sum(nil))
70
71	md5Ctx.Reset()
72	md5Ctx.Write([]byte(dest))
73	destCheckSum := hex.EncodeToString(md5Ctx.Sum(nil))
74
75	if versionId == "" {
76		return fmt.Sprintf("%v-%v.cp", srcCheckSum, destCheckSum)
77	}
78
79	md5Ctx.Reset()
80	md5Ctx.Write([]byte(versionId))
81	versionCheckSum := hex.EncodeToString(md5Ctx.Sum(nil))
82	return fmt.Sprintf("%v-%v-%v.cp", srcCheckSum, destCheckSum, versionCheckSum)
83}
84
85// getRoutines gets the routine count. by default it's 1.
86func getRoutines(options []Option) int {
87	rtnOpt, err := findOption(options, routineNum, nil)
88	if err != nil || rtnOpt == nil {
89		return 1
90	}
91
92	rs := rtnOpt.(int)
93	if rs < 1 {
94		rs = 1
95	} else if rs > 100 {
96		rs = 100
97	}
98
99	return rs
100}
101
102// getPayer return the payer of the request
103func getPayer(options []Option) string {
104	payerOpt, err := findOption(options, HTTPHeaderOssRequester, nil)
105	if err != nil || payerOpt == nil {
106		return ""
107	}
108
109	return payerOpt.(string)
110}
111
112// getProgressListener gets the progress callback
113func getProgressListener(options []Option) ProgressListener {
114	isSet, listener, _ := isOptionSet(options, progressListener)
115	if !isSet {
116		return nil
117	}
118	return listener.(ProgressListener)
119}
120
121// uploadPartHook is for testing usage
122type uploadPartHook func(id int, chunk FileChunk) error
123
124var uploadPartHooker uploadPartHook = defaultUploadPart
125
126func defaultUploadPart(id int, chunk FileChunk) error {
127	return nil
128}
129
130// workerArg defines worker argument structure
131type workerArg struct {
132	bucket   *Bucket
133	filePath string
134	imur     InitiateMultipartUploadResult
135	options  []Option
136	hook     uploadPartHook
137}
138
139// worker is the worker coroutine function
140func worker(id int, arg workerArg, jobs <-chan FileChunk, results chan<- UploadPart, failed chan<- error, die <-chan bool) {
141	for chunk := range jobs {
142		if err := arg.hook(id, chunk); err != nil {
143			failed <- err
144			break
145		}
146		part, err := arg.bucket.UploadPartFromFile(arg.imur, arg.filePath, chunk.Offset, chunk.Size, chunk.Number, arg.options...)
147		if err != nil {
148			failed <- err
149			break
150		}
151		select {
152		case <-die:
153			return
154		default:
155		}
156		results <- part
157	}
158}
159
160// scheduler function
161func scheduler(jobs chan FileChunk, chunks []FileChunk) {
162	for _, chunk := range chunks {
163		jobs <- chunk
164	}
165	close(jobs)
166}
167
168func getTotalBytes(chunks []FileChunk) int64 {
169	var tb int64
170	for _, chunk := range chunks {
171		tb += chunk.Size
172	}
173	return tb
174}
175
176// uploadFile is a concurrent upload, without checkpoint
177func (bucket Bucket) uploadFile(objectKey, filePath string, partSize int64, options []Option, routines int) error {
178	listener := getProgressListener(options)
179
180	chunks, err := SplitFileByPartSize(filePath, partSize)
181	if err != nil {
182		return err
183	}
184
185	partOptions := ChoiceTransferPartOption(options)
186	completeOptions := ChoiceCompletePartOption(options)
187	abortOptions := ChoiceAbortPartOption(options)
188
189	// Initialize the multipart upload
190	imur, err := bucket.InitiateMultipartUpload(objectKey, options...)
191	if err != nil {
192		return err
193	}
194
195	jobs := make(chan FileChunk, len(chunks))
196	results := make(chan UploadPart, len(chunks))
197	failed := make(chan error)
198	die := make(chan bool)
199
200	var completedBytes int64
201	totalBytes := getTotalBytes(chunks)
202	event := newProgressEvent(TransferStartedEvent, 0, totalBytes, 0)
203	publishProgress(listener, event)
204
205	// Start the worker coroutine
206	arg := workerArg{&bucket, filePath, imur, partOptions, uploadPartHooker}
207	for w := 1; w <= routines; w++ {
208		go worker(w, arg, jobs, results, failed, die)
209	}
210
211	// Schedule the jobs
212	go scheduler(jobs, chunks)
213
214	// Waiting for the upload finished
215	completed := 0
216	parts := make([]UploadPart, len(chunks))
217	for completed < len(chunks) {
218		select {
219		case part := <-results:
220			completed++
221			parts[part.PartNumber-1] = part
222			completedBytes += chunks[part.PartNumber-1].Size
223
224			// why RwBytes in ProgressEvent is 0 ?
225			// because read or write event has been notified in teeReader.Read()
226			event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes, 0)
227			publishProgress(listener, event)
228		case err := <-failed:
229			close(die)
230			event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes, 0)
231			publishProgress(listener, event)
232			bucket.AbortMultipartUpload(imur, abortOptions...)
233			return err
234		}
235
236		if completed >= len(chunks) {
237			break
238		}
239	}
240
241	event = newProgressEvent(TransferStartedEvent, completedBytes, totalBytes, 0)
242	publishProgress(listener, event)
243
244	// Complete the multpart upload
245	_, err = bucket.CompleteMultipartUpload(imur, parts, completeOptions...)
246	if err != nil {
247		bucket.AbortMultipartUpload(imur, abortOptions...)
248		return err
249	}
250	return nil
251}
252
253// ----- concurrent upload with checkpoint  -----
254const uploadCpMagic = "FE8BB4EA-B593-4FAC-AD7A-2459A36E2E62"
255
256type uploadCheckpoint struct {
257	Magic     string   // Magic
258	MD5       string   // Checkpoint file content's MD5
259	FilePath  string   // Local file path
260	FileStat  cpStat   // File state
261	ObjectKey string   // Key
262	UploadID  string   // Upload ID
263	Parts     []cpPart // All parts of the local file
264}
265
266type cpStat struct {
267	Size         int64     // File size
268	LastModified time.Time // File's last modified time
269	MD5          string    // Local file's MD5
270}
271
272type cpPart struct {
273	Chunk       FileChunk  // File chunk
274	Part        UploadPart // Uploaded part
275	IsCompleted bool       // Upload complete flag
276}
277
278// isValid checks if the uploaded data is valid---it's valid when the file is not updated and the checkpoint data is valid.
279func (cp uploadCheckpoint) isValid(filePath string) (bool, error) {
280	// Compare the CP's magic number and MD5.
281	cpb := cp
282	cpb.MD5 = ""
283	js, _ := json.Marshal(cpb)
284	sum := md5.Sum(js)
285	b64 := base64.StdEncoding.EncodeToString(sum[:])
286
287	if cp.Magic != uploadCpMagic || b64 != cp.MD5 {
288		return false, nil
289	}
290
291	// Make sure if the local file is updated.
292	fd, err := os.Open(filePath)
293	if err != nil {
294		return false, err
295	}
296	defer fd.Close()
297
298	st, err := fd.Stat()
299	if err != nil {
300		return false, err
301	}
302
303	md, err := calcFileMD5(filePath)
304	if err != nil {
305		return false, err
306	}
307
308	// Compare the file size, file's last modified time and file's MD5
309	if cp.FileStat.Size != st.Size() ||
310		!cp.FileStat.LastModified.Equal(st.ModTime()) ||
311		cp.FileStat.MD5 != md {
312		return false, nil
313	}
314
315	return true, nil
316}
317
318// load loads from the file
319func (cp *uploadCheckpoint) load(filePath string) error {
320	contents, err := ioutil.ReadFile(filePath)
321	if err != nil {
322		return err
323	}
324
325	err = json.Unmarshal(contents, cp)
326	return err
327}
328
329// dump dumps to the local file
330func (cp *uploadCheckpoint) dump(filePath string) error {
331	bcp := *cp
332
333	// Calculate MD5
334	bcp.MD5 = ""
335	js, err := json.Marshal(bcp)
336	if err != nil {
337		return err
338	}
339	sum := md5.Sum(js)
340	b64 := base64.StdEncoding.EncodeToString(sum[:])
341	bcp.MD5 = b64
342
343	// Serialization
344	js, err = json.Marshal(bcp)
345	if err != nil {
346		return err
347	}
348
349	// Dump
350	return ioutil.WriteFile(filePath, js, FilePermMode)
351}
352
353// updatePart updates the part status
354func (cp *uploadCheckpoint) updatePart(part UploadPart) {
355	cp.Parts[part.PartNumber-1].Part = part
356	cp.Parts[part.PartNumber-1].IsCompleted = true
357}
358
359// todoParts returns unfinished parts
360func (cp *uploadCheckpoint) todoParts() []FileChunk {
361	fcs := []FileChunk{}
362	for _, part := range cp.Parts {
363		if !part.IsCompleted {
364			fcs = append(fcs, part.Chunk)
365		}
366	}
367	return fcs
368}
369
370// allParts returns all parts
371func (cp *uploadCheckpoint) allParts() []UploadPart {
372	ps := []UploadPart{}
373	for _, part := range cp.Parts {
374		ps = append(ps, part.Part)
375	}
376	return ps
377}
378
379// getCompletedBytes returns completed bytes count
380func (cp *uploadCheckpoint) getCompletedBytes() int64 {
381	var completedBytes int64
382	for _, part := range cp.Parts {
383		if part.IsCompleted {
384			completedBytes += part.Chunk.Size
385		}
386	}
387	return completedBytes
388}
389
390// calcFileMD5 calculates the MD5 for the specified local file
391func calcFileMD5(filePath string) (string, error) {
392	return "", nil
393}
394
395// prepare initializes the multipart upload
396func prepare(cp *uploadCheckpoint, objectKey, filePath string, partSize int64, bucket *Bucket, options []Option) error {
397	// CP
398	cp.Magic = uploadCpMagic
399	cp.FilePath = filePath
400	cp.ObjectKey = objectKey
401
402	// Local file
403	fd, err := os.Open(filePath)
404	if err != nil {
405		return err
406	}
407	defer fd.Close()
408
409	st, err := fd.Stat()
410	if err != nil {
411		return err
412	}
413	cp.FileStat.Size = st.Size()
414	cp.FileStat.LastModified = st.ModTime()
415	md, err := calcFileMD5(filePath)
416	if err != nil {
417		return err
418	}
419	cp.FileStat.MD5 = md
420
421	// Chunks
422	parts, err := SplitFileByPartSize(filePath, partSize)
423	if err != nil {
424		return err
425	}
426
427	cp.Parts = make([]cpPart, len(parts))
428	for i, part := range parts {
429		cp.Parts[i].Chunk = part
430		cp.Parts[i].IsCompleted = false
431	}
432
433	// Init load
434	imur, err := bucket.InitiateMultipartUpload(objectKey, options...)
435	if err != nil {
436		return err
437	}
438	cp.UploadID = imur.UploadID
439
440	return nil
441}
442
443// complete completes the multipart upload and deletes the local CP files
444func complete(cp *uploadCheckpoint, bucket *Bucket, parts []UploadPart, cpFilePath string, options []Option) error {
445	imur := InitiateMultipartUploadResult{Bucket: bucket.BucketName,
446		Key: cp.ObjectKey, UploadID: cp.UploadID}
447	_, err := bucket.CompleteMultipartUpload(imur, parts, options...)
448	if err != nil {
449		return err
450	}
451	os.Remove(cpFilePath)
452	return err
453}
454
455// uploadFileWithCp handles concurrent upload with checkpoint
456func (bucket Bucket) uploadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int) error {
457	listener := getProgressListener(options)
458
459	partOptions := ChoiceTransferPartOption(options)
460	completeOptions := ChoiceCompletePartOption(options)
461
462	// Load CP data
463	ucp := uploadCheckpoint{}
464	err := ucp.load(cpFilePath)
465	if err != nil {
466		os.Remove(cpFilePath)
467	}
468
469	// Load error or the CP data is invalid.
470	valid, err := ucp.isValid(filePath)
471	if err != nil || !valid {
472		if err = prepare(&ucp, objectKey, filePath, partSize, &bucket, options); err != nil {
473			return err
474		}
475		os.Remove(cpFilePath)
476	}
477
478	chunks := ucp.todoParts()
479	imur := InitiateMultipartUploadResult{
480		Bucket:   bucket.BucketName,
481		Key:      objectKey,
482		UploadID: ucp.UploadID}
483
484	jobs := make(chan FileChunk, len(chunks))
485	results := make(chan UploadPart, len(chunks))
486	failed := make(chan error)
487	die := make(chan bool)
488
489	completedBytes := ucp.getCompletedBytes()
490
491	// why RwBytes in ProgressEvent is 0 ?
492	// because read or write event has been notified in teeReader.Read()
493	event := newProgressEvent(TransferStartedEvent, completedBytes, ucp.FileStat.Size, 0)
494	publishProgress(listener, event)
495
496	// Start the workers
497	arg := workerArg{&bucket, filePath, imur, partOptions, uploadPartHooker}
498	for w := 1; w <= routines; w++ {
499		go worker(w, arg, jobs, results, failed, die)
500	}
501
502	// Schedule jobs
503	go scheduler(jobs, chunks)
504
505	// Waiting for the job finished
506	completed := 0
507	for completed < len(chunks) {
508		select {
509		case part := <-results:
510			completed++
511			ucp.updatePart(part)
512			ucp.dump(cpFilePath)
513			completedBytes += ucp.Parts[part.PartNumber-1].Chunk.Size
514			event = newProgressEvent(TransferDataEvent, completedBytes, ucp.FileStat.Size, 0)
515			publishProgress(listener, event)
516		case err := <-failed:
517			close(die)
518			event = newProgressEvent(TransferFailedEvent, completedBytes, ucp.FileStat.Size, 0)
519			publishProgress(listener, event)
520			return err
521		}
522
523		if completed >= len(chunks) {
524			break
525		}
526	}
527
528	event = newProgressEvent(TransferCompletedEvent, completedBytes, ucp.FileStat.Size, 0)
529	publishProgress(listener, event)
530
531	// Complete the multipart upload
532	err = complete(&ucp, &bucket, ucp.allParts(), cpFilePath, completeOptions)
533	return err
534}
535