1 ///
2 /// # Neolink RTSP
3 ///
4 /// This module serves the rtsp streams for the
5 /// `neolink rtsp` subcommand
6 ///
7 /// All camera specified in the config.toml will be served
8 /// over rtsp. By default it bind to all local ip addresses
9 /// on the port 8554.
10 ///
11 /// You can view the streams with any rtsp compliement program
12 /// such as ffmpeg, vlc, blue-iris, home-assistant, zone-minder etc.
13 ///
14 /// Each camera has it own endpoint based on its name. For example
15 /// a camera named `"Garage"` in the config can be found at.
16 ///
17 /// `rtsp://my.ip.address:8554/Garage`
18 ///
19 /// With the lower resolution stream at
20 ///
21 /// `rtsp://my.ip.address:8554/Garage/subStream`
22 ///
23 /// # Usage
24 ///
25 /// To start the subcommand use the following in a shell.
26 ///
27 /// ```bash
28 /// neolink rtsp --config=config.toml
29 /// ```
30 ///
31 use anyhow::{Context, Result};
32 use log::*;
33 use neolink_core::bc_protocol::{BcCamera, Stream};
34 use neolink_core::Never;
35 use std::collections::HashSet;
36 use std::fs;
37 use std::sync::Arc;
38 use std::time::Duration;
39 use validator::Validate;
40
41 // mod adpcm;
42 /// The command line parameters for this subcommand
43 mod cmdline;
44 mod config;
45 /// The errors this subcommand can raise
46 mod gst;
47
48 pub(crate) use cmdline::Opt;
49 use config::{CameraConfig, Config, UserConfig};
50 use gst::{GstOutputs, RtspServer, TlsAuthenticationMode};
51
52 /// Entry point for the rtsp subcommand
53 ///
54 /// Opt is the command line options
main(opt: Opt) -> Result<()>55 pub fn main(opt: Opt) -> Result<()> {
56 let config: Config = toml::from_str(
57 &fs::read_to_string(&opt.config)
58 .with_context(|| format!("Failed to read {:?}", &opt.config))?,
59 )
60 .with_context(|| format!("Failed to load {:?} as a config file", &opt.config))?;
61
62 config
63 .validate()
64 .with_context(|| format!("Failed to validate the {:?} config file", &opt.config))?;
65
66 let rtsp = &RtspServer::new();
67
68 set_up_tls(&config, rtsp);
69
70 set_up_users(&config.users, rtsp);
71
72 if config.certificate == None && !config.users.is_empty() {
73 warn!(
74 "Without a server certificate, usernames and passwords will be exchanged in plaintext!"
75 )
76 }
77
78 crossbeam::scope(|s| {
79 for camera in config.cameras {
80 if camera.format.is_some() {
81 warn!("The format config option of the camera has been removed in favour of auto detection.")
82 }
83 // Let subthreads share the camera object; in principle I think they could share
84 // the object as it sits in the config.cameras block, but I have not figured out the
85 // syntax for that.
86 let arc_cam = Arc::new(camera);
87
88 let permitted_users =
89 get_permitted_users(config.users.as_slice(), &arc_cam.permitted_users);
90
91 // Set up each main and substream according to all the RTSP mount paths we support
92 if ["all", "both", "mainStream"].iter().any(|&e| e == arc_cam.stream) {
93 let paths = &[
94 &*format!("/{}", arc_cam.name),
95 &*format!("/{}/mainStream", arc_cam.name),
96 ];
97 let mut outputs = rtsp
98 .add_stream(paths, &permitted_users)
99 .unwrap();
100 let main_camera = arc_cam.clone();
101 s.spawn(move |_| camera_loop(&*main_camera, Stream::Main, &mut outputs, true));
102 }
103 if ["all", "both", "subStream"].iter().any(|&e| e == arc_cam.stream) {
104 let paths = &[&*format!("/{}/subStream", arc_cam.name)];
105 let mut outputs = rtsp
106 .add_stream(paths, &permitted_users)
107 .unwrap();
108 let sub_camera = arc_cam.clone();
109 let manage = arc_cam.stream == "subStream";
110 s.spawn(move |_| camera_loop(&*sub_camera, Stream::Sub, &mut outputs, manage));
111 }
112 if ["all", "externStream"].iter().any(|&e| e == arc_cam.stream) {
113 let paths = &[&*format!("/{}/externStream", arc_cam.name)];
114 let mut outputs = rtsp
115 .add_stream(paths, &permitted_users)
116 .unwrap();
117 let sub_camera = arc_cam.clone();
118 let manage = arc_cam.stream == "externStream";
119 s.spawn(move |_| camera_loop(&*sub_camera, Stream::Extern, &mut outputs, manage));
120 }
121 }
122
123 rtsp.run(&config.bind_addr, config.bind_port);
124 })
125 .unwrap();
126
127 Ok(())
128 }
129
camera_loop( camera_config: &CameraConfig, stream_name: Stream, outputs: &mut GstOutputs, manage: bool, ) -> Result<Never>130 fn camera_loop(
131 camera_config: &CameraConfig,
132 stream_name: Stream,
133 outputs: &mut GstOutputs,
134 manage: bool,
135 ) -> Result<Never> {
136 let min_backoff = Duration::from_secs(1);
137 let max_backoff = Duration::from_secs(15);
138 let mut current_backoff = min_backoff;
139
140 loop {
141 let cam_err = camera_main(camera_config, stream_name, outputs, manage).unwrap_err();
142 outputs.vidsrc.on_stream_error();
143 outputs.audsrc.on_stream_error();
144 // Authentication failures are permanent; we retry everything else
145 if cam_err.connected {
146 current_backoff = min_backoff;
147 }
148 if cam_err.login_fail {
149 error!(
150 "Authentication failed to camera {}, not retrying",
151 camera_config.name
152 );
153 return Err(cam_err.err);
154 } else {
155 error!(
156 "Error streaming from camera {}, will retry in {}s: {:?}",
157 camera_config.name,
158 current_backoff.as_secs(),
159 cam_err.err
160 )
161 }
162
163 std::thread::sleep(current_backoff);
164 current_backoff = std::cmp::min(max_backoff, current_backoff * 2);
165 }
166 }
167
168 struct CameraErr {
169 connected: bool,
170 login_fail: bool,
171 err: anyhow::Error,
172 }
173
set_up_tls(config: &Config, rtsp: &RtspServer)174 fn set_up_tls(config: &Config, rtsp: &RtspServer) {
175 let tls_client_auth = match &config.tls_client_auth as &str {
176 "request" => TlsAuthenticationMode::Requested,
177 "require" => TlsAuthenticationMode::Required,
178 "none" => TlsAuthenticationMode::None,
179 _ => unreachable!(),
180 };
181 if let Some(cert_path) = &config.certificate {
182 rtsp.set_tls(cert_path, tls_client_auth)
183 .expect("Failed to set up TLS");
184 }
185 }
186
set_up_users(users: &[UserConfig], rtsp: &RtspServer)187 fn set_up_users(users: &[UserConfig], rtsp: &RtspServer) {
188 // Setting up users
189 let credentials: Vec<_> = users
190 .iter()
191 .map(|user| (&*user.name, &*user.pass))
192 .collect();
193 rtsp.set_credentials(&credentials)
194 .expect("Failed to set up users");
195 }
196
get_permitted_users<'a>( users: &'a [UserConfig], permitted_users: &'a Option<Vec<String>>, ) -> HashSet<&'a str>197 fn get_permitted_users<'a>(
198 users: &'a [UserConfig],
199 // not idiomatic as a function argument, but this fn translates the config struct directly:
200 permitted_users: &'a Option<Vec<String>>,
201 ) -> HashSet<&'a str> {
202 // Helper to build hashset of all users in `users`:
203 let all_users_hash = || users.iter().map(|u| u.name.as_str()).collect();
204
205 match permitted_users {
206 // If in the camera config there is the user "anyone", or if none is specified but users
207 // are defined at all, then we add all users to the camera's allowed list.
208 Some(p) if p.iter().any(|u| u == "anyone") => all_users_hash(),
209 None if !users.is_empty() => all_users_hash(),
210
211 // The user specified permitted_users
212 Some(p) => p.iter().map(String::as_str).collect(),
213
214 // The user didn't specify permitted_users, and there are none defined anyway
215 None => ["anonymous"].iter().cloned().collect(),
216 }
217 }
218
camera_main( camera_config: &CameraConfig, stream_name: Stream, outputs: &mut GstOutputs, manage: bool, ) -> Result<Never, CameraErr>219 fn camera_main(
220 camera_config: &CameraConfig,
221 stream_name: Stream,
222 outputs: &mut GstOutputs,
223 manage: bool,
224 ) -> Result<Never, CameraErr> {
225 let mut connected = false;
226 let mut login_fail = false;
227 (|| {
228 let mut camera =
229 BcCamera::new_with_addr(&camera_config.camera_addr, camera_config.channel_id)
230 .with_context(|| {
231 format!(
232 "Failed to connect to camera {} at {} on channel {}",
233 camera_config.name, camera_config.camera_addr, camera_config.channel_id
234 )
235 })?;
236
237 if camera_config.timeout.is_some() {
238 warn!("The undocumented `timeout` config option has been removed and is no longer needed.");
239 warn!("Please update your config file.");
240 }
241
242 info!(
243 "{}: Connecting to camera at {}",
244 camera_config.name, camera_config.camera_addr
245 );
246
247 info!("{}: Logging in", camera_config.name);
248 camera.login(&camera_config.username, camera_config.password.as_deref()).map_err(|e|
249 {
250 if let neolink_core::Error::AuthFailed = e {
251 login_fail = true;
252 }
253 e
254 }
255 ).with_context(|| format!("Failed to login to {}", camera_config.name))?;
256
257 connected = true;
258 info!("{}: Connected and logged in", camera_config.name);
259
260 if manage {
261 do_camera_management(&mut camera, camera_config).context("Failed to manage the camera settings")?;
262 }
263
264 let stream_display_name = match stream_name {
265 Stream::Main => "Main Stream (Clear)",
266 Stream::Sub => "Sub Stream (Fluent)",
267 Stream::Extern => "Extern Stream (Balanced)",
268 };
269
270 info!(
271 "{}: Starting video stream {}",
272 camera_config.name, stream_display_name
273 );
274 camera.start_video(outputs, stream_name).with_context(|| format!("Error while streaming {}", camera_config.name))
275 })().map_err(|e| CameraErr{
276 connected,
277 login_fail,
278 err: e,
279 })
280 }
281
do_camera_management(camera: &mut BcCamera, camera_config: &CameraConfig) -> Result<()>282 fn do_camera_management(camera: &mut BcCamera, camera_config: &CameraConfig) -> Result<()> {
283 let cam_time = camera.get_time()?;
284 if let Some(time) = cam_time {
285 info!(
286 "{}: Camera time is already set: {}",
287 camera_config.name, time
288 );
289 } else {
290 use time::OffsetDateTime;
291 // We'd like now_local() but it's deprecated - try to get the local time, but if no
292 // time zone, fall back to UTC.
293 let new_time =
294 OffsetDateTime::try_now_local().unwrap_or_else(|_| OffsetDateTime::now_utc());
295
296 warn!(
297 "{}: Camera has no time set, setting to {}",
298 camera_config.name, new_time
299 );
300 camera.set_time(new_time)?;
301 let cam_time = camera.get_time()?;
302 if let Some(time) = cam_time {
303 info!("{}: Camera time is now set: {}", camera_config.name, time);
304 } else {
305 error!(
306 "{}: Camera did not accept new time (is {} an admin?)",
307 camera_config.name, camera_config.username
308 );
309 }
310 }
311
312 use neolink_core::bc::xml::VersionInfo;
313 if let Ok(VersionInfo {
314 firmwareVersion: firmware_version,
315 ..
316 }) = camera.version()
317 {
318 info!(
319 "{}: Camera reports firmware version {}",
320 camera_config.name, firmware_version
321 );
322 } else {
323 info!(
324 "{}: Could not fetch version information",
325 camera_config.name
326 );
327 }
328
329 Ok(())
330 }
331