1// Copyright (C) 2019 Storj Labs, Inc. 2// See LICENSE for copying information. 3 4package storagenodedb 5 6import ( 7 "context" 8 "time" 9 10 "github.com/zeebo/errs" 11 12 "storj.io/common/uuid" 13 "storj.io/storj/storagenode/notifications" 14) 15 16// ensures that notificationDB implements notifications.Notifications interface. 17var _ notifications.DB = (*notificationDB)(nil) 18 19// NotificationsDBName represents the database name. 20const NotificationsDBName = "notifications" 21 22// ErrNotificationsDB represents errors from the notifications database. 23var ErrNotificationsDB = errs.Class("notificationsdb") 24 25// notificationDB is an implementation of notifications.Notifications. 26// 27// architecture: Database 28type notificationDB struct { 29 dbContainerImpl 30} 31 32// Insert puts new notification to database. 33func (db *notificationDB) Insert(ctx context.Context, notification notifications.NewNotification) (_ notifications.Notification, err error) { 34 defer mon.Task()(&ctx, notification)(&err) 35 36 id, err := uuid.New() 37 if err != nil { 38 return notifications.Notification{}, ErrNotificationsDB.Wrap(err) 39 } 40 41 createdAt := time.Now().UTC() 42 43 query := ` 44 INSERT INTO 45 notifications (id, sender_id, type, title, message, created_at) 46 VALUES 47 (?, ?, ?, ?, ?, ?); 48 ` 49 50 _, err = db.ExecContext(ctx, query, id[:], notification.SenderID[:], notification.Type, notification.Title, notification.Message, createdAt) 51 if err != nil { 52 return notifications.Notification{}, ErrNotificationsDB.Wrap(err) 53 } 54 55 return notifications.Notification{ 56 ID: id, 57 SenderID: notification.SenderID, 58 Type: notification.Type, 59 Title: notification.Title, 60 Message: notification.Message, 61 ReadAt: nil, 62 CreatedAt: createdAt, 63 }, nil 64} 65 66// List returns listed page of notifications from database. 67func (db *notificationDB) List(ctx context.Context, cursor notifications.Cursor) (_ notifications.Page, err error) { 68 defer mon.Task()(&ctx, cursor)(&err) 69 70 if cursor.Limit > 50 { 71 cursor.Limit = 50 72 } 73 74 if cursor.Page == 0 { 75 return notifications.Page{}, ErrNotificationsDB.Wrap(errs.New("page can not be 0")) 76 } 77 78 page := notifications.Page{ 79 Limit: cursor.Limit, 80 Offset: uint64((cursor.Page - 1) * cursor.Limit), 81 } 82 83 countQuery := ` 84 SELECT 85 COUNT(id) 86 FROM 87 notifications 88 ` 89 90 err = db.QueryRowContext(ctx, countQuery).Scan(&page.TotalCount) 91 if err != nil { 92 return notifications.Page{}, ErrNotificationsDB.Wrap(err) 93 } 94 if page.TotalCount == 0 { 95 return page, nil 96 } 97 if page.Offset > page.TotalCount-1 { 98 return notifications.Page{}, ErrNotificationsDB.Wrap(errs.New("page is out of range")) 99 } 100 101 query := ` 102 SELECT * FROM 103 notifications 104 ORDER BY 105 created_at DESC 106 LIMIT ? OFFSET ? 107 ` 108 109 rows, err := db.QueryContext(ctx, query, page.Limit, page.Offset) 110 if err != nil { 111 return notifications.Page{}, ErrNotificationsDB.Wrap(err) 112 } 113 114 defer func() { 115 err = errs.Combine(err, ErrNotificationsDB.Wrap(rows.Close())) 116 }() 117 118 for rows.Next() { 119 notification := notifications.Notification{} 120 121 err = rows.Scan( 122 ¬ification.ID, 123 ¬ification.SenderID, 124 ¬ification.Type, 125 ¬ification.Title, 126 ¬ification.Message, 127 ¬ification.ReadAt, 128 ¬ification.CreatedAt, 129 ) 130 if err = rows.Err(); err != nil { 131 return notifications.Page{}, ErrNotificationsDB.Wrap(err) 132 } 133 134 page.Notifications = append(page.Notifications, notification) 135 } 136 137 page.PageCount = uint(page.TotalCount / uint64(cursor.Limit)) 138 if page.TotalCount%uint64(cursor.Limit) != 0 { 139 page.PageCount++ 140 } 141 142 page.CurrentPage = cursor.Page 143 144 return page, ErrNotificationsDB.Wrap(rows.Err()) 145} 146 147// Read updates specific notification in database as read. 148func (db *notificationDB) Read(ctx context.Context, notificationID uuid.UUID) (err error) { 149 defer mon.Task()(&ctx, notificationID)(&err) 150 151 query := ` 152 UPDATE 153 notifications 154 SET 155 read_at = ? 156 WHERE 157 id = ?; 158 ` 159 result, err := db.ExecContext(ctx, query, time.Now().UTC(), notificationID[:]) 160 if err != nil { 161 return ErrNotificationsDB.Wrap(err) 162 } 163 164 rowsAffected, err := result.RowsAffected() 165 if err != nil { 166 return ErrNotificationsDB.Wrap(err) 167 } 168 if rowsAffected != 1 { 169 return ErrNotificationsDB.Wrap(ErrNoRows) 170 } 171 172 return nil 173} 174 175// ReadAll updates all notifications in database as read. 176func (db *notificationDB) ReadAll(ctx context.Context) (err error) { 177 defer mon.Task()(&ctx)(&err) 178 179 query := ` 180 UPDATE 181 notifications 182 SET 183 read_at = ? 184 WHERE 185 read_at IS NULL; 186 ` 187 188 _, err = db.ExecContext(ctx, query, time.Now().UTC()) 189 190 return ErrNotificationsDB.Wrap(err) 191} 192 193// UnreadAmount returns amount on unread notifications. 194func (db *notificationDB) UnreadAmount(ctx context.Context) (_ int, err error) { 195 defer mon.Task()(&ctx)(&err) 196 var amount int 197 198 query := ` 199 SELECT 200 COUNT(id) 201 FROM 202 notifications 203 WHERE 204 read_at IS NULL 205 ` 206 207 err = db.QueryRowContext(ctx, query).Scan(&amount) 208 if err != nil { 209 return 0, ErrNotificationsDB.Wrap(err) 210 } 211 212 return amount, nil 213} 214