1package oss
2
3import (
4	"crypto/md5"
5	"encoding/base64"
6	"encoding/json"
7	"errors"
8	"io/ioutil"
9	"os"
10	"time"
11)
12
13// UploadFile is multipart file upload.
14//
15// objectKey    the object name.
16// filePath    the local file path to upload.
17// partSize    the part size in byte.
18// options    the options for uploading object.
19//
20// error    it's nil if the operation succeeds, otherwise it's an error object.
21//
22func (bucket Bucket) UploadFile(objectKey, filePath string, partSize int64, options ...Option) error {
23	if partSize < MinPartSize || partSize > MaxPartSize {
24		return errors.New("oss: part size invalid range (1024KB, 5GB]")
25	}
26
27	cpConf, err := getCpConfig(options, filePath)
28	if err != nil {
29		return err
30	}
31
32	routines := getRoutines(options)
33
34	if cpConf.IsEnable {
35		return bucket.uploadFileWithCp(objectKey, filePath, partSize, options, cpConf.FilePath, routines)
36	}
37
38	return bucket.uploadFile(objectKey, filePath, partSize, options, routines)
39}
40
41// ----- concurrent upload without checkpoint  -----
42
43// getCpConfig gets checkpoint configuration
44func getCpConfig(options []Option, filePath string) (*cpConfig, error) {
45	cpc := &cpConfig{}
46	cpcOpt, err := findOption(options, checkpointConfig, nil)
47	if err != nil || cpcOpt == nil {
48		return cpc, err
49	}
50
51	cpc = cpcOpt.(*cpConfig)
52	if cpc.IsEnable && cpc.FilePath == "" {
53		cpc.FilePath = filePath + CheckpointFileSuffix
54	}
55
56	return cpc, nil
57}
58
59// getRoutines gets the routine count. by default it's 1.
60func getRoutines(options []Option) int {
61	rtnOpt, err := findOption(options, routineNum, nil)
62	if err != nil || rtnOpt == nil {
63		return 1
64	}
65
66	rs := rtnOpt.(int)
67	if rs < 1 {
68		rs = 1
69	} else if rs > 100 {
70		rs = 100
71	}
72
73	return rs
74}
75
76// getProgressListener gets the progress callback
77func getProgressListener(options []Option) ProgressListener {
78	isSet, listener, _ := isOptionSet(options, progressListener)
79	if !isSet {
80		return nil
81	}
82	return listener.(ProgressListener)
83}
84
85// uploadPartHook is for testing usage
86type uploadPartHook func(id int, chunk FileChunk) error
87
88var uploadPartHooker uploadPartHook = defaultUploadPart
89
90func defaultUploadPart(id int, chunk FileChunk) error {
91	return nil
92}
93
94// workerArg defines worker argument structure
95type workerArg struct {
96	bucket   *Bucket
97	filePath string
98	imur     InitiateMultipartUploadResult
99	hook     uploadPartHook
100}
101
102// worker is the worker coroutine function
103func worker(id int, arg workerArg, jobs <-chan FileChunk, results chan<- UploadPart, failed chan<- error, die <-chan bool) {
104	for chunk := range jobs {
105		if err := arg.hook(id, chunk); err != nil {
106			failed <- err
107			break
108		}
109		part, err := arg.bucket.UploadPartFromFile(arg.imur, arg.filePath, chunk.Offset, chunk.Size, chunk.Number)
110		if err != nil {
111			failed <- err
112			break
113		}
114		select {
115		case <-die:
116			return
117		default:
118		}
119		results <- part
120	}
121}
122
123// scheduler function
124func scheduler(jobs chan FileChunk, chunks []FileChunk) {
125	for _, chunk := range chunks {
126		jobs <- chunk
127	}
128	close(jobs)
129}
130
131func getTotalBytes(chunks []FileChunk) int64 {
132	var tb int64
133	for _, chunk := range chunks {
134		tb += chunk.Size
135	}
136	return tb
137}
138
139// uploadFile is a concurrent upload, without checkpoint
140func (bucket Bucket) uploadFile(objectKey, filePath string, partSize int64, options []Option, routines int) error {
141	listener := getProgressListener(options)
142
143	chunks, err := SplitFileByPartSize(filePath, partSize)
144	if err != nil {
145		return err
146	}
147
148	// Initialize the multipart upload
149	imur, err := bucket.InitiateMultipartUpload(objectKey, options...)
150	if err != nil {
151		return err
152	}
153
154	jobs := make(chan FileChunk, len(chunks))
155	results := make(chan UploadPart, len(chunks))
156	failed := make(chan error)
157	die := make(chan bool)
158
159	var completedBytes int64
160	totalBytes := getTotalBytes(chunks)
161	event := newProgressEvent(TransferStartedEvent, 0, totalBytes)
162	publishProgress(listener, event)
163
164	// Start the worker coroutine
165	arg := workerArg{&bucket, filePath, imur, uploadPartHooker}
166	for w := 1; w <= routines; w++ {
167		go worker(w, arg, jobs, results, failed, die)
168	}
169
170	// Schedule the jobs
171	go scheduler(jobs, chunks)
172
173	// Waiting for the upload finished
174	completed := 0
175	parts := make([]UploadPart, len(chunks))
176	for completed < len(chunks) {
177		select {
178		case part := <-results:
179			completed++
180			parts[part.PartNumber-1] = part
181			completedBytes += chunks[part.PartNumber-1].Size
182			event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes)
183			publishProgress(listener, event)
184		case err := <-failed:
185			close(die)
186			event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes)
187			publishProgress(listener, event)
188			bucket.AbortMultipartUpload(imur)
189			return err
190		}
191
192		if completed >= len(chunks) {
193			break
194		}
195	}
196
197	event = newProgressEvent(TransferStartedEvent, completedBytes, totalBytes)
198	publishProgress(listener, event)
199
200	// Complete the multpart upload
201	_, err = bucket.CompleteMultipartUpload(imur, parts)
202	if err != nil {
203		bucket.AbortMultipartUpload(imur)
204		return err
205	}
206	return nil
207}
208
209// ----- concurrent upload with checkpoint  -----
210const uploadCpMagic = "FE8BB4EA-B593-4FAC-AD7A-2459A36E2E62"
211
212type uploadCheckpoint struct {
213	Magic     string   // Magic
214	MD5       string   // Checkpoint file content's MD5
215	FilePath  string   // Local file path
216	FileStat  cpStat   // File state
217	ObjectKey string   // Key
218	UploadID  string   // Upload ID
219	Parts     []cpPart // All parts of the local file
220}
221
222type cpStat struct {
223	Size         int64     // File size
224	LastModified time.Time // File's last modified time
225	MD5          string    // Local file's MD5
226}
227
228type cpPart struct {
229	Chunk       FileChunk  // File chunk
230	Part        UploadPart // Uploaded part
231	IsCompleted bool       // Upload complete flag
232}
233
234// isValid checks if the uploaded data is valid---it's valid when the file is not updated and the checkpoint data is valid.
235func (cp uploadCheckpoint) isValid(filePath string) (bool, error) {
236	// Compare the CP's magic number and MD5.
237	cpb := cp
238	cpb.MD5 = ""
239	js, _ := json.Marshal(cpb)
240	sum := md5.Sum(js)
241	b64 := base64.StdEncoding.EncodeToString(sum[:])
242
243	if cp.Magic != uploadCpMagic || b64 != cp.MD5 {
244		return false, nil
245	}
246
247	// Make sure if the local file is updated.
248	fd, err := os.Open(filePath)
249	if err != nil {
250		return false, err
251	}
252	defer fd.Close()
253
254	st, err := fd.Stat()
255	if err != nil {
256		return false, err
257	}
258
259	md, err := calcFileMD5(filePath)
260	if err != nil {
261		return false, err
262	}
263
264	// Compare the file size, file's last modified time and file's MD5
265	if cp.FileStat.Size != st.Size() ||
266		cp.FileStat.LastModified != st.ModTime() ||
267		cp.FileStat.MD5 != md {
268		return false, nil
269	}
270
271	return true, nil
272}
273
274// load loads from the file
275func (cp *uploadCheckpoint) load(filePath string) error {
276	contents, err := ioutil.ReadFile(filePath)
277	if err != nil {
278		return err
279	}
280
281	err = json.Unmarshal(contents, cp)
282	return err
283}
284
285// dump dumps to the local file
286func (cp *uploadCheckpoint) dump(filePath string) error {
287	bcp := *cp
288
289	// Calculate MD5
290	bcp.MD5 = ""
291	js, err := json.Marshal(bcp)
292	if err != nil {
293		return err
294	}
295	sum := md5.Sum(js)
296	b64 := base64.StdEncoding.EncodeToString(sum[:])
297	bcp.MD5 = b64
298
299	// Serialization
300	js, err = json.Marshal(bcp)
301	if err != nil {
302		return err
303	}
304
305	// Dump
306	return ioutil.WriteFile(filePath, js, FilePermMode)
307}
308
309// updatePart updates the part status
310func (cp *uploadCheckpoint) updatePart(part UploadPart) {
311	cp.Parts[part.PartNumber-1].Part = part
312	cp.Parts[part.PartNumber-1].IsCompleted = true
313}
314
315// todoParts returns unfinished parts
316func (cp *uploadCheckpoint) todoParts() []FileChunk {
317	fcs := []FileChunk{}
318	for _, part := range cp.Parts {
319		if !part.IsCompleted {
320			fcs = append(fcs, part.Chunk)
321		}
322	}
323	return fcs
324}
325
326// allParts returns all parts
327func (cp *uploadCheckpoint) allParts() []UploadPart {
328	ps := []UploadPart{}
329	for _, part := range cp.Parts {
330		ps = append(ps, part.Part)
331	}
332	return ps
333}
334
335// getCompletedBytes returns completed bytes count
336func (cp *uploadCheckpoint) getCompletedBytes() int64 {
337	var completedBytes int64
338	for _, part := range cp.Parts {
339		if part.IsCompleted {
340			completedBytes += part.Chunk.Size
341		}
342	}
343	return completedBytes
344}
345
346// calcFileMD5 calculates the MD5 for the specified local file
347func calcFileMD5(filePath string) (string, error) {
348	return "", nil
349}
350
351// prepare initializes the multipart upload
352func prepare(cp *uploadCheckpoint, objectKey, filePath string, partSize int64, bucket *Bucket, options []Option) error {
353	// CP
354	cp.Magic = uploadCpMagic
355	cp.FilePath = filePath
356	cp.ObjectKey = objectKey
357
358	// Local file
359	fd, err := os.Open(filePath)
360	if err != nil {
361		return err
362	}
363	defer fd.Close()
364
365	st, err := fd.Stat()
366	if err != nil {
367		return err
368	}
369	cp.FileStat.Size = st.Size()
370	cp.FileStat.LastModified = st.ModTime()
371	md, err := calcFileMD5(filePath)
372	if err != nil {
373		return err
374	}
375	cp.FileStat.MD5 = md
376
377	// Chunks
378	parts, err := SplitFileByPartSize(filePath, partSize)
379	if err != nil {
380		return err
381	}
382
383	cp.Parts = make([]cpPart, len(parts))
384	for i, part := range parts {
385		cp.Parts[i].Chunk = part
386		cp.Parts[i].IsCompleted = false
387	}
388
389	// Init load
390	imur, err := bucket.InitiateMultipartUpload(objectKey, options...)
391	if err != nil {
392		return err
393	}
394	cp.UploadID = imur.UploadID
395
396	return nil
397}
398
399// complete completes the multipart upload and deletes the local CP files
400func complete(cp *uploadCheckpoint, bucket *Bucket, parts []UploadPart, cpFilePath string) error {
401	imur := InitiateMultipartUploadResult{Bucket: bucket.BucketName,
402		Key: cp.ObjectKey, UploadID: cp.UploadID}
403	_, err := bucket.CompleteMultipartUpload(imur, parts)
404	if err != nil {
405		return err
406	}
407	os.Remove(cpFilePath)
408	return err
409}
410
411// uploadFileWithCp handles concurrent upload with checkpoint
412func (bucket Bucket) uploadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int) error {
413	listener := getProgressListener(options)
414
415	// Load CP data
416	ucp := uploadCheckpoint{}
417	err := ucp.load(cpFilePath)
418	if err != nil {
419		os.Remove(cpFilePath)
420	}
421
422	// Load error or the CP data is invalid.
423	valid, err := ucp.isValid(filePath)
424	if err != nil || !valid {
425		if err = prepare(&ucp, objectKey, filePath, partSize, &bucket, options); err != nil {
426			return err
427		}
428		os.Remove(cpFilePath)
429	}
430
431	chunks := ucp.todoParts()
432	imur := InitiateMultipartUploadResult{
433		Bucket:   bucket.BucketName,
434		Key:      objectKey,
435		UploadID: ucp.UploadID}
436
437	jobs := make(chan FileChunk, len(chunks))
438	results := make(chan UploadPart, len(chunks))
439	failed := make(chan error)
440	die := make(chan bool)
441
442	completedBytes := ucp.getCompletedBytes()
443	event := newProgressEvent(TransferStartedEvent, completedBytes, ucp.FileStat.Size)
444	publishProgress(listener, event)
445
446	// Start the workers
447	arg := workerArg{&bucket, filePath, imur, uploadPartHooker}
448	for w := 1; w <= routines; w++ {
449		go worker(w, arg, jobs, results, failed, die)
450	}
451
452	// Schedule jobs
453	go scheduler(jobs, chunks)
454
455	// Waiting for the job finished
456	completed := 0
457	for completed < len(chunks) {
458		select {
459		case part := <-results:
460			completed++
461			ucp.updatePart(part)
462			ucp.dump(cpFilePath)
463			completedBytes += ucp.Parts[part.PartNumber-1].Chunk.Size
464			event = newProgressEvent(TransferDataEvent, completedBytes, ucp.FileStat.Size)
465			publishProgress(listener, event)
466		case err := <-failed:
467			close(die)
468			event = newProgressEvent(TransferFailedEvent, completedBytes, ucp.FileStat.Size)
469			publishProgress(listener, event)
470			return err
471		}
472
473		if completed >= len(chunks) {
474			break
475		}
476	}
477
478	event = newProgressEvent(TransferCompletedEvent, completedBytes, ucp.FileStat.Size)
479	publishProgress(listener, event)
480
481	// Complete the multipart upload
482	err = complete(&ucp, &bucket, ucp.allParts(), cpFilePath)
483	return err
484}
485