1// Copyright (C) 2021 Storj Labs, Inc. 2// See LICENSE for copying information. 3 4package ultest 5 6import ( 7 "bytes" 8 "context" 9 "path/filepath" 10 "sort" 11 "strings" 12 "sync" 13 "time" 14 15 "github.com/zeebo/clingy" 16 "github.com/zeebo/errs" 17 18 "storj.io/storj/cmd/uplinkng/ulfs" 19 "storj.io/storj/cmd/uplinkng/ulloc" 20) 21 22// 23// ulfs.Filesystem 24// 25 26type testFilesystem struct { 27 stdin string 28 created int64 29 files map[ulloc.Location]memFileData 30 pending map[ulloc.Location][]*memWriteHandle 31 locals map[string]bool // true means path is a directory 32 buckets map[string]struct{} 33 34 mu sync.Mutex 35} 36 37func newTestFilesystem() *testFilesystem { 38 return &testFilesystem{ 39 files: make(map[ulloc.Location]memFileData), 40 pending: make(map[ulloc.Location][]*memWriteHandle), 41 locals: make(map[string]bool), 42 buckets: make(map[string]struct{}), 43 } 44} 45 46type memFileData struct { 47 contents string 48 created int64 49} 50 51func (tfs *testFilesystem) ensureBucket(name string) { 52 tfs.buckets[name] = struct{}{} 53} 54 55func (tfs *testFilesystem) Files() (files []File) { 56 for loc, mf := range tfs.files { 57 files = append(files, File{ 58 Loc: loc.String(), 59 Contents: mf.contents, 60 }) 61 } 62 sort.Slice(files, func(i, j int) bool { return files[i].less(files[j]) }) 63 return files 64} 65 66func (tfs *testFilesystem) Pending() (files []File) { 67 for loc, mh := range tfs.pending { 68 for _, h := range mh { 69 files = append(files, File{ 70 Loc: loc.String(), 71 Contents: h.buf.String(), 72 }) 73 } 74 } 75 sort.Slice(files, func(i, j int) bool { return files[i].less(files[j]) }) 76 return files 77} 78 79func (tfs *testFilesystem) Close() error { 80 return nil 81} 82 83func (tfs *testFilesystem) Open(ctx clingy.Context, loc ulloc.Location, opts *ulfs.OpenOptions) (_ ulfs.ReadHandle, err error) { 84 tfs.mu.Lock() 85 defer tfs.mu.Unlock() 86 87 if loc.Std() { 88 return &byteReadHandle{Buffer: bytes.NewBufferString("-")}, nil 89 } 90 91 mf, ok := tfs.files[loc] 92 if !ok { 93 return nil, errs.New("file does not exist %q", loc) 94 } 95 96 if opts != nil { 97 return &byteReadHandle{Buffer: bytes.NewBufferString(mf.contents[opts.Offset:(opts.Offset + opts.Length)])}, nil 98 } 99 100 return &byteReadHandle{Buffer: bytes.NewBufferString(mf.contents)}, nil 101} 102 103func (tfs *testFilesystem) Create(ctx clingy.Context, loc ulloc.Location) (_ ulfs.WriteHandle, err error) { 104 tfs.mu.Lock() 105 defer tfs.mu.Unlock() 106 107 if loc.Std() { 108 return new(discardWriteHandle), nil 109 } 110 111 if bucket, _, ok := loc.RemoteParts(); ok { 112 if _, ok := tfs.buckets[bucket]; !ok { 113 return nil, errs.New("bucket %q does not exist", bucket) 114 } 115 } 116 117 if path, ok := loc.LocalParts(); ok { 118 if loc.Directoryish() || tfs.isLocalDir(ctx, loc) { 119 return nil, errs.New("unable to open file for writing: %q", loc) 120 } 121 dir := ulloc.CleanPath(filepath.Dir(path)) 122 if err := tfs.mkdirAll(ctx, dir); err != nil { 123 return nil, err 124 } 125 } 126 127 tfs.created++ 128 wh := &memWriteHandle{ 129 buf: bytes.NewBuffer(nil), 130 loc: loc, 131 tfs: tfs, 132 cre: tfs.created, 133 } 134 135 if loc.Remote() { 136 tfs.pending[loc] = append(tfs.pending[loc], wh) 137 } 138 139 return wh, nil 140} 141 142func (tfs *testFilesystem) Move(ctx clingy.Context, source, dest ulloc.Location) error { 143 tfs.mu.Lock() 144 defer tfs.mu.Unlock() 145 146 mf, ok := tfs.files[source] 147 if !ok { 148 return errs.New("file does not exist %q", source) 149 } 150 delete(tfs.files, source) 151 tfs.files[dest] = mf 152 return nil 153} 154 155func (tfs *testFilesystem) Remove(ctx context.Context, loc ulloc.Location, opts *ulfs.RemoveOptions) error { 156 tfs.mu.Lock() 157 defer tfs.mu.Unlock() 158 159 if opts == nil || !opts.Pending { 160 delete(tfs.files, loc) 161 } else { 162 // TODO: Remove needs an API that understands that multiple pending files may exist 163 delete(tfs.pending, loc) 164 } 165 return nil 166} 167 168func (tfs *testFilesystem) List(ctx context.Context, prefix ulloc.Location, opts *ulfs.ListOptions) (ulfs.ObjectIterator, error) { 169 tfs.mu.Lock() 170 defer tfs.mu.Unlock() 171 172 if opts != nil && opts.Pending { 173 return tfs.listPending(ctx, prefix, opts) 174 } 175 176 prefixDir := prefix.AsDirectoryish() 177 178 var infos []ulfs.ObjectInfo 179 for loc, mf := range tfs.files { 180 if loc.HasPrefix(prefixDir) || loc == prefix { 181 infos = append(infos, ulfs.ObjectInfo{ 182 Loc: loc, 183 Created: time.Unix(mf.created, 0), 184 }) 185 } 186 } 187 188 sort.Sort(objectInfos(infos)) 189 190 if opts == nil || !opts.Recursive { 191 infos = collapseObjectInfos(prefix, infos) 192 } 193 194 return &objectInfoIterator{infos: infos}, nil 195} 196 197func (tfs *testFilesystem) listPending(ctx context.Context, prefix ulloc.Location, opts *ulfs.ListOptions) (ulfs.ObjectIterator, error) { 198 if prefix.Local() { 199 return &objectInfoIterator{}, nil 200 } 201 202 prefixDir := prefix.AsDirectoryish() 203 204 var infos []ulfs.ObjectInfo 205 for loc, whs := range tfs.pending { 206 if loc.HasPrefix(prefixDir) || loc == prefix { 207 for _, wh := range whs { 208 infos = append(infos, ulfs.ObjectInfo{ 209 Loc: loc, 210 Created: time.Unix(wh.cre, 0), 211 }) 212 } 213 } 214 } 215 216 sort.Sort(objectInfos(infos)) 217 218 if opts == nil || !opts.Recursive { 219 infos = collapseObjectInfos(prefix, infos) 220 } 221 222 return &objectInfoIterator{infos: infos}, nil 223} 224 225func (tfs *testFilesystem) IsLocalDir(ctx context.Context, loc ulloc.Location) (local bool) { 226 tfs.mu.Lock() 227 defer tfs.mu.Unlock() 228 229 return tfs.isLocalDir(ctx, loc) 230} 231 232func (tfs *testFilesystem) isLocalDir(ctx context.Context, loc ulloc.Location) (local bool) { 233 path, ok := loc.LocalParts() 234 return ok && (ulloc.CleanPath(path) == "." || tfs.locals[path]) 235} 236 237func (tfs *testFilesystem) Stat(ctx context.Context, loc ulloc.Location) (*ulfs.ObjectInfo, error) { 238 if loc.Std() { 239 return nil, errs.New("unable to stat loc %q", loc.Loc()) 240 } 241 242 mf, ok := tfs.files[loc] 243 if !ok { 244 return nil, errs.New("file does not exist: %q", loc.Loc()) 245 } 246 247 return &ulfs.ObjectInfo{ 248 Loc: loc, 249 Created: time.Unix(mf.created, 0), 250 ContentLength: int64(len(mf.contents)), 251 }, nil 252} 253 254func (tfs *testFilesystem) mkdirAll(ctx context.Context, dir string) error { 255 i := 0 256 for i < len(dir) { 257 slash := strings.Index(dir[i:], "/") 258 if slash == -1 { 259 break 260 } 261 if err := tfs.mkdir(ctx, dir[:i+slash]); err != nil { 262 return err 263 } 264 i += slash + 1 265 } 266 if len(dir) > 0 { 267 return tfs.mkdir(ctx, dir) 268 } 269 return nil 270} 271 272func (tfs *testFilesystem) mkdir(ctx context.Context, dir string) error { 273 if isDir, ok := tfs.locals[dir]; ok && !isDir { 274 return errs.New("cannot create directory: %q is a file", dir) 275 } 276 tfs.locals[dir] = true 277 return nil 278} 279 280// 281// ulfs.ReadHandle 282// 283 284type byteReadHandle struct { 285 *bytes.Buffer 286} 287 288func (b *byteReadHandle) Close() error { return nil } 289func (b *byteReadHandle) Info() ulfs.ObjectInfo { return ulfs.ObjectInfo{} } 290 291// 292// ulfs.WriteHandle 293// 294 295type memWriteHandle struct { 296 buf *bytes.Buffer 297 loc ulloc.Location 298 tfs *testFilesystem 299 cre int64 300 done bool 301} 302 303func (b *memWriteHandle) Write(p []byte) (int, error) { 304 return b.buf.Write(p) 305} 306 307func (b *memWriteHandle) Commit() error { 308 b.tfs.mu.Lock() 309 defer b.tfs.mu.Unlock() 310 311 if err := b.close(); err != nil { 312 return err 313 } 314 315 if path, ok := b.loc.LocalParts(); ok { 316 b.tfs.locals[path] = false 317 } 318 319 b.tfs.files[b.loc] = memFileData{ 320 contents: b.buf.String(), 321 created: b.cre, 322 } 323 return nil 324} 325 326func (b *memWriteHandle) Abort() error { 327 b.tfs.mu.Lock() 328 defer b.tfs.mu.Unlock() 329 330 if err := b.close(); err != nil { 331 return err 332 } 333 334 return nil 335} 336 337func (b *memWriteHandle) close() error { 338 if b.done { 339 return errs.New("already done") 340 } 341 b.done = true 342 343 handles := b.tfs.pending[b.loc] 344 for i, v := range handles { 345 if v == b { 346 handles = append(handles[:i], handles[i+1:]...) 347 break 348 } 349 } 350 351 if len(handles) > 0 { 352 b.tfs.pending[b.loc] = handles 353 } else { 354 delete(b.tfs.pending, b.loc) 355 } 356 357 return nil 358} 359 360type discardWriteHandle struct{} 361 362func (discardWriteHandle) Write(p []byte) (int, error) { return len(p), nil } 363func (discardWriteHandle) Commit() error { return nil } 364func (discardWriteHandle) Abort() error { return nil } 365 366// 367// ulfs.ObjectIterator 368// 369 370type objectInfoIterator struct { 371 infos []ulfs.ObjectInfo 372 current ulfs.ObjectInfo 373} 374 375func (li *objectInfoIterator) Next() bool { 376 if len(li.infos) == 0 { 377 return false 378 } 379 li.current, li.infos = li.infos[0], li.infos[1:] 380 return true 381} 382 383func (li *objectInfoIterator) Err() error { 384 return nil 385} 386 387func (li *objectInfoIterator) Item() ulfs.ObjectInfo { 388 return li.current 389} 390 391type objectInfos []ulfs.ObjectInfo 392 393func (ois objectInfos) Len() int { return len(ois) } 394func (ois objectInfos) Swap(i int, j int) { ois[i], ois[j] = ois[j], ois[i] } 395func (ois objectInfos) Less(i int, j int) bool { return ois[i].Loc.Less(ois[j].Loc) } 396 397func collapseObjectInfos(prefix ulloc.Location, infos []ulfs.ObjectInfo) []ulfs.ObjectInfo { 398 collapsing := false 399 current := "" 400 j := 0 401 402 for _, oi := range infos { 403 first, ok := oi.Loc.ListKeyName(prefix) 404 if ok { 405 if collapsing && first == current { 406 continue 407 } 408 409 collapsing = true 410 current = first 411 412 oi.IsPrefix = true 413 } 414 415 if bucket, _, ok := oi.Loc.RemoteParts(); ok { 416 oi.Loc = ulloc.NewRemote(bucket, first) 417 } else if _, ok := oi.Loc.LocalParts(); ok { 418 oi.Loc = ulloc.NewLocal(first) 419 } else { 420 panic("invalid object returned from list") 421 } 422 423 infos[j] = oi 424 j++ 425 } 426 427 return infos[:j] 428} 429