1package sftp
2
3import (
4	"bytes"
5	"encoding/binary"
6	"errors"
7	"io"
8	"math"
9	"os"
10	"path"
11	"sync"
12	"sync/atomic"
13	"syscall"
14	"time"
15
16	"github.com/kr/fs"
17	"golang.org/x/crypto/ssh"
18)
19
20var (
21	// ErrInternalInconsistency indicates the packets sent and the data queued to be
22	// written to the file don't match up. It is an unusual error and usually is
23	// caused by bad behavior server side or connection issues. The error is
24	// limited in scope to the call where it happened, the client object is still
25	// OK to use as long as the connection is still open.
26	ErrInternalInconsistency = errors.New("internal inconsistency")
27	// InternalInconsistency alias for ErrInternalInconsistency.
28	//
29	// Deprecated: please use ErrInternalInconsistency
30	InternalInconsistency = ErrInternalInconsistency
31)
32
33// A ClientOption is a function which applies configuration to a Client.
34type ClientOption func(*Client) error
35
36// MaxPacketChecked sets the maximum size of the payload, measured in bytes.
37// This option only accepts sizes servers should support, ie. <= 32768 bytes.
38//
39// If you get the error "failed to send packet header: EOF" when copying a
40// large file, try lowering this number.
41//
42// The default packet size is 32768 bytes.
43func MaxPacketChecked(size int) ClientOption {
44	return func(c *Client) error {
45		if size < 1 {
46			return errors.New("size must be greater or equal to 1")
47		}
48		if size > 32768 {
49			return errors.New("sizes larger than 32KB might not work with all servers")
50		}
51		c.maxPacket = size
52		return nil
53	}
54}
55
56// MaxPacketUnchecked sets the maximum size of the payload, measured in bytes.
57// It accepts sizes larger than the 32768 bytes all servers should support.
58// Only use a setting higher than 32768 if your application always connects to
59// the same server or after sufficiently broad testing.
60//
61// If you get the error "failed to send packet header: EOF" when copying a
62// large file, try lowering this number.
63//
64// The default packet size is 32768 bytes.
65func MaxPacketUnchecked(size int) ClientOption {
66	return func(c *Client) error {
67		if size < 1 {
68			return errors.New("size must be greater or equal to 1")
69		}
70		c.maxPacket = size
71		return nil
72	}
73}
74
75// MaxPacket sets the maximum size of the payload, measured in bytes.
76// This option only accepts sizes servers should support, ie. <= 32768 bytes.
77// This is a synonym for MaxPacketChecked that provides backward compatibility.
78//
79// If you get the error "failed to send packet header: EOF" when copying a
80// large file, try lowering this number.
81//
82// The default packet size is 32768 bytes.
83func MaxPacket(size int) ClientOption {
84	return MaxPacketChecked(size)
85}
86
87// MaxConcurrentRequestsPerFile sets the maximum concurrent requests allowed for a single file.
88//
89// The default maximum concurrent requests is 64.
90func MaxConcurrentRequestsPerFile(n int) ClientOption {
91	return func(c *Client) error {
92		if n < 1 {
93			return errors.New("n must be greater or equal to 1")
94		}
95		c.maxConcurrentRequests = n
96		return nil
97	}
98}
99
100// UseConcurrentWrites allows the Client to perform concurrent Writes.
101//
102// Using concurrency while doing writes, requires special consideration.
103// A write to a later offset in a file after an error,
104// could end up with a file length longer than what was successfully written.
105//
106// When using this option, if you receive an error during `io.Copy` or `io.WriteTo`,
107// you may need to `Truncate` the target Writer to avoid “holes” in the data written.
108func UseConcurrentWrites(value bool) ClientOption {
109	return func(c *Client) error {
110		c.useConcurrentWrites = value
111		return nil
112	}
113}
114
115// UseConcurrentReads allows the Client to perform concurrent Reads.
116//
117// Concurrent reads are generally safe to use and not using them will degrade
118// performance, so this option is enabled by default.
119//
120// When enabled, WriteTo will use Stat/Fstat to get the file size and determines
121// how many concurrent workers to use.
122// Some "read once" servers will delete the file if they receive a stat call on an
123// open file and then the download will fail.
124// Disabling concurrent reads you will be able to download files from these servers.
125// If concurrent reads are disabled, the UseFstat option is ignored.
126func UseConcurrentReads(value bool) ClientOption {
127	return func(c *Client) error {
128		c.disableConcurrentReads = !value
129		return nil
130	}
131}
132
133// UseFstat sets whether to use Fstat or Stat when File.WriteTo is called
134// (usually when copying files).
135// Some servers limit the amount of open files and calling Stat after opening
136// the file will throw an error From the server. Setting this flag will call
137// Fstat instead of Stat which is suppose to be called on an open file handle.
138//
139// It has been found that that with IBM Sterling SFTP servers which have
140// "extractability" level set to 1 which means only 1 file can be opened at
141// any given time.
142//
143// If the server you are working with still has an issue with both Stat and
144// Fstat calls you can always open a file and read it until the end.
145//
146// Another reason to read the file until its end and Fstat doesn't work is
147// that in some servers, reading a full file will automatically delete the
148// file as some of these mainframes map the file to a message in a queue.
149// Once the file has been read it will get deleted.
150func UseFstat(value bool) ClientOption {
151	return func(c *Client) error {
152		c.useFstat = value
153		return nil
154	}
155}
156
157// Client represents an SFTP session on a *ssh.ClientConn SSH connection.
158// Multiple Clients can be active on a single SSH connection, and a Client
159// may be called concurrently from multiple Goroutines.
160//
161// Client implements the github.com/kr/fs.FileSystem interface.
162type Client struct {
163	clientConn
164
165	ext map[string]string // Extensions (name -> data).
166
167	maxPacket             int // max packet size read or written.
168	maxConcurrentRequests int
169	nextid                uint32
170
171	// write concurrency is… error prone.
172	// Default behavior should be to not use it.
173	useConcurrentWrites    bool
174	useFstat               bool
175	disableConcurrentReads bool
176}
177
178// NewClient creates a new SFTP client on conn, using zero or more option
179// functions.
180func NewClient(conn *ssh.Client, opts ...ClientOption) (*Client, error) {
181	s, err := conn.NewSession()
182	if err != nil {
183		return nil, err
184	}
185	if err := s.RequestSubsystem("sftp"); err != nil {
186		return nil, err
187	}
188	pw, err := s.StdinPipe()
189	if err != nil {
190		return nil, err
191	}
192	pr, err := s.StdoutPipe()
193	if err != nil {
194		return nil, err
195	}
196
197	return NewClientPipe(pr, pw, opts...)
198}
199
200// NewClientPipe creates a new SFTP client given a Reader and a WriteCloser.
201// This can be used for connecting to an SFTP server over TCP/TLS or by using
202// the system's ssh client program (e.g. via exec.Command).
203func NewClientPipe(rd io.Reader, wr io.WriteCloser, opts ...ClientOption) (*Client, error) {
204	sftp := &Client{
205		clientConn: clientConn{
206			conn: conn{
207				Reader:      rd,
208				WriteCloser: wr,
209			},
210			inflight: make(map[uint32]chan<- result),
211			closed:   make(chan struct{}),
212		},
213
214		ext: make(map[string]string),
215
216		maxPacket:             1 << 15,
217		maxConcurrentRequests: 64,
218	}
219
220	for _, opt := range opts {
221		if err := opt(sftp); err != nil {
222			wr.Close()
223			return nil, err
224		}
225	}
226
227	if err := sftp.sendInit(); err != nil {
228		wr.Close()
229		return nil, err
230	}
231	if err := sftp.recvVersion(); err != nil {
232		wr.Close()
233		return nil, err
234	}
235
236	sftp.clientConn.wg.Add(1)
237	go sftp.loop()
238
239	return sftp, nil
240}
241
242// Create creates the named file mode 0666 (before umask), truncating it if it
243// already exists. If successful, methods on the returned File can be used for
244// I/O; the associated file descriptor has mode O_RDWR. If you need more
245// control over the flags/mode used to open the file see client.OpenFile.
246//
247// Note that some SFTP servers (eg. AWS Transfer) do not support opening files
248// read/write at the same time. For those services you will need to use
249// `client.OpenFile(os.O_WRONLY|os.O_CREATE|os.O_TRUNC)`.
250func (c *Client) Create(path string) (*File, error) {
251	return c.open(path, flags(os.O_RDWR|os.O_CREATE|os.O_TRUNC))
252}
253
254const sftpProtocolVersion = 3 // http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02
255
256func (c *Client) sendInit() error {
257	return c.clientConn.conn.sendPacket(&sshFxInitPacket{
258		Version: sftpProtocolVersion, // http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02
259	})
260}
261
262// returns the next value of c.nextid
263func (c *Client) nextID() uint32 {
264	return atomic.AddUint32(&c.nextid, 1)
265}
266
267func (c *Client) recvVersion() error {
268	typ, data, err := c.recvPacket(0)
269	if err != nil {
270		return err
271	}
272	if typ != sshFxpVersion {
273		return &unexpectedPacketErr{sshFxpVersion, typ}
274	}
275
276	version, data, err := unmarshalUint32Safe(data)
277	if err != nil {
278		return err
279	}
280	if version != sftpProtocolVersion {
281		return &unexpectedVersionErr{sftpProtocolVersion, version}
282	}
283
284	for len(data) > 0 {
285		var ext extensionPair
286		ext, data, err = unmarshalExtensionPair(data)
287		if err != nil {
288			return err
289		}
290		c.ext[ext.Name] = ext.Data
291	}
292
293	return nil
294}
295
296// HasExtension checks whether the server supports a named extension.
297//
298// The first return value is the extension data reported by the server
299// (typically a version number).
300func (c *Client) HasExtension(name string) (string, bool) {
301	data, ok := c.ext[name]
302	return data, ok
303}
304
305// Walk returns a new Walker rooted at root.
306func (c *Client) Walk(root string) *fs.Walker {
307	return fs.WalkFS(root, c)
308}
309
310// ReadDir reads the directory named by dirname and returns a list of
311// directory entries.
312func (c *Client) ReadDir(p string) ([]os.FileInfo, error) {
313	handle, err := c.opendir(p)
314	if err != nil {
315		return nil, err
316	}
317	defer c.close(handle) // this has to defer earlier than the lock below
318	var attrs []os.FileInfo
319	var done = false
320	for !done {
321		id := c.nextID()
322		typ, data, err1 := c.sendPacket(nil, &sshFxpReaddirPacket{
323			ID:     id,
324			Handle: handle,
325		})
326		if err1 != nil {
327			err = err1
328			done = true
329			break
330		}
331		switch typ {
332		case sshFxpName:
333			sid, data := unmarshalUint32(data)
334			if sid != id {
335				return nil, &unexpectedIDErr{id, sid}
336			}
337			count, data := unmarshalUint32(data)
338			for i := uint32(0); i < count; i++ {
339				var filename string
340				filename, data = unmarshalString(data)
341				_, data = unmarshalString(data) // discard longname
342				var attr *FileStat
343				attr, data = unmarshalAttrs(data)
344				if filename == "." || filename == ".." {
345					continue
346				}
347				attrs = append(attrs, fileInfoFromStat(attr, path.Base(filename)))
348			}
349		case sshFxpStatus:
350			// TODO(dfc) scope warning!
351			err = normaliseError(unmarshalStatus(id, data))
352			done = true
353		default:
354			return nil, unimplementedPacketErr(typ)
355		}
356	}
357	if err == io.EOF {
358		err = nil
359	}
360	return attrs, err
361}
362
363func (c *Client) opendir(path string) (string, error) {
364	id := c.nextID()
365	typ, data, err := c.sendPacket(nil, &sshFxpOpendirPacket{
366		ID:   id,
367		Path: path,
368	})
369	if err != nil {
370		return "", err
371	}
372	switch typ {
373	case sshFxpHandle:
374		sid, data := unmarshalUint32(data)
375		if sid != id {
376			return "", &unexpectedIDErr{id, sid}
377		}
378		handle, _ := unmarshalString(data)
379		return handle, nil
380	case sshFxpStatus:
381		return "", normaliseError(unmarshalStatus(id, data))
382	default:
383		return "", unimplementedPacketErr(typ)
384	}
385}
386
387// Stat returns a FileInfo structure describing the file specified by path 'p'.
388// If 'p' is a symbolic link, the returned FileInfo structure describes the referent file.
389func (c *Client) Stat(p string) (os.FileInfo, error) {
390	fs, err := c.stat(p)
391	if err != nil {
392		return nil, err
393	}
394	return fileInfoFromStat(fs, path.Base(p)), nil
395}
396
397// Lstat returns a FileInfo structure describing the file specified by path 'p'.
398// If 'p' is a symbolic link, the returned FileInfo structure describes the symbolic link.
399func (c *Client) Lstat(p string) (os.FileInfo, error) {
400	id := c.nextID()
401	typ, data, err := c.sendPacket(nil, &sshFxpLstatPacket{
402		ID:   id,
403		Path: p,
404	})
405	if err != nil {
406		return nil, err
407	}
408	switch typ {
409	case sshFxpAttrs:
410		sid, data := unmarshalUint32(data)
411		if sid != id {
412			return nil, &unexpectedIDErr{id, sid}
413		}
414		attr, _ := unmarshalAttrs(data)
415		return fileInfoFromStat(attr, path.Base(p)), nil
416	case sshFxpStatus:
417		return nil, normaliseError(unmarshalStatus(id, data))
418	default:
419		return nil, unimplementedPacketErr(typ)
420	}
421}
422
423// ReadLink reads the target of a symbolic link.
424func (c *Client) ReadLink(p string) (string, error) {
425	id := c.nextID()
426	typ, data, err := c.sendPacket(nil, &sshFxpReadlinkPacket{
427		ID:   id,
428		Path: p,
429	})
430	if err != nil {
431		return "", err
432	}
433	switch typ {
434	case sshFxpName:
435		sid, data := unmarshalUint32(data)
436		if sid != id {
437			return "", &unexpectedIDErr{id, sid}
438		}
439		count, data := unmarshalUint32(data)
440		if count != 1 {
441			return "", unexpectedCount(1, count)
442		}
443		filename, _ := unmarshalString(data) // ignore dummy attributes
444		return filename, nil
445	case sshFxpStatus:
446		return "", normaliseError(unmarshalStatus(id, data))
447	default:
448		return "", unimplementedPacketErr(typ)
449	}
450}
451
452// Link creates a hard link at 'newname', pointing at the same inode as 'oldname'
453func (c *Client) Link(oldname, newname string) error {
454	id := c.nextID()
455	typ, data, err := c.sendPacket(nil, &sshFxpHardlinkPacket{
456		ID:      id,
457		Oldpath: oldname,
458		Newpath: newname,
459	})
460	if err != nil {
461		return err
462	}
463	switch typ {
464	case sshFxpStatus:
465		return normaliseError(unmarshalStatus(id, data))
466	default:
467		return unimplementedPacketErr(typ)
468	}
469}
470
471// Symlink creates a symbolic link at 'newname', pointing at target 'oldname'
472func (c *Client) Symlink(oldname, newname string) error {
473	id := c.nextID()
474	typ, data, err := c.sendPacket(nil, &sshFxpSymlinkPacket{
475		ID:         id,
476		Linkpath:   newname,
477		Targetpath: oldname,
478	})
479	if err != nil {
480		return err
481	}
482	switch typ {
483	case sshFxpStatus:
484		return normaliseError(unmarshalStatus(id, data))
485	default:
486		return unimplementedPacketErr(typ)
487	}
488}
489
490func (c *Client) setfstat(handle string, flags uint32, attrs interface{}) error {
491	id := c.nextID()
492	typ, data, err := c.sendPacket(nil, &sshFxpFsetstatPacket{
493		ID:     id,
494		Handle: handle,
495		Flags:  flags,
496		Attrs:  attrs,
497	})
498	if err != nil {
499		return err
500	}
501	switch typ {
502	case sshFxpStatus:
503		return normaliseError(unmarshalStatus(id, data))
504	default:
505		return unimplementedPacketErr(typ)
506	}
507}
508
509// setstat is a convience wrapper to allow for changing of various parts of the file descriptor.
510func (c *Client) setstat(path string, flags uint32, attrs interface{}) error {
511	id := c.nextID()
512	typ, data, err := c.sendPacket(nil, &sshFxpSetstatPacket{
513		ID:    id,
514		Path:  path,
515		Flags: flags,
516		Attrs: attrs,
517	})
518	if err != nil {
519		return err
520	}
521	switch typ {
522	case sshFxpStatus:
523		return normaliseError(unmarshalStatus(id, data))
524	default:
525		return unimplementedPacketErr(typ)
526	}
527}
528
529// Chtimes changes the access and modification times of the named file.
530func (c *Client) Chtimes(path string, atime time.Time, mtime time.Time) error {
531	type times struct {
532		Atime uint32
533		Mtime uint32
534	}
535	attrs := times{uint32(atime.Unix()), uint32(mtime.Unix())}
536	return c.setstat(path, sshFileXferAttrACmodTime, attrs)
537}
538
539// Chown changes the user and group owners of the named file.
540func (c *Client) Chown(path string, uid, gid int) error {
541	type owner struct {
542		UID uint32
543		GID uint32
544	}
545	attrs := owner{uint32(uid), uint32(gid)}
546	return c.setstat(path, sshFileXferAttrUIDGID, attrs)
547}
548
549// Chmod changes the permissions of the named file.
550//
551// Chmod does not apply a umask, because even retrieving the umask is not
552// possible in a portable way without causing a race condition. Callers
553// should mask off umask bits, if desired.
554func (c *Client) Chmod(path string, mode os.FileMode) error {
555	return c.setstat(path, sshFileXferAttrPermissions, toChmodPerm(mode))
556}
557
558// Truncate sets the size of the named file. Although it may be safely assumed
559// that if the size is less than its current size it will be truncated to fit,
560// the SFTP protocol does not specify what behavior the server should do when setting
561// size greater than the current size.
562func (c *Client) Truncate(path string, size int64) error {
563	return c.setstat(path, sshFileXferAttrSize, uint64(size))
564}
565
566// Open opens the named file for reading. If successful, methods on the
567// returned file can be used for reading; the associated file descriptor
568// has mode O_RDONLY.
569func (c *Client) Open(path string) (*File, error) {
570	return c.open(path, flags(os.O_RDONLY))
571}
572
573// OpenFile is the generalized open call; most users will use Open or
574// Create instead. It opens the named file with specified flag (O_RDONLY
575// etc.). If successful, methods on the returned File can be used for I/O.
576func (c *Client) OpenFile(path string, f int) (*File, error) {
577	return c.open(path, flags(f))
578}
579
580func (c *Client) open(path string, pflags uint32) (*File, error) {
581	id := c.nextID()
582	typ, data, err := c.sendPacket(nil, &sshFxpOpenPacket{
583		ID:     id,
584		Path:   path,
585		Pflags: pflags,
586	})
587	if err != nil {
588		return nil, err
589	}
590	switch typ {
591	case sshFxpHandle:
592		sid, data := unmarshalUint32(data)
593		if sid != id {
594			return nil, &unexpectedIDErr{id, sid}
595		}
596		handle, _ := unmarshalString(data)
597		return &File{c: c, path: path, handle: handle}, nil
598	case sshFxpStatus:
599		return nil, normaliseError(unmarshalStatus(id, data))
600	default:
601		return nil, unimplementedPacketErr(typ)
602	}
603}
604
605// close closes a handle handle previously returned in the response
606// to SSH_FXP_OPEN or SSH_FXP_OPENDIR. The handle becomes invalid
607// immediately after this request has been sent.
608func (c *Client) close(handle string) error {
609	id := c.nextID()
610	typ, data, err := c.sendPacket(nil, &sshFxpClosePacket{
611		ID:     id,
612		Handle: handle,
613	})
614	if err != nil {
615		return err
616	}
617	switch typ {
618	case sshFxpStatus:
619		return normaliseError(unmarshalStatus(id, data))
620	default:
621		return unimplementedPacketErr(typ)
622	}
623}
624
625func (c *Client) stat(path string) (*FileStat, error) {
626	id := c.nextID()
627	typ, data, err := c.sendPacket(nil, &sshFxpStatPacket{
628		ID:   id,
629		Path: path,
630	})
631	if err != nil {
632		return nil, err
633	}
634	switch typ {
635	case sshFxpAttrs:
636		sid, data := unmarshalUint32(data)
637		if sid != id {
638			return nil, &unexpectedIDErr{id, sid}
639		}
640		attr, _ := unmarshalAttrs(data)
641		return attr, nil
642	case sshFxpStatus:
643		return nil, normaliseError(unmarshalStatus(id, data))
644	default:
645		return nil, unimplementedPacketErr(typ)
646	}
647}
648
649func (c *Client) fstat(handle string) (*FileStat, error) {
650	id := c.nextID()
651	typ, data, err := c.sendPacket(nil, &sshFxpFstatPacket{
652		ID:     id,
653		Handle: handle,
654	})
655	if err != nil {
656		return nil, err
657	}
658	switch typ {
659	case sshFxpAttrs:
660		sid, data := unmarshalUint32(data)
661		if sid != id {
662			return nil, &unexpectedIDErr{id, sid}
663		}
664		attr, _ := unmarshalAttrs(data)
665		return attr, nil
666	case sshFxpStatus:
667		return nil, normaliseError(unmarshalStatus(id, data))
668	default:
669		return nil, unimplementedPacketErr(typ)
670	}
671}
672
673// StatVFS retrieves VFS statistics from a remote host.
674//
675// It implements the statvfs@openssh.com SSH_FXP_EXTENDED feature
676// from http://www.opensource.apple.com/source/OpenSSH/OpenSSH-175/openssh/PROTOCOL?txt.
677func (c *Client) StatVFS(path string) (*StatVFS, error) {
678	// send the StatVFS packet to the server
679	id := c.nextID()
680	typ, data, err := c.sendPacket(nil, &sshFxpStatvfsPacket{
681		ID:   id,
682		Path: path,
683	})
684	if err != nil {
685		return nil, err
686	}
687
688	switch typ {
689	// server responded with valid data
690	case sshFxpExtendedReply:
691		var response StatVFS
692		err = binary.Read(bytes.NewReader(data), binary.BigEndian, &response)
693		if err != nil {
694			return nil, errors.New("can not parse reply")
695		}
696
697		return &response, nil
698
699	// the resquest failed
700	case sshFxpStatus:
701		return nil, normaliseError(unmarshalStatus(id, data))
702
703	default:
704		return nil, unimplementedPacketErr(typ)
705	}
706}
707
708// Join joins any number of path elements into a single path, adding a
709// separating slash if necessary. The result is Cleaned; in particular, all
710// empty strings are ignored.
711func (c *Client) Join(elem ...string) string { return path.Join(elem...) }
712
713// Remove removes the specified file or directory. An error will be returned if no
714// file or directory with the specified path exists, or if the specified directory
715// is not empty.
716func (c *Client) Remove(path string) error {
717	err := c.removeFile(path)
718	// some servers, *cough* osx *cough*, return EPERM, not ENODIR.
719	// serv-u returns ssh_FX_FILE_IS_A_DIRECTORY
720	// EPERM is converted to os.ErrPermission so it is not a StatusError
721	if err, ok := err.(*StatusError); ok {
722		switch err.Code {
723		case sshFxFailure, sshFxFileIsADirectory:
724			return c.RemoveDirectory(path)
725		}
726	}
727	if os.IsPermission(err) {
728		return c.RemoveDirectory(path)
729	}
730	return err
731}
732
733func (c *Client) removeFile(path string) error {
734	id := c.nextID()
735	typ, data, err := c.sendPacket(nil, &sshFxpRemovePacket{
736		ID:       id,
737		Filename: path,
738	})
739	if err != nil {
740		return err
741	}
742	switch typ {
743	case sshFxpStatus:
744		return normaliseError(unmarshalStatus(id, data))
745	default:
746		return unimplementedPacketErr(typ)
747	}
748}
749
750// RemoveDirectory removes a directory path.
751func (c *Client) RemoveDirectory(path string) error {
752	id := c.nextID()
753	typ, data, err := c.sendPacket(nil, &sshFxpRmdirPacket{
754		ID:   id,
755		Path: path,
756	})
757	if err != nil {
758		return err
759	}
760	switch typ {
761	case sshFxpStatus:
762		return normaliseError(unmarshalStatus(id, data))
763	default:
764		return unimplementedPacketErr(typ)
765	}
766}
767
768// Rename renames a file.
769func (c *Client) Rename(oldname, newname string) error {
770	id := c.nextID()
771	typ, data, err := c.sendPacket(nil, &sshFxpRenamePacket{
772		ID:      id,
773		Oldpath: oldname,
774		Newpath: newname,
775	})
776	if err != nil {
777		return err
778	}
779	switch typ {
780	case sshFxpStatus:
781		return normaliseError(unmarshalStatus(id, data))
782	default:
783		return unimplementedPacketErr(typ)
784	}
785}
786
787// PosixRename renames a file using the posix-rename@openssh.com extension
788// which will replace newname if it already exists.
789func (c *Client) PosixRename(oldname, newname string) error {
790	id := c.nextID()
791	typ, data, err := c.sendPacket(nil, &sshFxpPosixRenamePacket{
792		ID:      id,
793		Oldpath: oldname,
794		Newpath: newname,
795	})
796	if err != nil {
797		return err
798	}
799	switch typ {
800	case sshFxpStatus:
801		return normaliseError(unmarshalStatus(id, data))
802	default:
803		return unimplementedPacketErr(typ)
804	}
805}
806
807// RealPath can be used to have the server canonicalize any given path name to an absolute path.
808//
809// This is useful for converting path names containing ".." components,
810// or relative pathnames without a leading slash into absolute paths.
811func (c *Client) RealPath(path string) (string, error) {
812	id := c.nextID()
813	typ, data, err := c.sendPacket(nil, &sshFxpRealpathPacket{
814		ID:   id,
815		Path: path,
816	})
817	if err != nil {
818		return "", err
819	}
820	switch typ {
821	case sshFxpName:
822		sid, data := unmarshalUint32(data)
823		if sid != id {
824			return "", &unexpectedIDErr{id, sid}
825		}
826		count, data := unmarshalUint32(data)
827		if count != 1 {
828			return "", unexpectedCount(1, count)
829		}
830		filename, _ := unmarshalString(data) // ignore attributes
831		return filename, nil
832	case sshFxpStatus:
833		return "", normaliseError(unmarshalStatus(id, data))
834	default:
835		return "", unimplementedPacketErr(typ)
836	}
837}
838
839// Getwd returns the current working directory of the server. Operations
840// involving relative paths will be based at this location.
841func (c *Client) Getwd() (string, error) {
842	return c.RealPath(".")
843}
844
845// Mkdir creates the specified directory. An error will be returned if a file or
846// directory with the specified path already exists, or if the directory's
847// parent folder does not exist (the method cannot create complete paths).
848func (c *Client) Mkdir(path string) error {
849	id := c.nextID()
850	typ, data, err := c.sendPacket(nil, &sshFxpMkdirPacket{
851		ID:   id,
852		Path: path,
853	})
854	if err != nil {
855		return err
856	}
857	switch typ {
858	case sshFxpStatus:
859		return normaliseError(unmarshalStatus(id, data))
860	default:
861		return unimplementedPacketErr(typ)
862	}
863}
864
865// MkdirAll creates a directory named path, along with any necessary parents,
866// and returns nil, or else returns an error.
867// If path is already a directory, MkdirAll does nothing and returns nil.
868// If path contains a regular file, an error is returned
869func (c *Client) MkdirAll(path string) error {
870	// Most of this code mimics https://golang.org/src/os/path.go?s=514:561#L13
871	// Fast path: if we can tell whether path is a directory or file, stop with success or error.
872	dir, err := c.Stat(path)
873	if err == nil {
874		if dir.IsDir() {
875			return nil
876		}
877		return &os.PathError{Op: "mkdir", Path: path, Err: syscall.ENOTDIR}
878	}
879
880	// Slow path: make sure parent exists and then call Mkdir for path.
881	i := len(path)
882	for i > 0 && path[i-1] == '/' { // Skip trailing path separator.
883		i--
884	}
885
886	j := i
887	for j > 0 && path[j-1] != '/' { // Scan backward over element.
888		j--
889	}
890
891	if j > 1 {
892		// Create parent
893		err = c.MkdirAll(path[0 : j-1])
894		if err != nil {
895			return err
896		}
897	}
898
899	// Parent now exists; invoke Mkdir and use its result.
900	err = c.Mkdir(path)
901	if err != nil {
902		// Handle arguments like "foo/." by
903		// double-checking that directory doesn't exist.
904		dir, err1 := c.Lstat(path)
905		if err1 == nil && dir.IsDir() {
906			return nil
907		}
908		return err
909	}
910	return nil
911}
912
913// File represents a remote file.
914type File struct {
915	c      *Client
916	path   string
917	handle string
918
919	mu     sync.Mutex
920	offset int64 // current offset within remote file
921}
922
923// Close closes the File, rendering it unusable for I/O. It returns an
924// error, if any.
925func (f *File) Close() error {
926	return f.c.close(f.handle)
927}
928
929// Name returns the name of the file as presented to Open or Create.
930func (f *File) Name() string {
931	return f.path
932}
933
934// Read reads up to len(b) bytes from the File. It returns the number of bytes
935// read and an error, if any. Read follows io.Reader semantics, so when Read
936// encounters an error or EOF condition after successfully reading n > 0 bytes,
937// it returns the number of bytes read.
938//
939// To maximise throughput for transferring the entire file (especially
940// over high latency links) it is recommended to use WriteTo rather
941// than calling Read multiple times. io.Copy will do this
942// automatically.
943func (f *File) Read(b []byte) (int, error) {
944	f.mu.Lock()
945	defer f.mu.Unlock()
946
947	n, err := f.ReadAt(b, f.offset)
948	f.offset += int64(n)
949	return n, err
950}
951
952// readChunkAt attempts to read the whole entire length of the buffer from the file starting at the offset.
953// It will continue progressively reading into the buffer until it fills the whole buffer, or an error occurs.
954func (f *File) readChunkAt(ch chan result, b []byte, off int64) (n int, err error) {
955	for err == nil && n < len(b) {
956		id := f.c.nextID()
957		typ, data, err := f.c.sendPacket(ch, &sshFxpReadPacket{
958			ID:     id,
959			Handle: f.handle,
960			Offset: uint64(off) + uint64(n),
961			Len:    uint32(len(b) - n),
962		})
963		if err != nil {
964			return n, err
965		}
966
967		switch typ {
968		case sshFxpStatus:
969			return n, normaliseError(unmarshalStatus(id, data))
970
971		case sshFxpData:
972			sid, data := unmarshalUint32(data)
973			if id != sid {
974				return n, &unexpectedIDErr{id, sid}
975			}
976
977			l, data := unmarshalUint32(data)
978			n += copy(b[n:], data[:l])
979
980		default:
981			return n, unimplementedPacketErr(typ)
982		}
983	}
984
985	return
986}
987
988func (f *File) readAtSequential(b []byte, off int64) (read int, err error) {
989	for read < len(b) {
990		rb := b[read:]
991		if len(rb) > f.c.maxPacket {
992			rb = rb[:f.c.maxPacket]
993		}
994		n, err := f.readChunkAt(nil, rb, off+int64(read))
995		if n < 0 {
996			panic("sftp.File: returned negative count from readChunkAt")
997		}
998		if n > 0 {
999			read += n
1000		}
1001		if err != nil {
1002			if errors.Is(err, io.EOF) {
1003				return read, nil // return nil explicitly.
1004			}
1005			return read, err
1006		}
1007	}
1008	return read, nil
1009}
1010
1011// ReadAt reads up to len(b) byte from the File at a given offset `off`. It returns
1012// the number of bytes read and an error, if any. ReadAt follows io.ReaderAt semantics,
1013// so the file offset is not altered during the read.
1014func (f *File) ReadAt(b []byte, off int64) (int, error) {
1015	if len(b) <= f.c.maxPacket {
1016		// This should be able to be serviced with 1/2 requests.
1017		// So, just do it directly.
1018		return f.readChunkAt(nil, b, off)
1019	}
1020
1021	if f.c.disableConcurrentReads {
1022		return f.readAtSequential(b, off)
1023	}
1024
1025	// Split the read into multiple maxPacket-sized concurrent reads bounded by maxConcurrentRequests.
1026	// This allows writes with a suitably large buffer to transfer data at a much faster rate
1027	// by overlapping round trip times.
1028
1029	cancel := make(chan struct{})
1030
1031	concurrency := len(b)/f.c.maxPacket + 1
1032	if concurrency > f.c.maxConcurrentRequests || concurrency < 1 {
1033		concurrency = f.c.maxConcurrentRequests
1034	}
1035
1036	resPool := newResChanPool(concurrency)
1037
1038	type work struct {
1039		id  uint32
1040		res chan result
1041
1042		b   []byte
1043		off int64
1044	}
1045	workCh := make(chan work)
1046
1047	// Slice: cut up the Read into any number of buffers of length <= f.c.maxPacket, and at appropriate offsets.
1048	go func() {
1049		defer close(workCh)
1050
1051		b := b
1052		offset := off
1053		chunkSize := f.c.maxPacket
1054
1055		for len(b) > 0 {
1056			rb := b
1057			if len(rb) > chunkSize {
1058				rb = rb[:chunkSize]
1059			}
1060
1061			id := f.c.nextID()
1062			res := resPool.Get()
1063
1064			f.c.dispatchRequest(res, &sshFxpReadPacket{
1065				ID:     id,
1066				Handle: f.handle,
1067				Offset: uint64(offset),
1068				Len:    uint32(chunkSize),
1069			})
1070
1071			select {
1072			case workCh <- work{id, res, rb, offset}:
1073			case <-cancel:
1074				return
1075			}
1076
1077			offset += int64(len(rb))
1078			b = b[len(rb):]
1079		}
1080	}()
1081
1082	type rErr struct {
1083		off int64
1084		err error
1085	}
1086	errCh := make(chan rErr)
1087
1088	var wg sync.WaitGroup
1089	wg.Add(concurrency)
1090	for i := 0; i < concurrency; i++ {
1091		// Map_i: each worker gets work, and then performs the Read into its buffer from its respective offset.
1092		go func() {
1093			defer wg.Done()
1094
1095			for packet := range workCh {
1096				var n int
1097
1098				s := <-packet.res
1099				resPool.Put(packet.res)
1100
1101				err := s.err
1102				if err == nil {
1103					switch s.typ {
1104					case sshFxpStatus:
1105						err = normaliseError(unmarshalStatus(packet.id, s.data))
1106
1107					case sshFxpData:
1108						sid, data := unmarshalUint32(s.data)
1109						if packet.id != sid {
1110							err = &unexpectedIDErr{packet.id, sid}
1111
1112						} else {
1113							l, data := unmarshalUint32(data)
1114							n = copy(packet.b, data[:l])
1115
1116							// For normal disk files, it is guaranteed that this will read
1117							// the specified number of bytes, or up to end of file.
1118							// This implies, if we have a short read, that means EOF.
1119							if n < len(packet.b) {
1120								err = io.EOF
1121							}
1122						}
1123
1124					default:
1125						err = unimplementedPacketErr(s.typ)
1126					}
1127				}
1128
1129				if err != nil {
1130					// return the offset as the start + how much we read before the error.
1131					errCh <- rErr{packet.off + int64(n), err}
1132					return
1133				}
1134			}
1135		}()
1136	}
1137
1138	// Wait for long tail, before closing results.
1139	go func() {
1140		wg.Wait()
1141		close(errCh)
1142	}()
1143
1144	// Reduce: collect all the results into a relevant return: the earliest offset to return an error.
1145	firstErr := rErr{math.MaxInt64, nil}
1146	for rErr := range errCh {
1147		if rErr.off <= firstErr.off {
1148			firstErr = rErr
1149		}
1150
1151		select {
1152		case <-cancel:
1153		default:
1154			// stop any more work from being distributed. (Just in case.)
1155			close(cancel)
1156		}
1157	}
1158
1159	if firstErr.err != nil {
1160		// firstErr.err != nil if and only if firstErr.off > our starting offset.
1161		return int(firstErr.off - off), firstErr.err
1162	}
1163
1164	// As per spec for io.ReaderAt, we return nil error if and only if we read everything.
1165	return len(b), nil
1166}
1167
1168// writeToSequential implements WriteTo, but works sequentially with no parallelism.
1169func (f *File) writeToSequential(w io.Writer) (written int64, err error) {
1170	b := make([]byte, f.c.maxPacket)
1171	ch := make(chan result, 1) // reusable channel
1172
1173	for {
1174		n, err := f.readChunkAt(ch, b, f.offset)
1175		if n < 0 {
1176			panic("sftp.File: returned negative count from readChunkAt")
1177		}
1178
1179		if n > 0 {
1180			f.offset += int64(n)
1181
1182			m, err2 := w.Write(b[:n])
1183			written += int64(m)
1184
1185			if err == nil {
1186				err = err2
1187			}
1188		}
1189
1190		if err != nil {
1191			if err == io.EOF {
1192				return written, nil // return nil explicitly.
1193			}
1194
1195			return written, err
1196		}
1197	}
1198}
1199
1200// WriteTo writes the file to the given Writer.
1201// The return value is the number of bytes written.
1202// Any error encountered during the write is also returned.
1203//
1204// This method is preferred over calling Read multiple times
1205// to maximise throughput for transferring the entire file,
1206// especially over high latency links.
1207func (f *File) WriteTo(w io.Writer) (written int64, err error) {
1208	f.mu.Lock()
1209	defer f.mu.Unlock()
1210
1211	if f.c.disableConcurrentReads {
1212		return f.writeToSequential(w)
1213	}
1214
1215	// For concurrency, we want to guess how many concurrent workers we should use.
1216	var fileStat *FileStat
1217	if f.c.useFstat {
1218		fileStat, err = f.c.fstat(f.handle)
1219	} else {
1220		fileStat, err = f.c.stat(f.path)
1221	}
1222	if err != nil {
1223		return 0, err
1224	}
1225
1226	fileSize := fileStat.Size
1227	if fileSize <= uint64(f.c.maxPacket) || !isRegular(fileStat.Mode) {
1228		// only regular files are guaranteed to return (full read) xor (partial read, next error)
1229		return f.writeToSequential(w)
1230	}
1231
1232	concurrency64 := fileSize/uint64(f.c.maxPacket) + 1 // a bad guess, but better than no guess
1233	if concurrency64 > uint64(f.c.maxConcurrentRequests) || concurrency64 < 1 {
1234		concurrency64 = uint64(f.c.maxConcurrentRequests)
1235	}
1236	// Now that concurrency64 is saturated to an int value, we know this assignment cannot possibly overflow.
1237	concurrency := int(concurrency64)
1238
1239	chunkSize := f.c.maxPacket
1240	pool := newBufPool(concurrency, chunkSize)
1241	resPool := newResChanPool(concurrency)
1242
1243	cancel := make(chan struct{})
1244	var wg sync.WaitGroup
1245	defer func() {
1246		// Once the writing Reduce phase has ended, all the feed work needs to unconditionally stop.
1247		close(cancel)
1248
1249		// We want to wait until all outstanding goroutines with an `f` or `f.c` reference have completed.
1250		// Just to be sure we don’t orphan any goroutines any hanging references.
1251		wg.Wait()
1252	}()
1253
1254	type writeWork struct {
1255		b   []byte
1256		off int64
1257		err error
1258
1259		next chan writeWork
1260	}
1261	writeCh := make(chan writeWork)
1262
1263	type readWork struct {
1264		id  uint32
1265		res chan result
1266		off int64
1267
1268		cur, next chan writeWork
1269	}
1270	readCh := make(chan readWork)
1271
1272	// Slice: hand out chunks of work on demand, with a `cur` and `next` channel built-in for sequencing.
1273	go func() {
1274		defer close(readCh)
1275
1276		off := f.offset
1277
1278		cur := writeCh
1279		for {
1280			id := f.c.nextID()
1281			res := resPool.Get()
1282
1283			next := make(chan writeWork)
1284			readWork := readWork{
1285				id:  id,
1286				res: res,
1287				off: off,
1288
1289				cur:  cur,
1290				next: next,
1291			}
1292
1293			f.c.dispatchRequest(res, &sshFxpReadPacket{
1294				ID:     id,
1295				Handle: f.handle,
1296				Offset: uint64(off),
1297				Len:    uint32(chunkSize),
1298			})
1299
1300			select {
1301			case readCh <- readWork:
1302			case <-cancel:
1303				return
1304			}
1305
1306			off += int64(chunkSize)
1307			cur = next
1308		}
1309	}()
1310
1311	wg.Add(concurrency)
1312	for i := 0; i < concurrency; i++ {
1313		// Map_i: each worker gets readWork, and does the Read into a buffer at the given offset.
1314		go func() {
1315			defer wg.Done()
1316
1317			for readWork := range readCh {
1318				var b []byte
1319				var n int
1320
1321				s := <-readWork.res
1322				resPool.Put(readWork.res)
1323
1324				err := s.err
1325				if err == nil {
1326					switch s.typ {
1327					case sshFxpStatus:
1328						err = normaliseError(unmarshalStatus(readWork.id, s.data))
1329
1330					case sshFxpData:
1331						sid, data := unmarshalUint32(s.data)
1332						if readWork.id != sid {
1333							err = &unexpectedIDErr{readWork.id, sid}
1334
1335						} else {
1336							l, data := unmarshalUint32(data)
1337							b = pool.Get()[:l]
1338							n = copy(b, data[:l])
1339							b = b[:n]
1340						}
1341
1342					default:
1343						err = unimplementedPacketErr(s.typ)
1344					}
1345				}
1346
1347				writeWork := writeWork{
1348					b:   b,
1349					off: readWork.off,
1350					err: err,
1351
1352					next: readWork.next,
1353				}
1354
1355				select {
1356				case readWork.cur <- writeWork:
1357				case <-cancel:
1358					return
1359				}
1360
1361				if err != nil {
1362					return
1363				}
1364			}
1365		}()
1366	}
1367
1368	// Reduce: serialize the results from the reads into sequential writes.
1369	cur := writeCh
1370	for {
1371		packet, ok := <-cur
1372		if !ok {
1373			return written, errors.New("sftp.File.WriteTo: unexpectedly closed channel")
1374		}
1375
1376		// Because writes are serialized, this will always be the last successfully read byte.
1377		f.offset = packet.off + int64(len(packet.b))
1378
1379		if len(packet.b) > 0 {
1380			n, err := w.Write(packet.b)
1381			written += int64(n)
1382			if err != nil {
1383				return written, err
1384			}
1385		}
1386
1387		if packet.err != nil {
1388			if packet.err == io.EOF {
1389				return written, nil
1390			}
1391
1392			return written, packet.err
1393		}
1394
1395		pool.Put(packet.b)
1396		cur = packet.next
1397	}
1398}
1399
1400// Stat returns the FileInfo structure describing file. If there is an
1401// error.
1402func (f *File) Stat() (os.FileInfo, error) {
1403	fs, err := f.c.fstat(f.handle)
1404	if err != nil {
1405		return nil, err
1406	}
1407	return fileInfoFromStat(fs, path.Base(f.path)), nil
1408}
1409
1410// Write writes len(b) bytes to the File. It returns the number of bytes
1411// written and an error, if any. Write returns a non-nil error when n !=
1412// len(b).
1413//
1414// To maximise throughput for transferring the entire file (especially
1415// over high latency links) it is recommended to use ReadFrom rather
1416// than calling Write multiple times. io.Copy will do this
1417// automatically.
1418func (f *File) Write(b []byte) (int, error) {
1419	f.mu.Lock()
1420	defer f.mu.Unlock()
1421
1422	n, err := f.WriteAt(b, f.offset)
1423	f.offset += int64(n)
1424	return n, err
1425}
1426
1427func (f *File) writeChunkAt(ch chan result, b []byte, off int64) (int, error) {
1428	typ, data, err := f.c.sendPacket(ch, &sshFxpWritePacket{
1429		ID:     f.c.nextID(),
1430		Handle: f.handle,
1431		Offset: uint64(off),
1432		Length: uint32(len(b)),
1433		Data:   b,
1434	})
1435	if err != nil {
1436		return 0, err
1437	}
1438
1439	switch typ {
1440	case sshFxpStatus:
1441		id, _ := unmarshalUint32(data)
1442		err := normaliseError(unmarshalStatus(id, data))
1443		if err != nil {
1444			return 0, err
1445		}
1446
1447	default:
1448		return 0, unimplementedPacketErr(typ)
1449	}
1450
1451	return len(b), nil
1452}
1453
1454// writeAtConcurrent implements WriterAt, but works concurrently rather than sequentially.
1455func (f *File) writeAtConcurrent(b []byte, off int64) (int, error) {
1456	// Split the write into multiple maxPacket sized concurrent writes
1457	// bounded by maxConcurrentRequests. This allows writes with a suitably
1458	// large buffer to transfer data at a much faster rate due to
1459	// overlapping round trip times.
1460
1461	cancel := make(chan struct{})
1462
1463	type work struct {
1464		b   []byte
1465		off int64
1466	}
1467	workCh := make(chan work)
1468
1469	// Slice: cut up the Read into any number of buffers of length <= f.c.maxPacket, and at appropriate offsets.
1470	go func() {
1471		defer close(workCh)
1472
1473		var read int
1474		chunkSize := f.c.maxPacket
1475
1476		for read < len(b) {
1477			wb := b[read:]
1478			if len(wb) > chunkSize {
1479				wb = wb[:chunkSize]
1480			}
1481
1482			select {
1483			case workCh <- work{wb, off + int64(read)}:
1484			case <-cancel:
1485				return
1486			}
1487
1488			read += len(wb)
1489		}
1490	}()
1491
1492	type wErr struct {
1493		off int64
1494		err error
1495	}
1496	errCh := make(chan wErr)
1497
1498	concurrency := len(b)/f.c.maxPacket + 1
1499	if concurrency > f.c.maxConcurrentRequests || concurrency < 1 {
1500		concurrency = f.c.maxConcurrentRequests
1501	}
1502
1503	var wg sync.WaitGroup
1504	wg.Add(concurrency)
1505	for i := 0; i < concurrency; i++ {
1506		// Map_i: each worker gets work, and does the Write from each buffer to its respective offset.
1507		go func() {
1508			defer wg.Done()
1509
1510			ch := make(chan result, 1) // reusable channel per mapper.
1511
1512			for packet := range workCh {
1513				n, err := f.writeChunkAt(ch, packet.b, packet.off)
1514				if err != nil {
1515					// return the offset as the start + how much we wrote before the error.
1516					errCh <- wErr{packet.off + int64(n), err}
1517				}
1518			}
1519		}()
1520	}
1521
1522	// Wait for long tail, before closing results.
1523	go func() {
1524		wg.Wait()
1525		close(errCh)
1526	}()
1527
1528	// Reduce: collect all the results into a relevant return: the earliest offset to return an error.
1529	firstErr := wErr{math.MaxInt64, nil}
1530	for wErr := range errCh {
1531		if wErr.off <= firstErr.off {
1532			firstErr = wErr
1533		}
1534
1535		select {
1536		case <-cancel:
1537		default:
1538			// stop any more work from being distributed. (Just in case.)
1539			close(cancel)
1540		}
1541	}
1542
1543	if firstErr.err != nil {
1544		// firstErr.err != nil if and only if firstErr.off >= our starting offset.
1545		return int(firstErr.off - off), firstErr.err
1546	}
1547
1548	return len(b), nil
1549}
1550
1551// WriteAt writes up to len(b) byte to the File at a given offset `off`. It returns
1552// the number of bytes written and an error, if any. WriteAt follows io.WriterAt semantics,
1553// so the file offset is not altered during the write.
1554func (f *File) WriteAt(b []byte, off int64) (written int, err error) {
1555	if len(b) <= f.c.maxPacket {
1556		// We can do this in one write.
1557		return f.writeChunkAt(nil, b, off)
1558	}
1559
1560	if f.c.useConcurrentWrites {
1561		return f.writeAtConcurrent(b, off)
1562	}
1563
1564	ch := make(chan result, 1) // reusable channel
1565
1566	chunkSize := f.c.maxPacket
1567
1568	for written < len(b) {
1569		wb := b[written:]
1570		if len(wb) > chunkSize {
1571			wb = wb[:chunkSize]
1572		}
1573
1574		n, err := f.writeChunkAt(ch, wb, off+int64(written))
1575		if n > 0 {
1576			written += n
1577		}
1578
1579		if err != nil {
1580			return written, err
1581		}
1582	}
1583
1584	return len(b), nil
1585}
1586
1587// ReadFromWithConcurrency implements ReaderFrom,
1588// but uses the given concurrency to issue multiple requests at the same time.
1589//
1590// Giving a concurrency of less than one will default to the Client’s max concurrency.
1591//
1592// Otherwise, the given concurrency will be capped by the Client's max concurrency.
1593func (f *File) ReadFromWithConcurrency(r io.Reader, concurrency int) (read int64, err error) {
1594	// Split the write into multiple maxPacket sized concurrent writes.
1595	// This allows writes with a suitably large reader
1596	// to transfer data at a much faster rate due to overlapping round trip times.
1597
1598	cancel := make(chan struct{})
1599
1600	type work struct {
1601		b   []byte
1602		n   int
1603		off int64
1604	}
1605	workCh := make(chan work)
1606
1607	type rwErr struct {
1608		off int64
1609		err error
1610	}
1611	errCh := make(chan rwErr)
1612
1613	if concurrency > f.c.maxConcurrentRequests || concurrency < 1 {
1614		concurrency = f.c.maxConcurrentRequests
1615	}
1616
1617	pool := newBufPool(concurrency, f.c.maxPacket)
1618
1619	// Slice: cut up the Read into any number of buffers of length <= f.c.maxPacket, and at appropriate offsets.
1620	go func() {
1621		defer close(workCh)
1622
1623		off := f.offset
1624
1625		for {
1626			b := pool.Get()
1627
1628			n, err := r.Read(b)
1629			if n > 0 {
1630				read += int64(n)
1631
1632				select {
1633				case workCh <- work{b, n, off}:
1634					// We need the pool.Put(b) to put the whole slice, not just trunced.
1635				case <-cancel:
1636					return
1637				}
1638
1639				off += int64(n)
1640			}
1641
1642			if err != nil {
1643				if err != io.EOF {
1644					errCh <- rwErr{off, err}
1645				}
1646				return
1647			}
1648		}
1649	}()
1650
1651	var wg sync.WaitGroup
1652	wg.Add(concurrency)
1653	for i := 0; i < concurrency; i++ {
1654		// Map_i: each worker gets work, and does the Write from each buffer to its respective offset.
1655		go func() {
1656			defer wg.Done()
1657
1658			ch := make(chan result, 1) // reusable channel per mapper.
1659
1660			for packet := range workCh {
1661				n, err := f.writeChunkAt(ch, packet.b[:packet.n], packet.off)
1662				if err != nil {
1663					// return the offset as the start + how much we wrote before the error.
1664					errCh <- rwErr{packet.off + int64(n), err}
1665				}
1666				pool.Put(packet.b)
1667			}
1668		}()
1669	}
1670
1671	// Wait for long tail, before closing results.
1672	go func() {
1673		wg.Wait()
1674		close(errCh)
1675	}()
1676
1677	// Reduce: Collect all the results into a relevant return: the earliest offset to return an error.
1678	firstErr := rwErr{math.MaxInt64, nil}
1679	for rwErr := range errCh {
1680		if rwErr.off <= firstErr.off {
1681			firstErr = rwErr
1682		}
1683
1684		select {
1685		case <-cancel:
1686		default:
1687			// stop any more work from being distributed.
1688			close(cancel)
1689		}
1690	}
1691
1692	if firstErr.err != nil {
1693		// firstErr.err != nil if and only if firstErr.off is a valid offset.
1694		//
1695		// firstErr.off will then be the lesser of:
1696		// * the offset of the first error from writing,
1697		// * the last successfully read offset.
1698		//
1699		// This could be less than the last succesfully written offset,
1700		// which is the whole reason for the UseConcurrentWrites() ClientOption.
1701		//
1702		// Callers are responsible for truncating any SFTP files to a safe length.
1703		f.offset = firstErr.off
1704
1705		// ReadFrom is defined to return the read bytes, regardless of any writer errors.
1706		return read, firstErr.err
1707	}
1708
1709	f.offset += read
1710	return read, nil
1711}
1712
1713// ReadFrom reads data from r until EOF and writes it to the file. The return
1714// value is the number of bytes read. Any error except io.EOF encountered
1715// during the read is also returned.
1716//
1717// This method is preferred over calling Write multiple times
1718// to maximise throughput for transferring the entire file,
1719// especially over high-latency links.
1720func (f *File) ReadFrom(r io.Reader) (int64, error) {
1721	f.mu.Lock()
1722	defer f.mu.Unlock()
1723
1724	if f.c.useConcurrentWrites {
1725		var remain int64
1726		switch r := r.(type) {
1727		case interface{ Len() int }:
1728			remain = int64(r.Len())
1729
1730		case interface{ Size() int64 }:
1731			remain = r.Size()
1732
1733		case *io.LimitedReader:
1734			remain = r.N
1735
1736		case interface{ Stat() (os.FileInfo, error) }:
1737			info, err := r.Stat()
1738			if err == nil {
1739				remain = info.Size()
1740			}
1741		}
1742
1743		if remain < 0 {
1744			// We can strongly assert that we want default max concurrency here.
1745			return f.ReadFromWithConcurrency(r, f.c.maxConcurrentRequests)
1746		}
1747
1748		if remain > int64(f.c.maxPacket) {
1749			// Otherwise, only use concurrency, if it would be at least two packets.
1750
1751			// This is the best reasonable guess we can make.
1752			concurrency64 := remain/int64(f.c.maxPacket) + 1
1753
1754			// We need to cap this value to an `int` size value to avoid overflow on 32-bit machines.
1755			// So, we may as well pre-cap it to `f.c.maxConcurrentRequests`.
1756			if concurrency64 > int64(f.c.maxConcurrentRequests) {
1757				concurrency64 = int64(f.c.maxConcurrentRequests)
1758			}
1759
1760			return f.ReadFromWithConcurrency(r, int(concurrency64))
1761		}
1762	}
1763
1764	ch := make(chan result, 1) // reusable channel
1765
1766	b := make([]byte, f.c.maxPacket)
1767
1768	var read int64
1769	for {
1770		n, err := r.Read(b)
1771		if n < 0 {
1772			panic("sftp.File: reader returned negative count from Read")
1773		}
1774
1775		if n > 0 {
1776			read += int64(n)
1777
1778			m, err2 := f.writeChunkAt(ch, b[:n], f.offset)
1779			f.offset += int64(m)
1780
1781			if err == nil {
1782				err = err2
1783			}
1784		}
1785
1786		if err != nil {
1787			if err == io.EOF {
1788				return read, nil // return nil explicitly.
1789			}
1790
1791			return read, err
1792		}
1793	}
1794}
1795
1796// Seek implements io.Seeker by setting the client offset for the next Read or
1797// Write. It returns the next offset read. Seeking before or after the end of
1798// the file is undefined. Seeking relative to the end calls Stat.
1799func (f *File) Seek(offset int64, whence int) (int64, error) {
1800	f.mu.Lock()
1801	defer f.mu.Unlock()
1802
1803	switch whence {
1804	case io.SeekStart:
1805	case io.SeekCurrent:
1806		offset += f.offset
1807	case io.SeekEnd:
1808		fi, err := f.Stat()
1809		if err != nil {
1810			return f.offset, err
1811		}
1812		offset += fi.Size()
1813	default:
1814		return f.offset, unimplementedSeekWhence(whence)
1815	}
1816
1817	if offset < 0 {
1818		return f.offset, os.ErrInvalid
1819	}
1820
1821	f.offset = offset
1822	return f.offset, nil
1823}
1824
1825// Chown changes the uid/gid of the current file.
1826func (f *File) Chown(uid, gid int) error {
1827	return f.c.Chown(f.path, uid, gid)
1828}
1829
1830// Chmod changes the permissions of the current file.
1831//
1832// See Client.Chmod for details.
1833func (f *File) Chmod(mode os.FileMode) error {
1834	return f.c.setfstat(f.handle, sshFileXferAttrPermissions, toChmodPerm(mode))
1835}
1836
1837// Sync requests a flush of the contents of a File to stable storage.
1838//
1839// Sync requires the server to support the fsync@openssh.com extension.
1840func (f *File) Sync() error {
1841	id := f.c.nextID()
1842	typ, data, err := f.c.sendPacket(nil, &sshFxpFsyncPacket{
1843		ID:     id,
1844		Handle: f.handle,
1845	})
1846
1847	switch {
1848	case err != nil:
1849		return err
1850	case typ == sshFxpStatus:
1851		return normaliseError(unmarshalStatus(id, data))
1852	default:
1853		return &unexpectedPacketErr{want: sshFxpStatus, got: typ}
1854	}
1855}
1856
1857// Truncate sets the size of the current file. Although it may be safely assumed
1858// that if the size is less than its current size it will be truncated to fit,
1859// the SFTP protocol does not specify what behavior the server should do when setting
1860// size greater than the current size.
1861// We send a SSH_FXP_FSETSTAT here since we have a file handle
1862func (f *File) Truncate(size int64) error {
1863	return f.c.setfstat(f.handle, sshFileXferAttrSize, uint64(size))
1864}
1865
1866func min(a, b int) int {
1867	if a > b {
1868		return b
1869	}
1870	return a
1871}
1872
1873// normaliseError normalises an error into a more standard form that can be
1874// checked against stdlib errors like io.EOF or os.ErrNotExist.
1875func normaliseError(err error) error {
1876	switch err := err.(type) {
1877	case *StatusError:
1878		switch err.Code {
1879		case sshFxEOF:
1880			return io.EOF
1881		case sshFxNoSuchFile:
1882			return os.ErrNotExist
1883		case sshFxPermissionDenied:
1884			return os.ErrPermission
1885		case sshFxOk:
1886			return nil
1887		default:
1888			return err
1889		}
1890	default:
1891		return err
1892	}
1893}
1894
1895func unmarshalStatus(id uint32, data []byte) error {
1896	sid, data := unmarshalUint32(data)
1897	if sid != id {
1898		return &unexpectedIDErr{id, sid}
1899	}
1900	code, data := unmarshalUint32(data)
1901	msg, data, _ := unmarshalStringSafe(data)
1902	lang, _, _ := unmarshalStringSafe(data)
1903	return &StatusError{
1904		Code: code,
1905		msg:  msg,
1906		lang: lang,
1907	}
1908}
1909
1910func marshalStatus(b []byte, err StatusError) []byte {
1911	b = marshalUint32(b, err.Code)
1912	b = marshalString(b, err.msg)
1913	b = marshalString(b, err.lang)
1914	return b
1915}
1916
1917// flags converts the flags passed to OpenFile into ssh flags.
1918// Unsupported flags are ignored.
1919func flags(f int) uint32 {
1920	var out uint32
1921	switch f & os.O_WRONLY {
1922	case os.O_WRONLY:
1923		out |= sshFxfWrite
1924	case os.O_RDONLY:
1925		out |= sshFxfRead
1926	}
1927	if f&os.O_RDWR == os.O_RDWR {
1928		out |= sshFxfRead | sshFxfWrite
1929	}
1930	if f&os.O_APPEND == os.O_APPEND {
1931		out |= sshFxfAppend
1932	}
1933	if f&os.O_CREATE == os.O_CREATE {
1934		out |= sshFxfCreat
1935	}
1936	if f&os.O_TRUNC == os.O_TRUNC {
1937		out |= sshFxfTrunc
1938	}
1939	if f&os.O_EXCL == os.O_EXCL {
1940		out |= sshFxfExcl
1941	}
1942	return out
1943}
1944
1945// toChmodPerm converts Go permission bits to POSIX permission bits.
1946//
1947// This differs from fromFileMode in that we preserve the POSIX versions of
1948// setuid, setgid and sticky in m, because we've historically supported those
1949// bits, and we mask off any non-permission bits.
1950func toChmodPerm(m os.FileMode) (perm uint32) {
1951	const mask = os.ModePerm | s_ISUID | s_ISGID | s_ISVTX
1952	perm = uint32(m & mask)
1953
1954	if m&os.ModeSetuid != 0 {
1955		perm |= s_ISUID
1956	}
1957	if m&os.ModeSetgid != 0 {
1958		perm |= s_ISGID
1959	}
1960	if m&os.ModeSticky != 0 {
1961		perm |= s_ISVTX
1962	}
1963
1964	return perm
1965}
1966