1package oss 2 3import ( 4 "crypto/md5" 5 "encoding/base64" 6 "encoding/hex" 7 "encoding/json" 8 "errors" 9 "fmt" 10 "io/ioutil" 11 "os" 12 "path/filepath" 13 "time" 14) 15 16// UploadFile is multipart file upload. 17// 18// objectKey the object name. 19// filePath the local file path to upload. 20// partSize the part size in byte. 21// options the options for uploading object. 22// 23// error it's nil if the operation succeeds, otherwise it's an error object. 24// 25func (bucket Bucket) UploadFile(objectKey, filePath string, partSize int64, options ...Option) error { 26 if partSize < MinPartSize || partSize > MaxPartSize { 27 return errors.New("oss: part size invalid range (100KB, 5GB]") 28 } 29 30 cpConf := getCpConfig(options) 31 routines := getRoutines(options) 32 33 if cpConf != nil && cpConf.IsEnable { 34 cpFilePath := getUploadCpFilePath(cpConf, filePath, bucket.BucketName, objectKey) 35 if cpFilePath != "" { 36 return bucket.uploadFileWithCp(objectKey, filePath, partSize, options, cpFilePath, routines) 37 } 38 } 39 40 return bucket.uploadFile(objectKey, filePath, partSize, options, routines) 41} 42 43func getUploadCpFilePath(cpConf *cpConfig, srcFile, destBucket, destObject string) string { 44 if cpConf.FilePath == "" && cpConf.DirPath != "" { 45 dest := fmt.Sprintf("oss://%v/%v", destBucket, destObject) 46 absPath, _ := filepath.Abs(srcFile) 47 cpFileName := getCpFileName(absPath, dest) 48 cpConf.FilePath = cpConf.DirPath + string(os.PathSeparator) + cpFileName 49 } 50 return cpConf.FilePath 51} 52 53// ----- concurrent upload without checkpoint ----- 54 55// getCpConfig gets checkpoint configuration 56func getCpConfig(options []Option) *cpConfig { 57 cpcOpt, err := findOption(options, checkpointConfig, nil) 58 if err != nil || cpcOpt == nil { 59 return nil 60 } 61 62 return cpcOpt.(*cpConfig) 63} 64 65// getCpFileName return the name of the checkpoint file 66func getCpFileName(src, dest string) string { 67 md5Ctx := md5.New() 68 md5Ctx.Write([]byte(src)) 69 srcCheckSum := hex.EncodeToString(md5Ctx.Sum(nil)) 70 71 md5Ctx.Reset() 72 md5Ctx.Write([]byte(dest)) 73 destCheckSum := hex.EncodeToString(md5Ctx.Sum(nil)) 74 75 return fmt.Sprintf("%v-%v.cp", srcCheckSum, destCheckSum) 76} 77 78// getRoutines gets the routine count. by default it's 1. 79func getRoutines(options []Option) int { 80 rtnOpt, err := findOption(options, routineNum, nil) 81 if err != nil || rtnOpt == nil { 82 return 1 83 } 84 85 rs := rtnOpt.(int) 86 if rs < 1 { 87 rs = 1 88 } else if rs > 100 { 89 rs = 100 90 } 91 92 return rs 93} 94 95// getPayer return the payer of the request 96func getPayer(options []Option) string { 97 payerOpt, err := findOption(options, HTTPHeaderOSSRequester, nil) 98 if err != nil || payerOpt == nil { 99 return "" 100 } 101 102 return payerOpt.(string) 103} 104 105// getProgressListener gets the progress callback 106func getProgressListener(options []Option) ProgressListener { 107 isSet, listener, _ := isOptionSet(options, progressListener) 108 if !isSet { 109 return nil 110 } 111 return listener.(ProgressListener) 112} 113 114// uploadPartHook is for testing usage 115type uploadPartHook func(id int, chunk FileChunk) error 116 117var uploadPartHooker uploadPartHook = defaultUploadPart 118 119func defaultUploadPart(id int, chunk FileChunk) error { 120 return nil 121} 122 123// workerArg defines worker argument structure 124type workerArg struct { 125 bucket *Bucket 126 filePath string 127 imur InitiateMultipartUploadResult 128 options []Option 129 hook uploadPartHook 130} 131 132// worker is the worker coroutine function 133func worker(id int, arg workerArg, jobs <-chan FileChunk, results chan<- UploadPart, failed chan<- error, die <-chan bool) { 134 for chunk := range jobs { 135 if err := arg.hook(id, chunk); err != nil { 136 failed <- err 137 break 138 } 139 part, err := arg.bucket.UploadPartFromFile(arg.imur, arg.filePath, chunk.Offset, chunk.Size, chunk.Number, arg.options...) 140 if err != nil { 141 failed <- err 142 break 143 } 144 select { 145 case <-die: 146 return 147 default: 148 } 149 results <- part 150 } 151} 152 153// scheduler function 154func scheduler(jobs chan FileChunk, chunks []FileChunk) { 155 for _, chunk := range chunks { 156 jobs <- chunk 157 } 158 close(jobs) 159} 160 161func getTotalBytes(chunks []FileChunk) int64 { 162 var tb int64 163 for _, chunk := range chunks { 164 tb += chunk.Size 165 } 166 return tb 167} 168 169// uploadFile is a concurrent upload, without checkpoint 170func (bucket Bucket) uploadFile(objectKey, filePath string, partSize int64, options []Option, routines int) error { 171 listener := getProgressListener(options) 172 173 chunks, err := SplitFileByPartSize(filePath, partSize) 174 if err != nil { 175 return err 176 } 177 178 payerOptions := []Option{} 179 payer := getPayer(options) 180 if payer != "" { 181 payerOptions = append(payerOptions, RequestPayer(PayerType(payer))) 182 } 183 184 // Initialize the multipart upload 185 imur, err := bucket.InitiateMultipartUpload(objectKey, options...) 186 if err != nil { 187 return err 188 } 189 190 jobs := make(chan FileChunk, len(chunks)) 191 results := make(chan UploadPart, len(chunks)) 192 failed := make(chan error) 193 die := make(chan bool) 194 195 var completedBytes int64 196 totalBytes := getTotalBytes(chunks) 197 event := newProgressEvent(TransferStartedEvent, 0, totalBytes) 198 publishProgress(listener, event) 199 200 // Start the worker coroutine 201 arg := workerArg{&bucket, filePath, imur, payerOptions, uploadPartHooker} 202 for w := 1; w <= routines; w++ { 203 go worker(w, arg, jobs, results, failed, die) 204 } 205 206 // Schedule the jobs 207 go scheduler(jobs, chunks) 208 209 // Waiting for the upload finished 210 completed := 0 211 parts := make([]UploadPart, len(chunks)) 212 for completed < len(chunks) { 213 select { 214 case part := <-results: 215 completed++ 216 parts[part.PartNumber-1] = part 217 completedBytes += chunks[part.PartNumber-1].Size 218 event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes) 219 publishProgress(listener, event) 220 case err := <-failed: 221 close(die) 222 event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes) 223 publishProgress(listener, event) 224 bucket.AbortMultipartUpload(imur, payerOptions...) 225 return err 226 } 227 228 if completed >= len(chunks) { 229 break 230 } 231 } 232 233 event = newProgressEvent(TransferStartedEvent, completedBytes, totalBytes) 234 publishProgress(listener, event) 235 236 // Complete the multpart upload 237 _, err = bucket.CompleteMultipartUpload(imur, parts, payerOptions...) 238 if err != nil { 239 bucket.AbortMultipartUpload(imur, payerOptions...) 240 return err 241 } 242 return nil 243} 244 245// ----- concurrent upload with checkpoint ----- 246const uploadCpMagic = "FE8BB4EA-B593-4FAC-AD7A-2459A36E2E62" 247 248type uploadCheckpoint struct { 249 Magic string // Magic 250 MD5 string // Checkpoint file content's MD5 251 FilePath string // Local file path 252 FileStat cpStat // File state 253 ObjectKey string // Key 254 UploadID string // Upload ID 255 Parts []cpPart // All parts of the local file 256} 257 258type cpStat struct { 259 Size int64 // File size 260 LastModified time.Time // File's last modified time 261 MD5 string // Local file's MD5 262} 263 264type cpPart struct { 265 Chunk FileChunk // File chunk 266 Part UploadPart // Uploaded part 267 IsCompleted bool // Upload complete flag 268} 269 270// isValid checks if the uploaded data is valid---it's valid when the file is not updated and the checkpoint data is valid. 271func (cp uploadCheckpoint) isValid(filePath string) (bool, error) { 272 // Compare the CP's magic number and MD5. 273 cpb := cp 274 cpb.MD5 = "" 275 js, _ := json.Marshal(cpb) 276 sum := md5.Sum(js) 277 b64 := base64.StdEncoding.EncodeToString(sum[:]) 278 279 if cp.Magic != uploadCpMagic || b64 != cp.MD5 { 280 return false, nil 281 } 282 283 // Make sure if the local file is updated. 284 fd, err := os.Open(filePath) 285 if err != nil { 286 return false, err 287 } 288 defer fd.Close() 289 290 st, err := fd.Stat() 291 if err != nil { 292 return false, err 293 } 294 295 md, err := calcFileMD5(filePath) 296 if err != nil { 297 return false, err 298 } 299 300 // Compare the file size, file's last modified time and file's MD5 301 if cp.FileStat.Size != st.Size() || 302 cp.FileStat.LastModified != st.ModTime() || 303 cp.FileStat.MD5 != md { 304 return false, nil 305 } 306 307 return true, nil 308} 309 310// load loads from the file 311func (cp *uploadCheckpoint) load(filePath string) error { 312 contents, err := ioutil.ReadFile(filePath) 313 if err != nil { 314 return err 315 } 316 317 err = json.Unmarshal(contents, cp) 318 return err 319} 320 321// dump dumps to the local file 322func (cp *uploadCheckpoint) dump(filePath string) error { 323 bcp := *cp 324 325 // Calculate MD5 326 bcp.MD5 = "" 327 js, err := json.Marshal(bcp) 328 if err != nil { 329 return err 330 } 331 sum := md5.Sum(js) 332 b64 := base64.StdEncoding.EncodeToString(sum[:]) 333 bcp.MD5 = b64 334 335 // Serialization 336 js, err = json.Marshal(bcp) 337 if err != nil { 338 return err 339 } 340 341 // Dump 342 return ioutil.WriteFile(filePath, js, FilePermMode) 343} 344 345// updatePart updates the part status 346func (cp *uploadCheckpoint) updatePart(part UploadPart) { 347 cp.Parts[part.PartNumber-1].Part = part 348 cp.Parts[part.PartNumber-1].IsCompleted = true 349} 350 351// todoParts returns unfinished parts 352func (cp *uploadCheckpoint) todoParts() []FileChunk { 353 fcs := []FileChunk{} 354 for _, part := range cp.Parts { 355 if !part.IsCompleted { 356 fcs = append(fcs, part.Chunk) 357 } 358 } 359 return fcs 360} 361 362// allParts returns all parts 363func (cp *uploadCheckpoint) allParts() []UploadPart { 364 ps := []UploadPart{} 365 for _, part := range cp.Parts { 366 ps = append(ps, part.Part) 367 } 368 return ps 369} 370 371// getCompletedBytes returns completed bytes count 372func (cp *uploadCheckpoint) getCompletedBytes() int64 { 373 var completedBytes int64 374 for _, part := range cp.Parts { 375 if part.IsCompleted { 376 completedBytes += part.Chunk.Size 377 } 378 } 379 return completedBytes 380} 381 382// calcFileMD5 calculates the MD5 for the specified local file 383func calcFileMD5(filePath string) (string, error) { 384 return "", nil 385} 386 387// prepare initializes the multipart upload 388func prepare(cp *uploadCheckpoint, objectKey, filePath string, partSize int64, bucket *Bucket, options []Option) error { 389 // CP 390 cp.Magic = uploadCpMagic 391 cp.FilePath = filePath 392 cp.ObjectKey = objectKey 393 394 // Local file 395 fd, err := os.Open(filePath) 396 if err != nil { 397 return err 398 } 399 defer fd.Close() 400 401 st, err := fd.Stat() 402 if err != nil { 403 return err 404 } 405 cp.FileStat.Size = st.Size() 406 cp.FileStat.LastModified = st.ModTime() 407 md, err := calcFileMD5(filePath) 408 if err != nil { 409 return err 410 } 411 cp.FileStat.MD5 = md 412 413 // Chunks 414 parts, err := SplitFileByPartSize(filePath, partSize) 415 if err != nil { 416 return err 417 } 418 419 cp.Parts = make([]cpPart, len(parts)) 420 for i, part := range parts { 421 cp.Parts[i].Chunk = part 422 cp.Parts[i].IsCompleted = false 423 } 424 425 // Init load 426 imur, err := bucket.InitiateMultipartUpload(objectKey, options...) 427 if err != nil { 428 return err 429 } 430 cp.UploadID = imur.UploadID 431 432 return nil 433} 434 435// complete completes the multipart upload and deletes the local CP files 436func complete(cp *uploadCheckpoint, bucket *Bucket, parts []UploadPart, cpFilePath string, options []Option) error { 437 imur := InitiateMultipartUploadResult{Bucket: bucket.BucketName, 438 Key: cp.ObjectKey, UploadID: cp.UploadID} 439 _, err := bucket.CompleteMultipartUpload(imur, parts, options...) 440 if err != nil { 441 return err 442 } 443 os.Remove(cpFilePath) 444 return err 445} 446 447// uploadFileWithCp handles concurrent upload with checkpoint 448func (bucket Bucket) uploadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int) error { 449 listener := getProgressListener(options) 450 451 payerOptions := []Option{} 452 payer := getPayer(options) 453 if payer != "" { 454 payerOptions = append(payerOptions, RequestPayer(PayerType(payer))) 455 } 456 457 // Load CP data 458 ucp := uploadCheckpoint{} 459 err := ucp.load(cpFilePath) 460 if err != nil { 461 os.Remove(cpFilePath) 462 } 463 464 // Load error or the CP data is invalid. 465 valid, err := ucp.isValid(filePath) 466 if err != nil || !valid { 467 if err = prepare(&ucp, objectKey, filePath, partSize, &bucket, options); err != nil { 468 return err 469 } 470 os.Remove(cpFilePath) 471 } 472 473 chunks := ucp.todoParts() 474 imur := InitiateMultipartUploadResult{ 475 Bucket: bucket.BucketName, 476 Key: objectKey, 477 UploadID: ucp.UploadID} 478 479 jobs := make(chan FileChunk, len(chunks)) 480 results := make(chan UploadPart, len(chunks)) 481 failed := make(chan error) 482 die := make(chan bool) 483 484 completedBytes := ucp.getCompletedBytes() 485 event := newProgressEvent(TransferStartedEvent, completedBytes, ucp.FileStat.Size) 486 publishProgress(listener, event) 487 488 // Start the workers 489 arg := workerArg{&bucket, filePath, imur, payerOptions, uploadPartHooker} 490 for w := 1; w <= routines; w++ { 491 go worker(w, arg, jobs, results, failed, die) 492 } 493 494 // Schedule jobs 495 go scheduler(jobs, chunks) 496 497 // Waiting for the job finished 498 completed := 0 499 for completed < len(chunks) { 500 select { 501 case part := <-results: 502 completed++ 503 ucp.updatePart(part) 504 ucp.dump(cpFilePath) 505 completedBytes += ucp.Parts[part.PartNumber-1].Chunk.Size 506 event = newProgressEvent(TransferDataEvent, completedBytes, ucp.FileStat.Size) 507 publishProgress(listener, event) 508 case err := <-failed: 509 close(die) 510 event = newProgressEvent(TransferFailedEvent, completedBytes, ucp.FileStat.Size) 511 publishProgress(listener, event) 512 return err 513 } 514 515 if completed >= len(chunks) { 516 break 517 } 518 } 519 520 event = newProgressEvent(TransferCompletedEvent, completedBytes, ucp.FileStat.Size) 521 publishProgress(listener, event) 522 523 // Complete the multipart upload 524 err = complete(&ucp, &bucket, ucp.allParts(), cpFilePath, payerOptions) 525 return err 526} 527