1// Copyright (C) 2019 Storj Labs, Inc.
2// See LICENSE for copying information.
3
4package orders
5
6import (
7	"context"
8	"math/rand"
9	"sync"
10	"time"
11
12	"github.com/spacemonkeygo/monkit/v3"
13	"github.com/zeebo/errs"
14	"go.uber.org/zap"
15	"golang.org/x/sync/errgroup"
16
17	"storj.io/common/pb"
18	"storj.io/common/rpc"
19	"storj.io/common/storj"
20	"storj.io/common/sync2"
21	"storj.io/storj/storagenode/orders/ordersfile"
22	"storj.io/storj/storagenode/trust"
23)
24
25var (
26	// OrderError represents errors with orders.
27	OrderError = errs.Class("order")
28	// OrderNotFoundError is the error returned when an order is not found.
29	OrderNotFoundError = errs.Class("order not found")
30
31	mon = monkit.Package()
32)
33
34// ArchivedInfo contains full information about an archived order.
35type ArchivedInfo struct {
36	Limit *pb.OrderLimit
37	Order *pb.Order
38
39	Status     Status
40	ArchivedAt time.Time
41}
42
43// Status is the archival status of the order.
44type Status byte
45
46// Statuses for satellite responses.
47const (
48	StatusUnsent Status = iota
49	StatusAccepted
50	StatusRejected
51)
52
53// ArchiveRequest defines arguments for archiving a single order.
54type ArchiveRequest struct {
55	Satellite storj.NodeID
56	Serial    storj.SerialNumber
57	Status    Status
58}
59
60// DB implements storing orders for sending to the satellite.
61//
62// architecture: Database
63type DB interface {
64	// Enqueue inserts order to the list of orders needing to be sent to the satellite.
65	Enqueue(ctx context.Context, info *ordersfile.Info) error
66	// ListUnsent returns orders that haven't been sent yet.
67	ListUnsent(ctx context.Context, limit int) ([]*ordersfile.Info, error)
68	// ListUnsentBySatellite returns orders that haven't been sent yet grouped by satellite.
69	ListUnsentBySatellite(ctx context.Context) (map[storj.NodeID][]*ordersfile.Info, error)
70
71	// Archive marks order as being handled.
72	Archive(ctx context.Context, archivedAt time.Time, requests ...ArchiveRequest) error
73	// ListArchived returns orders that have been sent.
74	ListArchived(ctx context.Context, limit int) ([]*ArchivedInfo, error)
75	// CleanArchive deletes all entries older than the before time.
76	CleanArchive(ctx context.Context, deleteBefore time.Time) (int, error)
77}
78
79// Config defines configuration for sending orders.
80type Config struct {
81	MaxSleep          time.Duration `help:"maximum duration to wait before trying to send orders" releaseDefault:"30s" devDefault:"1s"`
82	SenderInterval    time.Duration `help:"duration between sending" releaseDefault:"1h0m0s" devDefault:"30s"`
83	SenderTimeout     time.Duration `help:"timeout for sending" default:"1h0m0s"`
84	SenderDialTimeout time.Duration `help:"timeout for dialing satellite during sending orders" default:"1m0s"`
85	CleanupInterval   time.Duration `help:"duration between archive cleanups" default:"5m0s"`
86	ArchiveTTL        time.Duration `help:"length of time to archive orders before deletion" default:"168h0m0s"` // 7 days
87	Path              string        `help:"path to store order limit files in" default:"$CONFDIR/orders"`
88}
89
90// Service sends every interval unsent orders to the satellite.
91//
92// architecture: Chore
93type Service struct {
94	log    *zap.Logger
95	config Config
96
97	dialer      rpc.Dialer
98	ordersStore *FileStore
99	orders      DB
100	trust       *trust.Pool
101
102	Sender  *sync2.Cycle
103	Cleanup *sync2.Cycle
104}
105
106// NewService creates an order service.
107func NewService(log *zap.Logger, dialer rpc.Dialer, ordersStore *FileStore, orders DB, trust *trust.Pool, config Config) *Service {
108	return &Service{
109		log:         log,
110		dialer:      dialer,
111		ordersStore: ordersStore,
112		orders:      orders,
113		config:      config,
114		trust:       trust,
115
116		Sender:  sync2.NewCycle(config.SenderInterval),
117		Cleanup: sync2.NewCycle(config.CleanupInterval),
118	}
119}
120
121// Run sends orders on every interval to the appropriate satellites.
122func (service *Service) Run(ctx context.Context) (err error) {
123	defer mon.Task()(&ctx)(&err)
124
125	var group errgroup.Group
126
127	service.Sender.Start(ctx, &group, func(ctx context.Context) error {
128		if err := service.sleep(ctx); err != nil {
129			return err
130		}
131
132		service.SendOrders(ctx, time.Now())
133
134		return nil
135	})
136	service.Cleanup.Start(ctx, &group, func(ctx context.Context) error {
137		if err := service.sleep(ctx); err != nil {
138			return err
139		}
140
141		err := service.CleanArchive(ctx, time.Now().Add(-service.config.ArchiveTTL))
142		if err != nil {
143			service.log.Error("clean archive failed", zap.Error(err))
144		}
145
146		return nil
147	})
148
149	return group.Wait()
150}
151
152// CleanArchive removes all archived orders that were archived before the deleteBefore time.
153func (service *Service) CleanArchive(ctx context.Context, deleteBefore time.Time) (err error) {
154	defer mon.Task()(&ctx)(&err)
155	service.log.Debug("cleaning")
156
157	deleted, err := service.orders.CleanArchive(ctx, deleteBefore)
158	if err != nil {
159		service.log.Error("cleaning DB archive", zap.Error(err))
160		return nil
161	}
162
163	err = service.ordersStore.CleanArchive(deleteBefore)
164	if err != nil {
165		service.log.Error("cleaning filestore archive", zap.Error(err))
166		return nil
167	}
168
169	service.log.Debug("cleanup finished", zap.Int("items deleted", deleted))
170	return nil
171}
172
173// SendOrders sends the orders using now as the current time.
174func (service *Service) SendOrders(ctx context.Context, now time.Time) {
175	defer mon.Task()(&ctx)(nil)
176	service.log.Debug("sending")
177
178	errorSatellites := make(map[storj.NodeID]struct{})
179	var errorSatellitesMu sync.Mutex
180
181	// Continue sending until there are no more windows to send, or all relevant satellites are offline.
182	for {
183		ordersBySatellite, err := service.ordersStore.ListUnsentBySatellite(ctx, now)
184		if err != nil {
185			service.log.Error("listing orders", zap.Error(err))
186		}
187		if len(ordersBySatellite) == 0 {
188			service.log.Debug("no orders to send")
189			break
190		}
191
192		var group errgroup.Group
193		attemptedSatellites := 0
194		ctx, cancel := context.WithTimeout(ctx, service.config.SenderTimeout)
195
196		for satelliteID, unsentInfo := range ordersBySatellite {
197			satelliteID, unsentInfo := satelliteID, unsentInfo
198			if _, ok := errorSatellites[satelliteID]; ok {
199				continue
200			}
201			attemptedSatellites++
202
203			group.Go(func() error {
204				log := service.log.Named(satelliteID.String())
205				status, err := service.settleWindow(ctx, log, satelliteID, unsentInfo.InfoList)
206				if err != nil {
207					// satellite returned an error, but settlement was not explicitly rejected; we want to retry later
208					errorSatellitesMu.Lock()
209					errorSatellites[satelliteID] = struct{}{}
210					errorSatellitesMu.Unlock()
211					log.Error("failed to settle orders for satellite", zap.String("satellite ID", satelliteID.String()), zap.Error(err))
212					return nil
213				}
214
215				err = service.ordersStore.Archive(satelliteID, unsentInfo, time.Now().UTC(), status)
216				if err != nil {
217					log.Error("failed to archive orders", zap.Error(err))
218					return nil
219				}
220
221				return nil
222			})
223
224		}
225		_ = group.Wait() // doesn't return errors
226		cancel()
227
228		// if all satellites that orders need to be sent to  are offline, exit and try again later.
229		if attemptedSatellites == 0 {
230			break
231		}
232	}
233}
234
235func (service *Service) settleWindow(ctx context.Context, log *zap.Logger, satelliteID storj.NodeID, orders []*ordersfile.Info) (status pb.SettlementWithWindowResponse_Status, err error) {
236	defer mon.Task()(&ctx)(&err)
237
238	log.Info("sending", zap.Int("count", len(orders)))
239	defer log.Info("finished")
240
241	nodeurl, err := service.trust.GetNodeURL(ctx, satelliteID)
242	if err != nil {
243		return 0, OrderError.New("unable to get satellite address: %w", err)
244	}
245
246	conn, err := service.dialer.DialNodeURL(ctx, nodeurl)
247	if err != nil {
248		return 0, OrderError.New("unable to connect to the satellite: %w", err)
249	}
250	defer func() { err = errs.Combine(err, conn.Close()) }()
251
252	stream, err := pb.NewDRPCOrdersClient(conn).SettlementWithWindow(ctx)
253	if err != nil {
254		return 0, OrderError.New("failed to start settlement: %w", err)
255	}
256
257	for _, order := range orders {
258		req := pb.SettlementRequest{
259			Limit: order.Limit,
260			Order: order.Order,
261		}
262		err := stream.Send(&req)
263		if err != nil {
264			err = OrderError.New("sending settlement agreements returned an error: %w", err)
265			log.Error("rpc client when sending new orders settlements",
266				zap.Error(err),
267				zap.Any("request", req),
268			)
269			return 0, err
270		}
271	}
272
273	res, err := stream.CloseAndRecv()
274	if err != nil {
275		err = OrderError.New("CloseAndRecv settlement agreements returned an error: %w", err)
276		log.Error("rpc client error when closing sender ", zap.Error(err))
277		return 0, err
278	}
279
280	return res.Status, nil
281}
282
283// sleep for random interval in [0;maxSleep).
284// Returns an error if context was cancelled.
285func (service *Service) sleep(ctx context.Context) error {
286	if service.config.MaxSleep <= 0 {
287		return nil
288	}
289
290	jitter := time.Duration(rand.Int63n(int64(service.config.MaxSleep)))
291	if !sync2.Sleep(ctx, jitter) {
292		return ctx.Err()
293	}
294
295	return nil
296}
297
298// Close stops the sending service.
299func (service *Service) Close() error {
300	service.Sender.Close()
301	service.Cleanup.Close()
302	return nil
303}
304