1// Package transfer collects together adapters for uploading and downloading LFS content
2// NOTE: Subject to change, do not rely on this package from outside git-lfs source
3package tq
4
5import (
6	"fmt"
7	"time"
8
9	"github.com/git-lfs/git-lfs/v3/errors"
10	"github.com/git-lfs/git-lfs/v3/lfsapi"
11	"github.com/git-lfs/git-lfs/v3/tools"
12)
13
14type Direction int
15
16const (
17	Upload   = Direction(iota)
18	Download = Direction(iota)
19	Checkout = Direction(iota)
20)
21
22// Verb returns a string containing the verb form of the receiving action.
23func (d Direction) Verb() string {
24	switch d {
25	case Checkout:
26		return "Checking out"
27	case Download:
28		return "Downloading"
29	case Upload:
30		return "Uploading"
31	default:
32		return "<unknown>"
33	}
34}
35
36func (d Direction) String() string {
37	switch d {
38	case Checkout:
39		return "checkout"
40	case Download:
41		return "download"
42	case Upload:
43		return "upload"
44	default:
45		return "<unknown>"
46	}
47}
48
49type Transfer struct {
50	Name          string       `json:"name,omitempty"`
51	Oid           string       `json:"oid,omitempty"`
52	Size          int64        `json:"size"`
53	Authenticated bool         `json:"authenticated,omitempty"`
54	Actions       ActionSet    `json:"actions,omitempty"`
55	Links         ActionSet    `json:"_links,omitempty"`
56	Error         *ObjectError `json:"error,omitempty"`
57	Path          string       `json:"path,omitempty"`
58	Missing       bool         `json:"-"`
59}
60
61func (t *Transfer) Rel(name string) (*Action, error) {
62	a, err := t.Actions.Get(name)
63	if a != nil || err != nil {
64		return a, err
65	}
66
67	if t.Links != nil {
68		a, err := t.Links.Get(name)
69		if a != nil || err != nil {
70			return a, err
71		}
72	}
73
74	return nil, nil
75}
76
77type ObjectError struct {
78	Code    int    `json:"code"`
79	Message string `json:"message"`
80}
81
82func (e *ObjectError) Error() string {
83	return fmt.Sprintf("[%d] %s", e.Code, e.Message)
84}
85
86// newTransfer returns a copy of the given Transfer, with the name and path
87// values set.
88func newTransfer(tr *Transfer, name string, path string) *Transfer {
89	t := &Transfer{
90		Name:          name,
91		Path:          path,
92		Oid:           tr.Oid,
93		Size:          tr.Size,
94		Authenticated: tr.Authenticated,
95		Actions:       make(ActionSet),
96	}
97
98	if tr.Error != nil {
99		t.Error = &ObjectError{
100			Code:    tr.Error.Code,
101			Message: tr.Error.Message,
102		}
103	}
104
105	for rel, action := range tr.Actions {
106		t.Actions[rel] = &Action{
107			Href:      action.Href,
108			Header:    action.Header,
109			ExpiresAt: action.ExpiresAt,
110			ExpiresIn: action.ExpiresIn,
111			createdAt: action.createdAt,
112		}
113	}
114
115	if tr.Links != nil {
116		t.Links = make(ActionSet)
117
118		for rel, link := range tr.Links {
119			t.Links[rel] = &Action{
120				Href:      link.Href,
121				Header:    link.Header,
122				ExpiresAt: link.ExpiresAt,
123				ExpiresIn: link.ExpiresIn,
124				createdAt: link.createdAt,
125			}
126		}
127	}
128
129	return t
130}
131
132type Action struct {
133	Href      string            `json:"href"`
134	Header    map[string]string `json:"header,omitempty"`
135	ExpiresAt time.Time         `json:"expires_at,omitempty"`
136	ExpiresIn int               `json:"expires_in,omitempty"`
137	Id        string            `json:"-"`
138	Token     string            `json:"-"`
139
140	createdAt time.Time
141}
142
143func (a *Action) IsExpiredWithin(d time.Duration) (time.Time, bool) {
144	return tools.IsExpiredAtOrIn(a.createdAt, d, a.ExpiresAt, time.Duration(a.ExpiresIn)*time.Second)
145}
146
147type ActionSet map[string]*Action
148
149const (
150	// objectExpirationToTransfer is the duration we expect to have passed
151	// from the time that the object's expires_at (or expires_in) property
152	// is checked to when the transfer is executed.
153	objectExpirationToTransfer = 5 * time.Second
154)
155
156func (as ActionSet) Get(rel string) (*Action, error) {
157	a, ok := as[rel]
158	if !ok {
159		return nil, nil
160	}
161
162	if at, expired := a.IsExpiredWithin(objectExpirationToTransfer); expired {
163		return nil, errors.NewRetriableError(&ActionExpiredErr{Rel: rel, At: at})
164	}
165
166	return a, nil
167}
168
169type ActionExpiredErr struct {
170	Rel string
171	At  time.Time
172}
173
174func (e ActionExpiredErr) Error() string {
175	return fmt.Sprintf("tq: action %q expires at %s",
176		e.Rel, e.At.In(time.Local).Format(time.RFC822))
177}
178
179func IsActionExpiredError(err error) bool {
180	if _, ok := err.(*ActionExpiredErr); ok {
181		return true
182	}
183	return false
184}
185
186// NewAdapterFunc creates new instances of Adapter. Code that wishes
187// to provide new Adapter instances should pass an implementation of this
188// function to RegisterNewTransferAdapterFunc() on a *Manifest.
189// name and dir are to provide context if one func implements many instances
190type NewAdapterFunc func(name string, dir Direction) Adapter
191
192type ProgressCallback func(name string, totalSize, readSoFar int64, readSinceLast int) error
193
194type AdapterConfig interface {
195	APIClient() *lfsapi.Client
196	ConcurrentTransfers() int
197	Remote() string
198}
199
200type adapterConfig struct {
201	apiClient           *lfsapi.Client
202	concurrentTransfers int
203	remote              string
204}
205
206func (c *adapterConfig) ConcurrentTransfers() int {
207	return c.concurrentTransfers
208}
209
210func (c *adapterConfig) APIClient() *lfsapi.Client {
211	return c.apiClient
212}
213
214func (c *adapterConfig) Remote() string {
215	return c.remote
216}
217
218// Adapter is implemented by types which can upload and/or download LFS
219// file content to a remote store. Each Adapter accepts one or more requests
220// which it may schedule and parallelise in whatever way it chooses, clients of
221// this interface will receive notifications of progress and completion asynchronously.
222// TransferAdapters support transfers in one direction; if an implementation
223// provides support for upload and download, it should be instantiated twice,
224// advertising support for each direction separately.
225// Note that Adapter only implements the actual upload/download of content
226// itself; organising the wider process including calling the API to get URLs,
227// handling progress reporting and retries is the job of the core TransferQueue.
228// This is so that the orchestration remains core & standard but Adapter
229// can be changed to physically transfer to different hosts with less code.
230type Adapter interface {
231	// Name returns the name of this adapter, which is the same for all instances
232	// of this type of adapter
233	Name() string
234	// Direction returns whether this instance is an upload or download instance
235	// Adapter instances can only be one or the other, although the same
236	// type may be instantiated for each direction
237	Direction() Direction
238	// Begin a new batch of uploads or downloads. Call this first, followed by one
239	// or more Add calls. The passed in callback will receive updates on progress.
240	Begin(cfg AdapterConfig, cb ProgressCallback) error
241	// Add queues a download/upload, which will complete asynchronously and
242	// notify the callbacks given to Begin()
243	Add(transfers ...*Transfer) (results <-chan TransferResult)
244	// Indicate that all transfers have been scheduled and resources can be released
245	// once the queued items have completed.
246	// This call blocks until all items have been processed
247	End()
248}
249
250// Result of a transfer returned through CompletionChannel()
251type TransferResult struct {
252	Transfer *Transfer
253	// This will be non-nil if there was an error transferring this item
254	Error error
255}
256