1// Package sftp provides a filesystem interface using github.com/pkg/sftp 2 3//go:build !plan9 4// +build !plan9 5 6package sftp 7 8import ( 9 "bytes" 10 "context" 11 "fmt" 12 "io" 13 "io/ioutil" 14 "os" 15 "path" 16 "regexp" 17 "strconv" 18 "strings" 19 "sync" 20 "sync/atomic" 21 "time" 22 23 "github.com/pkg/errors" 24 "github.com/pkg/sftp" 25 "github.com/rclone/rclone/fs" 26 "github.com/rclone/rclone/fs/accounting" 27 "github.com/rclone/rclone/fs/config" 28 "github.com/rclone/rclone/fs/config/configmap" 29 "github.com/rclone/rclone/fs/config/configstruct" 30 "github.com/rclone/rclone/fs/config/obscure" 31 "github.com/rclone/rclone/fs/fshttp" 32 "github.com/rclone/rclone/fs/hash" 33 "github.com/rclone/rclone/lib/env" 34 "github.com/rclone/rclone/lib/pacer" 35 "github.com/rclone/rclone/lib/readers" 36 sshagent "github.com/xanzy/ssh-agent" 37 "golang.org/x/crypto/ssh" 38 "golang.org/x/crypto/ssh/knownhosts" 39) 40 41const ( 42 hashCommandNotSupported = "none" 43 minSleep = 100 * time.Millisecond 44 maxSleep = 2 * time.Second 45 decayConstant = 2 // bigger for slower decay, exponential 46) 47 48var ( 49 currentUser = env.CurrentUser() 50) 51 52func init() { 53 fsi := &fs.RegInfo{ 54 Name: "sftp", 55 Description: "SSH/SFTP Connection", 56 NewFs: NewFs, 57 Options: []fs.Option{{ 58 Name: "host", 59 Help: "SSH host to connect to.\n\nE.g. \"example.com\".", 60 Required: true, 61 }, { 62 Name: "user", 63 Help: "SSH username, leave blank for current username, " + currentUser + ".", 64 }, { 65 Name: "port", 66 Help: "SSH port, leave blank to use default (22).", 67 }, { 68 Name: "pass", 69 Help: "SSH password, leave blank to use ssh-agent.", 70 IsPassword: true, 71 }, { 72 Name: "key_pem", 73 Help: "Raw PEM-encoded private key.\n\nIf specified, will override key_file parameter.", 74 }, { 75 Name: "key_file", 76 Help: "Path to PEM-encoded private key file.\n\nLeave blank or set key-use-agent to use ssh-agent." + env.ShellExpandHelp, 77 }, { 78 Name: "key_file_pass", 79 Help: `The passphrase to decrypt the PEM-encoded private key file. 80 81Only PEM encrypted key files (old OpenSSH format) are supported. Encrypted keys 82in the new OpenSSH format can't be used.`, 83 IsPassword: true, 84 }, { 85 Name: "pubkey_file", 86 Help: `Optional path to public key file. 87 88Set this if you have a signed certificate you want to use for authentication.` + env.ShellExpandHelp, 89 }, { 90 Name: "known_hosts_file", 91 Help: `Optional path to known_hosts file. 92 93Set this value to enable server host key validation.` + env.ShellExpandHelp, 94 Advanced: true, 95 Examples: []fs.OptionExample{{ 96 Value: "~/.ssh/known_hosts", 97 Help: "Use OpenSSH's known_hosts file.", 98 }}, 99 }, { 100 Name: "key_use_agent", 101 Help: `When set forces the usage of the ssh-agent. 102 103When key-file is also set, the ".pub" file of the specified key-file is read and only the associated key is 104requested from the ssh-agent. This allows to avoid ` + "`Too many authentication failures for *username*`" + ` errors 105when the ssh-agent contains many keys.`, 106 Default: false, 107 }, { 108 Name: "use_insecure_cipher", 109 Help: `Enable the use of insecure ciphers and key exchange methods. 110 111This enables the use of the following insecure ciphers and key exchange methods: 112 113- aes128-cbc 114- aes192-cbc 115- aes256-cbc 116- 3des-cbc 117- diffie-hellman-group-exchange-sha256 118- diffie-hellman-group-exchange-sha1 119 120Those algorithms are insecure and may allow plaintext data to be recovered by an attacker.`, 121 Default: false, 122 Examples: []fs.OptionExample{ 123 { 124 Value: "false", 125 Help: "Use default Cipher list.", 126 }, { 127 Value: "true", 128 Help: "Enables the use of the aes128-cbc cipher and diffie-hellman-group-exchange-sha256, diffie-hellman-group-exchange-sha1 key exchange.", 129 }, 130 }, 131 }, { 132 Name: "disable_hashcheck", 133 Default: false, 134 Help: "Disable the execution of SSH commands to determine if remote file hashing is available.\n\nLeave blank or set to false to enable hashing (recommended), set to true to disable hashing.", 135 }, { 136 Name: "ask_password", 137 Default: false, 138 Help: `Allow asking for SFTP password when needed. 139 140If this is set and no password is supplied then rclone will: 141- ask for a password 142- not contact the ssh agent 143`, 144 Advanced: true, 145 }, { 146 Name: "path_override", 147 Default: "", 148 Help: `Override path used by SSH connection. 149 150This allows checksum calculation when SFTP and SSH paths are 151different. This issue affects among others Synology NAS boxes. 152 153Shared folders can be found in directories representing volumes 154 155 rclone sync /home/local/directory remote:/directory --ssh-path-override /volume2/directory 156 157Home directory can be found in a shared folder called "home" 158 159 rclone sync /home/local/directory remote:/home/directory --ssh-path-override /volume1/homes/USER/directory`, 160 Advanced: true, 161 }, { 162 Name: "set_modtime", 163 Default: true, 164 Help: "Set the modified time on the remote if set.", 165 Advanced: true, 166 }, { 167 Name: "md5sum_command", 168 Default: "", 169 Help: "The command used to read md5 hashes.\n\nLeave blank for autodetect.", 170 Advanced: true, 171 }, { 172 Name: "sha1sum_command", 173 Default: "", 174 Help: "The command used to read sha1 hashes.\n\nLeave blank for autodetect.", 175 Advanced: true, 176 }, { 177 Name: "skip_links", 178 Default: false, 179 Help: "Set to skip any symlinks and any other non regular files.", 180 Advanced: true, 181 }, { 182 Name: "subsystem", 183 Default: "sftp", 184 Help: "Specifies the SSH2 subsystem on the remote host.", 185 Advanced: true, 186 }, { 187 Name: "server_command", 188 Default: "", 189 Help: `Specifies the path or command to run a sftp server on the remote host. 190 191The subsystem option is ignored when server_command is defined.`, 192 Advanced: true, 193 }, { 194 Name: "use_fstat", 195 Default: false, 196 Help: `If set use fstat instead of stat. 197 198Some servers limit the amount of open files and calling Stat after opening 199the file will throw an error from the server. Setting this flag will call 200Fstat instead of Stat which is called on an already open file handle. 201 202It has been found that this helps with IBM Sterling SFTP servers which have 203"extractability" level set to 1 which means only 1 file can be opened at 204any given time. 205`, 206 Advanced: true, 207 }, { 208 Name: "disable_concurrent_reads", 209 Default: false, 210 Help: `If set don't use concurrent reads. 211 212Normally concurrent reads are safe to use and not using them will 213degrade performance, so this option is disabled by default. 214 215Some servers limit the amount number of times a file can be 216downloaded. Using concurrent reads can trigger this limit, so if you 217have a server which returns 218 219 Failed to copy: file does not exist 220 221Then you may need to enable this flag. 222 223If concurrent reads are disabled, the use_fstat option is ignored. 224`, 225 Advanced: true, 226 }, { 227 Name: "disable_concurrent_writes", 228 Default: false, 229 Help: `If set don't use concurrent writes. 230 231Normally rclone uses concurrent writes to upload files. This improves 232the performance greatly, especially for distant servers. 233 234This option disables concurrent writes should that be necessary. 235`, 236 Advanced: true, 237 }, { 238 Name: "idle_timeout", 239 Default: fs.Duration(60 * time.Second), 240 Help: `Max time before closing idle connections. 241 242If no connections have been returned to the connection pool in the time 243given, rclone will empty the connection pool. 244 245Set to 0 to keep connections indefinitely. 246`, 247 Advanced: true, 248 }}, 249 } 250 fs.Register(fsi) 251} 252 253// Options defines the configuration for this backend 254type Options struct { 255 Host string `config:"host"` 256 User string `config:"user"` 257 Port string `config:"port"` 258 Pass string `config:"pass"` 259 KeyPem string `config:"key_pem"` 260 KeyFile string `config:"key_file"` 261 KeyFilePass string `config:"key_file_pass"` 262 PubKeyFile string `config:"pubkey_file"` 263 KnownHostsFile string `config:"known_hosts_file"` 264 KeyUseAgent bool `config:"key_use_agent"` 265 UseInsecureCipher bool `config:"use_insecure_cipher"` 266 DisableHashCheck bool `config:"disable_hashcheck"` 267 AskPassword bool `config:"ask_password"` 268 PathOverride string `config:"path_override"` 269 SetModTime bool `config:"set_modtime"` 270 Md5sumCommand string `config:"md5sum_command"` 271 Sha1sumCommand string `config:"sha1sum_command"` 272 SkipLinks bool `config:"skip_links"` 273 Subsystem string `config:"subsystem"` 274 ServerCommand string `config:"server_command"` 275 UseFstat bool `config:"use_fstat"` 276 DisableConcurrentReads bool `config:"disable_concurrent_reads"` 277 DisableConcurrentWrites bool `config:"disable_concurrent_writes"` 278 IdleTimeout fs.Duration `config:"idle_timeout"` 279} 280 281// Fs stores the interface to the remote SFTP files 282type Fs struct { 283 name string 284 root string 285 absRoot string 286 opt Options // parsed options 287 ci *fs.ConfigInfo // global config 288 m configmap.Mapper // config 289 features *fs.Features // optional features 290 config *ssh.ClientConfig 291 url string 292 mkdirLock *stringLock 293 cachedHashes *hash.Set 294 poolMu sync.Mutex 295 pool []*conn 296 drain *time.Timer // used to drain the pool when we stop using the connections 297 pacer *fs.Pacer // pacer for operations 298 savedpswd string 299 sessions int32 // count in use sessions 300} 301 302// Object is a remote SFTP file that has been stat'd (so it exists, but is not necessarily open for reading) 303type Object struct { 304 fs *Fs 305 remote string 306 size int64 // size of the object 307 modTime time.Time // modification time of the object 308 mode os.FileMode // mode bits from the file 309 md5sum *string // Cached MD5 checksum 310 sha1sum *string // Cached SHA1 checksum 311} 312 313// dial starts a client connection to the given SSH server. It is a 314// convenience function that connects to the given network address, 315// initiates the SSH handshake, and then sets up a Client. 316func (f *Fs) dial(ctx context.Context, network, addr string, sshConfig *ssh.ClientConfig) (*ssh.Client, error) { 317 dialer := fshttp.NewDialer(ctx) 318 conn, err := dialer.Dial(network, addr) 319 if err != nil { 320 return nil, err 321 } 322 c, chans, reqs, err := ssh.NewClientConn(conn, addr, sshConfig) 323 if err != nil { 324 return nil, err 325 } 326 fs.Debugf(f, "New connection %s->%s to %q", c.LocalAddr(), c.RemoteAddr(), c.ServerVersion()) 327 return ssh.NewClient(c, chans, reqs), nil 328} 329 330// conn encapsulates an ssh client and corresponding sftp client 331type conn struct { 332 sshClient *ssh.Client 333 sftpClient *sftp.Client 334 err chan error 335} 336 337// Wait for connection to close 338func (c *conn) wait() { 339 c.err <- c.sshClient.Conn.Wait() 340} 341 342// Closes the connection 343func (c *conn) close() error { 344 sftpErr := c.sftpClient.Close() 345 sshErr := c.sshClient.Close() 346 if sftpErr != nil { 347 return sftpErr 348 } 349 return sshErr 350} 351 352// Returns an error if closed 353func (c *conn) closed() error { 354 select { 355 case err := <-c.err: 356 return err 357 default: 358 } 359 return nil 360} 361 362// Show that we are using an ssh session 363// 364// Call removeSession() when done 365func (f *Fs) addSession() { 366 atomic.AddInt32(&f.sessions, 1) 367} 368 369// Show the ssh session is no longer in use 370func (f *Fs) removeSession() { 371 atomic.AddInt32(&f.sessions, -1) 372} 373 374// getSessions shows whether there are any sessions in use 375func (f *Fs) getSessions() int32 { 376 return atomic.LoadInt32(&f.sessions) 377} 378 379// Open a new connection to the SFTP server. 380func (f *Fs) sftpConnection(ctx context.Context) (c *conn, err error) { 381 // Rate limit rate of new connections 382 c = &conn{ 383 err: make(chan error, 1), 384 } 385 c.sshClient, err = f.dial(ctx, "tcp", f.opt.Host+":"+f.opt.Port, f.config) 386 if err != nil { 387 return nil, errors.Wrap(err, "couldn't connect SSH") 388 } 389 c.sftpClient, err = f.newSftpClient(c.sshClient) 390 if err != nil { 391 _ = c.sshClient.Close() 392 return nil, errors.Wrap(err, "couldn't initialise SFTP") 393 } 394 go c.wait() 395 return c, nil 396} 397 398// Creates a new SFTP client on conn, using the specified subsystem 399// or sftp server, and zero or more option functions 400func (f *Fs) newSftpClient(conn *ssh.Client, opts ...sftp.ClientOption) (*sftp.Client, error) { 401 s, err := conn.NewSession() 402 if err != nil { 403 return nil, err 404 } 405 pw, err := s.StdinPipe() 406 if err != nil { 407 return nil, err 408 } 409 pr, err := s.StdoutPipe() 410 if err != nil { 411 return nil, err 412 } 413 414 if f.opt.ServerCommand != "" { 415 if err := s.Start(f.opt.ServerCommand); err != nil { 416 return nil, err 417 } 418 } else { 419 if err := s.RequestSubsystem(f.opt.Subsystem); err != nil { 420 return nil, err 421 } 422 } 423 opts = opts[:len(opts):len(opts)] // make sure we don't overwrite the callers opts 424 opts = append(opts, 425 sftp.UseFstat(f.opt.UseFstat), 426 sftp.UseConcurrentReads(!f.opt.DisableConcurrentReads), 427 sftp.UseConcurrentWrites(!f.opt.DisableConcurrentWrites), 428 ) 429 return sftp.NewClientPipe(pr, pw, opts...) 430} 431 432// Get an SFTP connection from the pool, or open a new one 433func (f *Fs) getSftpConnection(ctx context.Context) (c *conn, err error) { 434 accounting.LimitTPS(ctx) 435 f.poolMu.Lock() 436 for len(f.pool) > 0 { 437 c = f.pool[0] 438 f.pool = f.pool[1:] 439 err := c.closed() 440 if err == nil { 441 break 442 } 443 fs.Errorf(f, "Discarding closed SSH connection: %v", err) 444 c = nil 445 } 446 f.poolMu.Unlock() 447 if c != nil { 448 return c, nil 449 } 450 err = f.pacer.Call(func() (bool, error) { 451 c, err = f.sftpConnection(ctx) 452 if err != nil { 453 return true, err 454 } 455 return false, nil 456 }) 457 return c, err 458} 459 460// Return an SFTP connection to the pool 461// 462// It nils the pointed to connection out so it can't be reused 463// 464// if err is not nil then it checks the connection is alive using a 465// Getwd request 466func (f *Fs) putSftpConnection(pc **conn, err error) { 467 c := *pc 468 *pc = nil 469 if err != nil { 470 // work out if this is an expected error 471 underlyingErr := errors.Cause(err) 472 isRegularError := false 473 switch underlyingErr { 474 case os.ErrNotExist: 475 isRegularError = true 476 default: 477 switch underlyingErr.(type) { 478 case *sftp.StatusError, *os.PathError: 479 isRegularError = true 480 } 481 } 482 // If not a regular SFTP error code then check the connection 483 if !isRegularError { 484 _, nopErr := c.sftpClient.Getwd() 485 if nopErr != nil { 486 fs.Debugf(f, "Connection failed, closing: %v", nopErr) 487 _ = c.close() 488 return 489 } 490 fs.Debugf(f, "Connection OK after error: %v", err) 491 } 492 } 493 f.poolMu.Lock() 494 f.pool = append(f.pool, c) 495 if f.opt.IdleTimeout > 0 { 496 f.drain.Reset(time.Duration(f.opt.IdleTimeout)) // nudge on the pool emptying timer 497 } 498 f.poolMu.Unlock() 499} 500 501// Drain the pool of any connections 502func (f *Fs) drainPool(ctx context.Context) (err error) { 503 f.poolMu.Lock() 504 defer f.poolMu.Unlock() 505 if sessions := f.getSessions(); sessions != 0 { 506 fs.Debugf(f, "Not closing %d unused connections as %d sessions active", len(f.pool), sessions) 507 if f.opt.IdleTimeout > 0 { 508 f.drain.Reset(time.Duration(f.opt.IdleTimeout)) // nudge on the pool emptying timer 509 } 510 return nil 511 } 512 if f.opt.IdleTimeout > 0 { 513 f.drain.Stop() 514 } 515 if len(f.pool) != 0 { 516 fs.Debugf(f, "closing %d unused connections", len(f.pool)) 517 } 518 for i, c := range f.pool { 519 if cErr := c.closed(); cErr == nil { 520 cErr = c.close() 521 if cErr != nil { 522 err = cErr 523 } 524 } 525 f.pool[i] = nil 526 } 527 f.pool = nil 528 return err 529} 530 531// NewFs creates a new Fs object from the name and root. It connects to 532// the host specified in the config file. 533func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, error) { 534 // This will hold the Fs object. We need to create it here 535 // so we can refer to it in the SSH callback, but it's populated 536 // in NewFsWithConnection 537 f := &Fs{ 538 ci: fs.GetConfig(ctx), 539 } 540 // Parse config into Options struct 541 opt := new(Options) 542 err := configstruct.Set(m, opt) 543 if err != nil { 544 return nil, err 545 } 546 if opt.User == "" { 547 opt.User = currentUser 548 } 549 if opt.Port == "" { 550 opt.Port = "22" 551 } 552 553 sshConfig := &ssh.ClientConfig{ 554 User: opt.User, 555 Auth: []ssh.AuthMethod{}, 556 HostKeyCallback: ssh.InsecureIgnoreHostKey(), 557 Timeout: f.ci.ConnectTimeout, 558 ClientVersion: "SSH-2.0-" + f.ci.UserAgent, 559 } 560 561 if opt.KnownHostsFile != "" { 562 hostcallback, err := knownhosts.New(env.ShellExpand(opt.KnownHostsFile)) 563 if err != nil { 564 return nil, errors.Wrap(err, "couldn't parse known_hosts_file") 565 } 566 sshConfig.HostKeyCallback = hostcallback 567 } 568 569 if opt.UseInsecureCipher { 570 sshConfig.Config.SetDefaults() 571 sshConfig.Config.Ciphers = append(sshConfig.Config.Ciphers, "aes128-cbc", "aes192-cbc", "aes256-cbc", "3des-cbc") 572 sshConfig.Config.KeyExchanges = append(sshConfig.Config.KeyExchanges, "diffie-hellman-group-exchange-sha1", "diffie-hellman-group-exchange-sha256") 573 } 574 575 keyFile := env.ShellExpand(opt.KeyFile) 576 pubkeyFile := env.ShellExpand(opt.PubKeyFile) 577 //keyPem := env.ShellExpand(opt.KeyPem) 578 // Add ssh agent-auth if no password or file or key PEM specified 579 if (opt.Pass == "" && keyFile == "" && !opt.AskPassword && opt.KeyPem == "") || opt.KeyUseAgent { 580 sshAgentClient, _, err := sshagent.New() 581 if err != nil { 582 return nil, errors.Wrap(err, "couldn't connect to ssh-agent") 583 } 584 signers, err := sshAgentClient.Signers() 585 if err != nil { 586 return nil, errors.Wrap(err, "couldn't read ssh agent signers") 587 } 588 if keyFile != "" { 589 pubBytes, err := ioutil.ReadFile(keyFile + ".pub") 590 if err != nil { 591 return nil, errors.Wrap(err, "failed to read public key file") 592 } 593 pub, _, _, _, err := ssh.ParseAuthorizedKey(pubBytes) 594 if err != nil { 595 return nil, errors.Wrap(err, "failed to parse public key file") 596 } 597 pubM := pub.Marshal() 598 found := false 599 for _, s := range signers { 600 if bytes.Equal(pubM, s.PublicKey().Marshal()) { 601 sshConfig.Auth = append(sshConfig.Auth, ssh.PublicKeys(s)) 602 found = true 603 break 604 } 605 } 606 if !found { 607 return nil, errors.New("private key not found in the ssh-agent") 608 } 609 } else { 610 sshConfig.Auth = append(sshConfig.Auth, ssh.PublicKeys(signers...)) 611 } 612 } 613 614 // Load key file if specified 615 if keyFile != "" || opt.KeyPem != "" { 616 var key []byte 617 if opt.KeyPem == "" { 618 key, err = ioutil.ReadFile(keyFile) 619 if err != nil { 620 return nil, errors.Wrap(err, "failed to read private key file") 621 } 622 } else { 623 // wrap in quotes because the config is a coming as a literal without them. 624 opt.KeyPem, err = strconv.Unquote("\"" + opt.KeyPem + "\"") 625 if err != nil { 626 return nil, errors.Wrap(err, "pem key not formatted properly") 627 } 628 key = []byte(opt.KeyPem) 629 } 630 clearpass := "" 631 if opt.KeyFilePass != "" { 632 clearpass, err = obscure.Reveal(opt.KeyFilePass) 633 if err != nil { 634 return nil, err 635 } 636 } 637 var signer ssh.Signer 638 if clearpass == "" { 639 signer, err = ssh.ParsePrivateKey(key) 640 } else { 641 signer, err = ssh.ParsePrivateKeyWithPassphrase(key, []byte(clearpass)) 642 } 643 if err != nil { 644 return nil, errors.Wrap(err, "failed to parse private key file") 645 } 646 647 // If a public key has been specified then use that 648 if pubkeyFile != "" { 649 certfile, err := ioutil.ReadFile(pubkeyFile) 650 if err != nil { 651 return nil, errors.Wrap(err, "unable to read cert file") 652 } 653 654 pk, _, _, _, err := ssh.ParseAuthorizedKey(certfile) 655 if err != nil { 656 return nil, errors.Wrap(err, "unable to parse cert file") 657 } 658 659 // And the signer for this, which includes the private key signer 660 // This is what we'll pass to the ssh client. 661 // Normally the ssh client will use the public key built 662 // into the private key, but we need to tell it to use the user 663 // specified public key cert. This signer is specific to the 664 // cert and will include the private key signer. Now ssh 665 // knows everything it needs. 666 cert, ok := pk.(*ssh.Certificate) 667 if !ok { 668 return nil, errors.New("public key file is not a certificate file: " + pubkeyFile) 669 } 670 pubsigner, err := ssh.NewCertSigner(cert, signer) 671 if err != nil { 672 return nil, errors.Wrap(err, "error generating cert signer") 673 } 674 sshConfig.Auth = append(sshConfig.Auth, ssh.PublicKeys(pubsigner)) 675 } else { 676 sshConfig.Auth = append(sshConfig.Auth, ssh.PublicKeys(signer)) 677 } 678 } 679 680 // Auth from password if specified 681 if opt.Pass != "" { 682 clearpass, err := obscure.Reveal(opt.Pass) 683 if err != nil { 684 return nil, err 685 } 686 sshConfig.Auth = append(sshConfig.Auth, 687 ssh.Password(clearpass), 688 ssh.KeyboardInteractive(func(user, instruction string, questions []string, echos []bool) ([]string, error) { 689 return f.keyboardInteractiveReponse(user, instruction, questions, echos, clearpass) 690 }), 691 ) 692 } 693 694 // Config for password if none was defined and we're allowed to 695 // We don't ask now; we ask if the ssh connection succeeds 696 if opt.Pass == "" && opt.AskPassword { 697 sshConfig.Auth = append(sshConfig.Auth, 698 ssh.PasswordCallback(f.getPass), 699 ssh.KeyboardInteractive(func(user, instruction string, questions []string, echos []bool) ([]string, error) { 700 pass, _ := f.getPass() 701 return f.keyboardInteractiveReponse(user, instruction, questions, echos, pass) 702 }), 703 ) 704 } 705 706 return NewFsWithConnection(ctx, f, name, root, m, opt, sshConfig) 707} 708 709// Do the keyboard interactive challenge 710// 711// Just send the password back for all questions 712func (f *Fs) keyboardInteractiveReponse(user, instruction string, questions []string, echos []bool, pass string) ([]string, error) { 713 fs.Debugf(f, "keyboard interactive auth requested") 714 answers := make([]string, len(questions)) 715 for i := range answers { 716 answers[i] = pass 717 } 718 return answers, nil 719} 720 721// If we're in password mode and ssh connection succeeds then this 722// callback is called. First time around we ask the user, and then 723// save it so on reconnection we give back the previous string. 724// This removes the ability to let the user correct a mistaken entry, 725// but means that reconnects are transparent. 726// We'll re-use config.Pass for this, 'cos we know it's not been 727// specified. 728func (f *Fs) getPass() (string, error) { 729 for f.savedpswd == "" { 730 _, _ = fmt.Fprint(os.Stderr, "Enter SFTP password: ") 731 f.savedpswd = config.ReadPassword() 732 } 733 return f.savedpswd, nil 734} 735 736// NewFsWithConnection creates a new Fs object from the name and root and an ssh.ClientConfig. It connects to 737// the host specified in the ssh.ClientConfig 738func NewFsWithConnection(ctx context.Context, f *Fs, name string, root string, m configmap.Mapper, opt *Options, sshConfig *ssh.ClientConfig) (fs.Fs, error) { 739 // Populate the Filesystem Object 740 f.name = name 741 f.root = root 742 f.absRoot = root 743 f.opt = *opt 744 f.m = m 745 f.config = sshConfig 746 f.url = "sftp://" + opt.User + "@" + opt.Host + ":" + opt.Port + "/" + root 747 f.mkdirLock = newStringLock() 748 f.pacer = fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))) 749 f.savedpswd = "" 750 // set the pool drainer timer going 751 if f.opt.IdleTimeout > 0 { 752 f.drain = time.AfterFunc(time.Duration(opt.IdleTimeout), func() { _ = f.drainPool(ctx) }) 753 } 754 755 f.features = (&fs.Features{ 756 CanHaveEmptyDirectories: true, 757 SlowHash: true, 758 }).Fill(ctx, f) 759 // Make a connection and pool it to return errors early 760 c, err := f.getSftpConnection(ctx) 761 if err != nil { 762 return nil, errors.Wrap(err, "NewFs") 763 } 764 cwd, err := c.sftpClient.Getwd() 765 f.putSftpConnection(&c, nil) 766 if err != nil { 767 fs.Debugf(f, "Failed to read current directory - using relative paths: %v", err) 768 } else if !path.IsAbs(f.root) { 769 f.absRoot = path.Join(cwd, f.root) 770 fs.Debugf(f, "Using absolute root directory %q", f.absRoot) 771 } 772 if root != "" { 773 // Check to see if the root actually an existing file 774 oldAbsRoot := f.absRoot 775 remote := path.Base(root) 776 f.root = path.Dir(root) 777 f.absRoot = path.Dir(f.absRoot) 778 if f.root == "." { 779 f.root = "" 780 } 781 _, err := f.NewObject(ctx, remote) 782 if err != nil { 783 if err == fs.ErrorObjectNotFound || err == fs.ErrorIsDir { 784 // File doesn't exist so return old f 785 f.root = root 786 f.absRoot = oldAbsRoot 787 return f, nil 788 } 789 return nil, err 790 } 791 // return an error with an fs which points to the parent 792 return f, fs.ErrorIsFile 793 } 794 return f, nil 795} 796 797// Name returns the configured name of the file system 798func (f *Fs) Name() string { 799 return f.name 800} 801 802// Root returns the root for the filesystem 803func (f *Fs) Root() string { 804 return f.root 805} 806 807// String returns the URL for the filesystem 808func (f *Fs) String() string { 809 return f.url 810} 811 812// Features returns the optional features of this Fs 813func (f *Fs) Features() *fs.Features { 814 return f.features 815} 816 817// Precision is the remote sftp file system's modtime precision, which we have no way of knowing. We estimate at 1s 818func (f *Fs) Precision() time.Duration { 819 return time.Second 820} 821 822// NewObject creates a new remote sftp file object 823func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) { 824 o := &Object{ 825 fs: f, 826 remote: remote, 827 } 828 err := o.stat(ctx) 829 if err != nil { 830 return nil, err 831 } 832 return o, nil 833} 834 835// dirExists returns true,nil if the directory exists, false, nil if 836// it doesn't or false, err 837func (f *Fs) dirExists(ctx context.Context, dir string) (bool, error) { 838 if dir == "" { 839 dir = "." 840 } 841 c, err := f.getSftpConnection(ctx) 842 if err != nil { 843 return false, errors.Wrap(err, "dirExists") 844 } 845 info, err := c.sftpClient.Stat(dir) 846 f.putSftpConnection(&c, err) 847 if err != nil { 848 if os.IsNotExist(err) { 849 return false, nil 850 } 851 return false, errors.Wrap(err, "dirExists stat failed") 852 } 853 if !info.IsDir() { 854 return false, fs.ErrorIsFile 855 } 856 return true, nil 857} 858 859// List the objects and directories in dir into entries. The 860// entries can be returned in any order but should be for a 861// complete directory. 862// 863// dir should be "" to list the root, and should not have 864// trailing slashes. 865// 866// This should return ErrDirNotFound if the directory isn't 867// found. 868func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { 869 root := path.Join(f.absRoot, dir) 870 ok, err := f.dirExists(ctx, root) 871 if err != nil { 872 return nil, errors.Wrap(err, "List failed") 873 } 874 if !ok { 875 return nil, fs.ErrorDirNotFound 876 } 877 sftpDir := root 878 if sftpDir == "" { 879 sftpDir = "." 880 } 881 c, err := f.getSftpConnection(ctx) 882 if err != nil { 883 return nil, errors.Wrap(err, "List") 884 } 885 infos, err := c.sftpClient.ReadDir(sftpDir) 886 f.putSftpConnection(&c, err) 887 if err != nil { 888 return nil, errors.Wrapf(err, "error listing %q", dir) 889 } 890 for _, info := range infos { 891 remote := path.Join(dir, info.Name()) 892 // If file is a symlink (not a regular file is the best cross platform test we can do), do a stat to 893 // pick up the size and type of the destination, instead of the size and type of the symlink. 894 if !info.Mode().IsRegular() && !info.IsDir() { 895 if f.opt.SkipLinks { 896 // skip non regular file if SkipLinks is set 897 continue 898 } 899 oldInfo := info 900 info, err = f.stat(ctx, remote) 901 if err != nil { 902 if !os.IsNotExist(err) { 903 fs.Errorf(remote, "stat of non-regular file failed: %v", err) 904 } 905 info = oldInfo 906 } 907 } 908 if info.IsDir() { 909 d := fs.NewDir(remote, info.ModTime()) 910 entries = append(entries, d) 911 } else { 912 o := &Object{ 913 fs: f, 914 remote: remote, 915 } 916 o.setMetadata(info) 917 entries = append(entries, o) 918 } 919 } 920 return entries, nil 921} 922 923// Put data from <in> into a new remote sftp file object described by <src.Remote()> and <src.ModTime(ctx)> 924func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { 925 err := f.mkParentDir(ctx, src.Remote()) 926 if err != nil { 927 return nil, errors.Wrap(err, "Put mkParentDir failed") 928 } 929 // Temporary object under construction 930 o := &Object{ 931 fs: f, 932 remote: src.Remote(), 933 } 934 err = o.Update(ctx, in, src, options...) 935 if err != nil { 936 return nil, err 937 } 938 return o, nil 939} 940 941// PutStream uploads to the remote path with the modTime given of indeterminate size 942func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { 943 return f.Put(ctx, in, src, options...) 944} 945 946// mkParentDir makes the parent of remote if necessary and any 947// directories above that 948func (f *Fs) mkParentDir(ctx context.Context, remote string) error { 949 parent := path.Dir(remote) 950 return f.mkdir(ctx, path.Join(f.absRoot, parent)) 951} 952 953// mkdir makes the directory and parents using native paths 954func (f *Fs) mkdir(ctx context.Context, dirPath string) error { 955 f.mkdirLock.Lock(dirPath) 956 defer f.mkdirLock.Unlock(dirPath) 957 if dirPath == "." || dirPath == "/" { 958 return nil 959 } 960 ok, err := f.dirExists(ctx, dirPath) 961 if err != nil { 962 return errors.Wrap(err, "mkdir dirExists failed") 963 } 964 if ok { 965 return nil 966 } 967 parent := path.Dir(dirPath) 968 err = f.mkdir(ctx, parent) 969 if err != nil { 970 return err 971 } 972 c, err := f.getSftpConnection(ctx) 973 if err != nil { 974 return errors.Wrap(err, "mkdir") 975 } 976 err = c.sftpClient.Mkdir(dirPath) 977 f.putSftpConnection(&c, err) 978 if err != nil { 979 return errors.Wrapf(err, "mkdir %q failed", dirPath) 980 } 981 return nil 982} 983 984// Mkdir makes the root directory of the Fs object 985func (f *Fs) Mkdir(ctx context.Context, dir string) error { 986 root := path.Join(f.absRoot, dir) 987 return f.mkdir(ctx, root) 988} 989 990// Rmdir removes the root directory of the Fs object 991func (f *Fs) Rmdir(ctx context.Context, dir string) error { 992 // Check to see if directory is empty as some servers will 993 // delete recursively with RemoveDirectory 994 entries, err := f.List(ctx, dir) 995 if err != nil { 996 return errors.Wrap(err, "Rmdir") 997 } 998 if len(entries) != 0 { 999 return fs.ErrorDirectoryNotEmpty 1000 } 1001 // Remove the directory 1002 root := path.Join(f.absRoot, dir) 1003 c, err := f.getSftpConnection(ctx) 1004 if err != nil { 1005 return errors.Wrap(err, "Rmdir") 1006 } 1007 err = c.sftpClient.RemoveDirectory(root) 1008 f.putSftpConnection(&c, err) 1009 return err 1010} 1011 1012// Move renames a remote sftp file object 1013func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { 1014 srcObj, ok := src.(*Object) 1015 if !ok { 1016 fs.Debugf(src, "Can't move - not same remote type") 1017 return nil, fs.ErrorCantMove 1018 } 1019 err := f.mkParentDir(ctx, remote) 1020 if err != nil { 1021 return nil, errors.Wrap(err, "Move mkParentDir failed") 1022 } 1023 c, err := f.getSftpConnection(ctx) 1024 if err != nil { 1025 return nil, errors.Wrap(err, "Move") 1026 } 1027 err = c.sftpClient.Rename( 1028 srcObj.path(), 1029 path.Join(f.absRoot, remote), 1030 ) 1031 f.putSftpConnection(&c, err) 1032 if err != nil { 1033 return nil, errors.Wrap(err, "Move Rename failed") 1034 } 1035 dstObj, err := f.NewObject(ctx, remote) 1036 if err != nil { 1037 return nil, errors.Wrap(err, "Move NewObject failed") 1038 } 1039 return dstObj, nil 1040} 1041 1042// DirMove moves src, srcRemote to this remote at dstRemote 1043// using server-side move operations. 1044// 1045// Will only be called if src.Fs().Name() == f.Name() 1046// 1047// If it isn't possible then return fs.ErrorCantDirMove 1048// 1049// If destination exists then return fs.ErrorDirExists 1050func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string) error { 1051 srcFs, ok := src.(*Fs) 1052 if !ok { 1053 fs.Debugf(srcFs, "Can't move directory - not same remote type") 1054 return fs.ErrorCantDirMove 1055 } 1056 srcPath := path.Join(srcFs.absRoot, srcRemote) 1057 dstPath := path.Join(f.absRoot, dstRemote) 1058 1059 // Check if destination exists 1060 ok, err := f.dirExists(ctx, dstPath) 1061 if err != nil { 1062 return errors.Wrap(err, "DirMove dirExists dst failed") 1063 } 1064 if ok { 1065 return fs.ErrorDirExists 1066 } 1067 1068 // Make sure the parent directory exists 1069 err = f.mkdir(ctx, path.Dir(dstPath)) 1070 if err != nil { 1071 return errors.Wrap(err, "DirMove mkParentDir dst failed") 1072 } 1073 1074 // Do the move 1075 c, err := f.getSftpConnection(ctx) 1076 if err != nil { 1077 return errors.Wrap(err, "DirMove") 1078 } 1079 err = c.sftpClient.Rename( 1080 srcPath, 1081 dstPath, 1082 ) 1083 f.putSftpConnection(&c, err) 1084 if err != nil { 1085 return errors.Wrapf(err, "DirMove Rename(%q,%q) failed", srcPath, dstPath) 1086 } 1087 return nil 1088} 1089 1090// run runds cmd on the remote end returning standard output 1091func (f *Fs) run(ctx context.Context, cmd string) ([]byte, error) { 1092 f.addSession() // Show session in use 1093 defer f.removeSession() 1094 1095 c, err := f.getSftpConnection(ctx) 1096 if err != nil { 1097 return nil, errors.Wrap(err, "run: get SFTP connection") 1098 } 1099 defer f.putSftpConnection(&c, err) 1100 1101 session, err := c.sshClient.NewSession() 1102 if err != nil { 1103 return nil, errors.Wrap(err, "run: get SFTP session") 1104 } 1105 defer func() { 1106 _ = session.Close() 1107 }() 1108 1109 var stdout, stderr bytes.Buffer 1110 session.Stdout = &stdout 1111 session.Stderr = &stderr 1112 1113 err = session.Run(cmd) 1114 if err != nil { 1115 return nil, errors.Wrapf(err, "failed to run %q: %s", cmd, stderr.Bytes()) 1116 } 1117 1118 return stdout.Bytes(), nil 1119} 1120 1121// Hashes returns the supported hash types of the filesystem 1122func (f *Fs) Hashes() hash.Set { 1123 ctx := context.TODO() 1124 if f.opt.DisableHashCheck { 1125 return hash.Set(hash.None) 1126 } 1127 1128 if f.cachedHashes != nil { 1129 return *f.cachedHashes 1130 } 1131 1132 // look for a hash command which works 1133 checkHash := func(commands []string, expected string, hashCommand *string, changed *bool) bool { 1134 if *hashCommand == hashCommandNotSupported { 1135 return false 1136 } 1137 if *hashCommand != "" { 1138 return true 1139 } 1140 *changed = true 1141 for _, command := range commands { 1142 output, err := f.run(ctx, command) 1143 if err != nil { 1144 continue 1145 } 1146 output = bytes.TrimSpace(output) 1147 fs.Debugf(f, "checking %q command: %q", command, output) 1148 if parseHash(output) == expected { 1149 *hashCommand = command 1150 return true 1151 } 1152 } 1153 *hashCommand = hashCommandNotSupported 1154 return false 1155 } 1156 1157 changed := false 1158 md5Works := checkHash([]string{"md5sum", "md5 -r"}, "d41d8cd98f00b204e9800998ecf8427e", &f.opt.Md5sumCommand, &changed) 1159 sha1Works := checkHash([]string{"sha1sum", "sha1 -r"}, "da39a3ee5e6b4b0d3255bfef95601890afd80709", &f.opt.Sha1sumCommand, &changed) 1160 1161 if changed { 1162 f.m.Set("md5sum_command", f.opt.Md5sumCommand) 1163 f.m.Set("sha1sum_command", f.opt.Sha1sumCommand) 1164 } 1165 1166 set := hash.NewHashSet() 1167 if sha1Works { 1168 set.Add(hash.SHA1) 1169 } 1170 if md5Works { 1171 set.Add(hash.MD5) 1172 } 1173 1174 f.cachedHashes = &set 1175 return set 1176} 1177 1178// About gets usage stats 1179func (f *Fs) About(ctx context.Context) (*fs.Usage, error) { 1180 escapedPath := shellEscape(f.root) 1181 if f.opt.PathOverride != "" { 1182 escapedPath = shellEscape(path.Join(f.opt.PathOverride, f.root)) 1183 } 1184 if len(escapedPath) == 0 { 1185 escapedPath = "/" 1186 } 1187 stdout, err := f.run(ctx, "df -k "+escapedPath) 1188 if err != nil { 1189 return nil, errors.Wrap(err, "your remote may not support About") 1190 } 1191 1192 usageTotal, usageUsed, usageAvail := parseUsage(stdout) 1193 usage := &fs.Usage{} 1194 if usageTotal >= 0 { 1195 usage.Total = fs.NewUsageValue(usageTotal) 1196 } 1197 if usageUsed >= 0 { 1198 usage.Used = fs.NewUsageValue(usageUsed) 1199 } 1200 if usageAvail >= 0 { 1201 usage.Free = fs.NewUsageValue(usageAvail) 1202 } 1203 return usage, nil 1204} 1205 1206// Shutdown the backend, closing any background tasks and any 1207// cached connections. 1208func (f *Fs) Shutdown(ctx context.Context) error { 1209 return f.drainPool(ctx) 1210} 1211 1212// Fs is the filesystem this remote sftp file object is located within 1213func (o *Object) Fs() fs.Info { 1214 return o.fs 1215} 1216 1217// String returns the URL to the remote SFTP file 1218func (o *Object) String() string { 1219 if o == nil { 1220 return "<nil>" 1221 } 1222 return o.remote 1223} 1224 1225// Remote the name of the remote SFTP file, relative to the fs root 1226func (o *Object) Remote() string { 1227 return o.remote 1228} 1229 1230// Hash returns the selected checksum of the file 1231// If no checksum is available it returns "" 1232func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) { 1233 o.fs.addSession() // Show session in use 1234 defer o.fs.removeSession() 1235 if o.fs.opt.DisableHashCheck { 1236 return "", nil 1237 } 1238 _ = o.fs.Hashes() 1239 1240 var hashCmd string 1241 if r == hash.MD5 { 1242 if o.md5sum != nil { 1243 return *o.md5sum, nil 1244 } 1245 hashCmd = o.fs.opt.Md5sumCommand 1246 } else if r == hash.SHA1 { 1247 if o.sha1sum != nil { 1248 return *o.sha1sum, nil 1249 } 1250 hashCmd = o.fs.opt.Sha1sumCommand 1251 } else { 1252 return "", hash.ErrUnsupported 1253 } 1254 if hashCmd == "" || hashCmd == hashCommandNotSupported { 1255 return "", hash.ErrUnsupported 1256 } 1257 1258 c, err := o.fs.getSftpConnection(ctx) 1259 if err != nil { 1260 return "", errors.Wrap(err, "Hash get SFTP connection") 1261 } 1262 session, err := c.sshClient.NewSession() 1263 o.fs.putSftpConnection(&c, err) 1264 if err != nil { 1265 return "", errors.Wrap(err, "Hash put SFTP connection") 1266 } 1267 1268 var stdout, stderr bytes.Buffer 1269 session.Stdout = &stdout 1270 session.Stderr = &stderr 1271 escapedPath := shellEscape(o.path()) 1272 if o.fs.opt.PathOverride != "" { 1273 escapedPath = shellEscape(path.Join(o.fs.opt.PathOverride, o.remote)) 1274 } 1275 err = session.Run(hashCmd + " " + escapedPath) 1276 fs.Debugf(nil, "sftp cmd = %s", escapedPath) 1277 if err != nil { 1278 _ = session.Close() 1279 fs.Debugf(o, "Failed to calculate %v hash: %v (%s)", r, err, bytes.TrimSpace(stderr.Bytes())) 1280 return "", nil 1281 } 1282 1283 _ = session.Close() 1284 b := stdout.Bytes() 1285 fs.Debugf(nil, "sftp output = %q", b) 1286 str := parseHash(b) 1287 fs.Debugf(nil, "sftp hash = %q", str) 1288 if r == hash.MD5 { 1289 o.md5sum = &str 1290 } else if r == hash.SHA1 { 1291 o.sha1sum = &str 1292 } 1293 return str, nil 1294} 1295 1296var shellEscapeRegex = regexp.MustCompile("[^A-Za-z0-9_.,:/\\@\u0080-\uFFFFFFFF\n-]") 1297 1298// Escape a string s.t. it cannot cause unintended behavior 1299// when sending it to a shell. 1300func shellEscape(str string) string { 1301 safe := shellEscapeRegex.ReplaceAllString(str, `\$0`) 1302 return strings.Replace(safe, "\n", "'\n'", -1) 1303} 1304 1305// Converts a byte array from the SSH session returned by 1306// an invocation of md5sum/sha1sum to a hash string 1307// as expected by the rest of this application 1308func parseHash(bytes []byte) string { 1309 // For strings with backslash *sum writes a leading \ 1310 // https://unix.stackexchange.com/q/313733/94054 1311 return strings.ToLower(strings.Split(strings.TrimLeft(string(bytes), "\\"), " ")[0]) // Split at hash / filename separator / all convert to lowercase 1312} 1313 1314// Parses the byte array output from the SSH session 1315// returned by an invocation of df into 1316// the disk size, used space, and available space on the disk, in that order. 1317// Only works when `df` has output info on only one disk 1318func parseUsage(bytes []byte) (spaceTotal int64, spaceUsed int64, spaceAvail int64) { 1319 spaceTotal, spaceUsed, spaceAvail = -1, -1, -1 1320 lines := strings.Split(string(bytes), "\n") 1321 if len(lines) < 2 { 1322 return 1323 } 1324 split := strings.Fields(lines[1]) 1325 if len(split) < 6 { 1326 return 1327 } 1328 spaceTotal, err := strconv.ParseInt(split[1], 10, 64) 1329 if err != nil { 1330 spaceTotal = -1 1331 } 1332 spaceUsed, err = strconv.ParseInt(split[2], 10, 64) 1333 if err != nil { 1334 spaceUsed = -1 1335 } 1336 spaceAvail, err = strconv.ParseInt(split[3], 10, 64) 1337 if err != nil { 1338 spaceAvail = -1 1339 } 1340 return spaceTotal * 1024, spaceUsed * 1024, spaceAvail * 1024 1341} 1342 1343// Size returns the size in bytes of the remote sftp file 1344func (o *Object) Size() int64 { 1345 return o.size 1346} 1347 1348// ModTime returns the modification time of the remote sftp file 1349func (o *Object) ModTime(ctx context.Context) time.Time { 1350 return o.modTime 1351} 1352 1353// path returns the native path of the object 1354func (o *Object) path() string { 1355 return path.Join(o.fs.absRoot, o.remote) 1356} 1357 1358// setMetadata updates the info in the object from the stat result passed in 1359func (o *Object) setMetadata(info os.FileInfo) { 1360 o.modTime = info.ModTime() 1361 o.size = info.Size() 1362 o.mode = info.Mode() 1363} 1364 1365// statRemote stats the file or directory at the remote given 1366func (f *Fs) stat(ctx context.Context, remote string) (info os.FileInfo, err error) { 1367 c, err := f.getSftpConnection(ctx) 1368 if err != nil { 1369 return nil, errors.Wrap(err, "stat") 1370 } 1371 absPath := path.Join(f.absRoot, remote) 1372 info, err = c.sftpClient.Stat(absPath) 1373 f.putSftpConnection(&c, err) 1374 return info, err 1375} 1376 1377// stat updates the info in the Object 1378func (o *Object) stat(ctx context.Context) error { 1379 info, err := o.fs.stat(ctx, o.remote) 1380 if err != nil { 1381 if os.IsNotExist(err) { 1382 return fs.ErrorObjectNotFound 1383 } 1384 return errors.Wrap(err, "stat failed") 1385 } 1386 if info.IsDir() { 1387 return fs.ErrorIsDir 1388 } 1389 o.setMetadata(info) 1390 return nil 1391} 1392 1393// SetModTime sets the modification and access time to the specified time 1394// 1395// it also updates the info field 1396func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error { 1397 if !o.fs.opt.SetModTime { 1398 return nil 1399 } 1400 c, err := o.fs.getSftpConnection(ctx) 1401 if err != nil { 1402 return errors.Wrap(err, "SetModTime") 1403 } 1404 err = c.sftpClient.Chtimes(o.path(), modTime, modTime) 1405 o.fs.putSftpConnection(&c, err) 1406 if err != nil { 1407 return errors.Wrap(err, "SetModTime failed") 1408 } 1409 err = o.stat(ctx) 1410 if err != nil { 1411 return errors.Wrap(err, "SetModTime stat failed") 1412 } 1413 return nil 1414} 1415 1416// Storable returns whether the remote sftp file is a regular file (not a directory, symbolic link, block device, character device, named pipe, etc.) 1417func (o *Object) Storable() bool { 1418 return o.mode.IsRegular() 1419} 1420 1421// objectReader represents a file open for reading on the SFTP server 1422type objectReader struct { 1423 f *Fs 1424 sftpFile *sftp.File 1425 pipeReader *io.PipeReader 1426 done chan struct{} 1427} 1428 1429func (f *Fs) newObjectReader(sftpFile *sftp.File) *objectReader { 1430 pipeReader, pipeWriter := io.Pipe() 1431 file := &objectReader{ 1432 f: f, 1433 sftpFile: sftpFile, 1434 pipeReader: pipeReader, 1435 done: make(chan struct{}), 1436 } 1437 // Show connection in use 1438 f.addSession() 1439 1440 go func() { 1441 // Use sftpFile.WriteTo to pump data so that it gets a 1442 // chance to build the window up. 1443 _, err := sftpFile.WriteTo(pipeWriter) 1444 // Close the pipeWriter so the pipeReader fails with 1445 // the same error or EOF if err == nil 1446 _ = pipeWriter.CloseWithError(err) 1447 // signal that we've finished 1448 close(file.done) 1449 }() 1450 1451 return file 1452} 1453 1454// Read from a remote sftp file object reader 1455func (file *objectReader) Read(p []byte) (n int, err error) { 1456 n, err = file.pipeReader.Read(p) 1457 return n, err 1458} 1459 1460// Close a reader of a remote sftp file 1461func (file *objectReader) Close() (err error) { 1462 // Close the sftpFile - this will likely cause the WriteTo to error 1463 err = file.sftpFile.Close() 1464 // Close the pipeReader so writes to the pipeWriter fail 1465 _ = file.pipeReader.Close() 1466 // Wait for the background process to finish 1467 <-file.done 1468 // Show connection no longer in use 1469 file.f.removeSession() 1470 return err 1471} 1472 1473// Open a remote sftp file object for reading. Seek is supported 1474func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.ReadCloser, err error) { 1475 var offset, limit int64 = 0, -1 1476 for _, option := range options { 1477 switch x := option.(type) { 1478 case *fs.SeekOption: 1479 offset = x.Offset 1480 case *fs.RangeOption: 1481 offset, limit = x.Decode(o.Size()) 1482 default: 1483 if option.Mandatory() { 1484 fs.Logf(o, "Unsupported mandatory option: %v", option) 1485 } 1486 } 1487 } 1488 c, err := o.fs.getSftpConnection(ctx) 1489 if err != nil { 1490 return nil, errors.Wrap(err, "Open") 1491 } 1492 sftpFile, err := c.sftpClient.Open(o.path()) 1493 o.fs.putSftpConnection(&c, err) 1494 if err != nil { 1495 return nil, errors.Wrap(err, "Open failed") 1496 } 1497 if offset > 0 { 1498 off, err := sftpFile.Seek(offset, io.SeekStart) 1499 if err != nil || off != offset { 1500 return nil, errors.Wrap(err, "Open Seek failed") 1501 } 1502 } 1503 in = readers.NewLimitedReadCloser(o.fs.newObjectReader(sftpFile), limit) 1504 return in, nil 1505} 1506 1507type sizeReader struct { 1508 io.Reader 1509 size int64 1510} 1511 1512// Size returns the expected size of the stream 1513// 1514// It is used in sftpFile.ReadFrom as a hint to work out the 1515// concurrency needed 1516func (sr *sizeReader) Size() int64 { 1517 return sr.size 1518} 1519 1520// Update a remote sftp file using the data <in> and ModTime from <src> 1521func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { 1522 o.fs.addSession() // Show session in use 1523 defer o.fs.removeSession() 1524 // Clear the hash cache since we are about to update the object 1525 o.md5sum = nil 1526 o.sha1sum = nil 1527 c, err := o.fs.getSftpConnection(ctx) 1528 if err != nil { 1529 return errors.Wrap(err, "Update") 1530 } 1531 file, err := c.sftpClient.OpenFile(o.path(), os.O_WRONLY|os.O_CREATE|os.O_TRUNC) 1532 o.fs.putSftpConnection(&c, err) 1533 if err != nil { 1534 return errors.Wrap(err, "Update Create failed") 1535 } 1536 // remove the file if upload failed 1537 remove := func() { 1538 c, removeErr := o.fs.getSftpConnection(ctx) 1539 if removeErr != nil { 1540 fs.Debugf(src, "Failed to open new SSH connection for delete: %v", removeErr) 1541 return 1542 } 1543 removeErr = c.sftpClient.Remove(o.path()) 1544 o.fs.putSftpConnection(&c, removeErr) 1545 if removeErr != nil { 1546 fs.Debugf(src, "Failed to remove: %v", removeErr) 1547 } else { 1548 fs.Debugf(src, "Removed after failed upload: %v", err) 1549 } 1550 } 1551 _, err = file.ReadFrom(&sizeReader{Reader: in, size: src.Size()}) 1552 if err != nil { 1553 remove() 1554 return errors.Wrap(err, "Update ReadFrom failed") 1555 } 1556 err = file.Close() 1557 if err != nil { 1558 remove() 1559 return errors.Wrap(err, "Update Close failed") 1560 } 1561 1562 // Set the mod time - this stats the object if o.fs.opt.SetModTime == true 1563 err = o.SetModTime(ctx, src.ModTime(ctx)) 1564 if err != nil { 1565 return errors.Wrap(err, "Update SetModTime failed") 1566 } 1567 1568 // Stat the file after the upload to read its stats back if o.fs.opt.SetModTime == false 1569 if !o.fs.opt.SetModTime { 1570 err = o.stat(ctx) 1571 if err == fs.ErrorObjectNotFound { 1572 // In the specific case of o.fs.opt.SetModTime == false 1573 // if the object wasn't found then don't return an error 1574 fs.Debugf(o, "Not found after upload with set_modtime=false so returning best guess") 1575 o.modTime = src.ModTime(ctx) 1576 o.size = src.Size() 1577 o.mode = os.FileMode(0666) // regular file 1578 } else if err != nil { 1579 return errors.Wrap(err, "Update stat failed") 1580 } 1581 } 1582 1583 return nil 1584} 1585 1586// Remove a remote sftp file object 1587func (o *Object) Remove(ctx context.Context) error { 1588 c, err := o.fs.getSftpConnection(ctx) 1589 if err != nil { 1590 return errors.Wrap(err, "Remove") 1591 } 1592 err = c.sftpClient.Remove(o.path()) 1593 o.fs.putSftpConnection(&c, err) 1594 return err 1595} 1596 1597// Check the interfaces are satisfied 1598var ( 1599 _ fs.Fs = &Fs{} 1600 _ fs.PutStreamer = &Fs{} 1601 _ fs.Mover = &Fs{} 1602 _ fs.DirMover = &Fs{} 1603 _ fs.Abouter = &Fs{} 1604 _ fs.Shutdowner = &Fs{} 1605 _ fs.Object = &Object{} 1606) 1607