1// Copyright (C) 2019 Storj Labs, Inc. 2// See LICENSE for copying information. 3 4package gracefulexit_test 5 6import ( 7 "context" 8 "testing" 9 "time" 10 11 "github.com/stretchr/testify/require" 12 "go.uber.org/zap" 13 14 "storj.io/common/memory" 15 "storj.io/common/storj" 16 "storj.io/common/testcontext" 17 "storj.io/common/testrand" 18 "storj.io/storj/private/testplanet" 19 "storj.io/storj/satellite" 20 "storj.io/storj/satellite/gracefulexit" 21 "storj.io/storj/satellite/metabase" 22 "storj.io/storj/satellite/overlay" 23 "storj.io/storj/satellite/satellitedb/satellitedbtest" 24) 25 26func TestChore(t *testing.T) { 27 var maximumInactiveTimeFrame = time.Second * 1 28 testplanet.Run(t, testplanet.Config{ 29 SatelliteCount: 1, 30 StorageNodeCount: 8, 31 UplinkCount: 1, 32 Reconfigure: testplanet.Reconfigure{ 33 Satellite: testplanet.Combine( 34 func(log *zap.Logger, index int, config *satellite.Config) { 35 config.GracefulExit.MaxInactiveTimeFrame = maximumInactiveTimeFrame 36 }, 37 testplanet.ReconfigureRS(4, 6, 8, 8), 38 ), 39 }, 40 }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { 41 uplinkPeer := planet.Uplinks[0] 42 satellite := planet.Satellites[0] 43 exitingNode := planet.StorageNodes[1] 44 45 project, err := uplinkPeer.GetProject(ctx, satellite) 46 require.NoError(t, err) 47 defer func() { require.NoError(t, project.Close()) }() 48 49 satellite.GracefulExit.Chore.Loop.Pause() 50 51 err = uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB)) 52 require.NoError(t, err) 53 54 err = uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path2", testrand.Bytes(5*memory.KiB)) 55 require.NoError(t, err) 56 57 info, err := project.BeginUpload(ctx, "testbucket", "test/path3", nil) 58 require.NoError(t, err) 59 60 upload, err := project.UploadPart(ctx, "testbucket", "test/path3", info.UploadID, 1) 61 require.NoError(t, err) 62 63 _, err = upload.Write(testrand.Bytes(5 * memory.KiB)) 64 require.NoError(t, err) 65 require.NoError(t, upload.Commit()) 66 67 exitStatusRequest := overlay.ExitStatusRequest{ 68 NodeID: exitingNode.ID(), 69 ExitInitiatedAt: time.Now(), 70 } 71 72 _, err = satellite.Overlay.DB.UpdateExitStatus(ctx, &exitStatusRequest) 73 require.NoError(t, err) 74 75 exitingNodes, err := satellite.Overlay.DB.GetExitingNodes(ctx) 76 require.NoError(t, err) 77 nodeIDs := make(storj.NodeIDList, 0, len(exitingNodes)) 78 for _, exitingNode := range exitingNodes { 79 if exitingNode.ExitLoopCompletedAt == nil { 80 nodeIDs = append(nodeIDs, exitingNode.NodeID) 81 } 82 } 83 require.Len(t, nodeIDs, 1) 84 85 satellite.GracefulExit.Chore.Loop.TriggerWait() 86 87 incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0) 88 require.NoError(t, err) 89 require.Len(t, incompleteTransfers, 3) 90 for _, incomplete := range incompleteTransfers { 91 require.True(t, incomplete.DurabilityRatio > 0) 92 require.NotNil(t, incomplete.RootPieceID) 93 } 94 95 // test the other nodes don't have anything to transfer 96 for _, node := range planet.StorageNodes { 97 if node.ID() == exitingNode.ID() { 98 continue 99 } 100 incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, node.ID(), 20, 0) 101 require.NoError(t, err) 102 require.Len(t, incompleteTransfers, 0) 103 } 104 105 exitingNodes, err = satellite.Overlay.DB.GetExitingNodes(ctx) 106 require.NoError(t, err) 107 nodeIDs = make(storj.NodeIDList, 0, len(exitingNodes)) 108 for _, exitingNode := range exitingNodes { 109 if exitingNode.ExitLoopCompletedAt == nil { 110 nodeIDs = append(nodeIDs, exitingNode.NodeID) 111 } 112 } 113 require.Len(t, nodeIDs, 0) 114 115 satellite.GracefulExit.Chore.Loop.Pause() 116 err = satellite.DB.GracefulExit().IncrementProgress(ctx, exitingNode.ID(), 0, 0, 0) 117 require.NoError(t, err) 118 119 incompleteTransfers, err = satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0) 120 require.NoError(t, err) 121 require.Len(t, incompleteTransfers, 3) 122 123 // node should fail graceful exit if it has been inactive for maximum inactive time frame since last activity 124 time.Sleep(maximumInactiveTimeFrame + time.Second*1) 125 satellite.GracefulExit.Chore.Loop.TriggerWait() 126 127 exitStatus, err := satellite.Overlay.DB.GetExitStatus(ctx, exitingNode.ID()) 128 require.NoError(t, err) 129 require.False(t, exitStatus.ExitSuccess) 130 require.NotNil(t, exitStatus.ExitFinishedAt) 131 132 incompleteTransfers, err = satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0) 133 require.NoError(t, err) 134 require.Len(t, incompleteTransfers, 0) 135 136 }) 137} 138 139func TestDurabilityRatio(t *testing.T) { 140 const ( 141 maximumInactiveTimeFrame = time.Second * 1 142 successThreshold = 4 143 ) 144 testplanet.Run(t, testplanet.Config{ 145 SatelliteCount: 1, 146 StorageNodeCount: 4, 147 UplinkCount: 1, 148 Reconfigure: testplanet.Reconfigure{ 149 Satellite: testplanet.Combine( 150 func(log *zap.Logger, index int, config *satellite.Config) { 151 config.GracefulExit.MaxInactiveTimeFrame = maximumInactiveTimeFrame 152 }, 153 testplanet.ReconfigureRS(2, 3, successThreshold, 4), 154 ), 155 }, 156 }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { 157 uplinkPeer := planet.Uplinks[0] 158 satellite := planet.Satellites[0] 159 nodeToRemove := planet.StorageNodes[0] 160 exitingNode := planet.StorageNodes[1] 161 162 project, err := uplinkPeer.GetProject(ctx, satellite) 163 require.NoError(t, err) 164 defer func() { require.NoError(t, project.Close()) }() 165 satellite.GracefulExit.Chore.Loop.Pause() 166 167 err = uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB)) 168 require.NoError(t, err) 169 170 info, err := project.BeginUpload(ctx, "testbucket", "test/path2", nil) 171 require.NoError(t, err) 172 173 upload, err := project.UploadPart(ctx, "testbucket", "test/path2", info.UploadID, 1) 174 require.NoError(t, err) 175 176 _, err = upload.Write(testrand.Bytes(5 * memory.KiB)) 177 require.NoError(t, err) 178 require.NoError(t, upload.Commit()) 179 180 exitStatusRequest := overlay.ExitStatusRequest{ 181 NodeID: exitingNode.ID(), 182 ExitInitiatedAt: time.Now(), 183 } 184 185 _, err = satellite.Overlay.DB.UpdateExitStatus(ctx, &exitStatusRequest) 186 require.NoError(t, err) 187 188 exitingNodes, err := satellite.Overlay.DB.GetExitingNodes(ctx) 189 require.NoError(t, err) 190 nodeIDs := make(storj.NodeIDList, 0, len(exitingNodes)) 191 for _, exitingNode := range exitingNodes { 192 if exitingNode.ExitLoopCompletedAt == nil { 193 nodeIDs = append(nodeIDs, exitingNode.NodeID) 194 } 195 } 196 require.Len(t, nodeIDs, 1) 197 198 // retrieve remote segment 199 segments, err := satellite.Metabase.DB.TestingAllSegments(ctx) 200 require.NoError(t, err) 201 require.Len(t, segments, 2) 202 203 for _, segment := range segments { 204 remotePieces := segment.Pieces 205 var newPieces metabase.Pieces = make(metabase.Pieces, len(remotePieces)-1) 206 idx := 0 207 for _, p := range remotePieces { 208 if p.StorageNode != nodeToRemove.ID() { 209 newPieces[idx] = p 210 idx++ 211 } 212 } 213 err = satellite.Metabase.DB.UpdateSegmentPieces(ctx, metabase.UpdateSegmentPieces{ 214 StreamID: segment.StreamID, 215 Position: segment.Position, 216 217 OldPieces: segment.Pieces, 218 NewPieces: newPieces, 219 NewRedundancy: segment.Redundancy, 220 }) 221 require.NoError(t, err) 222 } 223 224 satellite.GracefulExit.Chore.Loop.TriggerWait() 225 226 incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0) 227 require.NoError(t, err) 228 require.Len(t, incompleteTransfers, 2) 229 for _, incomplete := range incompleteTransfers { 230 require.Equal(t, float64(successThreshold-1)/float64(successThreshold), incomplete.DurabilityRatio) 231 require.NotNil(t, incomplete.RootPieceID) 232 } 233 }) 234} 235 236func BenchmarkChore(b *testing.B) { 237 satellitedbtest.Bench(b, func(b *testing.B, db satellite.DB) { 238 gracefulexitdb := db.GracefulExit() 239 ctx := context.Background() 240 241 b.Run("BatchUpdateStats-100", func(b *testing.B) { 242 batch(ctx, b, gracefulexitdb, 100) 243 }) 244 if !testing.Short() { 245 b.Run("BatchUpdateStats-250", func(b *testing.B) { 246 batch(ctx, b, gracefulexitdb, 250) 247 }) 248 b.Run("BatchUpdateStats-500", func(b *testing.B) { 249 batch(ctx, b, gracefulexitdb, 500) 250 }) 251 b.Run("BatchUpdateStats-1000", func(b *testing.B) { 252 batch(ctx, b, gracefulexitdb, 1000) 253 }) 254 b.Run("BatchUpdateStats-5000", func(b *testing.B) { 255 batch(ctx, b, gracefulexitdb, 5000) 256 }) 257 } 258 }) 259} 260func batch(ctx context.Context, b *testing.B, db gracefulexit.DB, size int) { 261 for i := 0; i < b.N; i++ { 262 var transferQueueItems []gracefulexit.TransferQueueItem 263 for j := 0; j < size; j++ { 264 item := gracefulexit.TransferQueueItem{ 265 NodeID: testrand.NodeID(), 266 StreamID: testrand.UUID(), 267 Position: metabase.SegmentPosition{}, 268 PieceNum: 0, 269 DurabilityRatio: 1.0, 270 } 271 transferQueueItems = append(transferQueueItems, item) 272 } 273 batchSize := 1000 274 err := db.Enqueue(ctx, transferQueueItems, batchSize) 275 require.NoError(b, err) 276 } 277} 278