1package sync
2
3import (
4	"container/heap"
5	"context"
6	"sync"
7	"sync/atomic"
8	"testing"
9
10	"github.com/rclone/rclone/fs"
11	"github.com/rclone/rclone/fstest/mockobject"
12	"github.com/stretchr/testify/assert"
13	"github.com/stretchr/testify/require"
14)
15
16// Check interface satisfied
17var _ heap.Interface = (*pipe)(nil)
18
19func TestPipe(t *testing.T) {
20	var queueLength int
21	var queueSize int64
22	stats := func(n int, size int64) {
23		queueLength, queueSize = n, size
24	}
25
26	// Make a new pipe
27	p, err := newPipe("", stats, 10)
28	require.NoError(t, err)
29
30	checkStats := func(expectedN int, expectedSize int64) {
31		n, size := p.Stats()
32		assert.Equal(t, expectedN, n)
33		assert.Equal(t, expectedSize, size)
34		assert.Equal(t, expectedN, queueLength)
35		assert.Equal(t, expectedSize, queueSize)
36	}
37
38	checkStats(0, 0)
39
40	ctx := context.Background()
41
42	obj1 := mockobject.New("potato").WithContent([]byte("hello"), mockobject.SeekModeNone)
43
44	pair1 := fs.ObjectPair{Src: obj1, Dst: nil}
45
46	// Put an object
47	ok := p.Put(ctx, pair1)
48	assert.Equal(t, true, ok)
49	checkStats(1, 5)
50
51	// Close the pipe showing reading on closed pipe is OK
52	p.Close()
53
54	// Read from pipe
55	pair2, ok := p.Get(ctx)
56	assert.Equal(t, pair1, pair2)
57	assert.Equal(t, true, ok)
58	checkStats(0, 0)
59
60	// Check read on closed pipe
61	pair2, ok = p.Get(ctx)
62	assert.Equal(t, fs.ObjectPair{}, pair2)
63	assert.Equal(t, false, ok)
64
65	// Check panic on write to closed pipe
66	assert.Panics(t, func() { p.Put(ctx, pair1) })
67
68	// Make a new pipe
69	p, err = newPipe("", stats, 10)
70	require.NoError(t, err)
71	ctx2, cancel := context.WithCancel(ctx)
72
73	// cancel it in the background - check read ceases
74	go cancel()
75	pair2, ok = p.Get(ctx2)
76	assert.Equal(t, fs.ObjectPair{}, pair2)
77	assert.Equal(t, false, ok)
78
79	// check we can't write
80	ok = p.Put(ctx2, pair1)
81	assert.Equal(t, false, ok)
82
83}
84
85// TestPipeConcurrent runs concurrent Get and Put to flush out any
86// race conditions and concurrency problems.
87func TestPipeConcurrent(t *testing.T) {
88	const (
89		N           = 1000
90		readWriters = 10
91	)
92
93	stats := func(n int, size int64) {}
94
95	// Make a new pipe
96	p, err := newPipe("", stats, 10)
97	require.NoError(t, err)
98
99	var wg sync.WaitGroup
100	obj1 := mockobject.New("potato").WithContent([]byte("hello"), mockobject.SeekModeNone)
101	pair1 := fs.ObjectPair{Src: obj1, Dst: nil}
102	ctx := context.Background()
103	var count int64
104
105	for j := 0; j < readWriters; j++ {
106		wg.Add(2)
107		go func() {
108			defer wg.Done()
109			for i := 0; i < N; i++ {
110				// Read from pipe
111				pair2, ok := p.Get(ctx)
112				assert.Equal(t, pair1, pair2)
113				assert.Equal(t, true, ok)
114				atomic.AddInt64(&count, -1)
115			}
116		}()
117		go func() {
118			defer wg.Done()
119			for i := 0; i < N; i++ {
120				// Put an object
121				ok := p.Put(ctx, pair1)
122				assert.Equal(t, true, ok)
123				atomic.AddInt64(&count, 1)
124			}
125		}()
126	}
127	wg.Wait()
128
129	assert.Equal(t, int64(0), count)
130}
131
132func TestPipeOrderBy(t *testing.T) {
133	var (
134		stats = func(n int, size int64) {}
135		ctx   = context.Background()
136		obj1  = mockobject.New("b").WithContent([]byte("1"), mockobject.SeekModeNone)
137		obj2  = mockobject.New("a").WithContent([]byte("22"), mockobject.SeekModeNone)
138		pair1 = fs.ObjectPair{Src: obj1}
139		pair2 = fs.ObjectPair{Src: obj2}
140	)
141
142	for _, test := range []struct {
143		orderBy  string
144		swapped1 bool
145		swapped2 bool
146		fraction int
147	}{
148		{"", false, true, -1},
149		{"size", false, false, -1},
150		{"name", true, true, -1},
151		{"modtime", false, true, -1},
152		{"size,ascending", false, false, -1},
153		{"name,asc", true, true, -1},
154		{"modtime,ascending", false, true, -1},
155		{"size,descending", true, true, -1},
156		{"name,desc", false, false, -1},
157		{"modtime,descending", true, false, -1},
158		{"size,mixed,50", false, false, 25},
159		{"size,mixed,51", true, true, 75},
160	} {
161		t.Run(test.orderBy, func(t *testing.T) {
162			p, err := newPipe(test.orderBy, stats, 10)
163			require.NoError(t, err)
164
165			readAndCheck := func(swapped bool) {
166				var readFirst, readSecond fs.ObjectPair
167				var ok1, ok2 bool
168				if test.fraction < 0 {
169					readFirst, ok1 = p.Get(ctx)
170					readSecond, ok2 = p.Get(ctx)
171				} else {
172					readFirst, ok1 = p.GetMax(ctx, test.fraction)
173					readSecond, ok2 = p.GetMax(ctx, test.fraction)
174				}
175				assert.True(t, ok1)
176				assert.True(t, ok2)
177
178				if swapped {
179					assert.True(t, readFirst == pair2 && readSecond == pair1)
180				} else {
181					assert.True(t, readFirst == pair1 && readSecond == pair2)
182				}
183			}
184
185			ok := p.Put(ctx, pair1)
186			assert.True(t, ok)
187			ok = p.Put(ctx, pair2)
188			assert.True(t, ok)
189
190			readAndCheck(test.swapped1)
191
192			// insert other way round
193
194			ok = p.Put(ctx, pair2)
195			assert.True(t, ok)
196			ok = p.Put(ctx, pair1)
197			assert.True(t, ok)
198
199			readAndCheck(test.swapped2)
200		})
201	}
202}
203
204func TestNewLess(t *testing.T) {
205	t.Run("blankOK", func(t *testing.T) {
206		less, _, err := newLess("")
207		require.NoError(t, err)
208		assert.Nil(t, less)
209	})
210
211	t.Run("tooManyParts", func(t *testing.T) {
212		_, _, err := newLess("size,asc,toomanyparts")
213		require.Error(t, err)
214		assert.Contains(t, err.Error(), "bad --order-by string")
215	})
216
217	t.Run("tooManyParts2", func(t *testing.T) {
218		_, _, err := newLess("size,mixed,50,toomanyparts")
219		require.Error(t, err)
220		assert.Contains(t, err.Error(), "bad --order-by string")
221	})
222
223	t.Run("badMixed", func(t *testing.T) {
224		_, _, err := newLess("size,mixed,32.7")
225		require.Error(t, err)
226		assert.Contains(t, err.Error(), "bad mixed fraction")
227	})
228
229	t.Run("unknownComparison", func(t *testing.T) {
230		_, _, err := newLess("potato")
231		require.Error(t, err)
232		assert.Contains(t, err.Error(), "unknown --order-by comparison")
233	})
234
235	t.Run("unknownSortDirection", func(t *testing.T) {
236		_, _, err := newLess("name,sideways")
237		require.Error(t, err)
238		assert.Contains(t, err.Error(), "unknown --order-by sort direction")
239	})
240
241	var (
242		obj1  = mockobject.New("b").WithContent([]byte("1"), mockobject.SeekModeNone)
243		obj2  = mockobject.New("a").WithContent([]byte("22"), mockobject.SeekModeNone)
244		pair1 = fs.ObjectPair{Src: obj1}
245		pair2 = fs.ObjectPair{Src: obj2}
246	)
247
248	for _, test := range []struct {
249		orderBy        string
250		pair1LessPair2 bool
251		pair2LessPair1 bool
252		wantFraction   int
253	}{
254		{"size", true, false, -1},
255		{"name", false, true, -1},
256		{"modtime", false, false, -1},
257		{"size,ascending", true, false, -1},
258		{"name,asc", false, true, -1},
259		{"modtime,ascending", false, false, -1},
260		{"size,descending", false, true, -1},
261		{"name,desc", true, false, -1},
262		{"modtime,descending", true, true, -1},
263		{"modtime,mixed", false, false, 50},
264		{"modtime,mixed,30", false, false, 30},
265	} {
266		t.Run(test.orderBy, func(t *testing.T) {
267			less, gotFraction, err := newLess(test.orderBy)
268			assert.Equal(t, test.wantFraction, gotFraction)
269			require.NoError(t, err)
270			require.NotNil(t, less)
271			pair1LessPair2 := less(pair1, pair2)
272			assert.Equal(t, test.pair1LessPair2, pair1LessPair2)
273			pair2LessPair1 := less(pair2, pair1)
274			assert.Equal(t, test.pair2LessPair1, pair2LessPair1)
275		})
276	}
277
278}
279