1// Copyright (C) 2019 Storj Labs, Inc.
2// See LICENSE for copying information.
3
4package gracefulexit_test
5
6import (
7	"context"
8	"testing"
9	"time"
10
11	"github.com/stretchr/testify/require"
12	"go.uber.org/zap"
13
14	"storj.io/common/memory"
15	"storj.io/common/storj"
16	"storj.io/common/testcontext"
17	"storj.io/common/testrand"
18	"storj.io/storj/private/testplanet"
19	"storj.io/storj/satellite"
20	"storj.io/storj/satellite/gracefulexit"
21	"storj.io/storj/satellite/metabase"
22	"storj.io/storj/satellite/overlay"
23	"storj.io/storj/satellite/satellitedb/satellitedbtest"
24)
25
26func TestChore(t *testing.T) {
27	var maximumInactiveTimeFrame = time.Second * 1
28	testplanet.Run(t, testplanet.Config{
29		SatelliteCount:   1,
30		StorageNodeCount: 8,
31		UplinkCount:      1,
32		Reconfigure: testplanet.Reconfigure{
33			Satellite: testplanet.Combine(
34				func(log *zap.Logger, index int, config *satellite.Config) {
35					config.GracefulExit.MaxInactiveTimeFrame = maximumInactiveTimeFrame
36				},
37				testplanet.ReconfigureRS(4, 6, 8, 8),
38			),
39		},
40	}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
41		uplinkPeer := planet.Uplinks[0]
42		satellite := planet.Satellites[0]
43		exitingNode := planet.StorageNodes[1]
44
45		project, err := uplinkPeer.GetProject(ctx, satellite)
46		require.NoError(t, err)
47		defer func() { require.NoError(t, project.Close()) }()
48
49		satellite.GracefulExit.Chore.Loop.Pause()
50
51		err = uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB))
52		require.NoError(t, err)
53
54		err = uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path2", testrand.Bytes(5*memory.KiB))
55		require.NoError(t, err)
56
57		info, err := project.BeginUpload(ctx, "testbucket", "test/path3", nil)
58		require.NoError(t, err)
59
60		upload, err := project.UploadPart(ctx, "testbucket", "test/path3", info.UploadID, 1)
61		require.NoError(t, err)
62
63		_, err = upload.Write(testrand.Bytes(5 * memory.KiB))
64		require.NoError(t, err)
65		require.NoError(t, upload.Commit())
66
67		exitStatusRequest := overlay.ExitStatusRequest{
68			NodeID:          exitingNode.ID(),
69			ExitInitiatedAt: time.Now(),
70		}
71
72		_, err = satellite.Overlay.DB.UpdateExitStatus(ctx, &exitStatusRequest)
73		require.NoError(t, err)
74
75		exitingNodes, err := satellite.Overlay.DB.GetExitingNodes(ctx)
76		require.NoError(t, err)
77		nodeIDs := make(storj.NodeIDList, 0, len(exitingNodes))
78		for _, exitingNode := range exitingNodes {
79			if exitingNode.ExitLoopCompletedAt == nil {
80				nodeIDs = append(nodeIDs, exitingNode.NodeID)
81			}
82		}
83		require.Len(t, nodeIDs, 1)
84
85		satellite.GracefulExit.Chore.Loop.TriggerWait()
86
87		incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0)
88		require.NoError(t, err)
89		require.Len(t, incompleteTransfers, 3)
90		for _, incomplete := range incompleteTransfers {
91			require.True(t, incomplete.DurabilityRatio > 0)
92			require.NotNil(t, incomplete.RootPieceID)
93		}
94
95		// test the other nodes don't have anything to transfer
96		for _, node := range planet.StorageNodes {
97			if node.ID() == exitingNode.ID() {
98				continue
99			}
100			incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, node.ID(), 20, 0)
101			require.NoError(t, err)
102			require.Len(t, incompleteTransfers, 0)
103		}
104
105		exitingNodes, err = satellite.Overlay.DB.GetExitingNodes(ctx)
106		require.NoError(t, err)
107		nodeIDs = make(storj.NodeIDList, 0, len(exitingNodes))
108		for _, exitingNode := range exitingNodes {
109			if exitingNode.ExitLoopCompletedAt == nil {
110				nodeIDs = append(nodeIDs, exitingNode.NodeID)
111			}
112		}
113		require.Len(t, nodeIDs, 0)
114
115		satellite.GracefulExit.Chore.Loop.Pause()
116		err = satellite.DB.GracefulExit().IncrementProgress(ctx, exitingNode.ID(), 0, 0, 0)
117		require.NoError(t, err)
118
119		incompleteTransfers, err = satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0)
120		require.NoError(t, err)
121		require.Len(t, incompleteTransfers, 3)
122
123		// node should fail graceful exit if it has been inactive for maximum inactive time frame since last activity
124		time.Sleep(maximumInactiveTimeFrame + time.Second*1)
125		satellite.GracefulExit.Chore.Loop.TriggerWait()
126
127		exitStatus, err := satellite.Overlay.DB.GetExitStatus(ctx, exitingNode.ID())
128		require.NoError(t, err)
129		require.False(t, exitStatus.ExitSuccess)
130		require.NotNil(t, exitStatus.ExitFinishedAt)
131
132		incompleteTransfers, err = satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0)
133		require.NoError(t, err)
134		require.Len(t, incompleteTransfers, 0)
135
136	})
137}
138
139func TestDurabilityRatio(t *testing.T) {
140	const (
141		maximumInactiveTimeFrame = time.Second * 1
142		successThreshold         = 4
143	)
144	testplanet.Run(t, testplanet.Config{
145		SatelliteCount:   1,
146		StorageNodeCount: 4,
147		UplinkCount:      1,
148		Reconfigure: testplanet.Reconfigure{
149			Satellite: testplanet.Combine(
150				func(log *zap.Logger, index int, config *satellite.Config) {
151					config.GracefulExit.MaxInactiveTimeFrame = maximumInactiveTimeFrame
152				},
153				testplanet.ReconfigureRS(2, 3, successThreshold, 4),
154			),
155		},
156	}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
157		uplinkPeer := planet.Uplinks[0]
158		satellite := planet.Satellites[0]
159		nodeToRemove := planet.StorageNodes[0]
160		exitingNode := planet.StorageNodes[1]
161
162		project, err := uplinkPeer.GetProject(ctx, satellite)
163		require.NoError(t, err)
164		defer func() { require.NoError(t, project.Close()) }()
165		satellite.GracefulExit.Chore.Loop.Pause()
166
167		err = uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB))
168		require.NoError(t, err)
169
170		info, err := project.BeginUpload(ctx, "testbucket", "test/path2", nil)
171		require.NoError(t, err)
172
173		upload, err := project.UploadPart(ctx, "testbucket", "test/path2", info.UploadID, 1)
174		require.NoError(t, err)
175
176		_, err = upload.Write(testrand.Bytes(5 * memory.KiB))
177		require.NoError(t, err)
178		require.NoError(t, upload.Commit())
179
180		exitStatusRequest := overlay.ExitStatusRequest{
181			NodeID:          exitingNode.ID(),
182			ExitInitiatedAt: time.Now(),
183		}
184
185		_, err = satellite.Overlay.DB.UpdateExitStatus(ctx, &exitStatusRequest)
186		require.NoError(t, err)
187
188		exitingNodes, err := satellite.Overlay.DB.GetExitingNodes(ctx)
189		require.NoError(t, err)
190		nodeIDs := make(storj.NodeIDList, 0, len(exitingNodes))
191		for _, exitingNode := range exitingNodes {
192			if exitingNode.ExitLoopCompletedAt == nil {
193				nodeIDs = append(nodeIDs, exitingNode.NodeID)
194			}
195		}
196		require.Len(t, nodeIDs, 1)
197
198		// retrieve remote segment
199		segments, err := satellite.Metabase.DB.TestingAllSegments(ctx)
200		require.NoError(t, err)
201		require.Len(t, segments, 2)
202
203		for _, segment := range segments {
204			remotePieces := segment.Pieces
205			var newPieces metabase.Pieces = make(metabase.Pieces, len(remotePieces)-1)
206			idx := 0
207			for _, p := range remotePieces {
208				if p.StorageNode != nodeToRemove.ID() {
209					newPieces[idx] = p
210					idx++
211				}
212			}
213			err = satellite.Metabase.DB.UpdateSegmentPieces(ctx, metabase.UpdateSegmentPieces{
214				StreamID: segment.StreamID,
215				Position: segment.Position,
216
217				OldPieces:     segment.Pieces,
218				NewPieces:     newPieces,
219				NewRedundancy: segment.Redundancy,
220			})
221			require.NoError(t, err)
222		}
223
224		satellite.GracefulExit.Chore.Loop.TriggerWait()
225
226		incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0)
227		require.NoError(t, err)
228		require.Len(t, incompleteTransfers, 2)
229		for _, incomplete := range incompleteTransfers {
230			require.Equal(t, float64(successThreshold-1)/float64(successThreshold), incomplete.DurabilityRatio)
231			require.NotNil(t, incomplete.RootPieceID)
232		}
233	})
234}
235
236func BenchmarkChore(b *testing.B) {
237	satellitedbtest.Bench(b, func(b *testing.B, db satellite.DB) {
238		gracefulexitdb := db.GracefulExit()
239		ctx := context.Background()
240
241		b.Run("BatchUpdateStats-100", func(b *testing.B) {
242			batch(ctx, b, gracefulexitdb, 100)
243		})
244		if !testing.Short() {
245			b.Run("BatchUpdateStats-250", func(b *testing.B) {
246				batch(ctx, b, gracefulexitdb, 250)
247			})
248			b.Run("BatchUpdateStats-500", func(b *testing.B) {
249				batch(ctx, b, gracefulexitdb, 500)
250			})
251			b.Run("BatchUpdateStats-1000", func(b *testing.B) {
252				batch(ctx, b, gracefulexitdb, 1000)
253			})
254			b.Run("BatchUpdateStats-5000", func(b *testing.B) {
255				batch(ctx, b, gracefulexitdb, 5000)
256			})
257		}
258	})
259}
260func batch(ctx context.Context, b *testing.B, db gracefulexit.DB, size int) {
261	for i := 0; i < b.N; i++ {
262		var transferQueueItems []gracefulexit.TransferQueueItem
263		for j := 0; j < size; j++ {
264			item := gracefulexit.TransferQueueItem{
265				NodeID:          testrand.NodeID(),
266				StreamID:        testrand.UUID(),
267				Position:        metabase.SegmentPosition{},
268				PieceNum:        0,
269				DurabilityRatio: 1.0,
270			}
271			transferQueueItems = append(transferQueueItems, item)
272		}
273		batchSize := 1000
274		err := db.Enqueue(ctx, transferQueueItems, batchSize)
275		require.NoError(b, err)
276	}
277}
278