1// Copyright (C) 2019 Storj Labs, Inc. 2// See LICENSE for copying information. 3 4package orders 5 6import ( 7 "context" 8 "math/rand" 9 "sync" 10 "time" 11 12 "github.com/spacemonkeygo/monkit/v3" 13 "github.com/zeebo/errs" 14 "go.uber.org/zap" 15 "golang.org/x/sync/errgroup" 16 17 "storj.io/common/pb" 18 "storj.io/common/rpc" 19 "storj.io/common/storj" 20 "storj.io/common/sync2" 21 "storj.io/storj/storagenode/orders/ordersfile" 22 "storj.io/storj/storagenode/trust" 23) 24 25var ( 26 // OrderError represents errors with orders. 27 OrderError = errs.Class("order") 28 // OrderNotFoundError is the error returned when an order is not found. 29 OrderNotFoundError = errs.Class("order not found") 30 31 mon = monkit.Package() 32) 33 34// ArchivedInfo contains full information about an archived order. 35type ArchivedInfo struct { 36 Limit *pb.OrderLimit 37 Order *pb.Order 38 39 Status Status 40 ArchivedAt time.Time 41} 42 43// Status is the archival status of the order. 44type Status byte 45 46// Statuses for satellite responses. 47const ( 48 StatusUnsent Status = iota 49 StatusAccepted 50 StatusRejected 51) 52 53// ArchiveRequest defines arguments for archiving a single order. 54type ArchiveRequest struct { 55 Satellite storj.NodeID 56 Serial storj.SerialNumber 57 Status Status 58} 59 60// DB implements storing orders for sending to the satellite. 61// 62// architecture: Database 63type DB interface { 64 // Enqueue inserts order to the list of orders needing to be sent to the satellite. 65 Enqueue(ctx context.Context, info *ordersfile.Info) error 66 // ListUnsent returns orders that haven't been sent yet. 67 ListUnsent(ctx context.Context, limit int) ([]*ordersfile.Info, error) 68 // ListUnsentBySatellite returns orders that haven't been sent yet grouped by satellite. 69 ListUnsentBySatellite(ctx context.Context) (map[storj.NodeID][]*ordersfile.Info, error) 70 71 // Archive marks order as being handled. 72 Archive(ctx context.Context, archivedAt time.Time, requests ...ArchiveRequest) error 73 // ListArchived returns orders that have been sent. 74 ListArchived(ctx context.Context, limit int) ([]*ArchivedInfo, error) 75 // CleanArchive deletes all entries older than the before time. 76 CleanArchive(ctx context.Context, deleteBefore time.Time) (int, error) 77} 78 79// Config defines configuration for sending orders. 80type Config struct { 81 MaxSleep time.Duration `help:"maximum duration to wait before trying to send orders" releaseDefault:"30s" devDefault:"1s"` 82 SenderInterval time.Duration `help:"duration between sending" releaseDefault:"1h0m0s" devDefault:"30s"` 83 SenderTimeout time.Duration `help:"timeout for sending" default:"1h0m0s"` 84 SenderDialTimeout time.Duration `help:"timeout for dialing satellite during sending orders" default:"1m0s"` 85 CleanupInterval time.Duration `help:"duration between archive cleanups" default:"5m0s"` 86 ArchiveTTL time.Duration `help:"length of time to archive orders before deletion" default:"168h0m0s"` // 7 days 87 Path string `help:"path to store order limit files in" default:"$CONFDIR/orders"` 88} 89 90// Service sends every interval unsent orders to the satellite. 91// 92// architecture: Chore 93type Service struct { 94 log *zap.Logger 95 config Config 96 97 dialer rpc.Dialer 98 ordersStore *FileStore 99 orders DB 100 trust *trust.Pool 101 102 Sender *sync2.Cycle 103 Cleanup *sync2.Cycle 104} 105 106// NewService creates an order service. 107func NewService(log *zap.Logger, dialer rpc.Dialer, ordersStore *FileStore, orders DB, trust *trust.Pool, config Config) *Service { 108 return &Service{ 109 log: log, 110 dialer: dialer, 111 ordersStore: ordersStore, 112 orders: orders, 113 config: config, 114 trust: trust, 115 116 Sender: sync2.NewCycle(config.SenderInterval), 117 Cleanup: sync2.NewCycle(config.CleanupInterval), 118 } 119} 120 121// Run sends orders on every interval to the appropriate satellites. 122func (service *Service) Run(ctx context.Context) (err error) { 123 defer mon.Task()(&ctx)(&err) 124 125 var group errgroup.Group 126 127 service.Sender.Start(ctx, &group, func(ctx context.Context) error { 128 if err := service.sleep(ctx); err != nil { 129 return err 130 } 131 132 service.SendOrders(ctx, time.Now()) 133 134 return nil 135 }) 136 service.Cleanup.Start(ctx, &group, func(ctx context.Context) error { 137 if err := service.sleep(ctx); err != nil { 138 return err 139 } 140 141 err := service.CleanArchive(ctx, time.Now().Add(-service.config.ArchiveTTL)) 142 if err != nil { 143 service.log.Error("clean archive failed", zap.Error(err)) 144 } 145 146 return nil 147 }) 148 149 return group.Wait() 150} 151 152// CleanArchive removes all archived orders that were archived before the deleteBefore time. 153func (service *Service) CleanArchive(ctx context.Context, deleteBefore time.Time) (err error) { 154 defer mon.Task()(&ctx)(&err) 155 service.log.Debug("cleaning") 156 157 deleted, err := service.orders.CleanArchive(ctx, deleteBefore) 158 if err != nil { 159 service.log.Error("cleaning DB archive", zap.Error(err)) 160 return nil 161 } 162 163 err = service.ordersStore.CleanArchive(deleteBefore) 164 if err != nil { 165 service.log.Error("cleaning filestore archive", zap.Error(err)) 166 return nil 167 } 168 169 service.log.Debug("cleanup finished", zap.Int("items deleted", deleted)) 170 return nil 171} 172 173// SendOrders sends the orders using now as the current time. 174func (service *Service) SendOrders(ctx context.Context, now time.Time) { 175 defer mon.Task()(&ctx)(nil) 176 service.log.Debug("sending") 177 178 errorSatellites := make(map[storj.NodeID]struct{}) 179 var errorSatellitesMu sync.Mutex 180 181 // Continue sending until there are no more windows to send, or all relevant satellites are offline. 182 for { 183 ordersBySatellite, err := service.ordersStore.ListUnsentBySatellite(ctx, now) 184 if err != nil { 185 service.log.Error("listing orders", zap.Error(err)) 186 } 187 if len(ordersBySatellite) == 0 { 188 service.log.Debug("no orders to send") 189 break 190 } 191 192 var group errgroup.Group 193 attemptedSatellites := 0 194 ctx, cancel := context.WithTimeout(ctx, service.config.SenderTimeout) 195 196 for satelliteID, unsentInfo := range ordersBySatellite { 197 satelliteID, unsentInfo := satelliteID, unsentInfo 198 if _, ok := errorSatellites[satelliteID]; ok { 199 continue 200 } 201 attemptedSatellites++ 202 203 group.Go(func() error { 204 log := service.log.Named(satelliteID.String()) 205 status, err := service.settleWindow(ctx, log, satelliteID, unsentInfo.InfoList) 206 if err != nil { 207 // satellite returned an error, but settlement was not explicitly rejected; we want to retry later 208 errorSatellitesMu.Lock() 209 errorSatellites[satelliteID] = struct{}{} 210 errorSatellitesMu.Unlock() 211 log.Error("failed to settle orders for satellite", zap.String("satellite ID", satelliteID.String()), zap.Error(err)) 212 return nil 213 } 214 215 err = service.ordersStore.Archive(satelliteID, unsentInfo, time.Now().UTC(), status) 216 if err != nil { 217 log.Error("failed to archive orders", zap.Error(err)) 218 return nil 219 } 220 221 return nil 222 }) 223 224 } 225 _ = group.Wait() // doesn't return errors 226 cancel() 227 228 // if all satellites that orders need to be sent to are offline, exit and try again later. 229 if attemptedSatellites == 0 { 230 break 231 } 232 } 233} 234 235func (service *Service) settleWindow(ctx context.Context, log *zap.Logger, satelliteID storj.NodeID, orders []*ordersfile.Info) (status pb.SettlementWithWindowResponse_Status, err error) { 236 defer mon.Task()(&ctx)(&err) 237 238 log.Info("sending", zap.Int("count", len(orders))) 239 defer log.Info("finished") 240 241 nodeurl, err := service.trust.GetNodeURL(ctx, satelliteID) 242 if err != nil { 243 return 0, OrderError.New("unable to get satellite address: %w", err) 244 } 245 246 conn, err := service.dialer.DialNodeURL(ctx, nodeurl) 247 if err != nil { 248 return 0, OrderError.New("unable to connect to the satellite: %w", err) 249 } 250 defer func() { err = errs.Combine(err, conn.Close()) }() 251 252 stream, err := pb.NewDRPCOrdersClient(conn).SettlementWithWindow(ctx) 253 if err != nil { 254 return 0, OrderError.New("failed to start settlement: %w", err) 255 } 256 257 for _, order := range orders { 258 req := pb.SettlementRequest{ 259 Limit: order.Limit, 260 Order: order.Order, 261 } 262 err := stream.Send(&req) 263 if err != nil { 264 err = OrderError.New("sending settlement agreements returned an error: %w", err) 265 log.Error("rpc client when sending new orders settlements", 266 zap.Error(err), 267 zap.Any("request", req), 268 ) 269 return 0, err 270 } 271 } 272 273 res, err := stream.CloseAndRecv() 274 if err != nil { 275 err = OrderError.New("CloseAndRecv settlement agreements returned an error: %w", err) 276 log.Error("rpc client error when closing sender ", zap.Error(err)) 277 return 0, err 278 } 279 280 return res.Status, nil 281} 282 283// sleep for random interval in [0;maxSleep). 284// Returns an error if context was cancelled. 285func (service *Service) sleep(ctx context.Context) error { 286 if service.config.MaxSleep <= 0 { 287 return nil 288 } 289 290 jitter := time.Duration(rand.Int63n(int64(service.config.MaxSleep))) 291 if !sync2.Sleep(ctx, jitter) { 292 return ctx.Err() 293 } 294 295 return nil 296} 297 298// Close stops the sending service. 299func (service *Service) Close() error { 300 service.Sender.Close() 301 service.Cleanup.Close() 302 return nil 303} 304