1package oss 2 3import ( 4 "crypto/md5" 5 "encoding/base64" 6 "encoding/hex" 7 "encoding/json" 8 "errors" 9 "fmt" 10 "io/ioutil" 11 "os" 12 "path/filepath" 13 "time" 14) 15 16// UploadFile is multipart file upload. 17// 18// objectKey the object name. 19// filePath the local file path to upload. 20// partSize the part size in byte. 21// options the options for uploading object. 22// 23// error it's nil if the operation succeeds, otherwise it's an error object. 24// 25func (bucket Bucket) UploadFile(objectKey, filePath string, partSize int64, options ...Option) error { 26 if partSize < MinPartSize || partSize > MaxPartSize { 27 return errors.New("oss: part size invalid range (100KB, 5GB]") 28 } 29 30 cpConf := getCpConfig(options) 31 routines := getRoutines(options) 32 33 if cpConf != nil && cpConf.IsEnable { 34 cpFilePath := getUploadCpFilePath(cpConf, filePath, bucket.BucketName, objectKey) 35 if cpFilePath != "" { 36 return bucket.uploadFileWithCp(objectKey, filePath, partSize, options, cpFilePath, routines) 37 } 38 } 39 40 return bucket.uploadFile(objectKey, filePath, partSize, options, routines) 41} 42 43func getUploadCpFilePath(cpConf *cpConfig, srcFile, destBucket, destObject string) string { 44 if cpConf.FilePath == "" && cpConf.DirPath != "" { 45 dest := fmt.Sprintf("oss://%v/%v", destBucket, destObject) 46 absPath, _ := filepath.Abs(srcFile) 47 cpFileName := getCpFileName(absPath, dest, "") 48 cpConf.FilePath = cpConf.DirPath + string(os.PathSeparator) + cpFileName 49 } 50 return cpConf.FilePath 51} 52 53// ----- concurrent upload without checkpoint ----- 54 55// getCpConfig gets checkpoint configuration 56func getCpConfig(options []Option) *cpConfig { 57 cpcOpt, err := findOption(options, checkpointConfig, nil) 58 if err != nil || cpcOpt == nil { 59 return nil 60 } 61 62 return cpcOpt.(*cpConfig) 63} 64 65// getCpFileName return the name of the checkpoint file 66func getCpFileName(src, dest, versionId string) string { 67 md5Ctx := md5.New() 68 md5Ctx.Write([]byte(src)) 69 srcCheckSum := hex.EncodeToString(md5Ctx.Sum(nil)) 70 71 md5Ctx.Reset() 72 md5Ctx.Write([]byte(dest)) 73 destCheckSum := hex.EncodeToString(md5Ctx.Sum(nil)) 74 75 if versionId == "" { 76 return fmt.Sprintf("%v-%v.cp", srcCheckSum, destCheckSum) 77 } 78 79 md5Ctx.Reset() 80 md5Ctx.Write([]byte(versionId)) 81 versionCheckSum := hex.EncodeToString(md5Ctx.Sum(nil)) 82 return fmt.Sprintf("%v-%v-%v.cp", srcCheckSum, destCheckSum, versionCheckSum) 83} 84 85// getRoutines gets the routine count. by default it's 1. 86func getRoutines(options []Option) int { 87 rtnOpt, err := findOption(options, routineNum, nil) 88 if err != nil || rtnOpt == nil { 89 return 1 90 } 91 92 rs := rtnOpt.(int) 93 if rs < 1 { 94 rs = 1 95 } else if rs > 100 { 96 rs = 100 97 } 98 99 return rs 100} 101 102// getPayer return the payer of the request 103func getPayer(options []Option) string { 104 payerOpt, err := findOption(options, HTTPHeaderOssRequester, nil) 105 if err != nil || payerOpt == nil { 106 return "" 107 } 108 109 return payerOpt.(string) 110} 111 112// getProgressListener gets the progress callback 113func getProgressListener(options []Option) ProgressListener { 114 isSet, listener, _ := isOptionSet(options, progressListener) 115 if !isSet { 116 return nil 117 } 118 return listener.(ProgressListener) 119} 120 121// uploadPartHook is for testing usage 122type uploadPartHook func(id int, chunk FileChunk) error 123 124var uploadPartHooker uploadPartHook = defaultUploadPart 125 126func defaultUploadPart(id int, chunk FileChunk) error { 127 return nil 128} 129 130// workerArg defines worker argument structure 131type workerArg struct { 132 bucket *Bucket 133 filePath string 134 imur InitiateMultipartUploadResult 135 options []Option 136 hook uploadPartHook 137} 138 139// worker is the worker coroutine function 140func worker(id int, arg workerArg, jobs <-chan FileChunk, results chan<- UploadPart, failed chan<- error, die <-chan bool) { 141 for chunk := range jobs { 142 if err := arg.hook(id, chunk); err != nil { 143 failed <- err 144 break 145 } 146 part, err := arg.bucket.UploadPartFromFile(arg.imur, arg.filePath, chunk.Offset, chunk.Size, chunk.Number, arg.options...) 147 if err != nil { 148 failed <- err 149 break 150 } 151 select { 152 case <-die: 153 return 154 default: 155 } 156 results <- part 157 } 158} 159 160// scheduler function 161func scheduler(jobs chan FileChunk, chunks []FileChunk) { 162 for _, chunk := range chunks { 163 jobs <- chunk 164 } 165 close(jobs) 166} 167 168func getTotalBytes(chunks []FileChunk) int64 { 169 var tb int64 170 for _, chunk := range chunks { 171 tb += chunk.Size 172 } 173 return tb 174} 175 176// uploadFile is a concurrent upload, without checkpoint 177func (bucket Bucket) uploadFile(objectKey, filePath string, partSize int64, options []Option, routines int) error { 178 listener := getProgressListener(options) 179 180 chunks, err := SplitFileByPartSize(filePath, partSize) 181 if err != nil { 182 return err 183 } 184 185 partOptions := ChoiceTransferPartOption(options) 186 completeOptions := ChoiceCompletePartOption(options) 187 abortOptions := ChoiceAbortPartOption(options) 188 189 // Initialize the multipart upload 190 imur, err := bucket.InitiateMultipartUpload(objectKey, options...) 191 if err != nil { 192 return err 193 } 194 195 jobs := make(chan FileChunk, len(chunks)) 196 results := make(chan UploadPart, len(chunks)) 197 failed := make(chan error) 198 die := make(chan bool) 199 200 var completedBytes int64 201 totalBytes := getTotalBytes(chunks) 202 event := newProgressEvent(TransferStartedEvent, 0, totalBytes, 0) 203 publishProgress(listener, event) 204 205 // Start the worker coroutine 206 arg := workerArg{&bucket, filePath, imur, partOptions, uploadPartHooker} 207 for w := 1; w <= routines; w++ { 208 go worker(w, arg, jobs, results, failed, die) 209 } 210 211 // Schedule the jobs 212 go scheduler(jobs, chunks) 213 214 // Waiting for the upload finished 215 completed := 0 216 parts := make([]UploadPart, len(chunks)) 217 for completed < len(chunks) { 218 select { 219 case part := <-results: 220 completed++ 221 parts[part.PartNumber-1] = part 222 completedBytes += chunks[part.PartNumber-1].Size 223 224 // why RwBytes in ProgressEvent is 0 ? 225 // because read or write event has been notified in teeReader.Read() 226 event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes, 0) 227 publishProgress(listener, event) 228 case err := <-failed: 229 close(die) 230 event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes, 0) 231 publishProgress(listener, event) 232 bucket.AbortMultipartUpload(imur, abortOptions...) 233 return err 234 } 235 236 if completed >= len(chunks) { 237 break 238 } 239 } 240 241 event = newProgressEvent(TransferStartedEvent, completedBytes, totalBytes, 0) 242 publishProgress(listener, event) 243 244 // Complete the multpart upload 245 _, err = bucket.CompleteMultipartUpload(imur, parts, completeOptions...) 246 if err != nil { 247 bucket.AbortMultipartUpload(imur, abortOptions...) 248 return err 249 } 250 return nil 251} 252 253// ----- concurrent upload with checkpoint ----- 254const uploadCpMagic = "FE8BB4EA-B593-4FAC-AD7A-2459A36E2E62" 255 256type uploadCheckpoint struct { 257 Magic string // Magic 258 MD5 string // Checkpoint file content's MD5 259 FilePath string // Local file path 260 FileStat cpStat // File state 261 ObjectKey string // Key 262 UploadID string // Upload ID 263 Parts []cpPart // All parts of the local file 264} 265 266type cpStat struct { 267 Size int64 // File size 268 LastModified time.Time // File's last modified time 269 MD5 string // Local file's MD5 270} 271 272type cpPart struct { 273 Chunk FileChunk // File chunk 274 Part UploadPart // Uploaded part 275 IsCompleted bool // Upload complete flag 276} 277 278// isValid checks if the uploaded data is valid---it's valid when the file is not updated and the checkpoint data is valid. 279func (cp uploadCheckpoint) isValid(filePath string) (bool, error) { 280 // Compare the CP's magic number and MD5. 281 cpb := cp 282 cpb.MD5 = "" 283 js, _ := json.Marshal(cpb) 284 sum := md5.Sum(js) 285 b64 := base64.StdEncoding.EncodeToString(sum[:]) 286 287 if cp.Magic != uploadCpMagic || b64 != cp.MD5 { 288 return false, nil 289 } 290 291 // Make sure if the local file is updated. 292 fd, err := os.Open(filePath) 293 if err != nil { 294 return false, err 295 } 296 defer fd.Close() 297 298 st, err := fd.Stat() 299 if err != nil { 300 return false, err 301 } 302 303 md, err := calcFileMD5(filePath) 304 if err != nil { 305 return false, err 306 } 307 308 // Compare the file size, file's last modified time and file's MD5 309 if cp.FileStat.Size != st.Size() || 310 !cp.FileStat.LastModified.Equal(st.ModTime()) || 311 cp.FileStat.MD5 != md { 312 return false, nil 313 } 314 315 return true, nil 316} 317 318// load loads from the file 319func (cp *uploadCheckpoint) load(filePath string) error { 320 contents, err := ioutil.ReadFile(filePath) 321 if err != nil { 322 return err 323 } 324 325 err = json.Unmarshal(contents, cp) 326 return err 327} 328 329// dump dumps to the local file 330func (cp *uploadCheckpoint) dump(filePath string) error { 331 bcp := *cp 332 333 // Calculate MD5 334 bcp.MD5 = "" 335 js, err := json.Marshal(bcp) 336 if err != nil { 337 return err 338 } 339 sum := md5.Sum(js) 340 b64 := base64.StdEncoding.EncodeToString(sum[:]) 341 bcp.MD5 = b64 342 343 // Serialization 344 js, err = json.Marshal(bcp) 345 if err != nil { 346 return err 347 } 348 349 // Dump 350 return ioutil.WriteFile(filePath, js, FilePermMode) 351} 352 353// updatePart updates the part status 354func (cp *uploadCheckpoint) updatePart(part UploadPart) { 355 cp.Parts[part.PartNumber-1].Part = part 356 cp.Parts[part.PartNumber-1].IsCompleted = true 357} 358 359// todoParts returns unfinished parts 360func (cp *uploadCheckpoint) todoParts() []FileChunk { 361 fcs := []FileChunk{} 362 for _, part := range cp.Parts { 363 if !part.IsCompleted { 364 fcs = append(fcs, part.Chunk) 365 } 366 } 367 return fcs 368} 369 370// allParts returns all parts 371func (cp *uploadCheckpoint) allParts() []UploadPart { 372 ps := []UploadPart{} 373 for _, part := range cp.Parts { 374 ps = append(ps, part.Part) 375 } 376 return ps 377} 378 379// getCompletedBytes returns completed bytes count 380func (cp *uploadCheckpoint) getCompletedBytes() int64 { 381 var completedBytes int64 382 for _, part := range cp.Parts { 383 if part.IsCompleted { 384 completedBytes += part.Chunk.Size 385 } 386 } 387 return completedBytes 388} 389 390// calcFileMD5 calculates the MD5 for the specified local file 391func calcFileMD5(filePath string) (string, error) { 392 return "", nil 393} 394 395// prepare initializes the multipart upload 396func prepare(cp *uploadCheckpoint, objectKey, filePath string, partSize int64, bucket *Bucket, options []Option) error { 397 // CP 398 cp.Magic = uploadCpMagic 399 cp.FilePath = filePath 400 cp.ObjectKey = objectKey 401 402 // Local file 403 fd, err := os.Open(filePath) 404 if err != nil { 405 return err 406 } 407 defer fd.Close() 408 409 st, err := fd.Stat() 410 if err != nil { 411 return err 412 } 413 cp.FileStat.Size = st.Size() 414 cp.FileStat.LastModified = st.ModTime() 415 md, err := calcFileMD5(filePath) 416 if err != nil { 417 return err 418 } 419 cp.FileStat.MD5 = md 420 421 // Chunks 422 parts, err := SplitFileByPartSize(filePath, partSize) 423 if err != nil { 424 return err 425 } 426 427 cp.Parts = make([]cpPart, len(parts)) 428 for i, part := range parts { 429 cp.Parts[i].Chunk = part 430 cp.Parts[i].IsCompleted = false 431 } 432 433 // Init load 434 imur, err := bucket.InitiateMultipartUpload(objectKey, options...) 435 if err != nil { 436 return err 437 } 438 cp.UploadID = imur.UploadID 439 440 return nil 441} 442 443// complete completes the multipart upload and deletes the local CP files 444func complete(cp *uploadCheckpoint, bucket *Bucket, parts []UploadPart, cpFilePath string, options []Option) error { 445 imur := InitiateMultipartUploadResult{Bucket: bucket.BucketName, 446 Key: cp.ObjectKey, UploadID: cp.UploadID} 447 _, err := bucket.CompleteMultipartUpload(imur, parts, options...) 448 if err != nil { 449 return err 450 } 451 os.Remove(cpFilePath) 452 return err 453} 454 455// uploadFileWithCp handles concurrent upload with checkpoint 456func (bucket Bucket) uploadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int) error { 457 listener := getProgressListener(options) 458 459 partOptions := ChoiceTransferPartOption(options) 460 completeOptions := ChoiceCompletePartOption(options) 461 462 // Load CP data 463 ucp := uploadCheckpoint{} 464 err := ucp.load(cpFilePath) 465 if err != nil { 466 os.Remove(cpFilePath) 467 } 468 469 // Load error or the CP data is invalid. 470 valid, err := ucp.isValid(filePath) 471 if err != nil || !valid { 472 if err = prepare(&ucp, objectKey, filePath, partSize, &bucket, options); err != nil { 473 return err 474 } 475 os.Remove(cpFilePath) 476 } 477 478 chunks := ucp.todoParts() 479 imur := InitiateMultipartUploadResult{ 480 Bucket: bucket.BucketName, 481 Key: objectKey, 482 UploadID: ucp.UploadID} 483 484 jobs := make(chan FileChunk, len(chunks)) 485 results := make(chan UploadPart, len(chunks)) 486 failed := make(chan error) 487 die := make(chan bool) 488 489 completedBytes := ucp.getCompletedBytes() 490 491 // why RwBytes in ProgressEvent is 0 ? 492 // because read or write event has been notified in teeReader.Read() 493 event := newProgressEvent(TransferStartedEvent, completedBytes, ucp.FileStat.Size, 0) 494 publishProgress(listener, event) 495 496 // Start the workers 497 arg := workerArg{&bucket, filePath, imur, partOptions, uploadPartHooker} 498 for w := 1; w <= routines; w++ { 499 go worker(w, arg, jobs, results, failed, die) 500 } 501 502 // Schedule jobs 503 go scheduler(jobs, chunks) 504 505 // Waiting for the job finished 506 completed := 0 507 for completed < len(chunks) { 508 select { 509 case part := <-results: 510 completed++ 511 ucp.updatePart(part) 512 ucp.dump(cpFilePath) 513 completedBytes += ucp.Parts[part.PartNumber-1].Chunk.Size 514 event = newProgressEvent(TransferDataEvent, completedBytes, ucp.FileStat.Size, 0) 515 publishProgress(listener, event) 516 case err := <-failed: 517 close(die) 518 event = newProgressEvent(TransferFailedEvent, completedBytes, ucp.FileStat.Size, 0) 519 publishProgress(listener, event) 520 return err 521 } 522 523 if completed >= len(chunks) { 524 break 525 } 526 } 527 528 event = newProgressEvent(TransferCompletedEvent, completedBytes, ucp.FileStat.Size, 0) 529 publishProgress(listener, event) 530 531 // Complete the multipart upload 532 err = complete(&ucp, &bucket, ucp.allParts(), cpFilePath, completeOptions) 533 return err 534} 535