1//go:build !plan9 && !js 2// +build !plan9,!js 3 4package cache 5 6import ( 7 "bytes" 8 "crypto/tls" 9 "encoding/json" 10 "fmt" 11 "io/ioutil" 12 "net/http" 13 "net/url" 14 "strings" 15 "sync" 16 "time" 17 18 cache "github.com/patrickmn/go-cache" 19 "github.com/rclone/rclone/fs" 20 "golang.org/x/net/websocket" 21) 22 23const ( 24 // defPlexLoginURL is the default URL for Plex login 25 defPlexLoginURL = "https://plex.tv/users/sign_in.json" 26 defPlexNotificationURL = "%s/:/websockets/notifications?X-Plex-Token=%s" 27) 28 29// PlaySessionStateNotification is part of the API response of Plex 30type PlaySessionStateNotification struct { 31 SessionKey string `json:"sessionKey"` 32 GUID string `json:"guid"` 33 Key string `json:"key"` 34 ViewOffset int64 `json:"viewOffset"` 35 State string `json:"state"` 36 TranscodeSession string `json:"transcodeSession"` 37} 38 39// NotificationContainer is part of the API response of Plex 40type NotificationContainer struct { 41 Type string `json:"type"` 42 Size int `json:"size"` 43 PlaySessionState []PlaySessionStateNotification `json:"PlaySessionStateNotification"` 44} 45 46// PlexNotification is part of the API response of Plex 47type PlexNotification struct { 48 Container NotificationContainer `json:"NotificationContainer"` 49} 50 51// plexConnector is managing the cache integration with Plex 52type plexConnector struct { 53 url *url.URL 54 username string 55 password string 56 token string 57 insecure bool 58 f *Fs 59 mu sync.Mutex 60 running bool 61 runningMu sync.Mutex 62 stateCache *cache.Cache 63 saveToken func(string) 64} 65 66// newPlexConnector connects to a Plex server and generates a token 67func newPlexConnector(f *Fs, plexURL, username, password string, insecure bool, saveToken func(string)) (*plexConnector, error) { 68 u, err := url.ParseRequestURI(strings.TrimRight(plexURL, "/")) 69 if err != nil { 70 return nil, err 71 } 72 73 pc := &plexConnector{ 74 f: f, 75 url: u, 76 username: username, 77 password: password, 78 token: "", 79 insecure: insecure, 80 stateCache: cache.New(time.Hour, time.Minute), 81 saveToken: saveToken, 82 } 83 84 return pc, nil 85} 86 87// newPlexConnector connects to a Plex server and generates a token 88func newPlexConnectorWithToken(f *Fs, plexURL, token string, insecure bool) (*plexConnector, error) { 89 u, err := url.ParseRequestURI(strings.TrimRight(plexURL, "/")) 90 if err != nil { 91 return nil, err 92 } 93 94 pc := &plexConnector{ 95 f: f, 96 url: u, 97 token: token, 98 insecure: insecure, 99 stateCache: cache.New(time.Hour, time.Minute), 100 } 101 pc.listenWebsocket() 102 103 return pc, nil 104} 105 106func (p *plexConnector) closeWebsocket() { 107 p.runningMu.Lock() 108 defer p.runningMu.Unlock() 109 fs.Infof("plex", "stopped Plex watcher") 110 p.running = false 111} 112 113func (p *plexConnector) websocketDial() (*websocket.Conn, error) { 114 u := strings.TrimRight(strings.Replace(strings.Replace( 115 p.url.String(), "http://", "ws://", 1), "https://", "wss://", 1), "/") 116 url := fmt.Sprintf(defPlexNotificationURL, u, p.token) 117 118 config, err := websocket.NewConfig(url, "http://localhost") 119 if err != nil { 120 return nil, err 121 } 122 if p.insecure { 123 config.TlsConfig = &tls.Config{InsecureSkipVerify: true} 124 } 125 return websocket.DialConfig(config) 126} 127 128func (p *plexConnector) listenWebsocket() { 129 p.runningMu.Lock() 130 defer p.runningMu.Unlock() 131 132 conn, err := p.websocketDial() 133 if err != nil { 134 fs.Errorf("plex", "%v", err) 135 return 136 } 137 138 p.running = true 139 go func() { 140 for { 141 if !p.isConnected() { 142 break 143 } 144 145 notif := &PlexNotification{} 146 err := websocket.JSON.Receive(conn, notif) 147 if err != nil { 148 fs.Debugf("plex", "%v", err) 149 p.closeWebsocket() 150 break 151 } 152 // we're only interested in play events 153 if notif.Container.Type == "playing" { 154 // we loop through each of them 155 for _, v := range notif.Container.PlaySessionState { 156 // event type of playing 157 if v.State == "playing" { 158 // if it's not cached get the details and cache them 159 if _, found := p.stateCache.Get(v.Key); !found { 160 req, err := http.NewRequest("GET", fmt.Sprintf("%s%s", p.url.String(), v.Key), nil) 161 if err != nil { 162 continue 163 } 164 p.fillDefaultHeaders(req) 165 resp, err := http.DefaultClient.Do(req) 166 if err != nil { 167 continue 168 } 169 var data []byte 170 data, err = ioutil.ReadAll(resp.Body) 171 if err != nil { 172 continue 173 } 174 p.stateCache.Set(v.Key, data, cache.DefaultExpiration) 175 } 176 } else if v.State == "stopped" { 177 p.stateCache.Delete(v.Key) 178 } 179 } 180 } 181 } 182 }() 183} 184 185// fillDefaultHeaders will add common headers to requests 186func (p *plexConnector) fillDefaultHeaders(req *http.Request) { 187 req.Header.Add("X-Plex-Client-Identifier", fmt.Sprintf("rclone (%v)", p.f.String())) 188 req.Header.Add("X-Plex-Product", fmt.Sprintf("rclone (%v)", p.f.Name())) 189 req.Header.Add("X-Plex-Version", fs.Version) 190 req.Header.Add("Accept", "application/json") 191 if p.token != "" { 192 req.Header.Add("X-Plex-Token", p.token) 193 } 194} 195 196// authenticate will generate a token based on a username/password 197func (p *plexConnector) authenticate() error { 198 p.mu.Lock() 199 defer p.mu.Unlock() 200 201 form := url.Values{} 202 form.Set("user[login]", p.username) 203 form.Add("user[password]", p.password) 204 req, err := http.NewRequest("POST", defPlexLoginURL, strings.NewReader(form.Encode())) 205 if err != nil { 206 return err 207 } 208 p.fillDefaultHeaders(req) 209 resp, err := http.DefaultClient.Do(req) 210 if err != nil { 211 return err 212 } 213 var data map[string]interface{} 214 err = json.NewDecoder(resp.Body).Decode(&data) 215 if err != nil { 216 return fmt.Errorf("failed to obtain token: %v", err) 217 } 218 tokenGen, ok := get(data, "user", "authToken") 219 if !ok { 220 return fmt.Errorf("failed to obtain token: %v", data) 221 } 222 token, ok := tokenGen.(string) 223 if !ok { 224 return fmt.Errorf("failed to obtain token: %v", data) 225 } 226 p.token = token 227 if p.token != "" { 228 if p.saveToken != nil { 229 p.saveToken(p.token) 230 } 231 fs.Infof(p.f.Name(), "Connected to Plex server: %v", p.url.String()) 232 } 233 p.listenWebsocket() 234 235 return nil 236} 237 238// isConnected checks if this rclone is authenticated to Plex 239func (p *plexConnector) isConnected() bool { 240 p.runningMu.Lock() 241 defer p.runningMu.Unlock() 242 return p.running 243} 244 245// isConfigured checks if this rclone is configured to use a Plex server 246func (p *plexConnector) isConfigured() bool { 247 return p.url != nil 248} 249 250func (p *plexConnector) isPlaying(co *Object) bool { 251 var err error 252 if !p.isConnected() { 253 p.listenWebsocket() 254 } 255 256 remote := co.Remote() 257 if cr, yes := p.f.isWrappedByCrypt(); yes { 258 remote, err = cr.DecryptFileName(co.Remote()) 259 if err != nil { 260 fs.Debugf("plex", "can not decrypt wrapped file: %v", err) 261 return false 262 } 263 } 264 265 isPlaying := false 266 for _, v := range p.stateCache.Items() { 267 if bytes.Contains(v.Object.([]byte), []byte(remote)) { 268 isPlaying = true 269 break 270 } 271 } 272 273 return isPlaying 274} 275 276// adapted from: https://stackoverflow.com/a/28878037 (credit) 277func get(m interface{}, path ...interface{}) (interface{}, bool) { 278 for _, p := range path { 279 switch idx := p.(type) { 280 case string: 281 if mm, ok := m.(map[string]interface{}); ok { 282 if val, found := mm[idx]; found { 283 m = val 284 continue 285 } 286 } 287 return nil, false 288 case int: 289 if mm, ok := m.([]interface{}); ok { 290 if len(mm) > idx { 291 m = mm[idx] 292 continue 293 } 294 } 295 return nil, false 296 } 297 } 298 return m, true 299} 300