1// Package transfer collects together adapters for uploading and downloading LFS content 2// NOTE: Subject to change, do not rely on this package from outside git-lfs source 3package tq 4 5import ( 6 "fmt" 7 "time" 8 9 "github.com/git-lfs/git-lfs/v3/errors" 10 "github.com/git-lfs/git-lfs/v3/lfsapi" 11 "github.com/git-lfs/git-lfs/v3/tools" 12) 13 14type Direction int 15 16const ( 17 Upload = Direction(iota) 18 Download = Direction(iota) 19 Checkout = Direction(iota) 20) 21 22// Verb returns a string containing the verb form of the receiving action. 23func (d Direction) Verb() string { 24 switch d { 25 case Checkout: 26 return "Checking out" 27 case Download: 28 return "Downloading" 29 case Upload: 30 return "Uploading" 31 default: 32 return "<unknown>" 33 } 34} 35 36func (d Direction) String() string { 37 switch d { 38 case Checkout: 39 return "checkout" 40 case Download: 41 return "download" 42 case Upload: 43 return "upload" 44 default: 45 return "<unknown>" 46 } 47} 48 49type Transfer struct { 50 Name string `json:"name,omitempty"` 51 Oid string `json:"oid,omitempty"` 52 Size int64 `json:"size"` 53 Authenticated bool `json:"authenticated,omitempty"` 54 Actions ActionSet `json:"actions,omitempty"` 55 Links ActionSet `json:"_links,omitempty"` 56 Error *ObjectError `json:"error,omitempty"` 57 Path string `json:"path,omitempty"` 58 Missing bool `json:"-"` 59} 60 61func (t *Transfer) Rel(name string) (*Action, error) { 62 a, err := t.Actions.Get(name) 63 if a != nil || err != nil { 64 return a, err 65 } 66 67 if t.Links != nil { 68 a, err := t.Links.Get(name) 69 if a != nil || err != nil { 70 return a, err 71 } 72 } 73 74 return nil, nil 75} 76 77type ObjectError struct { 78 Code int `json:"code"` 79 Message string `json:"message"` 80} 81 82func (e *ObjectError) Error() string { 83 return fmt.Sprintf("[%d] %s", e.Code, e.Message) 84} 85 86// newTransfer returns a copy of the given Transfer, with the name and path 87// values set. 88func newTransfer(tr *Transfer, name string, path string) *Transfer { 89 t := &Transfer{ 90 Name: name, 91 Path: path, 92 Oid: tr.Oid, 93 Size: tr.Size, 94 Authenticated: tr.Authenticated, 95 Actions: make(ActionSet), 96 } 97 98 if tr.Error != nil { 99 t.Error = &ObjectError{ 100 Code: tr.Error.Code, 101 Message: tr.Error.Message, 102 } 103 } 104 105 for rel, action := range tr.Actions { 106 t.Actions[rel] = &Action{ 107 Href: action.Href, 108 Header: action.Header, 109 ExpiresAt: action.ExpiresAt, 110 ExpiresIn: action.ExpiresIn, 111 createdAt: action.createdAt, 112 } 113 } 114 115 if tr.Links != nil { 116 t.Links = make(ActionSet) 117 118 for rel, link := range tr.Links { 119 t.Links[rel] = &Action{ 120 Href: link.Href, 121 Header: link.Header, 122 ExpiresAt: link.ExpiresAt, 123 ExpiresIn: link.ExpiresIn, 124 createdAt: link.createdAt, 125 } 126 } 127 } 128 129 return t 130} 131 132type Action struct { 133 Href string `json:"href"` 134 Header map[string]string `json:"header,omitempty"` 135 ExpiresAt time.Time `json:"expires_at,omitempty"` 136 ExpiresIn int `json:"expires_in,omitempty"` 137 Id string `json:"-"` 138 Token string `json:"-"` 139 140 createdAt time.Time 141} 142 143func (a *Action) IsExpiredWithin(d time.Duration) (time.Time, bool) { 144 return tools.IsExpiredAtOrIn(a.createdAt, d, a.ExpiresAt, time.Duration(a.ExpiresIn)*time.Second) 145} 146 147type ActionSet map[string]*Action 148 149const ( 150 // objectExpirationToTransfer is the duration we expect to have passed 151 // from the time that the object's expires_at (or expires_in) property 152 // is checked to when the transfer is executed. 153 objectExpirationToTransfer = 5 * time.Second 154) 155 156func (as ActionSet) Get(rel string) (*Action, error) { 157 a, ok := as[rel] 158 if !ok { 159 return nil, nil 160 } 161 162 if at, expired := a.IsExpiredWithin(objectExpirationToTransfer); expired { 163 return nil, errors.NewRetriableError(&ActionExpiredErr{Rel: rel, At: at}) 164 } 165 166 return a, nil 167} 168 169type ActionExpiredErr struct { 170 Rel string 171 At time.Time 172} 173 174func (e ActionExpiredErr) Error() string { 175 return fmt.Sprintf("tq: action %q expires at %s", 176 e.Rel, e.At.In(time.Local).Format(time.RFC822)) 177} 178 179func IsActionExpiredError(err error) bool { 180 if _, ok := err.(*ActionExpiredErr); ok { 181 return true 182 } 183 return false 184} 185 186// NewAdapterFunc creates new instances of Adapter. Code that wishes 187// to provide new Adapter instances should pass an implementation of this 188// function to RegisterNewTransferAdapterFunc() on a *Manifest. 189// name and dir are to provide context if one func implements many instances 190type NewAdapterFunc func(name string, dir Direction) Adapter 191 192type ProgressCallback func(name string, totalSize, readSoFar int64, readSinceLast int) error 193 194type AdapterConfig interface { 195 APIClient() *lfsapi.Client 196 ConcurrentTransfers() int 197 Remote() string 198} 199 200type adapterConfig struct { 201 apiClient *lfsapi.Client 202 concurrentTransfers int 203 remote string 204} 205 206func (c *adapterConfig) ConcurrentTransfers() int { 207 return c.concurrentTransfers 208} 209 210func (c *adapterConfig) APIClient() *lfsapi.Client { 211 return c.apiClient 212} 213 214func (c *adapterConfig) Remote() string { 215 return c.remote 216} 217 218// Adapter is implemented by types which can upload and/or download LFS 219// file content to a remote store. Each Adapter accepts one or more requests 220// which it may schedule and parallelise in whatever way it chooses, clients of 221// this interface will receive notifications of progress and completion asynchronously. 222// TransferAdapters support transfers in one direction; if an implementation 223// provides support for upload and download, it should be instantiated twice, 224// advertising support for each direction separately. 225// Note that Adapter only implements the actual upload/download of content 226// itself; organising the wider process including calling the API to get URLs, 227// handling progress reporting and retries is the job of the core TransferQueue. 228// This is so that the orchestration remains core & standard but Adapter 229// can be changed to physically transfer to different hosts with less code. 230type Adapter interface { 231 // Name returns the name of this adapter, which is the same for all instances 232 // of this type of adapter 233 Name() string 234 // Direction returns whether this instance is an upload or download instance 235 // Adapter instances can only be one or the other, although the same 236 // type may be instantiated for each direction 237 Direction() Direction 238 // Begin a new batch of uploads or downloads. Call this first, followed by one 239 // or more Add calls. The passed in callback will receive updates on progress. 240 Begin(cfg AdapterConfig, cb ProgressCallback) error 241 // Add queues a download/upload, which will complete asynchronously and 242 // notify the callbacks given to Begin() 243 Add(transfers ...*Transfer) (results <-chan TransferResult) 244 // Indicate that all transfers have been scheduled and resources can be released 245 // once the queued items have completed. 246 // This call blocks until all items have been processed 247 End() 248} 249 250// Result of a transfer returned through CompletionChannel() 251type TransferResult struct { 252 Transfer *Transfer 253 // This will be non-nil if there was an error transferring this item 254 Error error 255} 256