1package sftp 2 3import ( 4 "bytes" 5 "encoding/binary" 6 "errors" 7 "io" 8 "math" 9 "os" 10 "path" 11 "sync" 12 "sync/atomic" 13 "syscall" 14 "time" 15 16 "github.com/kr/fs" 17 "golang.org/x/crypto/ssh" 18) 19 20var ( 21 // ErrInternalInconsistency indicates the packets sent and the data queued to be 22 // written to the file don't match up. It is an unusual error and usually is 23 // caused by bad behavior server side or connection issues. The error is 24 // limited in scope to the call where it happened, the client object is still 25 // OK to use as long as the connection is still open. 26 ErrInternalInconsistency = errors.New("internal inconsistency") 27 // InternalInconsistency alias for ErrInternalInconsistency. 28 // 29 // Deprecated: please use ErrInternalInconsistency 30 InternalInconsistency = ErrInternalInconsistency 31) 32 33// A ClientOption is a function which applies configuration to a Client. 34type ClientOption func(*Client) error 35 36// MaxPacketChecked sets the maximum size of the payload, measured in bytes. 37// This option only accepts sizes servers should support, ie. <= 32768 bytes. 38// 39// If you get the error "failed to send packet header: EOF" when copying a 40// large file, try lowering this number. 41// 42// The default packet size is 32768 bytes. 43func MaxPacketChecked(size int) ClientOption { 44 return func(c *Client) error { 45 if size < 1 { 46 return errors.New("size must be greater or equal to 1") 47 } 48 if size > 32768 { 49 return errors.New("sizes larger than 32KB might not work with all servers") 50 } 51 c.maxPacket = size 52 return nil 53 } 54} 55 56// MaxPacketUnchecked sets the maximum size of the payload, measured in bytes. 57// It accepts sizes larger than the 32768 bytes all servers should support. 58// Only use a setting higher than 32768 if your application always connects to 59// the same server or after sufficiently broad testing. 60// 61// If you get the error "failed to send packet header: EOF" when copying a 62// large file, try lowering this number. 63// 64// The default packet size is 32768 bytes. 65func MaxPacketUnchecked(size int) ClientOption { 66 return func(c *Client) error { 67 if size < 1 { 68 return errors.New("size must be greater or equal to 1") 69 } 70 c.maxPacket = size 71 return nil 72 } 73} 74 75// MaxPacket sets the maximum size of the payload, measured in bytes. 76// This option only accepts sizes servers should support, ie. <= 32768 bytes. 77// This is a synonym for MaxPacketChecked that provides backward compatibility. 78// 79// If you get the error "failed to send packet header: EOF" when copying a 80// large file, try lowering this number. 81// 82// The default packet size is 32768 bytes. 83func MaxPacket(size int) ClientOption { 84 return MaxPacketChecked(size) 85} 86 87// MaxConcurrentRequestsPerFile sets the maximum concurrent requests allowed for a single file. 88// 89// The default maximum concurrent requests is 64. 90func MaxConcurrentRequestsPerFile(n int) ClientOption { 91 return func(c *Client) error { 92 if n < 1 { 93 return errors.New("n must be greater or equal to 1") 94 } 95 c.maxConcurrentRequests = n 96 return nil 97 } 98} 99 100// UseConcurrentWrites allows the Client to perform concurrent Writes. 101// 102// Using concurrency while doing writes, requires special consideration. 103// A write to a later offset in a file after an error, 104// could end up with a file length longer than what was successfully written. 105// 106// When using this option, if you receive an error during `io.Copy` or `io.WriteTo`, 107// you may need to `Truncate` the target Writer to avoid “holes” in the data written. 108func UseConcurrentWrites(value bool) ClientOption { 109 return func(c *Client) error { 110 c.useConcurrentWrites = value 111 return nil 112 } 113} 114 115// UseConcurrentReads allows the Client to perform concurrent Reads. 116// 117// Concurrent reads are generally safe to use and not using them will degrade 118// performance, so this option is enabled by default. 119// 120// When enabled, WriteTo will use Stat/Fstat to get the file size and determines 121// how many concurrent workers to use. 122// Some "read once" servers will delete the file if they receive a stat call on an 123// open file and then the download will fail. 124// Disabling concurrent reads you will be able to download files from these servers. 125// If concurrent reads are disabled, the UseFstat option is ignored. 126func UseConcurrentReads(value bool) ClientOption { 127 return func(c *Client) error { 128 c.disableConcurrentReads = !value 129 return nil 130 } 131} 132 133// UseFstat sets whether to use Fstat or Stat when File.WriteTo is called 134// (usually when copying files). 135// Some servers limit the amount of open files and calling Stat after opening 136// the file will throw an error From the server. Setting this flag will call 137// Fstat instead of Stat which is suppose to be called on an open file handle. 138// 139// It has been found that that with IBM Sterling SFTP servers which have 140// "extractability" level set to 1 which means only 1 file can be opened at 141// any given time. 142// 143// If the server you are working with still has an issue with both Stat and 144// Fstat calls you can always open a file and read it until the end. 145// 146// Another reason to read the file until its end and Fstat doesn't work is 147// that in some servers, reading a full file will automatically delete the 148// file as some of these mainframes map the file to a message in a queue. 149// Once the file has been read it will get deleted. 150func UseFstat(value bool) ClientOption { 151 return func(c *Client) error { 152 c.useFstat = value 153 return nil 154 } 155} 156 157// Client represents an SFTP session on a *ssh.ClientConn SSH connection. 158// Multiple Clients can be active on a single SSH connection, and a Client 159// may be called concurrently from multiple Goroutines. 160// 161// Client implements the github.com/kr/fs.FileSystem interface. 162type Client struct { 163 clientConn 164 165 ext map[string]string // Extensions (name -> data). 166 167 maxPacket int // max packet size read or written. 168 maxConcurrentRequests int 169 nextid uint32 170 171 // write concurrency is… error prone. 172 // Default behavior should be to not use it. 173 useConcurrentWrites bool 174 useFstat bool 175 disableConcurrentReads bool 176} 177 178// NewClient creates a new SFTP client on conn, using zero or more option 179// functions. 180func NewClient(conn *ssh.Client, opts ...ClientOption) (*Client, error) { 181 s, err := conn.NewSession() 182 if err != nil { 183 return nil, err 184 } 185 if err := s.RequestSubsystem("sftp"); err != nil { 186 return nil, err 187 } 188 pw, err := s.StdinPipe() 189 if err != nil { 190 return nil, err 191 } 192 pr, err := s.StdoutPipe() 193 if err != nil { 194 return nil, err 195 } 196 197 return NewClientPipe(pr, pw, opts...) 198} 199 200// NewClientPipe creates a new SFTP client given a Reader and a WriteCloser. 201// This can be used for connecting to an SFTP server over TCP/TLS or by using 202// the system's ssh client program (e.g. via exec.Command). 203func NewClientPipe(rd io.Reader, wr io.WriteCloser, opts ...ClientOption) (*Client, error) { 204 sftp := &Client{ 205 clientConn: clientConn{ 206 conn: conn{ 207 Reader: rd, 208 WriteCloser: wr, 209 }, 210 inflight: make(map[uint32]chan<- result), 211 closed: make(chan struct{}), 212 }, 213 214 ext: make(map[string]string), 215 216 maxPacket: 1 << 15, 217 maxConcurrentRequests: 64, 218 } 219 220 for _, opt := range opts { 221 if err := opt(sftp); err != nil { 222 wr.Close() 223 return nil, err 224 } 225 } 226 227 if err := sftp.sendInit(); err != nil { 228 wr.Close() 229 return nil, err 230 } 231 if err := sftp.recvVersion(); err != nil { 232 wr.Close() 233 return nil, err 234 } 235 236 sftp.clientConn.wg.Add(1) 237 go sftp.loop() 238 239 return sftp, nil 240} 241 242// Create creates the named file mode 0666 (before umask), truncating it if it 243// already exists. If successful, methods on the returned File can be used for 244// I/O; the associated file descriptor has mode O_RDWR. If you need more 245// control over the flags/mode used to open the file see client.OpenFile. 246// 247// Note that some SFTP servers (eg. AWS Transfer) do not support opening files 248// read/write at the same time. For those services you will need to use 249// `client.OpenFile(os.O_WRONLY|os.O_CREATE|os.O_TRUNC)`. 250func (c *Client) Create(path string) (*File, error) { 251 return c.open(path, flags(os.O_RDWR|os.O_CREATE|os.O_TRUNC)) 252} 253 254const sftpProtocolVersion = 3 // http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02 255 256func (c *Client) sendInit() error { 257 return c.clientConn.conn.sendPacket(&sshFxInitPacket{ 258 Version: sftpProtocolVersion, // http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02 259 }) 260} 261 262// returns the next value of c.nextid 263func (c *Client) nextID() uint32 { 264 return atomic.AddUint32(&c.nextid, 1) 265} 266 267func (c *Client) recvVersion() error { 268 typ, data, err := c.recvPacket(0) 269 if err != nil { 270 return err 271 } 272 if typ != sshFxpVersion { 273 return &unexpectedPacketErr{sshFxpVersion, typ} 274 } 275 276 version, data, err := unmarshalUint32Safe(data) 277 if err != nil { 278 return err 279 } 280 if version != sftpProtocolVersion { 281 return &unexpectedVersionErr{sftpProtocolVersion, version} 282 } 283 284 for len(data) > 0 { 285 var ext extensionPair 286 ext, data, err = unmarshalExtensionPair(data) 287 if err != nil { 288 return err 289 } 290 c.ext[ext.Name] = ext.Data 291 } 292 293 return nil 294} 295 296// HasExtension checks whether the server supports a named extension. 297// 298// The first return value is the extension data reported by the server 299// (typically a version number). 300func (c *Client) HasExtension(name string) (string, bool) { 301 data, ok := c.ext[name] 302 return data, ok 303} 304 305// Walk returns a new Walker rooted at root. 306func (c *Client) Walk(root string) *fs.Walker { 307 return fs.WalkFS(root, c) 308} 309 310// ReadDir reads the directory named by dirname and returns a list of 311// directory entries. 312func (c *Client) ReadDir(p string) ([]os.FileInfo, error) { 313 handle, err := c.opendir(p) 314 if err != nil { 315 return nil, err 316 } 317 defer c.close(handle) // this has to defer earlier than the lock below 318 var attrs []os.FileInfo 319 var done = false 320 for !done { 321 id := c.nextID() 322 typ, data, err1 := c.sendPacket(nil, &sshFxpReaddirPacket{ 323 ID: id, 324 Handle: handle, 325 }) 326 if err1 != nil { 327 err = err1 328 done = true 329 break 330 } 331 switch typ { 332 case sshFxpName: 333 sid, data := unmarshalUint32(data) 334 if sid != id { 335 return nil, &unexpectedIDErr{id, sid} 336 } 337 count, data := unmarshalUint32(data) 338 for i := uint32(0); i < count; i++ { 339 var filename string 340 filename, data = unmarshalString(data) 341 _, data = unmarshalString(data) // discard longname 342 var attr *FileStat 343 attr, data = unmarshalAttrs(data) 344 if filename == "." || filename == ".." { 345 continue 346 } 347 attrs = append(attrs, fileInfoFromStat(attr, path.Base(filename))) 348 } 349 case sshFxpStatus: 350 // TODO(dfc) scope warning! 351 err = normaliseError(unmarshalStatus(id, data)) 352 done = true 353 default: 354 return nil, unimplementedPacketErr(typ) 355 } 356 } 357 if err == io.EOF { 358 err = nil 359 } 360 return attrs, err 361} 362 363func (c *Client) opendir(path string) (string, error) { 364 id := c.nextID() 365 typ, data, err := c.sendPacket(nil, &sshFxpOpendirPacket{ 366 ID: id, 367 Path: path, 368 }) 369 if err != nil { 370 return "", err 371 } 372 switch typ { 373 case sshFxpHandle: 374 sid, data := unmarshalUint32(data) 375 if sid != id { 376 return "", &unexpectedIDErr{id, sid} 377 } 378 handle, _ := unmarshalString(data) 379 return handle, nil 380 case sshFxpStatus: 381 return "", normaliseError(unmarshalStatus(id, data)) 382 default: 383 return "", unimplementedPacketErr(typ) 384 } 385} 386 387// Stat returns a FileInfo structure describing the file specified by path 'p'. 388// If 'p' is a symbolic link, the returned FileInfo structure describes the referent file. 389func (c *Client) Stat(p string) (os.FileInfo, error) { 390 fs, err := c.stat(p) 391 if err != nil { 392 return nil, err 393 } 394 return fileInfoFromStat(fs, path.Base(p)), nil 395} 396 397// Lstat returns a FileInfo structure describing the file specified by path 'p'. 398// If 'p' is a symbolic link, the returned FileInfo structure describes the symbolic link. 399func (c *Client) Lstat(p string) (os.FileInfo, error) { 400 id := c.nextID() 401 typ, data, err := c.sendPacket(nil, &sshFxpLstatPacket{ 402 ID: id, 403 Path: p, 404 }) 405 if err != nil { 406 return nil, err 407 } 408 switch typ { 409 case sshFxpAttrs: 410 sid, data := unmarshalUint32(data) 411 if sid != id { 412 return nil, &unexpectedIDErr{id, sid} 413 } 414 attr, _ := unmarshalAttrs(data) 415 return fileInfoFromStat(attr, path.Base(p)), nil 416 case sshFxpStatus: 417 return nil, normaliseError(unmarshalStatus(id, data)) 418 default: 419 return nil, unimplementedPacketErr(typ) 420 } 421} 422 423// ReadLink reads the target of a symbolic link. 424func (c *Client) ReadLink(p string) (string, error) { 425 id := c.nextID() 426 typ, data, err := c.sendPacket(nil, &sshFxpReadlinkPacket{ 427 ID: id, 428 Path: p, 429 }) 430 if err != nil { 431 return "", err 432 } 433 switch typ { 434 case sshFxpName: 435 sid, data := unmarshalUint32(data) 436 if sid != id { 437 return "", &unexpectedIDErr{id, sid} 438 } 439 count, data := unmarshalUint32(data) 440 if count != 1 { 441 return "", unexpectedCount(1, count) 442 } 443 filename, _ := unmarshalString(data) // ignore dummy attributes 444 return filename, nil 445 case sshFxpStatus: 446 return "", normaliseError(unmarshalStatus(id, data)) 447 default: 448 return "", unimplementedPacketErr(typ) 449 } 450} 451 452// Link creates a hard link at 'newname', pointing at the same inode as 'oldname' 453func (c *Client) Link(oldname, newname string) error { 454 id := c.nextID() 455 typ, data, err := c.sendPacket(nil, &sshFxpHardlinkPacket{ 456 ID: id, 457 Oldpath: oldname, 458 Newpath: newname, 459 }) 460 if err != nil { 461 return err 462 } 463 switch typ { 464 case sshFxpStatus: 465 return normaliseError(unmarshalStatus(id, data)) 466 default: 467 return unimplementedPacketErr(typ) 468 } 469} 470 471// Symlink creates a symbolic link at 'newname', pointing at target 'oldname' 472func (c *Client) Symlink(oldname, newname string) error { 473 id := c.nextID() 474 typ, data, err := c.sendPacket(nil, &sshFxpSymlinkPacket{ 475 ID: id, 476 Linkpath: newname, 477 Targetpath: oldname, 478 }) 479 if err != nil { 480 return err 481 } 482 switch typ { 483 case sshFxpStatus: 484 return normaliseError(unmarshalStatus(id, data)) 485 default: 486 return unimplementedPacketErr(typ) 487 } 488} 489 490func (c *Client) setfstat(handle string, flags uint32, attrs interface{}) error { 491 id := c.nextID() 492 typ, data, err := c.sendPacket(nil, &sshFxpFsetstatPacket{ 493 ID: id, 494 Handle: handle, 495 Flags: flags, 496 Attrs: attrs, 497 }) 498 if err != nil { 499 return err 500 } 501 switch typ { 502 case sshFxpStatus: 503 return normaliseError(unmarshalStatus(id, data)) 504 default: 505 return unimplementedPacketErr(typ) 506 } 507} 508 509// setstat is a convience wrapper to allow for changing of various parts of the file descriptor. 510func (c *Client) setstat(path string, flags uint32, attrs interface{}) error { 511 id := c.nextID() 512 typ, data, err := c.sendPacket(nil, &sshFxpSetstatPacket{ 513 ID: id, 514 Path: path, 515 Flags: flags, 516 Attrs: attrs, 517 }) 518 if err != nil { 519 return err 520 } 521 switch typ { 522 case sshFxpStatus: 523 return normaliseError(unmarshalStatus(id, data)) 524 default: 525 return unimplementedPacketErr(typ) 526 } 527} 528 529// Chtimes changes the access and modification times of the named file. 530func (c *Client) Chtimes(path string, atime time.Time, mtime time.Time) error { 531 type times struct { 532 Atime uint32 533 Mtime uint32 534 } 535 attrs := times{uint32(atime.Unix()), uint32(mtime.Unix())} 536 return c.setstat(path, sshFileXferAttrACmodTime, attrs) 537} 538 539// Chown changes the user and group owners of the named file. 540func (c *Client) Chown(path string, uid, gid int) error { 541 type owner struct { 542 UID uint32 543 GID uint32 544 } 545 attrs := owner{uint32(uid), uint32(gid)} 546 return c.setstat(path, sshFileXferAttrUIDGID, attrs) 547} 548 549// Chmod changes the permissions of the named file. 550// 551// Chmod does not apply a umask, because even retrieving the umask is not 552// possible in a portable way without causing a race condition. Callers 553// should mask off umask bits, if desired. 554func (c *Client) Chmod(path string, mode os.FileMode) error { 555 return c.setstat(path, sshFileXferAttrPermissions, toChmodPerm(mode)) 556} 557 558// Truncate sets the size of the named file. Although it may be safely assumed 559// that if the size is less than its current size it will be truncated to fit, 560// the SFTP protocol does not specify what behavior the server should do when setting 561// size greater than the current size. 562func (c *Client) Truncate(path string, size int64) error { 563 return c.setstat(path, sshFileXferAttrSize, uint64(size)) 564} 565 566// Open opens the named file for reading. If successful, methods on the 567// returned file can be used for reading; the associated file descriptor 568// has mode O_RDONLY. 569func (c *Client) Open(path string) (*File, error) { 570 return c.open(path, flags(os.O_RDONLY)) 571} 572 573// OpenFile is the generalized open call; most users will use Open or 574// Create instead. It opens the named file with specified flag (O_RDONLY 575// etc.). If successful, methods on the returned File can be used for I/O. 576func (c *Client) OpenFile(path string, f int) (*File, error) { 577 return c.open(path, flags(f)) 578} 579 580func (c *Client) open(path string, pflags uint32) (*File, error) { 581 id := c.nextID() 582 typ, data, err := c.sendPacket(nil, &sshFxpOpenPacket{ 583 ID: id, 584 Path: path, 585 Pflags: pflags, 586 }) 587 if err != nil { 588 return nil, err 589 } 590 switch typ { 591 case sshFxpHandle: 592 sid, data := unmarshalUint32(data) 593 if sid != id { 594 return nil, &unexpectedIDErr{id, sid} 595 } 596 handle, _ := unmarshalString(data) 597 return &File{c: c, path: path, handle: handle}, nil 598 case sshFxpStatus: 599 return nil, normaliseError(unmarshalStatus(id, data)) 600 default: 601 return nil, unimplementedPacketErr(typ) 602 } 603} 604 605// close closes a handle handle previously returned in the response 606// to SSH_FXP_OPEN or SSH_FXP_OPENDIR. The handle becomes invalid 607// immediately after this request has been sent. 608func (c *Client) close(handle string) error { 609 id := c.nextID() 610 typ, data, err := c.sendPacket(nil, &sshFxpClosePacket{ 611 ID: id, 612 Handle: handle, 613 }) 614 if err != nil { 615 return err 616 } 617 switch typ { 618 case sshFxpStatus: 619 return normaliseError(unmarshalStatus(id, data)) 620 default: 621 return unimplementedPacketErr(typ) 622 } 623} 624 625func (c *Client) stat(path string) (*FileStat, error) { 626 id := c.nextID() 627 typ, data, err := c.sendPacket(nil, &sshFxpStatPacket{ 628 ID: id, 629 Path: path, 630 }) 631 if err != nil { 632 return nil, err 633 } 634 switch typ { 635 case sshFxpAttrs: 636 sid, data := unmarshalUint32(data) 637 if sid != id { 638 return nil, &unexpectedIDErr{id, sid} 639 } 640 attr, _ := unmarshalAttrs(data) 641 return attr, nil 642 case sshFxpStatus: 643 return nil, normaliseError(unmarshalStatus(id, data)) 644 default: 645 return nil, unimplementedPacketErr(typ) 646 } 647} 648 649func (c *Client) fstat(handle string) (*FileStat, error) { 650 id := c.nextID() 651 typ, data, err := c.sendPacket(nil, &sshFxpFstatPacket{ 652 ID: id, 653 Handle: handle, 654 }) 655 if err != nil { 656 return nil, err 657 } 658 switch typ { 659 case sshFxpAttrs: 660 sid, data := unmarshalUint32(data) 661 if sid != id { 662 return nil, &unexpectedIDErr{id, sid} 663 } 664 attr, _ := unmarshalAttrs(data) 665 return attr, nil 666 case sshFxpStatus: 667 return nil, normaliseError(unmarshalStatus(id, data)) 668 default: 669 return nil, unimplementedPacketErr(typ) 670 } 671} 672 673// StatVFS retrieves VFS statistics from a remote host. 674// 675// It implements the statvfs@openssh.com SSH_FXP_EXTENDED feature 676// from http://www.opensource.apple.com/source/OpenSSH/OpenSSH-175/openssh/PROTOCOL?txt. 677func (c *Client) StatVFS(path string) (*StatVFS, error) { 678 // send the StatVFS packet to the server 679 id := c.nextID() 680 typ, data, err := c.sendPacket(nil, &sshFxpStatvfsPacket{ 681 ID: id, 682 Path: path, 683 }) 684 if err != nil { 685 return nil, err 686 } 687 688 switch typ { 689 // server responded with valid data 690 case sshFxpExtendedReply: 691 var response StatVFS 692 err = binary.Read(bytes.NewReader(data), binary.BigEndian, &response) 693 if err != nil { 694 return nil, errors.New("can not parse reply") 695 } 696 697 return &response, nil 698 699 // the resquest failed 700 case sshFxpStatus: 701 return nil, normaliseError(unmarshalStatus(id, data)) 702 703 default: 704 return nil, unimplementedPacketErr(typ) 705 } 706} 707 708// Join joins any number of path elements into a single path, adding a 709// separating slash if necessary. The result is Cleaned; in particular, all 710// empty strings are ignored. 711func (c *Client) Join(elem ...string) string { return path.Join(elem...) } 712 713// Remove removes the specified file or directory. An error will be returned if no 714// file or directory with the specified path exists, or if the specified directory 715// is not empty. 716func (c *Client) Remove(path string) error { 717 err := c.removeFile(path) 718 // some servers, *cough* osx *cough*, return EPERM, not ENODIR. 719 // serv-u returns ssh_FX_FILE_IS_A_DIRECTORY 720 // EPERM is converted to os.ErrPermission so it is not a StatusError 721 if err, ok := err.(*StatusError); ok { 722 switch err.Code { 723 case sshFxFailure, sshFxFileIsADirectory: 724 return c.RemoveDirectory(path) 725 } 726 } 727 if os.IsPermission(err) { 728 return c.RemoveDirectory(path) 729 } 730 return err 731} 732 733func (c *Client) removeFile(path string) error { 734 id := c.nextID() 735 typ, data, err := c.sendPacket(nil, &sshFxpRemovePacket{ 736 ID: id, 737 Filename: path, 738 }) 739 if err != nil { 740 return err 741 } 742 switch typ { 743 case sshFxpStatus: 744 return normaliseError(unmarshalStatus(id, data)) 745 default: 746 return unimplementedPacketErr(typ) 747 } 748} 749 750// RemoveDirectory removes a directory path. 751func (c *Client) RemoveDirectory(path string) error { 752 id := c.nextID() 753 typ, data, err := c.sendPacket(nil, &sshFxpRmdirPacket{ 754 ID: id, 755 Path: path, 756 }) 757 if err != nil { 758 return err 759 } 760 switch typ { 761 case sshFxpStatus: 762 return normaliseError(unmarshalStatus(id, data)) 763 default: 764 return unimplementedPacketErr(typ) 765 } 766} 767 768// Rename renames a file. 769func (c *Client) Rename(oldname, newname string) error { 770 id := c.nextID() 771 typ, data, err := c.sendPacket(nil, &sshFxpRenamePacket{ 772 ID: id, 773 Oldpath: oldname, 774 Newpath: newname, 775 }) 776 if err != nil { 777 return err 778 } 779 switch typ { 780 case sshFxpStatus: 781 return normaliseError(unmarshalStatus(id, data)) 782 default: 783 return unimplementedPacketErr(typ) 784 } 785} 786 787// PosixRename renames a file using the posix-rename@openssh.com extension 788// which will replace newname if it already exists. 789func (c *Client) PosixRename(oldname, newname string) error { 790 id := c.nextID() 791 typ, data, err := c.sendPacket(nil, &sshFxpPosixRenamePacket{ 792 ID: id, 793 Oldpath: oldname, 794 Newpath: newname, 795 }) 796 if err != nil { 797 return err 798 } 799 switch typ { 800 case sshFxpStatus: 801 return normaliseError(unmarshalStatus(id, data)) 802 default: 803 return unimplementedPacketErr(typ) 804 } 805} 806 807// RealPath can be used to have the server canonicalize any given path name to an absolute path. 808// 809// This is useful for converting path names containing ".." components, 810// or relative pathnames without a leading slash into absolute paths. 811func (c *Client) RealPath(path string) (string, error) { 812 id := c.nextID() 813 typ, data, err := c.sendPacket(nil, &sshFxpRealpathPacket{ 814 ID: id, 815 Path: path, 816 }) 817 if err != nil { 818 return "", err 819 } 820 switch typ { 821 case sshFxpName: 822 sid, data := unmarshalUint32(data) 823 if sid != id { 824 return "", &unexpectedIDErr{id, sid} 825 } 826 count, data := unmarshalUint32(data) 827 if count != 1 { 828 return "", unexpectedCount(1, count) 829 } 830 filename, _ := unmarshalString(data) // ignore attributes 831 return filename, nil 832 case sshFxpStatus: 833 return "", normaliseError(unmarshalStatus(id, data)) 834 default: 835 return "", unimplementedPacketErr(typ) 836 } 837} 838 839// Getwd returns the current working directory of the server. Operations 840// involving relative paths will be based at this location. 841func (c *Client) Getwd() (string, error) { 842 return c.RealPath(".") 843} 844 845// Mkdir creates the specified directory. An error will be returned if a file or 846// directory with the specified path already exists, or if the directory's 847// parent folder does not exist (the method cannot create complete paths). 848func (c *Client) Mkdir(path string) error { 849 id := c.nextID() 850 typ, data, err := c.sendPacket(nil, &sshFxpMkdirPacket{ 851 ID: id, 852 Path: path, 853 }) 854 if err != nil { 855 return err 856 } 857 switch typ { 858 case sshFxpStatus: 859 return normaliseError(unmarshalStatus(id, data)) 860 default: 861 return unimplementedPacketErr(typ) 862 } 863} 864 865// MkdirAll creates a directory named path, along with any necessary parents, 866// and returns nil, or else returns an error. 867// If path is already a directory, MkdirAll does nothing and returns nil. 868// If path contains a regular file, an error is returned 869func (c *Client) MkdirAll(path string) error { 870 // Most of this code mimics https://golang.org/src/os/path.go?s=514:561#L13 871 // Fast path: if we can tell whether path is a directory or file, stop with success or error. 872 dir, err := c.Stat(path) 873 if err == nil { 874 if dir.IsDir() { 875 return nil 876 } 877 return &os.PathError{Op: "mkdir", Path: path, Err: syscall.ENOTDIR} 878 } 879 880 // Slow path: make sure parent exists and then call Mkdir for path. 881 i := len(path) 882 for i > 0 && path[i-1] == '/' { // Skip trailing path separator. 883 i-- 884 } 885 886 j := i 887 for j > 0 && path[j-1] != '/' { // Scan backward over element. 888 j-- 889 } 890 891 if j > 1 { 892 // Create parent 893 err = c.MkdirAll(path[0 : j-1]) 894 if err != nil { 895 return err 896 } 897 } 898 899 // Parent now exists; invoke Mkdir and use its result. 900 err = c.Mkdir(path) 901 if err != nil { 902 // Handle arguments like "foo/." by 903 // double-checking that directory doesn't exist. 904 dir, err1 := c.Lstat(path) 905 if err1 == nil && dir.IsDir() { 906 return nil 907 } 908 return err 909 } 910 return nil 911} 912 913// File represents a remote file. 914type File struct { 915 c *Client 916 path string 917 handle string 918 919 mu sync.Mutex 920 offset int64 // current offset within remote file 921} 922 923// Close closes the File, rendering it unusable for I/O. It returns an 924// error, if any. 925func (f *File) Close() error { 926 return f.c.close(f.handle) 927} 928 929// Name returns the name of the file as presented to Open or Create. 930func (f *File) Name() string { 931 return f.path 932} 933 934// Read reads up to len(b) bytes from the File. It returns the number of bytes 935// read and an error, if any. Read follows io.Reader semantics, so when Read 936// encounters an error or EOF condition after successfully reading n > 0 bytes, 937// it returns the number of bytes read. 938// 939// To maximise throughput for transferring the entire file (especially 940// over high latency links) it is recommended to use WriteTo rather 941// than calling Read multiple times. io.Copy will do this 942// automatically. 943func (f *File) Read(b []byte) (int, error) { 944 f.mu.Lock() 945 defer f.mu.Unlock() 946 947 n, err := f.ReadAt(b, f.offset) 948 f.offset += int64(n) 949 return n, err 950} 951 952// readChunkAt attempts to read the whole entire length of the buffer from the file starting at the offset. 953// It will continue progressively reading into the buffer until it fills the whole buffer, or an error occurs. 954func (f *File) readChunkAt(ch chan result, b []byte, off int64) (n int, err error) { 955 for err == nil && n < len(b) { 956 id := f.c.nextID() 957 typ, data, err := f.c.sendPacket(ch, &sshFxpReadPacket{ 958 ID: id, 959 Handle: f.handle, 960 Offset: uint64(off) + uint64(n), 961 Len: uint32(len(b) - n), 962 }) 963 if err != nil { 964 return n, err 965 } 966 967 switch typ { 968 case sshFxpStatus: 969 return n, normaliseError(unmarshalStatus(id, data)) 970 971 case sshFxpData: 972 sid, data := unmarshalUint32(data) 973 if id != sid { 974 return n, &unexpectedIDErr{id, sid} 975 } 976 977 l, data := unmarshalUint32(data) 978 n += copy(b[n:], data[:l]) 979 980 default: 981 return n, unimplementedPacketErr(typ) 982 } 983 } 984 985 return 986} 987 988func (f *File) readAtSequential(b []byte, off int64) (read int, err error) { 989 for read < len(b) { 990 rb := b[read:] 991 if len(rb) > f.c.maxPacket { 992 rb = rb[:f.c.maxPacket] 993 } 994 n, err := f.readChunkAt(nil, rb, off+int64(read)) 995 if n < 0 { 996 panic("sftp.File: returned negative count from readChunkAt") 997 } 998 if n > 0 { 999 read += n 1000 } 1001 if err != nil { 1002 if errors.Is(err, io.EOF) { 1003 return read, nil // return nil explicitly. 1004 } 1005 return read, err 1006 } 1007 } 1008 return read, nil 1009} 1010 1011// ReadAt reads up to len(b) byte from the File at a given offset `off`. It returns 1012// the number of bytes read and an error, if any. ReadAt follows io.ReaderAt semantics, 1013// so the file offset is not altered during the read. 1014func (f *File) ReadAt(b []byte, off int64) (int, error) { 1015 if len(b) <= f.c.maxPacket { 1016 // This should be able to be serviced with 1/2 requests. 1017 // So, just do it directly. 1018 return f.readChunkAt(nil, b, off) 1019 } 1020 1021 if f.c.disableConcurrentReads { 1022 return f.readAtSequential(b, off) 1023 } 1024 1025 // Split the read into multiple maxPacket-sized concurrent reads bounded by maxConcurrentRequests. 1026 // This allows writes with a suitably large buffer to transfer data at a much faster rate 1027 // by overlapping round trip times. 1028 1029 cancel := make(chan struct{}) 1030 1031 concurrency := len(b)/f.c.maxPacket + 1 1032 if concurrency > f.c.maxConcurrentRequests || concurrency < 1 { 1033 concurrency = f.c.maxConcurrentRequests 1034 } 1035 1036 resPool := newResChanPool(concurrency) 1037 1038 type work struct { 1039 id uint32 1040 res chan result 1041 1042 b []byte 1043 off int64 1044 } 1045 workCh := make(chan work) 1046 1047 // Slice: cut up the Read into any number of buffers of length <= f.c.maxPacket, and at appropriate offsets. 1048 go func() { 1049 defer close(workCh) 1050 1051 b := b 1052 offset := off 1053 chunkSize := f.c.maxPacket 1054 1055 for len(b) > 0 { 1056 rb := b 1057 if len(rb) > chunkSize { 1058 rb = rb[:chunkSize] 1059 } 1060 1061 id := f.c.nextID() 1062 res := resPool.Get() 1063 1064 f.c.dispatchRequest(res, &sshFxpReadPacket{ 1065 ID: id, 1066 Handle: f.handle, 1067 Offset: uint64(offset), 1068 Len: uint32(chunkSize), 1069 }) 1070 1071 select { 1072 case workCh <- work{id, res, rb, offset}: 1073 case <-cancel: 1074 return 1075 } 1076 1077 offset += int64(len(rb)) 1078 b = b[len(rb):] 1079 } 1080 }() 1081 1082 type rErr struct { 1083 off int64 1084 err error 1085 } 1086 errCh := make(chan rErr) 1087 1088 var wg sync.WaitGroup 1089 wg.Add(concurrency) 1090 for i := 0; i < concurrency; i++ { 1091 // Map_i: each worker gets work, and then performs the Read into its buffer from its respective offset. 1092 go func() { 1093 defer wg.Done() 1094 1095 for packet := range workCh { 1096 var n int 1097 1098 s := <-packet.res 1099 resPool.Put(packet.res) 1100 1101 err := s.err 1102 if err == nil { 1103 switch s.typ { 1104 case sshFxpStatus: 1105 err = normaliseError(unmarshalStatus(packet.id, s.data)) 1106 1107 case sshFxpData: 1108 sid, data := unmarshalUint32(s.data) 1109 if packet.id != sid { 1110 err = &unexpectedIDErr{packet.id, sid} 1111 1112 } else { 1113 l, data := unmarshalUint32(data) 1114 n = copy(packet.b, data[:l]) 1115 1116 // For normal disk files, it is guaranteed that this will read 1117 // the specified number of bytes, or up to end of file. 1118 // This implies, if we have a short read, that means EOF. 1119 if n < len(packet.b) { 1120 err = io.EOF 1121 } 1122 } 1123 1124 default: 1125 err = unimplementedPacketErr(s.typ) 1126 } 1127 } 1128 1129 if err != nil { 1130 // return the offset as the start + how much we read before the error. 1131 errCh <- rErr{packet.off + int64(n), err} 1132 return 1133 } 1134 } 1135 }() 1136 } 1137 1138 // Wait for long tail, before closing results. 1139 go func() { 1140 wg.Wait() 1141 close(errCh) 1142 }() 1143 1144 // Reduce: collect all the results into a relevant return: the earliest offset to return an error. 1145 firstErr := rErr{math.MaxInt64, nil} 1146 for rErr := range errCh { 1147 if rErr.off <= firstErr.off { 1148 firstErr = rErr 1149 } 1150 1151 select { 1152 case <-cancel: 1153 default: 1154 // stop any more work from being distributed. (Just in case.) 1155 close(cancel) 1156 } 1157 } 1158 1159 if firstErr.err != nil { 1160 // firstErr.err != nil if and only if firstErr.off > our starting offset. 1161 return int(firstErr.off - off), firstErr.err 1162 } 1163 1164 // As per spec for io.ReaderAt, we return nil error if and only if we read everything. 1165 return len(b), nil 1166} 1167 1168// writeToSequential implements WriteTo, but works sequentially with no parallelism. 1169func (f *File) writeToSequential(w io.Writer) (written int64, err error) { 1170 b := make([]byte, f.c.maxPacket) 1171 ch := make(chan result, 1) // reusable channel 1172 1173 for { 1174 n, err := f.readChunkAt(ch, b, f.offset) 1175 if n < 0 { 1176 panic("sftp.File: returned negative count from readChunkAt") 1177 } 1178 1179 if n > 0 { 1180 f.offset += int64(n) 1181 1182 m, err2 := w.Write(b[:n]) 1183 written += int64(m) 1184 1185 if err == nil { 1186 err = err2 1187 } 1188 } 1189 1190 if err != nil { 1191 if err == io.EOF { 1192 return written, nil // return nil explicitly. 1193 } 1194 1195 return written, err 1196 } 1197 } 1198} 1199 1200// WriteTo writes the file to the given Writer. 1201// The return value is the number of bytes written. 1202// Any error encountered during the write is also returned. 1203// 1204// This method is preferred over calling Read multiple times 1205// to maximise throughput for transferring the entire file, 1206// especially over high latency links. 1207func (f *File) WriteTo(w io.Writer) (written int64, err error) { 1208 f.mu.Lock() 1209 defer f.mu.Unlock() 1210 1211 if f.c.disableConcurrentReads { 1212 return f.writeToSequential(w) 1213 } 1214 1215 // For concurrency, we want to guess how many concurrent workers we should use. 1216 var fileStat *FileStat 1217 if f.c.useFstat { 1218 fileStat, err = f.c.fstat(f.handle) 1219 } else { 1220 fileStat, err = f.c.stat(f.path) 1221 } 1222 if err != nil { 1223 return 0, err 1224 } 1225 1226 fileSize := fileStat.Size 1227 if fileSize <= uint64(f.c.maxPacket) || !isRegular(fileStat.Mode) { 1228 // only regular files are guaranteed to return (full read) xor (partial read, next error) 1229 return f.writeToSequential(w) 1230 } 1231 1232 concurrency64 := fileSize/uint64(f.c.maxPacket) + 1 // a bad guess, but better than no guess 1233 if concurrency64 > uint64(f.c.maxConcurrentRequests) || concurrency64 < 1 { 1234 concurrency64 = uint64(f.c.maxConcurrentRequests) 1235 } 1236 // Now that concurrency64 is saturated to an int value, we know this assignment cannot possibly overflow. 1237 concurrency := int(concurrency64) 1238 1239 chunkSize := f.c.maxPacket 1240 pool := newBufPool(concurrency, chunkSize) 1241 resPool := newResChanPool(concurrency) 1242 1243 cancel := make(chan struct{}) 1244 var wg sync.WaitGroup 1245 defer func() { 1246 // Once the writing Reduce phase has ended, all the feed work needs to unconditionally stop. 1247 close(cancel) 1248 1249 // We want to wait until all outstanding goroutines with an `f` or `f.c` reference have completed. 1250 // Just to be sure we don’t orphan any goroutines any hanging references. 1251 wg.Wait() 1252 }() 1253 1254 type writeWork struct { 1255 b []byte 1256 off int64 1257 err error 1258 1259 next chan writeWork 1260 } 1261 writeCh := make(chan writeWork) 1262 1263 type readWork struct { 1264 id uint32 1265 res chan result 1266 off int64 1267 1268 cur, next chan writeWork 1269 } 1270 readCh := make(chan readWork) 1271 1272 // Slice: hand out chunks of work on demand, with a `cur` and `next` channel built-in for sequencing. 1273 go func() { 1274 defer close(readCh) 1275 1276 off := f.offset 1277 1278 cur := writeCh 1279 for { 1280 id := f.c.nextID() 1281 res := resPool.Get() 1282 1283 next := make(chan writeWork) 1284 readWork := readWork{ 1285 id: id, 1286 res: res, 1287 off: off, 1288 1289 cur: cur, 1290 next: next, 1291 } 1292 1293 f.c.dispatchRequest(res, &sshFxpReadPacket{ 1294 ID: id, 1295 Handle: f.handle, 1296 Offset: uint64(off), 1297 Len: uint32(chunkSize), 1298 }) 1299 1300 select { 1301 case readCh <- readWork: 1302 case <-cancel: 1303 return 1304 } 1305 1306 off += int64(chunkSize) 1307 cur = next 1308 } 1309 }() 1310 1311 wg.Add(concurrency) 1312 for i := 0; i < concurrency; i++ { 1313 // Map_i: each worker gets readWork, and does the Read into a buffer at the given offset. 1314 go func() { 1315 defer wg.Done() 1316 1317 for readWork := range readCh { 1318 var b []byte 1319 var n int 1320 1321 s := <-readWork.res 1322 resPool.Put(readWork.res) 1323 1324 err := s.err 1325 if err == nil { 1326 switch s.typ { 1327 case sshFxpStatus: 1328 err = normaliseError(unmarshalStatus(readWork.id, s.data)) 1329 1330 case sshFxpData: 1331 sid, data := unmarshalUint32(s.data) 1332 if readWork.id != sid { 1333 err = &unexpectedIDErr{readWork.id, sid} 1334 1335 } else { 1336 l, data := unmarshalUint32(data) 1337 b = pool.Get()[:l] 1338 n = copy(b, data[:l]) 1339 b = b[:n] 1340 } 1341 1342 default: 1343 err = unimplementedPacketErr(s.typ) 1344 } 1345 } 1346 1347 writeWork := writeWork{ 1348 b: b, 1349 off: readWork.off, 1350 err: err, 1351 1352 next: readWork.next, 1353 } 1354 1355 select { 1356 case readWork.cur <- writeWork: 1357 case <-cancel: 1358 return 1359 } 1360 1361 if err != nil { 1362 return 1363 } 1364 } 1365 }() 1366 } 1367 1368 // Reduce: serialize the results from the reads into sequential writes. 1369 cur := writeCh 1370 for { 1371 packet, ok := <-cur 1372 if !ok { 1373 return written, errors.New("sftp.File.WriteTo: unexpectedly closed channel") 1374 } 1375 1376 // Because writes are serialized, this will always be the last successfully read byte. 1377 f.offset = packet.off + int64(len(packet.b)) 1378 1379 if len(packet.b) > 0 { 1380 n, err := w.Write(packet.b) 1381 written += int64(n) 1382 if err != nil { 1383 return written, err 1384 } 1385 } 1386 1387 if packet.err != nil { 1388 if packet.err == io.EOF { 1389 return written, nil 1390 } 1391 1392 return written, packet.err 1393 } 1394 1395 pool.Put(packet.b) 1396 cur = packet.next 1397 } 1398} 1399 1400// Stat returns the FileInfo structure describing file. If there is an 1401// error. 1402func (f *File) Stat() (os.FileInfo, error) { 1403 fs, err := f.c.fstat(f.handle) 1404 if err != nil { 1405 return nil, err 1406 } 1407 return fileInfoFromStat(fs, path.Base(f.path)), nil 1408} 1409 1410// Write writes len(b) bytes to the File. It returns the number of bytes 1411// written and an error, if any. Write returns a non-nil error when n != 1412// len(b). 1413// 1414// To maximise throughput for transferring the entire file (especially 1415// over high latency links) it is recommended to use ReadFrom rather 1416// than calling Write multiple times. io.Copy will do this 1417// automatically. 1418func (f *File) Write(b []byte) (int, error) { 1419 f.mu.Lock() 1420 defer f.mu.Unlock() 1421 1422 n, err := f.WriteAt(b, f.offset) 1423 f.offset += int64(n) 1424 return n, err 1425} 1426 1427func (f *File) writeChunkAt(ch chan result, b []byte, off int64) (int, error) { 1428 typ, data, err := f.c.sendPacket(ch, &sshFxpWritePacket{ 1429 ID: f.c.nextID(), 1430 Handle: f.handle, 1431 Offset: uint64(off), 1432 Length: uint32(len(b)), 1433 Data: b, 1434 }) 1435 if err != nil { 1436 return 0, err 1437 } 1438 1439 switch typ { 1440 case sshFxpStatus: 1441 id, _ := unmarshalUint32(data) 1442 err := normaliseError(unmarshalStatus(id, data)) 1443 if err != nil { 1444 return 0, err 1445 } 1446 1447 default: 1448 return 0, unimplementedPacketErr(typ) 1449 } 1450 1451 return len(b), nil 1452} 1453 1454// writeAtConcurrent implements WriterAt, but works concurrently rather than sequentially. 1455func (f *File) writeAtConcurrent(b []byte, off int64) (int, error) { 1456 // Split the write into multiple maxPacket sized concurrent writes 1457 // bounded by maxConcurrentRequests. This allows writes with a suitably 1458 // large buffer to transfer data at a much faster rate due to 1459 // overlapping round trip times. 1460 1461 cancel := make(chan struct{}) 1462 1463 type work struct { 1464 b []byte 1465 off int64 1466 } 1467 workCh := make(chan work) 1468 1469 // Slice: cut up the Read into any number of buffers of length <= f.c.maxPacket, and at appropriate offsets. 1470 go func() { 1471 defer close(workCh) 1472 1473 var read int 1474 chunkSize := f.c.maxPacket 1475 1476 for read < len(b) { 1477 wb := b[read:] 1478 if len(wb) > chunkSize { 1479 wb = wb[:chunkSize] 1480 } 1481 1482 select { 1483 case workCh <- work{wb, off + int64(read)}: 1484 case <-cancel: 1485 return 1486 } 1487 1488 read += len(wb) 1489 } 1490 }() 1491 1492 type wErr struct { 1493 off int64 1494 err error 1495 } 1496 errCh := make(chan wErr) 1497 1498 concurrency := len(b)/f.c.maxPacket + 1 1499 if concurrency > f.c.maxConcurrentRequests || concurrency < 1 { 1500 concurrency = f.c.maxConcurrentRequests 1501 } 1502 1503 var wg sync.WaitGroup 1504 wg.Add(concurrency) 1505 for i := 0; i < concurrency; i++ { 1506 // Map_i: each worker gets work, and does the Write from each buffer to its respective offset. 1507 go func() { 1508 defer wg.Done() 1509 1510 ch := make(chan result, 1) // reusable channel per mapper. 1511 1512 for packet := range workCh { 1513 n, err := f.writeChunkAt(ch, packet.b, packet.off) 1514 if err != nil { 1515 // return the offset as the start + how much we wrote before the error. 1516 errCh <- wErr{packet.off + int64(n), err} 1517 } 1518 } 1519 }() 1520 } 1521 1522 // Wait for long tail, before closing results. 1523 go func() { 1524 wg.Wait() 1525 close(errCh) 1526 }() 1527 1528 // Reduce: collect all the results into a relevant return: the earliest offset to return an error. 1529 firstErr := wErr{math.MaxInt64, nil} 1530 for wErr := range errCh { 1531 if wErr.off <= firstErr.off { 1532 firstErr = wErr 1533 } 1534 1535 select { 1536 case <-cancel: 1537 default: 1538 // stop any more work from being distributed. (Just in case.) 1539 close(cancel) 1540 } 1541 } 1542 1543 if firstErr.err != nil { 1544 // firstErr.err != nil if and only if firstErr.off >= our starting offset. 1545 return int(firstErr.off - off), firstErr.err 1546 } 1547 1548 return len(b), nil 1549} 1550 1551// WriteAt writes up to len(b) byte to the File at a given offset `off`. It returns 1552// the number of bytes written and an error, if any. WriteAt follows io.WriterAt semantics, 1553// so the file offset is not altered during the write. 1554func (f *File) WriteAt(b []byte, off int64) (written int, err error) { 1555 if len(b) <= f.c.maxPacket { 1556 // We can do this in one write. 1557 return f.writeChunkAt(nil, b, off) 1558 } 1559 1560 if f.c.useConcurrentWrites { 1561 return f.writeAtConcurrent(b, off) 1562 } 1563 1564 ch := make(chan result, 1) // reusable channel 1565 1566 chunkSize := f.c.maxPacket 1567 1568 for written < len(b) { 1569 wb := b[written:] 1570 if len(wb) > chunkSize { 1571 wb = wb[:chunkSize] 1572 } 1573 1574 n, err := f.writeChunkAt(ch, wb, off+int64(written)) 1575 if n > 0 { 1576 written += n 1577 } 1578 1579 if err != nil { 1580 return written, err 1581 } 1582 } 1583 1584 return len(b), nil 1585} 1586 1587// ReadFromWithConcurrency implements ReaderFrom, 1588// but uses the given concurrency to issue multiple requests at the same time. 1589// 1590// Giving a concurrency of less than one will default to the Client’s max concurrency. 1591// 1592// Otherwise, the given concurrency will be capped by the Client's max concurrency. 1593func (f *File) ReadFromWithConcurrency(r io.Reader, concurrency int) (read int64, err error) { 1594 // Split the write into multiple maxPacket sized concurrent writes. 1595 // This allows writes with a suitably large reader 1596 // to transfer data at a much faster rate due to overlapping round trip times. 1597 1598 cancel := make(chan struct{}) 1599 1600 type work struct { 1601 b []byte 1602 n int 1603 off int64 1604 } 1605 workCh := make(chan work) 1606 1607 type rwErr struct { 1608 off int64 1609 err error 1610 } 1611 errCh := make(chan rwErr) 1612 1613 if concurrency > f.c.maxConcurrentRequests || concurrency < 1 { 1614 concurrency = f.c.maxConcurrentRequests 1615 } 1616 1617 pool := newBufPool(concurrency, f.c.maxPacket) 1618 1619 // Slice: cut up the Read into any number of buffers of length <= f.c.maxPacket, and at appropriate offsets. 1620 go func() { 1621 defer close(workCh) 1622 1623 off := f.offset 1624 1625 for { 1626 b := pool.Get() 1627 1628 n, err := r.Read(b) 1629 if n > 0 { 1630 read += int64(n) 1631 1632 select { 1633 case workCh <- work{b, n, off}: 1634 // We need the pool.Put(b) to put the whole slice, not just trunced. 1635 case <-cancel: 1636 return 1637 } 1638 1639 off += int64(n) 1640 } 1641 1642 if err != nil { 1643 if err != io.EOF { 1644 errCh <- rwErr{off, err} 1645 } 1646 return 1647 } 1648 } 1649 }() 1650 1651 var wg sync.WaitGroup 1652 wg.Add(concurrency) 1653 for i := 0; i < concurrency; i++ { 1654 // Map_i: each worker gets work, and does the Write from each buffer to its respective offset. 1655 go func() { 1656 defer wg.Done() 1657 1658 ch := make(chan result, 1) // reusable channel per mapper. 1659 1660 for packet := range workCh { 1661 n, err := f.writeChunkAt(ch, packet.b[:packet.n], packet.off) 1662 if err != nil { 1663 // return the offset as the start + how much we wrote before the error. 1664 errCh <- rwErr{packet.off + int64(n), err} 1665 } 1666 pool.Put(packet.b) 1667 } 1668 }() 1669 } 1670 1671 // Wait for long tail, before closing results. 1672 go func() { 1673 wg.Wait() 1674 close(errCh) 1675 }() 1676 1677 // Reduce: Collect all the results into a relevant return: the earliest offset to return an error. 1678 firstErr := rwErr{math.MaxInt64, nil} 1679 for rwErr := range errCh { 1680 if rwErr.off <= firstErr.off { 1681 firstErr = rwErr 1682 } 1683 1684 select { 1685 case <-cancel: 1686 default: 1687 // stop any more work from being distributed. 1688 close(cancel) 1689 } 1690 } 1691 1692 if firstErr.err != nil { 1693 // firstErr.err != nil if and only if firstErr.off is a valid offset. 1694 // 1695 // firstErr.off will then be the lesser of: 1696 // * the offset of the first error from writing, 1697 // * the last successfully read offset. 1698 // 1699 // This could be less than the last succesfully written offset, 1700 // which is the whole reason for the UseConcurrentWrites() ClientOption. 1701 // 1702 // Callers are responsible for truncating any SFTP files to a safe length. 1703 f.offset = firstErr.off 1704 1705 // ReadFrom is defined to return the read bytes, regardless of any writer errors. 1706 return read, firstErr.err 1707 } 1708 1709 f.offset += read 1710 return read, nil 1711} 1712 1713// ReadFrom reads data from r until EOF and writes it to the file. The return 1714// value is the number of bytes read. Any error except io.EOF encountered 1715// during the read is also returned. 1716// 1717// This method is preferred over calling Write multiple times 1718// to maximise throughput for transferring the entire file, 1719// especially over high-latency links. 1720func (f *File) ReadFrom(r io.Reader) (int64, error) { 1721 f.mu.Lock() 1722 defer f.mu.Unlock() 1723 1724 if f.c.useConcurrentWrites { 1725 var remain int64 1726 switch r := r.(type) { 1727 case interface{ Len() int }: 1728 remain = int64(r.Len()) 1729 1730 case interface{ Size() int64 }: 1731 remain = r.Size() 1732 1733 case *io.LimitedReader: 1734 remain = r.N 1735 1736 case interface{ Stat() (os.FileInfo, error) }: 1737 info, err := r.Stat() 1738 if err == nil { 1739 remain = info.Size() 1740 } 1741 } 1742 1743 if remain < 0 { 1744 // We can strongly assert that we want default max concurrency here. 1745 return f.ReadFromWithConcurrency(r, f.c.maxConcurrentRequests) 1746 } 1747 1748 if remain > int64(f.c.maxPacket) { 1749 // Otherwise, only use concurrency, if it would be at least two packets. 1750 1751 // This is the best reasonable guess we can make. 1752 concurrency64 := remain/int64(f.c.maxPacket) + 1 1753 1754 // We need to cap this value to an `int` size value to avoid overflow on 32-bit machines. 1755 // So, we may as well pre-cap it to `f.c.maxConcurrentRequests`. 1756 if concurrency64 > int64(f.c.maxConcurrentRequests) { 1757 concurrency64 = int64(f.c.maxConcurrentRequests) 1758 } 1759 1760 return f.ReadFromWithConcurrency(r, int(concurrency64)) 1761 } 1762 } 1763 1764 ch := make(chan result, 1) // reusable channel 1765 1766 b := make([]byte, f.c.maxPacket) 1767 1768 var read int64 1769 for { 1770 n, err := r.Read(b) 1771 if n < 0 { 1772 panic("sftp.File: reader returned negative count from Read") 1773 } 1774 1775 if n > 0 { 1776 read += int64(n) 1777 1778 m, err2 := f.writeChunkAt(ch, b[:n], f.offset) 1779 f.offset += int64(m) 1780 1781 if err == nil { 1782 err = err2 1783 } 1784 } 1785 1786 if err != nil { 1787 if err == io.EOF { 1788 return read, nil // return nil explicitly. 1789 } 1790 1791 return read, err 1792 } 1793 } 1794} 1795 1796// Seek implements io.Seeker by setting the client offset for the next Read or 1797// Write. It returns the next offset read. Seeking before or after the end of 1798// the file is undefined. Seeking relative to the end calls Stat. 1799func (f *File) Seek(offset int64, whence int) (int64, error) { 1800 f.mu.Lock() 1801 defer f.mu.Unlock() 1802 1803 switch whence { 1804 case io.SeekStart: 1805 case io.SeekCurrent: 1806 offset += f.offset 1807 case io.SeekEnd: 1808 fi, err := f.Stat() 1809 if err != nil { 1810 return f.offset, err 1811 } 1812 offset += fi.Size() 1813 default: 1814 return f.offset, unimplementedSeekWhence(whence) 1815 } 1816 1817 if offset < 0 { 1818 return f.offset, os.ErrInvalid 1819 } 1820 1821 f.offset = offset 1822 return f.offset, nil 1823} 1824 1825// Chown changes the uid/gid of the current file. 1826func (f *File) Chown(uid, gid int) error { 1827 return f.c.Chown(f.path, uid, gid) 1828} 1829 1830// Chmod changes the permissions of the current file. 1831// 1832// See Client.Chmod for details. 1833func (f *File) Chmod(mode os.FileMode) error { 1834 return f.c.setfstat(f.handle, sshFileXferAttrPermissions, toChmodPerm(mode)) 1835} 1836 1837// Sync requests a flush of the contents of a File to stable storage. 1838// 1839// Sync requires the server to support the fsync@openssh.com extension. 1840func (f *File) Sync() error { 1841 id := f.c.nextID() 1842 typ, data, err := f.c.sendPacket(nil, &sshFxpFsyncPacket{ 1843 ID: id, 1844 Handle: f.handle, 1845 }) 1846 1847 switch { 1848 case err != nil: 1849 return err 1850 case typ == sshFxpStatus: 1851 return normaliseError(unmarshalStatus(id, data)) 1852 default: 1853 return &unexpectedPacketErr{want: sshFxpStatus, got: typ} 1854 } 1855} 1856 1857// Truncate sets the size of the current file. Although it may be safely assumed 1858// that if the size is less than its current size it will be truncated to fit, 1859// the SFTP protocol does not specify what behavior the server should do when setting 1860// size greater than the current size. 1861// We send a SSH_FXP_FSETSTAT here since we have a file handle 1862func (f *File) Truncate(size int64) error { 1863 return f.c.setfstat(f.handle, sshFileXferAttrSize, uint64(size)) 1864} 1865 1866func min(a, b int) int { 1867 if a > b { 1868 return b 1869 } 1870 return a 1871} 1872 1873// normaliseError normalises an error into a more standard form that can be 1874// checked against stdlib errors like io.EOF or os.ErrNotExist. 1875func normaliseError(err error) error { 1876 switch err := err.(type) { 1877 case *StatusError: 1878 switch err.Code { 1879 case sshFxEOF: 1880 return io.EOF 1881 case sshFxNoSuchFile: 1882 return os.ErrNotExist 1883 case sshFxPermissionDenied: 1884 return os.ErrPermission 1885 case sshFxOk: 1886 return nil 1887 default: 1888 return err 1889 } 1890 default: 1891 return err 1892 } 1893} 1894 1895func unmarshalStatus(id uint32, data []byte) error { 1896 sid, data := unmarshalUint32(data) 1897 if sid != id { 1898 return &unexpectedIDErr{id, sid} 1899 } 1900 code, data := unmarshalUint32(data) 1901 msg, data, _ := unmarshalStringSafe(data) 1902 lang, _, _ := unmarshalStringSafe(data) 1903 return &StatusError{ 1904 Code: code, 1905 msg: msg, 1906 lang: lang, 1907 } 1908} 1909 1910func marshalStatus(b []byte, err StatusError) []byte { 1911 b = marshalUint32(b, err.Code) 1912 b = marshalString(b, err.msg) 1913 b = marshalString(b, err.lang) 1914 return b 1915} 1916 1917// flags converts the flags passed to OpenFile into ssh flags. 1918// Unsupported flags are ignored. 1919func flags(f int) uint32 { 1920 var out uint32 1921 switch f & os.O_WRONLY { 1922 case os.O_WRONLY: 1923 out |= sshFxfWrite 1924 case os.O_RDONLY: 1925 out |= sshFxfRead 1926 } 1927 if f&os.O_RDWR == os.O_RDWR { 1928 out |= sshFxfRead | sshFxfWrite 1929 } 1930 if f&os.O_APPEND == os.O_APPEND { 1931 out |= sshFxfAppend 1932 } 1933 if f&os.O_CREATE == os.O_CREATE { 1934 out |= sshFxfCreat 1935 } 1936 if f&os.O_TRUNC == os.O_TRUNC { 1937 out |= sshFxfTrunc 1938 } 1939 if f&os.O_EXCL == os.O_EXCL { 1940 out |= sshFxfExcl 1941 } 1942 return out 1943} 1944 1945// toChmodPerm converts Go permission bits to POSIX permission bits. 1946// 1947// This differs from fromFileMode in that we preserve the POSIX versions of 1948// setuid, setgid and sticky in m, because we've historically supported those 1949// bits, and we mask off any non-permission bits. 1950func toChmodPerm(m os.FileMode) (perm uint32) { 1951 const mask = os.ModePerm | s_ISUID | s_ISGID | s_ISVTX 1952 perm = uint32(m & mask) 1953 1954 if m&os.ModeSetuid != 0 { 1955 perm |= s_ISUID 1956 } 1957 if m&os.ModeSetgid != 0 { 1958 perm |= s_ISGID 1959 } 1960 if m&os.ModeSticky != 0 { 1961 perm |= s_ISVTX 1962 } 1963 1964 return perm 1965} 1966