1//go:build !plan9 && !js
2// +build !plan9,!js
3
4package cache
5
6import (
7	"bytes"
8	"crypto/tls"
9	"encoding/json"
10	"fmt"
11	"io/ioutil"
12	"net/http"
13	"net/url"
14	"strings"
15	"sync"
16	"time"
17
18	cache "github.com/patrickmn/go-cache"
19	"github.com/rclone/rclone/fs"
20	"golang.org/x/net/websocket"
21)
22
23const (
24	// defPlexLoginURL is the default URL for Plex login
25	defPlexLoginURL        = "https://plex.tv/users/sign_in.json"
26	defPlexNotificationURL = "%s/:/websockets/notifications?X-Plex-Token=%s"
27)
28
29// PlaySessionStateNotification is part of the API response of Plex
30type PlaySessionStateNotification struct {
31	SessionKey       string `json:"sessionKey"`
32	GUID             string `json:"guid"`
33	Key              string `json:"key"`
34	ViewOffset       int64  `json:"viewOffset"`
35	State            string `json:"state"`
36	TranscodeSession string `json:"transcodeSession"`
37}
38
39// NotificationContainer is part of the API response of Plex
40type NotificationContainer struct {
41	Type             string                         `json:"type"`
42	Size             int                            `json:"size"`
43	PlaySessionState []PlaySessionStateNotification `json:"PlaySessionStateNotification"`
44}
45
46// PlexNotification is part of the API response of Plex
47type PlexNotification struct {
48	Container NotificationContainer `json:"NotificationContainer"`
49}
50
51// plexConnector is managing the cache integration with Plex
52type plexConnector struct {
53	url        *url.URL
54	username   string
55	password   string
56	token      string
57	insecure   bool
58	f          *Fs
59	mu         sync.Mutex
60	running    bool
61	runningMu  sync.Mutex
62	stateCache *cache.Cache
63	saveToken  func(string)
64}
65
66// newPlexConnector connects to a Plex server and generates a token
67func newPlexConnector(f *Fs, plexURL, username, password string, insecure bool, saveToken func(string)) (*plexConnector, error) {
68	u, err := url.ParseRequestURI(strings.TrimRight(plexURL, "/"))
69	if err != nil {
70		return nil, err
71	}
72
73	pc := &plexConnector{
74		f:          f,
75		url:        u,
76		username:   username,
77		password:   password,
78		token:      "",
79		insecure:   insecure,
80		stateCache: cache.New(time.Hour, time.Minute),
81		saveToken:  saveToken,
82	}
83
84	return pc, nil
85}
86
87// newPlexConnector connects to a Plex server and generates a token
88func newPlexConnectorWithToken(f *Fs, plexURL, token string, insecure bool) (*plexConnector, error) {
89	u, err := url.ParseRequestURI(strings.TrimRight(plexURL, "/"))
90	if err != nil {
91		return nil, err
92	}
93
94	pc := &plexConnector{
95		f:          f,
96		url:        u,
97		token:      token,
98		insecure:   insecure,
99		stateCache: cache.New(time.Hour, time.Minute),
100	}
101	pc.listenWebsocket()
102
103	return pc, nil
104}
105
106func (p *plexConnector) closeWebsocket() {
107	p.runningMu.Lock()
108	defer p.runningMu.Unlock()
109	fs.Infof("plex", "stopped Plex watcher")
110	p.running = false
111}
112
113func (p *plexConnector) websocketDial() (*websocket.Conn, error) {
114	u := strings.TrimRight(strings.Replace(strings.Replace(
115		p.url.String(), "http://", "ws://", 1), "https://", "wss://", 1), "/")
116	url := fmt.Sprintf(defPlexNotificationURL, u, p.token)
117
118	config, err := websocket.NewConfig(url, "http://localhost")
119	if err != nil {
120		return nil, err
121	}
122	if p.insecure {
123		config.TlsConfig = &tls.Config{InsecureSkipVerify: true}
124	}
125	return websocket.DialConfig(config)
126}
127
128func (p *plexConnector) listenWebsocket() {
129	p.runningMu.Lock()
130	defer p.runningMu.Unlock()
131
132	conn, err := p.websocketDial()
133	if err != nil {
134		fs.Errorf("plex", "%v", err)
135		return
136	}
137
138	p.running = true
139	go func() {
140		for {
141			if !p.isConnected() {
142				break
143			}
144
145			notif := &PlexNotification{}
146			err := websocket.JSON.Receive(conn, notif)
147			if err != nil {
148				fs.Debugf("plex", "%v", err)
149				p.closeWebsocket()
150				break
151			}
152			// we're only interested in play events
153			if notif.Container.Type == "playing" {
154				// we loop through each of them
155				for _, v := range notif.Container.PlaySessionState {
156					// event type of playing
157					if v.State == "playing" {
158						// if it's not cached get the details and cache them
159						if _, found := p.stateCache.Get(v.Key); !found {
160							req, err := http.NewRequest("GET", fmt.Sprintf("%s%s", p.url.String(), v.Key), nil)
161							if err != nil {
162								continue
163							}
164							p.fillDefaultHeaders(req)
165							resp, err := http.DefaultClient.Do(req)
166							if err != nil {
167								continue
168							}
169							var data []byte
170							data, err = ioutil.ReadAll(resp.Body)
171							if err != nil {
172								continue
173							}
174							p.stateCache.Set(v.Key, data, cache.DefaultExpiration)
175						}
176					} else if v.State == "stopped" {
177						p.stateCache.Delete(v.Key)
178					}
179				}
180			}
181		}
182	}()
183}
184
185// fillDefaultHeaders will add common headers to requests
186func (p *plexConnector) fillDefaultHeaders(req *http.Request) {
187	req.Header.Add("X-Plex-Client-Identifier", fmt.Sprintf("rclone (%v)", p.f.String()))
188	req.Header.Add("X-Plex-Product", fmt.Sprintf("rclone (%v)", p.f.Name()))
189	req.Header.Add("X-Plex-Version", fs.Version)
190	req.Header.Add("Accept", "application/json")
191	if p.token != "" {
192		req.Header.Add("X-Plex-Token", p.token)
193	}
194}
195
196// authenticate will generate a token based on a username/password
197func (p *plexConnector) authenticate() error {
198	p.mu.Lock()
199	defer p.mu.Unlock()
200
201	form := url.Values{}
202	form.Set("user[login]", p.username)
203	form.Add("user[password]", p.password)
204	req, err := http.NewRequest("POST", defPlexLoginURL, strings.NewReader(form.Encode()))
205	if err != nil {
206		return err
207	}
208	p.fillDefaultHeaders(req)
209	resp, err := http.DefaultClient.Do(req)
210	if err != nil {
211		return err
212	}
213	var data map[string]interface{}
214	err = json.NewDecoder(resp.Body).Decode(&data)
215	if err != nil {
216		return fmt.Errorf("failed to obtain token: %v", err)
217	}
218	tokenGen, ok := get(data, "user", "authToken")
219	if !ok {
220		return fmt.Errorf("failed to obtain token: %v", data)
221	}
222	token, ok := tokenGen.(string)
223	if !ok {
224		return fmt.Errorf("failed to obtain token: %v", data)
225	}
226	p.token = token
227	if p.token != "" {
228		if p.saveToken != nil {
229			p.saveToken(p.token)
230		}
231		fs.Infof(p.f.Name(), "Connected to Plex server: %v", p.url.String())
232	}
233	p.listenWebsocket()
234
235	return nil
236}
237
238// isConnected checks if this rclone is authenticated to Plex
239func (p *plexConnector) isConnected() bool {
240	p.runningMu.Lock()
241	defer p.runningMu.Unlock()
242	return p.running
243}
244
245// isConfigured checks if this rclone is configured to use a Plex server
246func (p *plexConnector) isConfigured() bool {
247	return p.url != nil
248}
249
250func (p *plexConnector) isPlaying(co *Object) bool {
251	var err error
252	if !p.isConnected() {
253		p.listenWebsocket()
254	}
255
256	remote := co.Remote()
257	if cr, yes := p.f.isWrappedByCrypt(); yes {
258		remote, err = cr.DecryptFileName(co.Remote())
259		if err != nil {
260			fs.Debugf("plex", "can not decrypt wrapped file: %v", err)
261			return false
262		}
263	}
264
265	isPlaying := false
266	for _, v := range p.stateCache.Items() {
267		if bytes.Contains(v.Object.([]byte), []byte(remote)) {
268			isPlaying = true
269			break
270		}
271	}
272
273	return isPlaying
274}
275
276// adapted from: https://stackoverflow.com/a/28878037 (credit)
277func get(m interface{}, path ...interface{}) (interface{}, bool) {
278	for _, p := range path {
279		switch idx := p.(type) {
280		case string:
281			if mm, ok := m.(map[string]interface{}); ok {
282				if val, found := mm[idx]; found {
283					m = val
284					continue
285				}
286			}
287			return nil, false
288		case int:
289			if mm, ok := m.([]interface{}); ok {
290				if len(mm) > idx {
291					m = mm[idx]
292					continue
293				}
294			}
295			return nil, false
296		}
297	}
298	return m, true
299}
300