1package worker
2
3import (
4	"log"
5	"sync"
6	"sync/atomic"
7	"time"
8
9	"github.com/nkanaev/yarr/src/storage"
10)
11
12const NUM_WORKERS = 4
13
14type Worker struct {
15	db      *storage.Storage
16	pending *int32
17	refresh *time.Ticker
18	reflock sync.Mutex
19	stopper chan bool
20}
21
22func NewWorker(db *storage.Storage) *Worker {
23	pending := int32(0)
24	return &Worker{db: db, pending: &pending}
25}
26
27func (w *Worker) FeedsPending() int32 {
28	return *w.pending
29}
30
31func (w *Worker) StartFeedCleaner() {
32	go w.db.DeleteOldItems()
33	ticker := time.NewTicker(time.Hour * 24)
34	go func() {
35		for {
36			<-ticker.C
37			w.db.DeleteOldItems()
38		}
39	}()
40}
41
42func (w *Worker) FindFavicons() {
43	go func() {
44		for _, feed := range w.db.ListFeedsMissingIcons() {
45			w.FindFeedFavicon(feed)
46		}
47	}()
48}
49
50func (w *Worker) FindFeedFavicon(feed storage.Feed) {
51	icon, err := findFavicon(feed.Link, feed.FeedLink)
52	if err != nil {
53		log.Printf("Failed to find favicon for %s (%s): %s", feed.FeedLink, feed.Link, err)
54	}
55	if icon != nil {
56		w.db.UpdateFeedIcon(feed.Id, icon)
57	}
58}
59
60func (w *Worker) SetRefreshRate(minute int64) {
61	if w.stopper != nil {
62		w.refresh.Stop()
63		w.refresh = nil
64		w.stopper <- true
65		w.stopper = nil
66	}
67
68	if minute == 0 {
69		return
70	}
71
72	w.stopper = make(chan bool)
73	w.refresh = time.NewTicker(time.Minute * time.Duration(minute))
74
75	go func(fire <-chan time.Time, stop <-chan bool, m int64) {
76		log.Printf("auto-refresh %dm: starting", m)
77		for {
78			select {
79			case <-fire:
80				log.Printf("auto-refresh %dm: firing", m)
81				w.RefreshFeeds()
82			case <-stop:
83				log.Printf("auto-refresh %dm: stopping", m)
84				return
85			}
86		}
87	}(w.refresh.C, w.stopper, minute)
88}
89
90func (w *Worker) RefreshFeeds() {
91	w.reflock.Lock()
92	defer w.reflock.Unlock()
93
94	if *w.pending > 0 {
95		log.Print("Refreshing already in progress")
96		return
97	}
98
99	feeds := w.db.ListFeeds()
100	if len(feeds) == 0 {
101		log.Print("Nothing to refresh")
102		return
103	}
104
105	log.Print("Refreshing feeds")
106	atomic.StoreInt32(w.pending, int32(len(feeds)))
107	go w.refresher(feeds)
108}
109
110func (w *Worker) refresher(feeds []storage.Feed) {
111	w.db.ResetFeedErrors()
112
113	srcqueue := make(chan storage.Feed, len(feeds))
114	dstqueue := make(chan []storage.Item)
115
116	for i := 0; i < NUM_WORKERS; i++ {
117		go w.worker(srcqueue, dstqueue)
118	}
119
120	for _, feed := range feeds {
121		srcqueue <- feed
122	}
123	for i := 0; i < len(feeds); i++ {
124		w.db.CreateItems(<-dstqueue)
125		atomic.AddInt32(w.pending, -1)
126		w.db.SyncSearch()
127	}
128	close(srcqueue)
129	close(dstqueue)
130
131	log.Printf("Finished refreshing %d feeds", len(feeds))
132}
133
134func (w *Worker) worker(srcqueue <-chan storage.Feed, dstqueue chan<- []storage.Item) {
135	for feed := range srcqueue {
136		items, err := listItems(feed, w.db)
137		if err != nil {
138			w.db.SetFeedError(feed.Id, err)
139		}
140		dstqueue <- items
141	}
142}
143