1 ///
2 /// # Neolink RTSP
3 ///
4 /// This module serves the rtsp streams for the
5 /// `neolink rtsp` subcommand
6 ///
7 /// All camera specified in the config.toml will be served
8 /// over rtsp. By default it bind to all local ip addresses
9 /// on the port 8554.
10 ///
11 /// You can view the streams with any rtsp compliement program
12 /// such as ffmpeg, vlc, blue-iris, home-assistant, zone-minder etc.
13 ///
14 /// Each camera has it own endpoint based on its name. For example
15 /// a camera named `"Garage"` in the config can be found at.
16 ///
17 /// `rtsp://my.ip.address:8554/Garage`
18 ///
19 /// With the lower resolution stream at
20 ///
21 /// `rtsp://my.ip.address:8554/Garage/subStream`
22 ///
23 /// # Usage
24 ///
25 /// To start the subcommand use the following in a shell.
26 ///
27 /// ```bash
28 /// neolink rtsp --config=config.toml
29 /// ```
30 ///
31 use anyhow::{Context, Result};
32 use log::*;
33 use neolink_core::bc_protocol::{BcCamera, Stream};
34 use neolink_core::Never;
35 use std::collections::HashSet;
36 use std::fs;
37 use std::sync::Arc;
38 use std::time::Duration;
39 use validator::Validate;
40 
41 // mod adpcm;
42 /// The command line parameters for this subcommand
43 mod cmdline;
44 mod config;
45 /// The errors this subcommand can raise
46 mod gst;
47 
48 pub(crate) use cmdline::Opt;
49 use config::{CameraConfig, Config, UserConfig};
50 use gst::{GstOutputs, RtspServer, TlsAuthenticationMode};
51 
52 /// Entry point for the rtsp subcommand
53 ///
54 /// Opt is the command line options
main(opt: Opt) -> Result<()>55 pub fn main(opt: Opt) -> Result<()> {
56     let config: Config = toml::from_str(
57         &fs::read_to_string(&opt.config)
58             .with_context(|| format!("Failed to read {:?}", &opt.config))?,
59     )
60     .with_context(|| format!("Failed to load {:?} as a config file", &opt.config))?;
61 
62     config
63         .validate()
64         .with_context(|| format!("Failed to validate the {:?} config file", &opt.config))?;
65 
66     let rtsp = &RtspServer::new();
67 
68     set_up_tls(&config, rtsp);
69 
70     set_up_users(&config.users, rtsp);
71 
72     if config.certificate == None && !config.users.is_empty() {
73         warn!(
74             "Without a server certificate, usernames and passwords will be exchanged in plaintext!"
75         )
76     }
77 
78     crossbeam::scope(|s| {
79         for camera in config.cameras {
80             if camera.format.is_some() {
81                 warn!("The format config option of the camera has been removed in favour of auto detection.")
82             }
83             // Let subthreads share the camera object; in principle I think they could share
84             // the object as it sits in the config.cameras block, but I have not figured out the
85             // syntax for that.
86             let arc_cam = Arc::new(camera);
87 
88             let permitted_users =
89                 get_permitted_users(config.users.as_slice(), &arc_cam.permitted_users);
90 
91             // Set up each main and substream according to all the RTSP mount paths we support
92             if ["all", "both", "mainStream"].iter().any(|&e| e == arc_cam.stream) {
93                 let paths = &[
94                     &*format!("/{}", arc_cam.name),
95                     &*format!("/{}/mainStream", arc_cam.name),
96                 ];
97                 let mut outputs = rtsp
98                     .add_stream(paths, &permitted_users)
99                     .unwrap();
100                 let main_camera = arc_cam.clone();
101                 s.spawn(move |_| camera_loop(&*main_camera, Stream::Main, &mut outputs, true));
102             }
103             if ["all", "both", "subStream"].iter().any(|&e| e == arc_cam.stream) {
104                 let paths = &[&*format!("/{}/subStream", arc_cam.name)];
105                 let mut outputs = rtsp
106                     .add_stream(paths, &permitted_users)
107                     .unwrap();
108                 let sub_camera = arc_cam.clone();
109                 let manage = arc_cam.stream == "subStream";
110                 s.spawn(move |_| camera_loop(&*sub_camera, Stream::Sub, &mut outputs, manage));
111             }
112             if ["all", "externStream"].iter().any(|&e| e == arc_cam.stream) {
113                 let paths = &[&*format!("/{}/externStream", arc_cam.name)];
114                 let mut outputs = rtsp
115                     .add_stream(paths, &permitted_users)
116                     .unwrap();
117                 let sub_camera = arc_cam.clone();
118                 let manage = arc_cam.stream == "externStream";
119                 s.spawn(move |_| camera_loop(&*sub_camera, Stream::Extern, &mut outputs, manage));
120             }
121         }
122 
123         rtsp.run(&config.bind_addr, config.bind_port);
124     })
125     .unwrap();
126 
127     Ok(())
128 }
129 
camera_loop( camera_config: &CameraConfig, stream_name: Stream, outputs: &mut GstOutputs, manage: bool, ) -> Result<Never>130 fn camera_loop(
131     camera_config: &CameraConfig,
132     stream_name: Stream,
133     outputs: &mut GstOutputs,
134     manage: bool,
135 ) -> Result<Never> {
136     let min_backoff = Duration::from_secs(1);
137     let max_backoff = Duration::from_secs(15);
138     let mut current_backoff = min_backoff;
139 
140     loop {
141         let cam_err = camera_main(camera_config, stream_name, outputs, manage).unwrap_err();
142         outputs.vidsrc.on_stream_error();
143         outputs.audsrc.on_stream_error();
144         // Authentication failures are permanent; we retry everything else
145         if cam_err.connected {
146             current_backoff = min_backoff;
147         }
148         if cam_err.login_fail {
149             error!(
150                 "Authentication failed to camera {}, not retrying",
151                 camera_config.name
152             );
153             return Err(cam_err.err);
154         } else {
155             error!(
156                 "Error streaming from camera {}, will retry in {}s: {:?}",
157                 camera_config.name,
158                 current_backoff.as_secs(),
159                 cam_err.err
160             )
161         }
162 
163         std::thread::sleep(current_backoff);
164         current_backoff = std::cmp::min(max_backoff, current_backoff * 2);
165     }
166 }
167 
168 struct CameraErr {
169     connected: bool,
170     login_fail: bool,
171     err: anyhow::Error,
172 }
173 
set_up_tls(config: &Config, rtsp: &RtspServer)174 fn set_up_tls(config: &Config, rtsp: &RtspServer) {
175     let tls_client_auth = match &config.tls_client_auth as &str {
176         "request" => TlsAuthenticationMode::Requested,
177         "require" => TlsAuthenticationMode::Required,
178         "none" => TlsAuthenticationMode::None,
179         _ => unreachable!(),
180     };
181     if let Some(cert_path) = &config.certificate {
182         rtsp.set_tls(cert_path, tls_client_auth)
183             .expect("Failed to set up TLS");
184     }
185 }
186 
set_up_users(users: &[UserConfig], rtsp: &RtspServer)187 fn set_up_users(users: &[UserConfig], rtsp: &RtspServer) {
188     // Setting up users
189     let credentials: Vec<_> = users
190         .iter()
191         .map(|user| (&*user.name, &*user.pass))
192         .collect();
193     rtsp.set_credentials(&credentials)
194         .expect("Failed to set up users");
195 }
196 
get_permitted_users<'a>( users: &'a [UserConfig], permitted_users: &'a Option<Vec<String>>, ) -> HashSet<&'a str>197 fn get_permitted_users<'a>(
198     users: &'a [UserConfig],
199     // not idiomatic as a function argument, but this fn translates the config struct directly:
200     permitted_users: &'a Option<Vec<String>>,
201 ) -> HashSet<&'a str> {
202     // Helper to build hashset of all users in `users`:
203     let all_users_hash = || users.iter().map(|u| u.name.as_str()).collect();
204 
205     match permitted_users {
206         // If in the camera config there is the user "anyone", or if none is specified but users
207         // are defined at all, then we add all users to the camera's allowed list.
208         Some(p) if p.iter().any(|u| u == "anyone") => all_users_hash(),
209         None if !users.is_empty() => all_users_hash(),
210 
211         // The user specified permitted_users
212         Some(p) => p.iter().map(String::as_str).collect(),
213 
214         // The user didn't specify permitted_users, and there are none defined anyway
215         None => ["anonymous"].iter().cloned().collect(),
216     }
217 }
218 
camera_main( camera_config: &CameraConfig, stream_name: Stream, outputs: &mut GstOutputs, manage: bool, ) -> Result<Never, CameraErr>219 fn camera_main(
220     camera_config: &CameraConfig,
221     stream_name: Stream,
222     outputs: &mut GstOutputs,
223     manage: bool,
224 ) -> Result<Never, CameraErr> {
225     let mut connected = false;
226     let mut login_fail = false;
227     (|| {
228         let mut camera =
229             BcCamera::new_with_addr(&camera_config.camera_addr, camera_config.channel_id)
230                 .with_context(|| {
231                     format!(
232                         "Failed to connect to camera {} at {} on channel {}",
233                         camera_config.name, camera_config.camera_addr, camera_config.channel_id
234                     )
235                 })?;
236 
237         if camera_config.timeout.is_some() {
238             warn!("The undocumented `timeout` config option has been removed and is no longer needed.");
239             warn!("Please update your config file.");
240         }
241 
242         info!(
243             "{}: Connecting to camera at {}",
244             camera_config.name, camera_config.camera_addr
245         );
246 
247         info!("{}: Logging in", camera_config.name);
248         camera.login(&camera_config.username, camera_config.password.as_deref()).map_err(|e|
249             {
250                 if let neolink_core::Error::AuthFailed = e {
251                     login_fail = true;
252                 }
253                 e
254             }
255         ).with_context(|| format!("Failed to login to {}", camera_config.name))?;
256 
257         connected = true;
258         info!("{}: Connected and logged in", camera_config.name);
259 
260         if manage {
261             do_camera_management(&mut camera, camera_config).context("Failed to manage the camera settings")?;
262         }
263 
264         let stream_display_name = match stream_name {
265             Stream::Main => "Main Stream (Clear)",
266             Stream::Sub => "Sub Stream (Fluent)",
267             Stream::Extern => "Extern Stream (Balanced)",
268         };
269 
270         info!(
271             "{}: Starting video stream {}",
272             camera_config.name, stream_display_name
273         );
274         camera.start_video(outputs, stream_name).with_context(|| format!("Error while streaming {}", camera_config.name))
275     })().map_err(|e| CameraErr{
276         connected,
277         login_fail,
278         err: e,
279     })
280 }
281 
do_camera_management(camera: &mut BcCamera, camera_config: &CameraConfig) -> Result<()>282 fn do_camera_management(camera: &mut BcCamera, camera_config: &CameraConfig) -> Result<()> {
283     let cam_time = camera.get_time()?;
284     if let Some(time) = cam_time {
285         info!(
286             "{}: Camera time is already set: {}",
287             camera_config.name, time
288         );
289     } else {
290         use time::OffsetDateTime;
291         // We'd like now_local() but it's deprecated - try to get the local time, but if no
292         // time zone, fall back to UTC.
293         let new_time =
294             OffsetDateTime::try_now_local().unwrap_or_else(|_| OffsetDateTime::now_utc());
295 
296         warn!(
297             "{}: Camera has no time set, setting to {}",
298             camera_config.name, new_time
299         );
300         camera.set_time(new_time)?;
301         let cam_time = camera.get_time()?;
302         if let Some(time) = cam_time {
303             info!("{}: Camera time is now set: {}", camera_config.name, time);
304         } else {
305             error!(
306                 "{}: Camera did not accept new time (is {} an admin?)",
307                 camera_config.name, camera_config.username
308             );
309         }
310     }
311 
312     use neolink_core::bc::xml::VersionInfo;
313     if let Ok(VersionInfo {
314         firmwareVersion: firmware_version,
315         ..
316     }) = camera.version()
317     {
318         info!(
319             "{}: Camera reports firmware version {}",
320             camera_config.name, firmware_version
321         );
322     } else {
323         info!(
324             "{}: Could not fetch version information",
325             camera_config.name
326         );
327     }
328 
329     Ok(())
330 }
331