1package ftpd 2 3import ( 4 "errors" 5 "io" 6 "sync/atomic" 7 8 "github.com/eikenb/pipeat" 9 10 "github.com/drakkan/sftpgo/v2/common" 11 "github.com/drakkan/sftpgo/v2/vfs" 12) 13 14// transfer contains the transfer details for an upload or a download. 15// It implements the ftpserver.FileTransfer interface to handle files downloads and uploads 16type transfer struct { 17 *common.BaseTransfer 18 writer io.WriteCloser 19 reader io.ReadCloser 20 isFinished bool 21 expectedOffset int64 22} 23 24func newTransfer(baseTransfer *common.BaseTransfer, pipeWriter *vfs.PipeWriter, pipeReader *pipeat.PipeReaderAt, 25 expectedOffset int64) *transfer { 26 var writer io.WriteCloser 27 var reader io.ReadCloser 28 if baseTransfer.File != nil { 29 writer = baseTransfer.File 30 reader = baseTransfer.File 31 } else if pipeWriter != nil { 32 writer = pipeWriter 33 } else if pipeReader != nil { 34 reader = pipeReader 35 } 36 return &transfer{ 37 BaseTransfer: baseTransfer, 38 writer: writer, 39 reader: reader, 40 isFinished: false, 41 expectedOffset: expectedOffset, 42 } 43} 44 45// Read reads the contents to downloads. 46func (t *transfer) Read(p []byte) (n int, err error) { 47 t.Connection.UpdateLastActivity() 48 49 n, err = t.reader.Read(p) 50 atomic.AddInt64(&t.BytesSent, int64(n)) 51 52 if err != nil && err != io.EOF { 53 t.TransferError(err) 54 return 55 } 56 t.HandleThrottle() 57 return 58} 59 60// Write writes the uploaded contents. 61func (t *transfer) Write(p []byte) (n int, err error) { 62 t.Connection.UpdateLastActivity() 63 64 n, err = t.writer.Write(p) 65 atomic.AddInt64(&t.BytesReceived, int64(n)) 66 67 if t.MaxWriteSize > 0 && err == nil && atomic.LoadInt64(&t.BytesReceived) > t.MaxWriteSize { 68 err = t.Connection.GetQuotaExceededError() 69 } 70 if err != nil { 71 t.TransferError(err) 72 return 73 } 74 t.HandleThrottle() 75 return 76} 77 78// Seek sets the offset to resume an upload or a download 79func (t *transfer) Seek(offset int64, whence int) (int64, error) { 80 t.Connection.UpdateLastActivity() 81 if t.File != nil { 82 ret, err := t.File.Seek(offset, whence) 83 if err != nil { 84 t.TransferError(err) 85 } 86 return ret, err 87 } 88 if t.reader != nil && t.expectedOffset == offset && whence == io.SeekStart { 89 return offset, nil 90 } 91 t.TransferError(errors.New("seek is unsupported for this transfer")) 92 return 0, common.ErrOpUnsupported 93} 94 95// Close it is called when the transfer is completed. 96func (t *transfer) Close() error { 97 if err := t.setFinished(); err != nil { 98 return err 99 } 100 err := t.closeIO() 101 errBaseClose := t.BaseTransfer.Close() 102 if errBaseClose != nil { 103 err = errBaseClose 104 } 105 return t.Connection.GetFsError(t.Fs, err) 106} 107 108func (t *transfer) closeIO() error { 109 var err error 110 if t.File != nil { 111 err = t.File.Close() 112 } else if t.writer != nil { 113 err = t.writer.Close() 114 t.Lock() 115 // we set ErrTransfer here so quota is not updated, in this case the uploads are atomic 116 if err != nil && t.ErrTransfer == nil { 117 t.ErrTransfer = err 118 } 119 t.Unlock() 120 } else if t.reader != nil { 121 err = t.reader.Close() 122 } 123 return err 124} 125 126func (t *transfer) setFinished() error { 127 t.Lock() 128 defer t.Unlock() 129 if t.isFinished { 130 return common.ErrTransferClosed 131 } 132 t.isFinished = true 133 return nil 134} 135