1package ftpd
2
3import (
4	"errors"
5	"io"
6	"sync/atomic"
7
8	"github.com/eikenb/pipeat"
9
10	"github.com/drakkan/sftpgo/v2/common"
11	"github.com/drakkan/sftpgo/v2/vfs"
12)
13
14// transfer contains the transfer details for an upload or a download.
15// It implements the ftpserver.FileTransfer interface to handle files downloads and uploads
16type transfer struct {
17	*common.BaseTransfer
18	writer         io.WriteCloser
19	reader         io.ReadCloser
20	isFinished     bool
21	expectedOffset int64
22}
23
24func newTransfer(baseTransfer *common.BaseTransfer, pipeWriter *vfs.PipeWriter, pipeReader *pipeat.PipeReaderAt,
25	expectedOffset int64) *transfer {
26	var writer io.WriteCloser
27	var reader io.ReadCloser
28	if baseTransfer.File != nil {
29		writer = baseTransfer.File
30		reader = baseTransfer.File
31	} else if pipeWriter != nil {
32		writer = pipeWriter
33	} else if pipeReader != nil {
34		reader = pipeReader
35	}
36	return &transfer{
37		BaseTransfer:   baseTransfer,
38		writer:         writer,
39		reader:         reader,
40		isFinished:     false,
41		expectedOffset: expectedOffset,
42	}
43}
44
45// Read reads the contents to downloads.
46func (t *transfer) Read(p []byte) (n int, err error) {
47	t.Connection.UpdateLastActivity()
48
49	n, err = t.reader.Read(p)
50	atomic.AddInt64(&t.BytesSent, int64(n))
51
52	if err != nil && err != io.EOF {
53		t.TransferError(err)
54		return
55	}
56	t.HandleThrottle()
57	return
58}
59
60// Write writes the uploaded contents.
61func (t *transfer) Write(p []byte) (n int, err error) {
62	t.Connection.UpdateLastActivity()
63
64	n, err = t.writer.Write(p)
65	atomic.AddInt64(&t.BytesReceived, int64(n))
66
67	if t.MaxWriteSize > 0 && err == nil && atomic.LoadInt64(&t.BytesReceived) > t.MaxWriteSize {
68		err = t.Connection.GetQuotaExceededError()
69	}
70	if err != nil {
71		t.TransferError(err)
72		return
73	}
74	t.HandleThrottle()
75	return
76}
77
78// Seek sets the offset to resume an upload or a download
79func (t *transfer) Seek(offset int64, whence int) (int64, error) {
80	t.Connection.UpdateLastActivity()
81	if t.File != nil {
82		ret, err := t.File.Seek(offset, whence)
83		if err != nil {
84			t.TransferError(err)
85		}
86		return ret, err
87	}
88	if t.reader != nil && t.expectedOffset == offset && whence == io.SeekStart {
89		return offset, nil
90	}
91	t.TransferError(errors.New("seek is unsupported for this transfer"))
92	return 0, common.ErrOpUnsupported
93}
94
95// Close it is called when the transfer is completed.
96func (t *transfer) Close() error {
97	if err := t.setFinished(); err != nil {
98		return err
99	}
100	err := t.closeIO()
101	errBaseClose := t.BaseTransfer.Close()
102	if errBaseClose != nil {
103		err = errBaseClose
104	}
105	return t.Connection.GetFsError(t.Fs, err)
106}
107
108func (t *transfer) closeIO() error {
109	var err error
110	if t.File != nil {
111		err = t.File.Close()
112	} else if t.writer != nil {
113		err = t.writer.Close()
114		t.Lock()
115		// we set ErrTransfer here so quota is not updated, in this case the uploads are atomic
116		if err != nil && t.ErrTransfer == nil {
117			t.ErrTransfer = err
118		}
119		t.Unlock()
120	} else if t.reader != nil {
121		err = t.reader.Close()
122	}
123	return err
124}
125
126func (t *transfer) setFinished() error {
127	t.Lock()
128	defer t.Unlock()
129	if t.isFinished {
130		return common.ErrTransferClosed
131	}
132	t.isFinished = true
133	return nil
134}
135