1// Copyright (C) 2019 Storj Labs, Inc.
2// See LICENSE for copying information.
3
4package storagenodedb
5
6import (
7	"context"
8	"time"
9
10	"github.com/zeebo/errs"
11
12	"storj.io/common/uuid"
13	"storj.io/storj/storagenode/notifications"
14)
15
16// ensures that notificationDB implements notifications.Notifications interface.
17var _ notifications.DB = (*notificationDB)(nil)
18
19// NotificationsDBName represents the database name.
20const NotificationsDBName = "notifications"
21
22// ErrNotificationsDB represents errors from the notifications database.
23var ErrNotificationsDB = errs.Class("notificationsdb")
24
25// notificationDB is an implementation of notifications.Notifications.
26//
27// architecture: Database
28type notificationDB struct {
29	dbContainerImpl
30}
31
32// Insert puts new notification to database.
33func (db *notificationDB) Insert(ctx context.Context, notification notifications.NewNotification) (_ notifications.Notification, err error) {
34	defer mon.Task()(&ctx, notification)(&err)
35
36	id, err := uuid.New()
37	if err != nil {
38		return notifications.Notification{}, ErrNotificationsDB.Wrap(err)
39	}
40
41	createdAt := time.Now().UTC()
42
43	query := `
44		INSERT INTO
45			notifications (id, sender_id, type, title, message, created_at)
46		VALUES
47			(?, ?, ?, ?, ?, ?);
48	`
49
50	_, err = db.ExecContext(ctx, query, id[:], notification.SenderID[:], notification.Type, notification.Title, notification.Message, createdAt)
51	if err != nil {
52		return notifications.Notification{}, ErrNotificationsDB.Wrap(err)
53	}
54
55	return notifications.Notification{
56		ID:        id,
57		SenderID:  notification.SenderID,
58		Type:      notification.Type,
59		Title:     notification.Title,
60		Message:   notification.Message,
61		ReadAt:    nil,
62		CreatedAt: createdAt,
63	}, nil
64}
65
66// List returns listed page of notifications from database.
67func (db *notificationDB) List(ctx context.Context, cursor notifications.Cursor) (_ notifications.Page, err error) {
68	defer mon.Task()(&ctx, cursor)(&err)
69
70	if cursor.Limit > 50 {
71		cursor.Limit = 50
72	}
73
74	if cursor.Page == 0 {
75		return notifications.Page{}, ErrNotificationsDB.Wrap(errs.New("page can not be 0"))
76	}
77
78	page := notifications.Page{
79		Limit:  cursor.Limit,
80		Offset: uint64((cursor.Page - 1) * cursor.Limit),
81	}
82
83	countQuery := `
84		SELECT
85			COUNT(id)
86		FROM
87			notifications
88	`
89
90	err = db.QueryRowContext(ctx, countQuery).Scan(&page.TotalCount)
91	if err != nil {
92		return notifications.Page{}, ErrNotificationsDB.Wrap(err)
93	}
94	if page.TotalCount == 0 {
95		return page, nil
96	}
97	if page.Offset > page.TotalCount-1 {
98		return notifications.Page{}, ErrNotificationsDB.Wrap(errs.New("page is out of range"))
99	}
100
101	query := `
102		SELECT * FROM
103			notifications
104		ORDER BY
105			created_at DESC
106		LIMIT ? OFFSET ?
107	`
108
109	rows, err := db.QueryContext(ctx, query, page.Limit, page.Offset)
110	if err != nil {
111		return notifications.Page{}, ErrNotificationsDB.Wrap(err)
112	}
113
114	defer func() {
115		err = errs.Combine(err, ErrNotificationsDB.Wrap(rows.Close()))
116	}()
117
118	for rows.Next() {
119		notification := notifications.Notification{}
120
121		err = rows.Scan(
122			&notification.ID,
123			&notification.SenderID,
124			&notification.Type,
125			&notification.Title,
126			&notification.Message,
127			&notification.ReadAt,
128			&notification.CreatedAt,
129		)
130		if err = rows.Err(); err != nil {
131			return notifications.Page{}, ErrNotificationsDB.Wrap(err)
132		}
133
134		page.Notifications = append(page.Notifications, notification)
135	}
136
137	page.PageCount = uint(page.TotalCount / uint64(cursor.Limit))
138	if page.TotalCount%uint64(cursor.Limit) != 0 {
139		page.PageCount++
140	}
141
142	page.CurrentPage = cursor.Page
143
144	return page, ErrNotificationsDB.Wrap(rows.Err())
145}
146
147// Read updates specific notification in database as read.
148func (db *notificationDB) Read(ctx context.Context, notificationID uuid.UUID) (err error) {
149	defer mon.Task()(&ctx, notificationID)(&err)
150
151	query := `
152		UPDATE
153			notifications
154		SET
155			read_at = ?
156		WHERE
157			id = ?;
158	`
159	result, err := db.ExecContext(ctx, query, time.Now().UTC(), notificationID[:])
160	if err != nil {
161		return ErrNotificationsDB.Wrap(err)
162	}
163
164	rowsAffected, err := result.RowsAffected()
165	if err != nil {
166		return ErrNotificationsDB.Wrap(err)
167	}
168	if rowsAffected != 1 {
169		return ErrNotificationsDB.Wrap(ErrNoRows)
170	}
171
172	return nil
173}
174
175// ReadAll updates all notifications in database as read.
176func (db *notificationDB) ReadAll(ctx context.Context) (err error) {
177	defer mon.Task()(&ctx)(&err)
178
179	query := `
180		UPDATE
181			notifications
182		SET
183			read_at = ?
184		WHERE
185			read_at IS NULL;
186	`
187
188	_, err = db.ExecContext(ctx, query, time.Now().UTC())
189
190	return ErrNotificationsDB.Wrap(err)
191}
192
193// UnreadAmount returns amount on unread notifications.
194func (db *notificationDB) UnreadAmount(ctx context.Context) (_ int, err error) {
195	defer mon.Task()(&ctx)(&err)
196	var amount int
197
198	query := `
199		SELECT
200			COUNT(id)
201		FROM
202			notifications
203		WHERE
204			read_at IS NULL
205	`
206
207	err = db.QueryRowContext(ctx, query).Scan(&amount)
208	if err != nil {
209		return 0, ErrNotificationsDB.Wrap(err)
210	}
211
212	return amount, nil
213}
214