1package sync 2 3import ( 4 "container/heap" 5 "context" 6 "sync" 7 "sync/atomic" 8 "testing" 9 10 "github.com/rclone/rclone/fs" 11 "github.com/rclone/rclone/fstest/mockobject" 12 "github.com/stretchr/testify/assert" 13 "github.com/stretchr/testify/require" 14) 15 16// Check interface satisfied 17var _ heap.Interface = (*pipe)(nil) 18 19func TestPipe(t *testing.T) { 20 var queueLength int 21 var queueSize int64 22 stats := func(n int, size int64) { 23 queueLength, queueSize = n, size 24 } 25 26 // Make a new pipe 27 p, err := newPipe("", stats, 10) 28 require.NoError(t, err) 29 30 checkStats := func(expectedN int, expectedSize int64) { 31 n, size := p.Stats() 32 assert.Equal(t, expectedN, n) 33 assert.Equal(t, expectedSize, size) 34 assert.Equal(t, expectedN, queueLength) 35 assert.Equal(t, expectedSize, queueSize) 36 } 37 38 checkStats(0, 0) 39 40 ctx := context.Background() 41 42 obj1 := mockobject.New("potato").WithContent([]byte("hello"), mockobject.SeekModeNone) 43 44 pair1 := fs.ObjectPair{Src: obj1, Dst: nil} 45 46 // Put an object 47 ok := p.Put(ctx, pair1) 48 assert.Equal(t, true, ok) 49 checkStats(1, 5) 50 51 // Close the pipe showing reading on closed pipe is OK 52 p.Close() 53 54 // Read from pipe 55 pair2, ok := p.Get(ctx) 56 assert.Equal(t, pair1, pair2) 57 assert.Equal(t, true, ok) 58 checkStats(0, 0) 59 60 // Check read on closed pipe 61 pair2, ok = p.Get(ctx) 62 assert.Equal(t, fs.ObjectPair{}, pair2) 63 assert.Equal(t, false, ok) 64 65 // Check panic on write to closed pipe 66 assert.Panics(t, func() { p.Put(ctx, pair1) }) 67 68 // Make a new pipe 69 p, err = newPipe("", stats, 10) 70 require.NoError(t, err) 71 ctx2, cancel := context.WithCancel(ctx) 72 73 // cancel it in the background - check read ceases 74 go cancel() 75 pair2, ok = p.Get(ctx2) 76 assert.Equal(t, fs.ObjectPair{}, pair2) 77 assert.Equal(t, false, ok) 78 79 // check we can't write 80 ok = p.Put(ctx2, pair1) 81 assert.Equal(t, false, ok) 82 83} 84 85// TestPipeConcurrent runs concurrent Get and Put to flush out any 86// race conditions and concurrency problems. 87func TestPipeConcurrent(t *testing.T) { 88 const ( 89 N = 1000 90 readWriters = 10 91 ) 92 93 stats := func(n int, size int64) {} 94 95 // Make a new pipe 96 p, err := newPipe("", stats, 10) 97 require.NoError(t, err) 98 99 var wg sync.WaitGroup 100 obj1 := mockobject.New("potato").WithContent([]byte("hello"), mockobject.SeekModeNone) 101 pair1 := fs.ObjectPair{Src: obj1, Dst: nil} 102 ctx := context.Background() 103 var count int64 104 105 for j := 0; j < readWriters; j++ { 106 wg.Add(2) 107 go func() { 108 defer wg.Done() 109 for i := 0; i < N; i++ { 110 // Read from pipe 111 pair2, ok := p.Get(ctx) 112 assert.Equal(t, pair1, pair2) 113 assert.Equal(t, true, ok) 114 atomic.AddInt64(&count, -1) 115 } 116 }() 117 go func() { 118 defer wg.Done() 119 for i := 0; i < N; i++ { 120 // Put an object 121 ok := p.Put(ctx, pair1) 122 assert.Equal(t, true, ok) 123 atomic.AddInt64(&count, 1) 124 } 125 }() 126 } 127 wg.Wait() 128 129 assert.Equal(t, int64(0), count) 130} 131 132func TestPipeOrderBy(t *testing.T) { 133 var ( 134 stats = func(n int, size int64) {} 135 ctx = context.Background() 136 obj1 = mockobject.New("b").WithContent([]byte("1"), mockobject.SeekModeNone) 137 obj2 = mockobject.New("a").WithContent([]byte("22"), mockobject.SeekModeNone) 138 pair1 = fs.ObjectPair{Src: obj1} 139 pair2 = fs.ObjectPair{Src: obj2} 140 ) 141 142 for _, test := range []struct { 143 orderBy string 144 swapped1 bool 145 swapped2 bool 146 fraction int 147 }{ 148 {"", false, true, -1}, 149 {"size", false, false, -1}, 150 {"name", true, true, -1}, 151 {"modtime", false, true, -1}, 152 {"size,ascending", false, false, -1}, 153 {"name,asc", true, true, -1}, 154 {"modtime,ascending", false, true, -1}, 155 {"size,descending", true, true, -1}, 156 {"name,desc", false, false, -1}, 157 {"modtime,descending", true, false, -1}, 158 {"size,mixed,50", false, false, 25}, 159 {"size,mixed,51", true, true, 75}, 160 } { 161 t.Run(test.orderBy, func(t *testing.T) { 162 p, err := newPipe(test.orderBy, stats, 10) 163 require.NoError(t, err) 164 165 readAndCheck := func(swapped bool) { 166 var readFirst, readSecond fs.ObjectPair 167 var ok1, ok2 bool 168 if test.fraction < 0 { 169 readFirst, ok1 = p.Get(ctx) 170 readSecond, ok2 = p.Get(ctx) 171 } else { 172 readFirst, ok1 = p.GetMax(ctx, test.fraction) 173 readSecond, ok2 = p.GetMax(ctx, test.fraction) 174 } 175 assert.True(t, ok1) 176 assert.True(t, ok2) 177 178 if swapped { 179 assert.True(t, readFirst == pair2 && readSecond == pair1) 180 } else { 181 assert.True(t, readFirst == pair1 && readSecond == pair2) 182 } 183 } 184 185 ok := p.Put(ctx, pair1) 186 assert.True(t, ok) 187 ok = p.Put(ctx, pair2) 188 assert.True(t, ok) 189 190 readAndCheck(test.swapped1) 191 192 // insert other way round 193 194 ok = p.Put(ctx, pair2) 195 assert.True(t, ok) 196 ok = p.Put(ctx, pair1) 197 assert.True(t, ok) 198 199 readAndCheck(test.swapped2) 200 }) 201 } 202} 203 204func TestNewLess(t *testing.T) { 205 t.Run("blankOK", func(t *testing.T) { 206 less, _, err := newLess("") 207 require.NoError(t, err) 208 assert.Nil(t, less) 209 }) 210 211 t.Run("tooManyParts", func(t *testing.T) { 212 _, _, err := newLess("size,asc,toomanyparts") 213 require.Error(t, err) 214 assert.Contains(t, err.Error(), "bad --order-by string") 215 }) 216 217 t.Run("tooManyParts2", func(t *testing.T) { 218 _, _, err := newLess("size,mixed,50,toomanyparts") 219 require.Error(t, err) 220 assert.Contains(t, err.Error(), "bad --order-by string") 221 }) 222 223 t.Run("badMixed", func(t *testing.T) { 224 _, _, err := newLess("size,mixed,32.7") 225 require.Error(t, err) 226 assert.Contains(t, err.Error(), "bad mixed fraction") 227 }) 228 229 t.Run("unknownComparison", func(t *testing.T) { 230 _, _, err := newLess("potato") 231 require.Error(t, err) 232 assert.Contains(t, err.Error(), "unknown --order-by comparison") 233 }) 234 235 t.Run("unknownSortDirection", func(t *testing.T) { 236 _, _, err := newLess("name,sideways") 237 require.Error(t, err) 238 assert.Contains(t, err.Error(), "unknown --order-by sort direction") 239 }) 240 241 var ( 242 obj1 = mockobject.New("b").WithContent([]byte("1"), mockobject.SeekModeNone) 243 obj2 = mockobject.New("a").WithContent([]byte("22"), mockobject.SeekModeNone) 244 pair1 = fs.ObjectPair{Src: obj1} 245 pair2 = fs.ObjectPair{Src: obj2} 246 ) 247 248 for _, test := range []struct { 249 orderBy string 250 pair1LessPair2 bool 251 pair2LessPair1 bool 252 wantFraction int 253 }{ 254 {"size", true, false, -1}, 255 {"name", false, true, -1}, 256 {"modtime", false, false, -1}, 257 {"size,ascending", true, false, -1}, 258 {"name,asc", false, true, -1}, 259 {"modtime,ascending", false, false, -1}, 260 {"size,descending", false, true, -1}, 261 {"name,desc", true, false, -1}, 262 {"modtime,descending", true, true, -1}, 263 {"modtime,mixed", false, false, 50}, 264 {"modtime,mixed,30", false, false, 30}, 265 } { 266 t.Run(test.orderBy, func(t *testing.T) { 267 less, gotFraction, err := newLess(test.orderBy) 268 assert.Equal(t, test.wantFraction, gotFraction) 269 require.NoError(t, err) 270 require.NotNil(t, less) 271 pair1LessPair2 := less(pair1, pair2) 272 assert.Equal(t, test.pair1LessPair2, pair1LessPair2) 273 pair2LessPair1 := less(pair2, pair1) 274 assert.Equal(t, test.pair2LessPair1, pair2LessPair1) 275 }) 276 } 277 278} 279