1package worker 2 3import ( 4 "log" 5 "sync" 6 "sync/atomic" 7 "time" 8 9 "github.com/nkanaev/yarr/src/storage" 10) 11 12const NUM_WORKERS = 4 13 14type Worker struct { 15 db *storage.Storage 16 pending *int32 17 refresh *time.Ticker 18 reflock sync.Mutex 19 stopper chan bool 20} 21 22func NewWorker(db *storage.Storage) *Worker { 23 pending := int32(0) 24 return &Worker{db: db, pending: &pending} 25} 26 27func (w *Worker) FeedsPending() int32 { 28 return *w.pending 29} 30 31func (w *Worker) StartFeedCleaner() { 32 go w.db.DeleteOldItems() 33 ticker := time.NewTicker(time.Hour * 24) 34 go func() { 35 for { 36 <-ticker.C 37 w.db.DeleteOldItems() 38 } 39 }() 40} 41 42func (w *Worker) FindFavicons() { 43 go func() { 44 for _, feed := range w.db.ListFeedsMissingIcons() { 45 w.FindFeedFavicon(feed) 46 } 47 }() 48} 49 50func (w *Worker) FindFeedFavicon(feed storage.Feed) { 51 icon, err := findFavicon(feed.Link, feed.FeedLink) 52 if err != nil { 53 log.Printf("Failed to find favicon for %s (%s): %s", feed.FeedLink, feed.Link, err) 54 } 55 if icon != nil { 56 w.db.UpdateFeedIcon(feed.Id, icon) 57 } 58} 59 60func (w *Worker) SetRefreshRate(minute int64) { 61 if w.stopper != nil { 62 w.refresh.Stop() 63 w.refresh = nil 64 w.stopper <- true 65 w.stopper = nil 66 } 67 68 if minute == 0 { 69 return 70 } 71 72 w.stopper = make(chan bool) 73 w.refresh = time.NewTicker(time.Minute * time.Duration(minute)) 74 75 go func(fire <-chan time.Time, stop <-chan bool, m int64) { 76 log.Printf("auto-refresh %dm: starting", m) 77 for { 78 select { 79 case <-fire: 80 log.Printf("auto-refresh %dm: firing", m) 81 w.RefreshFeeds() 82 case <-stop: 83 log.Printf("auto-refresh %dm: stopping", m) 84 return 85 } 86 } 87 }(w.refresh.C, w.stopper, minute) 88} 89 90func (w *Worker) RefreshFeeds() { 91 w.reflock.Lock() 92 defer w.reflock.Unlock() 93 94 if *w.pending > 0 { 95 log.Print("Refreshing already in progress") 96 return 97 } 98 99 feeds := w.db.ListFeeds() 100 if len(feeds) == 0 { 101 log.Print("Nothing to refresh") 102 return 103 } 104 105 log.Print("Refreshing feeds") 106 atomic.StoreInt32(w.pending, int32(len(feeds))) 107 go w.refresher(feeds) 108} 109 110func (w *Worker) refresher(feeds []storage.Feed) { 111 w.db.ResetFeedErrors() 112 113 srcqueue := make(chan storage.Feed, len(feeds)) 114 dstqueue := make(chan []storage.Item) 115 116 for i := 0; i < NUM_WORKERS; i++ { 117 go w.worker(srcqueue, dstqueue) 118 } 119 120 for _, feed := range feeds { 121 srcqueue <- feed 122 } 123 for i := 0; i < len(feeds); i++ { 124 w.db.CreateItems(<-dstqueue) 125 atomic.AddInt32(w.pending, -1) 126 w.db.SyncSearch() 127 } 128 close(srcqueue) 129 close(dstqueue) 130 131 log.Printf("Finished refreshing %d feeds", len(feeds)) 132} 133 134func (w *Worker) worker(srcqueue <-chan storage.Feed, dstqueue chan<- []storage.Item) { 135 for feed := range srcqueue { 136 items, err := listItems(feed, w.db) 137 if err != nil { 138 w.db.SetFeedError(feed.Id, err) 139 } 140 dstqueue <- items 141 } 142} 143