1package oss 2 3import ( 4 "crypto/md5" 5 "encoding/base64" 6 "encoding/json" 7 "errors" 8 "io/ioutil" 9 "os" 10 "time" 11) 12 13// UploadFile is multipart file upload. 14// 15// objectKey the object name. 16// filePath the local file path to upload. 17// partSize the part size in byte. 18// options the options for uploading object. 19// 20// error it's nil if the operation succeeds, otherwise it's an error object. 21// 22func (bucket Bucket) UploadFile(objectKey, filePath string, partSize int64, options ...Option) error { 23 if partSize < MinPartSize || partSize > MaxPartSize { 24 return errors.New("oss: part size invalid range (1024KB, 5GB]") 25 } 26 27 cpConf, err := getCpConfig(options, filePath) 28 if err != nil { 29 return err 30 } 31 32 routines := getRoutines(options) 33 34 if cpConf.IsEnable { 35 return bucket.uploadFileWithCp(objectKey, filePath, partSize, options, cpConf.FilePath, routines) 36 } 37 38 return bucket.uploadFile(objectKey, filePath, partSize, options, routines) 39} 40 41// ----- concurrent upload without checkpoint ----- 42 43// getCpConfig gets checkpoint configuration 44func getCpConfig(options []Option, filePath string) (*cpConfig, error) { 45 cpc := &cpConfig{} 46 cpcOpt, err := findOption(options, checkpointConfig, nil) 47 if err != nil || cpcOpt == nil { 48 return cpc, err 49 } 50 51 cpc = cpcOpt.(*cpConfig) 52 if cpc.IsEnable && cpc.FilePath == "" { 53 cpc.FilePath = filePath + CheckpointFileSuffix 54 } 55 56 return cpc, nil 57} 58 59// getRoutines gets the routine count. by default it's 1. 60func getRoutines(options []Option) int { 61 rtnOpt, err := findOption(options, routineNum, nil) 62 if err != nil || rtnOpt == nil { 63 return 1 64 } 65 66 rs := rtnOpt.(int) 67 if rs < 1 { 68 rs = 1 69 } else if rs > 100 { 70 rs = 100 71 } 72 73 return rs 74} 75 76// getProgressListener gets the progress callback 77func getProgressListener(options []Option) ProgressListener { 78 isSet, listener, _ := isOptionSet(options, progressListener) 79 if !isSet { 80 return nil 81 } 82 return listener.(ProgressListener) 83} 84 85// uploadPartHook is for testing usage 86type uploadPartHook func(id int, chunk FileChunk) error 87 88var uploadPartHooker uploadPartHook = defaultUploadPart 89 90func defaultUploadPart(id int, chunk FileChunk) error { 91 return nil 92} 93 94// workerArg defines worker argument structure 95type workerArg struct { 96 bucket *Bucket 97 filePath string 98 imur InitiateMultipartUploadResult 99 hook uploadPartHook 100} 101 102// worker is the worker coroutine function 103func worker(id int, arg workerArg, jobs <-chan FileChunk, results chan<- UploadPart, failed chan<- error, die <-chan bool) { 104 for chunk := range jobs { 105 if err := arg.hook(id, chunk); err != nil { 106 failed <- err 107 break 108 } 109 part, err := arg.bucket.UploadPartFromFile(arg.imur, arg.filePath, chunk.Offset, chunk.Size, chunk.Number) 110 if err != nil { 111 failed <- err 112 break 113 } 114 select { 115 case <-die: 116 return 117 default: 118 } 119 results <- part 120 } 121} 122 123// scheduler function 124func scheduler(jobs chan FileChunk, chunks []FileChunk) { 125 for _, chunk := range chunks { 126 jobs <- chunk 127 } 128 close(jobs) 129} 130 131func getTotalBytes(chunks []FileChunk) int64 { 132 var tb int64 133 for _, chunk := range chunks { 134 tb += chunk.Size 135 } 136 return tb 137} 138 139// uploadFile is a concurrent upload, without checkpoint 140func (bucket Bucket) uploadFile(objectKey, filePath string, partSize int64, options []Option, routines int) error { 141 listener := getProgressListener(options) 142 143 chunks, err := SplitFileByPartSize(filePath, partSize) 144 if err != nil { 145 return err 146 } 147 148 // Initialize the multipart upload 149 imur, err := bucket.InitiateMultipartUpload(objectKey, options...) 150 if err != nil { 151 return err 152 } 153 154 jobs := make(chan FileChunk, len(chunks)) 155 results := make(chan UploadPart, len(chunks)) 156 failed := make(chan error) 157 die := make(chan bool) 158 159 var completedBytes int64 160 totalBytes := getTotalBytes(chunks) 161 event := newProgressEvent(TransferStartedEvent, 0, totalBytes) 162 publishProgress(listener, event) 163 164 // Start the worker coroutine 165 arg := workerArg{&bucket, filePath, imur, uploadPartHooker} 166 for w := 1; w <= routines; w++ { 167 go worker(w, arg, jobs, results, failed, die) 168 } 169 170 // Schedule the jobs 171 go scheduler(jobs, chunks) 172 173 // Waiting for the upload finished 174 completed := 0 175 parts := make([]UploadPart, len(chunks)) 176 for completed < len(chunks) { 177 select { 178 case part := <-results: 179 completed++ 180 parts[part.PartNumber-1] = part 181 completedBytes += chunks[part.PartNumber-1].Size 182 event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes) 183 publishProgress(listener, event) 184 case err := <-failed: 185 close(die) 186 event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes) 187 publishProgress(listener, event) 188 bucket.AbortMultipartUpload(imur) 189 return err 190 } 191 192 if completed >= len(chunks) { 193 break 194 } 195 } 196 197 event = newProgressEvent(TransferStartedEvent, completedBytes, totalBytes) 198 publishProgress(listener, event) 199 200 // Complete the multpart upload 201 _, err = bucket.CompleteMultipartUpload(imur, parts) 202 if err != nil { 203 bucket.AbortMultipartUpload(imur) 204 return err 205 } 206 return nil 207} 208 209// ----- concurrent upload with checkpoint ----- 210const uploadCpMagic = "FE8BB4EA-B593-4FAC-AD7A-2459A36E2E62" 211 212type uploadCheckpoint struct { 213 Magic string // Magic 214 MD5 string // Checkpoint file content's MD5 215 FilePath string // Local file path 216 FileStat cpStat // File state 217 ObjectKey string // Key 218 UploadID string // Upload ID 219 Parts []cpPart // All parts of the local file 220} 221 222type cpStat struct { 223 Size int64 // File size 224 LastModified time.Time // File's last modified time 225 MD5 string // Local file's MD5 226} 227 228type cpPart struct { 229 Chunk FileChunk // File chunk 230 Part UploadPart // Uploaded part 231 IsCompleted bool // Upload complete flag 232} 233 234// isValid checks if the uploaded data is valid---it's valid when the file is not updated and the checkpoint data is valid. 235func (cp uploadCheckpoint) isValid(filePath string) (bool, error) { 236 // Compare the CP's magic number and MD5. 237 cpb := cp 238 cpb.MD5 = "" 239 js, _ := json.Marshal(cpb) 240 sum := md5.Sum(js) 241 b64 := base64.StdEncoding.EncodeToString(sum[:]) 242 243 if cp.Magic != uploadCpMagic || b64 != cp.MD5 { 244 return false, nil 245 } 246 247 // Make sure if the local file is updated. 248 fd, err := os.Open(filePath) 249 if err != nil { 250 return false, err 251 } 252 defer fd.Close() 253 254 st, err := fd.Stat() 255 if err != nil { 256 return false, err 257 } 258 259 md, err := calcFileMD5(filePath) 260 if err != nil { 261 return false, err 262 } 263 264 // Compare the file size, file's last modified time and file's MD5 265 if cp.FileStat.Size != st.Size() || 266 cp.FileStat.LastModified != st.ModTime() || 267 cp.FileStat.MD5 != md { 268 return false, nil 269 } 270 271 return true, nil 272} 273 274// load loads from the file 275func (cp *uploadCheckpoint) load(filePath string) error { 276 contents, err := ioutil.ReadFile(filePath) 277 if err != nil { 278 return err 279 } 280 281 err = json.Unmarshal(contents, cp) 282 return err 283} 284 285// dump dumps to the local file 286func (cp *uploadCheckpoint) dump(filePath string) error { 287 bcp := *cp 288 289 // Calculate MD5 290 bcp.MD5 = "" 291 js, err := json.Marshal(bcp) 292 if err != nil { 293 return err 294 } 295 sum := md5.Sum(js) 296 b64 := base64.StdEncoding.EncodeToString(sum[:]) 297 bcp.MD5 = b64 298 299 // Serialization 300 js, err = json.Marshal(bcp) 301 if err != nil { 302 return err 303 } 304 305 // Dump 306 return ioutil.WriteFile(filePath, js, FilePermMode) 307} 308 309// updatePart updates the part status 310func (cp *uploadCheckpoint) updatePart(part UploadPart) { 311 cp.Parts[part.PartNumber-1].Part = part 312 cp.Parts[part.PartNumber-1].IsCompleted = true 313} 314 315// todoParts returns unfinished parts 316func (cp *uploadCheckpoint) todoParts() []FileChunk { 317 fcs := []FileChunk{} 318 for _, part := range cp.Parts { 319 if !part.IsCompleted { 320 fcs = append(fcs, part.Chunk) 321 } 322 } 323 return fcs 324} 325 326// allParts returns all parts 327func (cp *uploadCheckpoint) allParts() []UploadPart { 328 ps := []UploadPart{} 329 for _, part := range cp.Parts { 330 ps = append(ps, part.Part) 331 } 332 return ps 333} 334 335// getCompletedBytes returns completed bytes count 336func (cp *uploadCheckpoint) getCompletedBytes() int64 { 337 var completedBytes int64 338 for _, part := range cp.Parts { 339 if part.IsCompleted { 340 completedBytes += part.Chunk.Size 341 } 342 } 343 return completedBytes 344} 345 346// calcFileMD5 calculates the MD5 for the specified local file 347func calcFileMD5(filePath string) (string, error) { 348 return "", nil 349} 350 351// prepare initializes the multipart upload 352func prepare(cp *uploadCheckpoint, objectKey, filePath string, partSize int64, bucket *Bucket, options []Option) error { 353 // CP 354 cp.Magic = uploadCpMagic 355 cp.FilePath = filePath 356 cp.ObjectKey = objectKey 357 358 // Local file 359 fd, err := os.Open(filePath) 360 if err != nil { 361 return err 362 } 363 defer fd.Close() 364 365 st, err := fd.Stat() 366 if err != nil { 367 return err 368 } 369 cp.FileStat.Size = st.Size() 370 cp.FileStat.LastModified = st.ModTime() 371 md, err := calcFileMD5(filePath) 372 if err != nil { 373 return err 374 } 375 cp.FileStat.MD5 = md 376 377 // Chunks 378 parts, err := SplitFileByPartSize(filePath, partSize) 379 if err != nil { 380 return err 381 } 382 383 cp.Parts = make([]cpPart, len(parts)) 384 for i, part := range parts { 385 cp.Parts[i].Chunk = part 386 cp.Parts[i].IsCompleted = false 387 } 388 389 // Init load 390 imur, err := bucket.InitiateMultipartUpload(objectKey, options...) 391 if err != nil { 392 return err 393 } 394 cp.UploadID = imur.UploadID 395 396 return nil 397} 398 399// complete completes the multipart upload and deletes the local CP files 400func complete(cp *uploadCheckpoint, bucket *Bucket, parts []UploadPart, cpFilePath string) error { 401 imur := InitiateMultipartUploadResult{Bucket: bucket.BucketName, 402 Key: cp.ObjectKey, UploadID: cp.UploadID} 403 _, err := bucket.CompleteMultipartUpload(imur, parts) 404 if err != nil { 405 return err 406 } 407 os.Remove(cpFilePath) 408 return err 409} 410 411// uploadFileWithCp handles concurrent upload with checkpoint 412func (bucket Bucket) uploadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int) error { 413 listener := getProgressListener(options) 414 415 // Load CP data 416 ucp := uploadCheckpoint{} 417 err := ucp.load(cpFilePath) 418 if err != nil { 419 os.Remove(cpFilePath) 420 } 421 422 // Load error or the CP data is invalid. 423 valid, err := ucp.isValid(filePath) 424 if err != nil || !valid { 425 if err = prepare(&ucp, objectKey, filePath, partSize, &bucket, options); err != nil { 426 return err 427 } 428 os.Remove(cpFilePath) 429 } 430 431 chunks := ucp.todoParts() 432 imur := InitiateMultipartUploadResult{ 433 Bucket: bucket.BucketName, 434 Key: objectKey, 435 UploadID: ucp.UploadID} 436 437 jobs := make(chan FileChunk, len(chunks)) 438 results := make(chan UploadPart, len(chunks)) 439 failed := make(chan error) 440 die := make(chan bool) 441 442 completedBytes := ucp.getCompletedBytes() 443 event := newProgressEvent(TransferStartedEvent, completedBytes, ucp.FileStat.Size) 444 publishProgress(listener, event) 445 446 // Start the workers 447 arg := workerArg{&bucket, filePath, imur, uploadPartHooker} 448 for w := 1; w <= routines; w++ { 449 go worker(w, arg, jobs, results, failed, die) 450 } 451 452 // Schedule jobs 453 go scheduler(jobs, chunks) 454 455 // Waiting for the job finished 456 completed := 0 457 for completed < len(chunks) { 458 select { 459 case part := <-results: 460 completed++ 461 ucp.updatePart(part) 462 ucp.dump(cpFilePath) 463 completedBytes += ucp.Parts[part.PartNumber-1].Chunk.Size 464 event = newProgressEvent(TransferDataEvent, completedBytes, ucp.FileStat.Size) 465 publishProgress(listener, event) 466 case err := <-failed: 467 close(die) 468 event = newProgressEvent(TransferFailedEvent, completedBytes, ucp.FileStat.Size) 469 publishProgress(listener, event) 470 return err 471 } 472 473 if completed >= len(chunks) { 474 break 475 } 476 } 477 478 event = newProgressEvent(TransferCompletedEvent, completedBytes, ucp.FileStat.Size) 479 publishProgress(listener, event) 480 481 // Complete the multipart upload 482 err = complete(&ucp, &bucket, ucp.allParts(), cpFilePath) 483 return err 484} 485