1 use std::cmp;
2 use std::env;
3 use std::fmt;
4 use std::future::Future;
5 #[cfg(any(feature = "ssl", feature = "tls"))]
6 use std::path::{Path, PathBuf};
7 use std::str::from_utf8;
8 use std::sync::atomic::AtomicUsize;
9 use std::sync::atomic::Ordering;
10 use std::sync::Arc;
11 use std::time::Duration;
12
13 use arrayvec::ArrayVec;
14 use futures_core::Stream;
15 use futures_util::future::FutureExt;
16 use futures_util::future::TryFutureExt;
17 use futures_util::stream;
18 use futures_util::stream::TryStreamExt;
19 use http::header::CONTENT_TYPE;
20 use http::request::Builder;
21 use hyper::client::HttpConnector;
22 use hyper::{self, body::Bytes, Body, Client, Method, Request, Response, StatusCode};
23 #[cfg(feature = "ssl")]
24 use hyper_openssl::HttpsConnector;
25 #[cfg(feature = "tls")]
26 use hyper_tls;
27 #[cfg(unix)]
28 use hyperlocal::UnixClient as UnixConnector;
29 #[cfg(feature = "tls")]
30 use native_tls::{Certificate, Identity, TlsConnector};
31 #[cfg(feature = "ssl")]
32 use openssl::ssl::{SslConnector, SslFiletype, SslMethod};
33 use tokio_util::codec::FramedRead;
34
35 use crate::container::LogOutput;
36 use crate::errors::Error;
37 use crate::errors::ErrorKind::{
38 APIVersionParseError, DockerResponseBadParameterError, DockerResponseConflictError,
39 DockerResponseNotFoundError, DockerResponseNotModifiedError, DockerResponseServerError,
40 HttpClientError, HyperResponseError, JsonDataError, JsonDeserializeError, JsonSerializeError,
41 RequestTimeoutError, StrParseError,
42 };
43 #[cfg(any(feature = "ssl", feature = "tls"))]
44 use crate::errors::ErrorKind::NoCertPathError;
45 #[cfg(feature = "ssl")]
46 use crate::errors::ErrorKind::SSLError;
47 #[cfg(windows)]
48 use crate::named_pipe::NamedPipeConnector;
49 use crate::read::{JsonLineDecoder, NewlineLogOutputDecoder, StreamReader};
50 use crate::system::Version;
51 use crate::uri::Uri;
52
53 use serde::de::DeserializeOwned;
54 use serde::ser::Serialize;
55 use serde_json;
56
57 /// The default `DOCKER_SOCKET` address that we will try to connect to.
58 #[cfg(unix)]
59 pub const DEFAULT_SOCKET: &'static str = "unix:///var/run/docker.sock";
60
61 /// The default `DOCKER_NAMED_PIPE` address that a windows client will try to connect to.
62 #[cfg(windows)]
63 pub const DEFAULT_NAMED_PIPE: &'static str = "npipe:////./pipe/docker_engine";
64
65 /// The default `DOCKER_HOST` address that we will try to connect to.
66 pub const DEFAULT_DOCKER_HOST: &'static str = "tcp://localhost:2375";
67
68 /// Default timeout for all requests is 2 minutes.
69 const DEFAULT_TIMEOUT: u64 = 120;
70
71 /// Default Client Version to communicate with the server.
72 pub const API_DEFAULT_VERSION: &'static ClientVersion = &ClientVersion {
73 major_version: 1,
74 minor_version: 40,
75 };
76
77 pub(crate) const TRUE_STR: &'static str = "true";
78 pub(crate) const FALSE_STR: &'static str = "false";
79
80 /// The default directory in which to look for our Docker certificate
81 /// files.
82 #[cfg(any(feature = "ssl", feature = "tls"))]
default_cert_path() -> Result<PathBuf, Error>83 pub fn default_cert_path() -> Result<PathBuf, Error> {
84 let from_env = env::var("DOCKER_CERT_PATH").or_else(|_| env::var("DOCKER_CONFIG"));
85 if let Ok(ref path) = from_env {
86 Ok(Path::new(path).to_owned())
87 } else {
88 let home = dirs::home_dir().ok_or_else(|| NoCertPathError)?;
89 Ok(home.join(".docker"))
90 }
91 }
92
93 #[derive(Debug, Clone)]
94 pub(crate) enum ClientType {
95 #[cfg(unix)]
96 Unix,
97 Http,
98 #[cfg(any(feature = "ssl", feature = "tls"))]
99 SSL,
100 #[cfg(windows)]
101 NamedPipe,
102 }
103
104 /// Transport is the type representing the means of communication
105 /// with the Docker daemon.
106 ///
107 /// Each transport usually encapsulate a hyper client
108 /// with various Connect traits fulfilled.
109 pub(crate) enum Transport {
110 Http {
111 client: Client<HttpConnector>,
112 },
113 #[cfg(feature = "ssl")]
114 Https {
115 client: Client<HttpsConnector<HttpConnector>>,
116 },
117 #[cfg(feature = "tls")]
118 Tls {
119 client: Client<hyper_tls::HttpsConnector<HttpConnector>>,
120 },
121 #[cfg(unix)]
122 Unix {
123 client: Client<UnixConnector>,
124 },
125 #[cfg(windows)]
126 NamedPipe {
127 client: Client<NamedPipeConnector>,
128 },
129 }
130
131 impl fmt::Debug for Transport {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result132 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
133 match self {
134 Transport::Http { .. } => write!(f, "HTTP"),
135 #[cfg(feature = "ssl")]
136 Transport::Https { .. } => write!(f, "HTTPS(openssl)"),
137 #[cfg(feature = "tls")]
138 Transport::Tls { .. } => write!(f, "HTTPS(native)"),
139 #[cfg(unix)]
140 Transport::Unix { .. } => write!(f, "Unix"),
141 #[cfg(windows)]
142 Transport::NamedPipe { .. } => write!(f, "NamedPipe"),
143 }
144 }
145 }
146
147 #[derive(Debug, Copy, Clone, PartialEq)]
148 /// Advisory version stub to use for communicating with the Server. The docker server will error if
149 /// a higher client version is used than is compatible with the server. Beware also, that the
150 /// docker server will return stubs for a higher version than the version set when communicating.
151 ///
152 /// See also [negotiate_version](struct.Docker.html#method.negotiate_version), and the `client_version` argument when instantiating the
153 /// [Docker](struct.Docker.html) client instance.
154 pub struct ClientVersion {
155 /// The major version number.
156 pub major_version: usize,
157 /// The minor version number.
158 pub minor_version: usize,
159 }
160
161 pub(crate) enum MaybeClientVersion {
162 Some(ClientVersion),
163 None,
164 }
165
166 impl From<String> for MaybeClientVersion {
from(s: String) -> MaybeClientVersion167 fn from(s: String) -> MaybeClientVersion {
168 match s
169 .split(".")
170 .map(|v| v.parse::<usize>())
171 .collect::<Vec<Result<usize, std::num::ParseIntError>>>()
172 .as_slice()
173 {
174 [Ok(first), Ok(second)] => MaybeClientVersion::Some(ClientVersion {
175 major_version: first.to_owned(),
176 minor_version: second.to_owned(),
177 }),
178 _ => MaybeClientVersion::None,
179 }
180 }
181 }
182
183 impl fmt::Display for ClientVersion {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result184 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
185 write!(f, "{}.{}", self.major_version, self.minor_version)
186 }
187 }
188
189 impl PartialOrd for ClientVersion {
partial_cmp(&self, other: &Self) -> Option<cmp::Ordering>190 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
191 match self.major_version.partial_cmp(&other.major_version) {
192 Some(cmp::Ordering::Equal) => self.minor_version.partial_cmp(&other.minor_version),
193 res => res,
194 }
195 }
196 }
197
198 impl From<&(AtomicUsize, AtomicUsize)> for ClientVersion {
from(tpl: &(AtomicUsize, AtomicUsize)) -> ClientVersion199 fn from(tpl: &(AtomicUsize, AtomicUsize)) -> ClientVersion {
200 ClientVersion {
201 major_version: tpl.0.load(Ordering::Relaxed),
202 minor_version: tpl.1.load(Ordering::Relaxed),
203 }
204 }
205 }
206
207 #[derive(Debug)]
208 /// ---
209 ///
210 /// # Docker
211 ///
212 /// The main interface for calling the Docker API. Construct a new Docker instance using one of the
213 /// connect methods:
214 /// - [`Docker::connect_with_http_defaults`](struct.Docker.html#method.connect_with_http_defaults)
215 /// - [`Docker::connect_with_named_pipe_defaults`](struct.Docker.html#method.connect_with_pipe_defaults)
216 /// - [`Docker::connect_with_ssl_defaults`](struct.Docker.html#method.connect_with_ssl_defaults)
217 /// - [`Docker::connect_with_unix_defaults`](struct.Docker.html#method.connect_with_unix_defaults)
218 /// - [`Docker::connect_with_tls_defaults`](struct.Docker.html#method.connect_with_tls_defaults)
219 /// - [`Docker::connect_with_local_defaults`](struct.Docker.html#method.connect_with_local_defaults)
220 pub struct Docker {
221 pub(crate) transport: Arc<Transport>,
222 pub(crate) client_type: ClientType,
223 pub(crate) client_addr: String,
224 pub(crate) client_timeout: u64,
225 pub(crate) version: Arc<(AtomicUsize, AtomicUsize)>,
226 }
227
228 impl Clone for Docker {
clone(&self) -> Docker229 fn clone(&self) -> Docker {
230 Docker {
231 transport: self.transport.clone(),
232 client_type: self.client_type.clone(),
233 client_addr: self.client_addr.clone(),
234 client_timeout: self.client_timeout,
235 version: self.version.clone(),
236 }
237 }
238 }
239
240 #[cfg(feature = "ssl")]
241 /// A Docker implementation typed to connect to a secure HTTPS connection using the `openssl`
242 /// library.
243 impl Docker {
244 /// Connect using secure HTTPS using defaults that are signalled by environment variables.
245 ///
246 /// # Defaults
247 ///
248 /// - The connection url is sourced from the `DOCKER_HOST` environment variable.
249 /// - The certificate directory is sourced from the `DOCKER_CERT_PATH` environment variable.
250 /// - Certificates are named `key.pem`, `cert.pem` and `ca.pem` to indicate the private key,
251 /// the server certificate and the certificate chain respectively.
252 /// - The number of threads used for the HTTP connection pool defaults to 1.
253 /// - The request timeout defaults to 2 minutes.
254 ///
255 /// # Examples
256 ///
257 /// ```rust,no_run
258 /// use bollard::Docker;
259 ///
260 /// use futures_util::try_future::TryFutureExt;
261 ///
262 /// let connection = Docker::connect_with_ssl_defaults().unwrap();
263 /// connection.ping()
264 /// .map_ok(|_| Ok::<_, ()>(println!("Connected!")));
265 /// ```
connect_with_ssl_defaults() -> Result<Docker, Error>266 pub fn connect_with_ssl_defaults() -> Result<Docker, Error> {
267 let cert_path = default_cert_path()?;
268 if let Ok(ref host) = env::var("DOCKER_HOST") {
269 Docker::connect_with_ssl(
270 host,
271 &cert_path.join("key.pem"),
272 &cert_path.join("cert.pem"),
273 &cert_path.join("ca.pem"),
274 DEFAULT_TIMEOUT,
275 API_DEFAULT_VERSION,
276 )
277 } else {
278 Docker::connect_with_ssl(
279 DEFAULT_DOCKER_HOST,
280 &cert_path.join("key.pem"),
281 &cert_path.join("cert.pem"),
282 &cert_path.join("ca.pem"),
283 DEFAULT_TIMEOUT,
284 API_DEFAULT_VERSION,
285 )
286 }
287 }
288
289 /// Connect using secure HTTPS.
290 ///
291 /// # Arguments
292 ///
293 /// - `addr`: the connection url.
294 /// - `ssl_key`: the private key path.
295 /// - `ssl_cert`: the server certificate path.
296 /// - `ssl_ca`: the certificate chain path.
297 /// - `timeout`: the read/write timeout (seconds) to use for every hyper connection
298 /// - `client_version`: the client version to communicate with the server.
299 ///
300 /// # Examples
301 ///
302 /// ```rust,no_run
303 /// use bollard::Docker;
304 ///
305 /// use std::path::Path;
306 ///
307 /// use futures_util::try_future::TryFutureExt;
308 ///
309 /// let connection = Docker::connect_with_ssl(
310 /// "localhost:2375",
311 /// Path::new("/certs/key.pem"),
312 /// Path::new("/certs/cert.pem"),
313 /// Path::new("/certs/ca.pem"),
314 /// 1,
315 /// 120).unwrap();
316 /// connection.ping()
317 /// .map_ok(|_| Ok::<_, ()>(println!("Connected!")));
318 /// ```
connect_with_ssl( addr: &str, ssl_key: &Path, ssl_cert: &Path, ssl_ca: &Path, timeout: u64, client_version: &ClientVersion, ) -> Result<Docker, Error>319 pub fn connect_with_ssl(
320 addr: &str,
321 ssl_key: &Path,
322 ssl_cert: &Path,
323 ssl_ca: &Path,
324 timeout: u64,
325 client_version: &ClientVersion,
326 ) -> Result<Docker, Error> {
327 // This ensures that using docker-machine-esque addresses work with Hyper.
328 let client_addr = addr.replacen("tcp://", "", 1);
329
330 let mut ssl_connector_builder = SslConnector::builder(SslMethod::tls())
331 .map_err::<Error, _>(|e| SSLError { err: e }.into())?;
332
333 ssl_connector_builder
334 .set_ca_file(ssl_ca)
335 .map_err::<Error, _>(|e| SSLError { err: e }.into())?;
336 ssl_connector_builder
337 .set_certificate_file(ssl_cert, SslFiletype::PEM)
338 .map_err::<Error, _>(|e| SSLError { err: e }.into())?;
339 ssl_connector_builder
340 .set_private_key_file(ssl_key, SslFiletype::PEM)
341 .map_err::<Error, _>(|e| SSLError { err: e }.into())?;
342
343 let mut http_connector = HttpConnector::new();
344 http_connector.enforce_http(false);
345
346 let https_connector: HttpsConnector<HttpConnector> =
347 HttpsConnector::with_connector(http_connector, ssl_connector_builder)
348 .map_err::<Error, _>(|e| SSLError { err: e }.into())?;
349
350 let client_builder = Client::builder();
351 let client = client_builder.build(https_connector);
352 let transport = Transport::Https { client };
353 let docker = Docker {
354 transport: Arc::new(transport),
355 client_type: ClientType::SSL,
356 client_addr: client_addr.to_owned(),
357 client_timeout: timeout,
358 version: Arc::new((
359 AtomicUsize::new(client_version.major_version),
360 AtomicUsize::new(client_version.minor_version),
361 )),
362 };
363
364 Ok(docker)
365 }
366 }
367
368 /// A Docker implementation typed to connect to an unsecure Http connection.
369 impl Docker {
370 /// Connect using unsecured HTTP using defaults that are signalled by environment variables.
371 ///
372 /// # Defaults
373 ///
374 /// - The connection url is sourced from the `DOCKER_HOST` environment variable, and defaults
375 /// to `localhost:2375`.
376 /// - The number of threads used for the HTTP connection pool defaults to 1.
377 /// - The request timeout defaults to 2 minutes.
378 ///
379 /// # Examples
380 ///
381 /// ```rust,no_run
382 /// use bollard::Docker;
383 ///
384 /// use futures_util::future::TryFutureExt;
385 ///
386 /// let connection = Docker::connect_with_http_defaults().unwrap();
387 /// connection.ping()
388 /// .map_ok(|_| Ok::<_, ()>(println!("Connected!")));
389 /// ```
connect_with_http_defaults() -> Result<Docker, Error>390 pub fn connect_with_http_defaults() -> Result<Docker, Error> {
391 let host = env::var("DOCKER_HOST").unwrap_or(DEFAULT_DOCKER_HOST.to_string());
392 Docker::connect_with_http(&host, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
393 }
394
395 /// Connect using unsecured HTTP.
396 ///
397 /// # Arguments
398 ///
399 /// - `addr`: connection url including scheme and port.
400 /// - `timeout`: the read/write timeout (seconds) to use for every hyper connection
401 /// - `client_version`: the client version to communicate with the server.
402 ///
403 /// # Examples
404 ///
405 /// ```rust,no_run
406 /// use bollard::{API_DEFAULT_VERSION, Docker};
407 ///
408 /// use futures_util::future::TryFutureExt;
409 ///
410 /// let connection = Docker::connect_with_http(
411 /// "http://my-custom-docker-server:2735", 4, API_DEFAULT_VERSION)
412 /// .unwrap();
413 /// connection.ping()
414 /// .map_ok(|_| Ok::<_, ()>(println!("Connected!")));
415 /// ```
connect_with_http( addr: &str, timeout: u64, client_version: &ClientVersion, ) -> Result<Docker, Error>416 pub fn connect_with_http(
417 addr: &str,
418 timeout: u64,
419 client_version: &ClientVersion,
420 ) -> Result<Docker, Error> {
421 // This ensures that using docker-machine-esque addresses work with Hyper.
422 let client_addr = addr.replacen("tcp://", "", 1);
423
424 let http_connector = HttpConnector::new();
425
426 let client_builder = Client::builder();
427 let client = client_builder.build(http_connector);
428 let transport = Transport::Http { client };
429 let docker = Docker {
430 transport: Arc::new(transport),
431 client_type: ClientType::Http,
432 client_addr: client_addr.to_owned(),
433 client_timeout: timeout,
434 version: Arc::new((
435 AtomicUsize::new(client_version.major_version),
436 AtomicUsize::new(client_version.minor_version),
437 )),
438 };
439
440 Ok(docker)
441 }
442 }
443
444 #[cfg(unix)]
445 /// A Docker implementation typed to connect to a Unix socket.
446 impl Docker {
447 /// Connect using a Unix socket using defaults that are signalled by environment variables.
448 ///
449 /// # Defaults
450 ///
451 /// - The socket location defaults to `/var/run/docker.sock`.
452 /// - The request timeout defaults to 2 minutes.
453 ///
454 /// # Examples
455 ///
456 /// ```rust,no_run
457 /// use bollard::Docker;
458 ///
459 /// use futures_util::future::TryFutureExt;
460 ///
461 /// let connection = Docker::connect_with_unix_defaults().unwrap();
462 /// connection.ping().map_ok(|_| Ok::<_, ()>(println!("Connected!")));
463 /// ```
connect_with_unix_defaults() -> Result<Docker, Error>464 pub fn connect_with_unix_defaults() -> Result<Docker, Error> {
465 Docker::connect_with_unix(DEFAULT_SOCKET, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
466 }
467
468 /// Connect using a Unix socket.
469 ///
470 /// # Arguments
471 ///
472 /// - `addr`: connection socket path.
473 /// - `timeout`: the read/write timeout (seconds) to use for every hyper connection
474 /// - `client_version`: the client version to communicate with the server.
475 ///
476 /// # Examples
477 ///
478 /// ```rust,no_run
479 /// use bollard::{API_DEFAULT_VERSION, Docker};
480 ///
481 /// use futures_util::future::TryFutureExt;
482 ///
483 /// let connection = Docker::connect_with_unix("/var/run/docker.sock", 120, API_DEFAULT_VERSION).unwrap();
484 /// connection.ping().map_ok(|_| Ok::<_, ()>(println!("Connected!")));
485 /// ```
connect_with_unix( addr: &str, timeout: u64, client_version: &ClientVersion, ) -> Result<Docker, Error>486 pub fn connect_with_unix(
487 addr: &str,
488 timeout: u64,
489 client_version: &ClientVersion,
490 ) -> Result<Docker, Error> {
491 let client_addr = addr.replacen("unix://", "", 1);
492
493 let unix_connector = UnixConnector;
494
495 let mut client_builder = Client::builder();
496 client_builder.pool_max_idle_per_host(0);
497
498 let client = client_builder.build(unix_connector);
499 let transport = Transport::Unix { client };
500 let docker = Docker {
501 transport: Arc::new(transport),
502 client_type: ClientType::Unix,
503 client_addr: client_addr.to_owned(),
504 client_timeout: timeout,
505 version: Arc::new((
506 AtomicUsize::new(client_version.major_version),
507 AtomicUsize::new(client_version.minor_version),
508 )),
509 };
510
511 Ok(docker)
512 }
513 }
514
515 #[cfg(windows)]
516 /// A Docker implementation typed to connect to a Windows Named Pipe, exclusive to the windows
517 /// target.
518 impl Docker {
519 /// Connect using a Windows Named Pipe using defaults that are signalled by environment
520 /// variables.
521 ///
522 /// # Defaults
523 ///
524 /// - The socket location defaults to `//./pipe/docker_engine`.
525 /// - The request timeout defaults to 2 minutes.
526 ///
527 /// # Examples
528 ///
529 /// ```rust,no_run
530 /// use bollard::Docker;
531 ///
532 /// use futures_util::future::TryFutureExt;
533 ///
534 /// let connection = Docker::connect_with_named_pipe_defaults().unwrap();
535 /// connection.ping().map_ok(|_| Ok::<_, ()>(println!("Connected!")));
536 ///
537 /// ```
connect_with_named_pipe_defaults() -> Result<Docker, Error>538 pub fn connect_with_named_pipe_defaults() -> Result<Docker, Error> {
539 Docker::connect_with_named_pipe(DEFAULT_NAMED_PIPE, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
540 }
541
542 /// Connect using a Windows Named Pipe.
543 ///
544 /// # Arguments
545 ///
546 /// - `addr`: socket location.
547 /// - `timeout`: the read/write timeout (seconds) to use for every hyper connection
548 /// - `client_version`: the client version to communicate with the server.
549 ///
550 /// # Examples
551 ///
552 /// ```rust,no_run
553 /// use bollard::{API_DEFAULT_VERSION, Docker};
554 ///
555 /// use futures_util::future::TryFutureExt;
556 ///
557 /// let connection = Docker::connect_with_named_pipe(
558 /// "//./pipe/docker_engine", 120, API_DEFAULT_VERSION).unwrap();
559 /// connection.ping().map_ok(|_| Ok::<_, ()>(println!("Connected!")));
560 ///
561 /// ```
connect_with_named_pipe( addr: &str, timeout: u64, client_version: &ClientVersion, ) -> Result<Docker, Error>562 pub fn connect_with_named_pipe(
563 addr: &str,
564 timeout: u64,
565 client_version: &ClientVersion,
566 ) -> Result<Docker, Error> {
567 let client_addr = addr.replacen("npipe://", "", 1);
568
569 let named_pipe_connector = NamedPipeConnector;
570
571 let mut client_builder = Client::builder();
572 client_builder.pool_max_idle_per_host(0);
573 client_builder.http1_title_case_headers(true);
574 let client = client_builder.build(named_pipe_connector);
575 let transport = Transport::NamedPipe { client };
576 let docker = Docker {
577 transport: Arc::new(transport),
578 client_type: ClientType::NamedPipe,
579 client_addr: client_addr.to_owned(),
580 client_timeout: timeout,
581 version: Arc::new((
582 AtomicUsize::new(client_version.major_version),
583 AtomicUsize::new(client_version.minor_version),
584 )),
585 };
586
587 Ok(docker)
588 }
589 }
590
591 /// A Docker implementation that wraps away which local implementation we are calling.
592 #[cfg(any(unix, windows))]
593 impl Docker {
594 /// Connect using the local machine connection method with default arguments.
595 ///
596 /// This is a simple wrapper over the OS specific handlers:
597 /// * Unix: [`Docker::connect_with_unix_defaults`]
598 /// * Windows: [`Docker::connect_with_named_pipe_defaults`]
599 ///
600 /// [`Docker::connect_with_unix_defaults`]: struct.Docker.html#method.connect_with_unix_defaults
601 /// [`Docker::connect_with_named_pipe_defaults`]: struct.Docker.html#method.connect_with_named_pipe_defaults
connect_with_local_defaults() -> Result<Docker, Error>602 pub fn connect_with_local_defaults() -> Result<Docker, Error> {
603 #[cfg(unix)]
604 return Docker::connect_with_unix_defaults();
605 #[cfg(windows)]
606 return Docker::connect_with_named_pipe_defaults();
607 }
608
609 /// Connect using the local machine connection method with supplied arguments.
610 ///
611 /// This is a simple wrapper over the OS specific handlers:
612 /// * Unix: [`Docker::connect_with_unix`]
613 /// * Windows: [`Docker::connect_with_named_pipe`]
614 ///
615 /// [`Docker::connect_with_unix`]: struct.Docker.html#method.connect_with_unix
616 /// [`Docker::connect_with_named_pipe`]: struct.Docker.html#method.connect_with_named_pipe
connect_with_local( addr: &str, timeout: u64, client_version: &ClientVersion, ) -> Result<Docker, Error>617 pub fn connect_with_local(
618 addr: &str,
619 timeout: u64,
620 client_version: &ClientVersion,
621 ) -> Result<Docker, Error> {
622 #[cfg(unix)]
623 return Docker::connect_with_unix(addr, timeout, client_version);
624 #[cfg(windows)]
625 return Docker::connect_with_named_pipe(addr, timeout, client_version);
626 }
627 }
628
629 /// A Docker implementation typed to connect to a secure HTTPS connection, using the native rust
630 /// TLS library.
631 #[cfg(feature = "tls")]
632 impl Docker {
633 /// Connect using secure HTTPS using native TLS and defaults that are signalled by environment
634 /// variables.
635 ///
636 /// # Defaults
637 ///
638 /// - The connection url is sourced from the `DOCKER_HOST` environment variable.
639 /// - The certificate directory is sourced from the `DOCKER_CERT_PATH` environment variable.
640 /// - Certificate PKCS #12 archive is named `identity.pfx` and the certificate chain is named `ca.pem`.
641 /// - The password for the PKCS #12 archive defaults to an empty password.
642 /// - The number of threads used for the HTTP connection pool defaults to 1.
643 /// - The request timeout defaults to 2 minutes.
644 ///
645 /// # PKCS12
646 ///
647 /// PKCS #12 archives can be created with OpenSSL:
648 ///
649 /// ```bash
650 /// openssl pkcs12 -export -out identity.pfx -inkey key.pem -in cert.pem -certfile
651 /// chain_certs.pem
652 /// ```
653 ///
654 /// # Examples
655 ///
656 /// ```rust,no_run
657 /// use bollard::Docker;
658 ///
659 /// use futures_util::try_future::TryFutureExt;
660 ///
661 /// let connection = Docker::connect_with_tls_defaults().unwrap();
662 /// connection.ping()
663 /// .map_ok(|_| Ok::<_, ()>(println!("Connected!")));
664 /// ```
connect_with_tls_defaults() -> Result<Docker, Error>665 pub fn connect_with_tls_defaults() -> Result<Docker, Error> {
666 let cert_path = default_cert_path()?;
667 if let Ok(ref host) = env::var("DOCKER_HOST") {
668 Docker::connect_with_tls(
669 host,
670 &cert_path.join("identity.pfx"),
671 &cert_path.join("ca.pem"),
672 "",
673 DEFAULT_TIMEOUT,
674 API_DEFAULT_VERSION,
675 )
676 } else {
677 Docker::connect_with_tls(
678 DEFAULT_DOCKER_HOST,
679 &cert_path.join("identity.pfx"),
680 &cert_path.join("ca.pem"),
681 "",
682 DEFAULT_TIMEOUT,
683 API_DEFAULT_VERSION,
684 )
685 }
686 }
687
688 /// Connect using secure HTTPS using native TLS.
689 ///
690 /// # Arguments
691 ///
692 /// - `addr`: the connection url.
693 /// - `pkcs12_file`: the PKCS #12 archive.
694 /// - `ca_file`: the certificate chain.
695 /// - `pkcs12_password`: the password to the PKCS #12 archive.
696 /// - `timeout`: the read/write timeout (seconds) to use for every hyper connection
697 /// - `client_version`: the client version to communicate with the server.
698 ///
699 /// # PKCS12
700 ///
701 /// PKCS #12 archives can be created with OpenSSL:
702 ///
703 /// ```bash
704 /// openssl pkcs12 -export -out identity.pfx -inkey key.pem -in cert.pem -certfile
705 /// chain_certs.pem
706 /// ```
707 ///
708 /// # Examples
709 ///
710 /// ```rust,no_run
711 /// use bollard::Docker;
712 ///
713 /// use std::path::Path;
714 ///
715 /// use futures_util::try_future::TryFutureExt;
716 ///
717 /// let connection = Docker::connect_with_tls(
718 /// "localhost:2375",
719 /// Path::new("/certs/identity.pfx"),
720 /// Path::new("/certs/ca.pem"),
721 /// "my_secret_password",
722 /// 1,
723 /// 120
724 /// ).unwrap();
725 /// connection.ping()
726 /// .map_ok(|_| Ok::<_, ()>(println!("Connected!")));
727 /// ```
connect_with_tls( addr: &str, pkcs12_file: &Path, ca_file: &Path, pkcs12_password: &str, timeout: u64, client_version: &ClientVersion, ) -> Result<Docker, Error>728 pub fn connect_with_tls(
729 addr: &str,
730 pkcs12_file: &Path,
731 ca_file: &Path,
732 pkcs12_password: &str,
733 timeout: u64,
734 client_version: &ClientVersion,
735 ) -> Result<Docker, Error> {
736 let client_addr = addr.replacen("tcp://", "", 1);
737
738 let mut tls_connector_builder = TlsConnector::builder();
739
740 use crate::errors::ErrorKind;
741 use std::fs::File;
742 use std::io::Read;
743 let mut file = File::open(pkcs12_file)?;
744 let mut buf = vec![];
745 file.read_to_end(&mut buf)?;
746 let identity = Identity::from_pkcs12(&buf, pkcs12_password)
747 .map_err(|err| ErrorKind::TLSError { err })?;
748
749 let mut file = File::open(ca_file)?;
750 let mut buf = vec![];
751 file.read_to_end(&mut buf)?;
752 let ca = Certificate::from_pem(&buf).map_err(|err| ErrorKind::TLSError { err })?;
753
754 let tls_connector_builder = tls_connector_builder.identity(identity);
755 tls_connector_builder.add_root_certificate(ca);
756
757 let mut http_connector = HttpConnector::new();
758 http_connector.enforce_http(false);
759
760 let tls_connector = tls_connector_builder
761 .build()
762 .map_err(|err| ErrorKind::TLSError { err })?;
763 let https_connector: hyper_tls::HttpsConnector<HttpConnector> =
764 hyper_tls::HttpsConnector::from((http_connector, tls_connector.into()));
765
766 let client_builder = Client::builder();
767 let client = client_builder.build(https_connector);
768 let transport = Transport::Tls { client };
769 let docker = Docker {
770 transport: Arc::new(transport),
771 client_type: ClientType::SSL,
772 client_addr: client_addr.to_owned(),
773 client_timeout: timeout,
774 version: Arc::new((
775 AtomicUsize::new(client_version.major_version),
776 AtomicUsize::new(client_version.minor_version),
777 )),
778 };
779
780 Ok(docker)
781 }
782 }
783
784 // The implementation block for Docker requests
785 impl Docker {
process_into_value<T>( &self, req: Result<Request<Body>, Error>, ) -> impl Future<Output = Result<T, Error>> where T: DeserializeOwned,786 pub(crate) fn process_into_value<T>(
787 &self,
788 req: Result<Request<Body>, Error>,
789 ) -> impl Future<Output = Result<T, Error>>
790 where
791 T: DeserializeOwned,
792 {
793 let fut = self.process_request(req);
794 async move {
795 let response = fut.await?;
796 Docker::decode_response(response).await
797 }
798 }
799
process_into_stream<T>( &self, req: Result<Request<Body>, Error>, ) -> impl Stream<Item = Result<T, Error>> + Unpin where T: DeserializeOwned,800 pub(crate) fn process_into_stream<T>(
801 &self,
802 req: Result<Request<Body>, Error>,
803 ) -> impl Stream<Item = Result<T, Error>> + Unpin
804 where
805 T: DeserializeOwned,
806 {
807 Box::pin(
808 self.process_request(req)
809 .map_ok(Docker::decode_into_stream::<T>)
810 .into_stream()
811 .try_flatten(),
812 )
813 }
814
process_into_stream_string( &self, req: Result<Request<Body>, Error>, ) -> impl Stream<Item = Result<LogOutput, Error>> + Unpin815 pub(crate) fn process_into_stream_string(
816 &self,
817 req: Result<Request<Body>, Error>,
818 ) -> impl Stream<Item = Result<LogOutput, Error>> + Unpin {
819 Box::pin(
820 self.process_request(req)
821 .map_ok(Docker::decode_into_stream_string)
822 .try_flatten_stream(),
823 )
824 }
825
process_into_unit( &self, req: Result<Request<Body>, Error>, ) -> impl Future<Output = Result<(), Error>>826 pub(crate) fn process_into_unit(
827 &self,
828 req: Result<Request<Body>, Error>,
829 ) -> impl Future<Output = Result<(), Error>> {
830 let fut = self.process_request(req);
831 async move {
832 fut.await?;
833 Ok(())
834 }
835 }
836
process_into_body( &self, req: Result<Request<Body>, Error>, ) -> impl Stream<Item = Result<Bytes, Error>> + Unpin837 pub(crate) fn process_into_body(
838 &self,
839 req: Result<Request<Body>, Error>,
840 ) -> impl Stream<Item = Result<Bytes, Error>> + Unpin {
841 Box::pin(
842 self.process_request(req)
843 .map_ok(|response| {
844 response
845 .into_body()
846 .map_err::<Error, _>(|e: hyper::Error| HyperResponseError { err: e }.into())
847 })
848 .into_stream()
849 .try_flatten(),
850 )
851 }
852
process_upgraded_stream_string<'a>( &self, req: Result<Request<Body>, Error>, ) -> impl Stream<Item = Result<LogOutput, Error>>853 pub(crate) fn process_upgraded_stream_string<'a>(
854 &self,
855 req: Result<Request<Body>, Error>,
856 ) -> impl Stream<Item = Result<LogOutput, Error>> {
857 let fut = self.process_request(req);
858 stream::once(async move { fut.await.map(Docker::decode_into_upgraded_stream_string) })
859 .try_flatten()
860 }
861
transpose_option<T>( option: Option<Result<T, Error>>, ) -> Result<Option<T>, Error>862 pub(crate) fn transpose_option<T>(
863 option: Option<Result<T, Error>>,
864 ) -> Result<Option<T>, Error> {
865 option.transpose()
866 }
867
serialize_payload<S>(body: Option<S>) -> Result<Body, Error> where S: Serialize,868 pub(crate) fn serialize_payload<S>(body: Option<S>) -> Result<Body, Error>
869 where
870 S: Serialize,
871 {
872 match body.map(|inst| serde_json::to_string(&inst)) {
873 Some(Ok(res)) => Ok(Some(res)),
874 Some(Err(e)) => Err(e),
875 None => Ok(None),
876 }
877 .map_err(|e| JsonSerializeError { err: e }.into())
878 .map(|payload| {
879 debug!("{}", payload.clone().unwrap_or_else(String::new));
880 payload
881 .map(|content| content.into())
882 .unwrap_or(Body::empty())
883 })
884 }
885
886 /// Return the currently set client version.
client_version(&self) -> ClientVersion887 pub fn client_version(&self) -> ClientVersion {
888 self.version.as_ref().into()
889 }
890
891 /// Check with the server for a supported version, and downgrade the client version if
892 /// appropriate.
893 ///
894 /// # Examples:
895 ///
896 /// ```rust,norun
897 /// use bollard::Docker;
898 ///
899 /// let docker = Docker::connect_with_http_defaults().unwrap();
900 /// async move {
901 /// &docker.negotiate_version().await.unwrap().version();
902 /// };
903 /// ```
negotiate_version(self) -> Result<Self, Error>904 pub async fn negotiate_version(self) -> Result<Self, Error> {
905 let req = self.build_request::<_, String, String>(
906 "/version",
907 Builder::new().method(Method::GET),
908 Ok(None::<ArrayVec<[(_, _); 0]>>),
909 Ok(Body::empty()),
910 );
911
912 let res = self.process_into_value::<Version>(req).await?;
913
914 let err_api_version = res.api_version.clone();
915 let server_version: ClientVersion = match res.api_version.into() {
916 MaybeClientVersion::Some(client_version) => client_version,
917 MaybeClientVersion::None => {
918 return Err(APIVersionParseError {
919 api_version: err_api_version,
920 }
921 .into())
922 }
923 };
924 if server_version < self.client_version() {
925 self.version
926 .0
927 .store(server_version.major_version, Ordering::Relaxed);
928 self.version
929 .1
930 .store(server_version.minor_version, Ordering::Relaxed);
931 }
932 Ok(self)
933 }
934
process_request( &self, request: Result<Request<Body>, Error>, ) -> impl Future<Output = Result<Response<Body>, Error>>935 fn process_request(
936 &self,
937 request: Result<Request<Body>, Error>,
938 ) -> impl Future<Output = Result<Response<Body>, Error>> {
939 let transport = self.transport.clone();
940 let timeout = self.client_timeout;
941
942 async move {
943 let request = request?;
944 let response = Docker::execute_request(transport, request, timeout).await?;
945
946 let status = response.status();
947 match status {
948 // Status code 200 - 299
949 s if s.is_success() => Ok(response),
950
951 StatusCode::SWITCHING_PROTOCOLS => Ok(response),
952
953 // Status code 304: Not Modified
954 StatusCode::NOT_MODIFIED => {
955 let message = Docker::decode_into_string(response).await?;
956 Err(DockerResponseNotModifiedError { message }.into())
957 }
958
959 // Status code 409: Conflict
960 StatusCode::CONFLICT => {
961 let message = Docker::decode_into_string(response).await?;
962 Err(DockerResponseConflictError { message }.into())
963 }
964
965 // Status code 400: Bad request
966 StatusCode::BAD_REQUEST => {
967 let message = Docker::decode_into_string(response).await?;
968 Err(DockerResponseBadParameterError { message }.into())
969 }
970
971 // Status code 404: Not Found
972 StatusCode::NOT_FOUND => {
973 let message = Docker::decode_into_string(response).await?;
974 Err(DockerResponseNotFoundError { message }.into())
975 }
976
977 // All other status codes
978 _ => {
979 let message = Docker::decode_into_string(response).await?;
980 Err(DockerResponseServerError {
981 status_code: status.as_u16(),
982 message,
983 }
984 .into())
985 }
986 }
987 }
988 }
989
build_request<O, K, V>( &self, path: &str, builder: Builder, query: Result<Option<O>, Error>, payload: Result<Body, Error>, ) -> Result<Request<Body>, Error> where O: IntoIterator, O::Item: ::std::borrow::Borrow<(K, V)>, K: AsRef<str>, V: AsRef<str>,990 pub(crate) fn build_request<O, K, V>(
991 &self,
992 path: &str,
993 builder: Builder,
994 query: Result<Option<O>, Error>,
995 payload: Result<Body, Error>,
996 ) -> Result<Request<Body>, Error>
997 where
998 O: IntoIterator,
999 O::Item: ::std::borrow::Borrow<(K, V)>,
1000 K: AsRef<str>,
1001 V: AsRef<str>,
1002 {
1003 query
1004 .and_then(|q| payload.map(|body| (q, body)))
1005 .and_then(|(q, body)| {
1006 let uri = Uri::parse(
1007 &self.client_addr,
1008 &self.client_type,
1009 path,
1010 q,
1011 &self.client_version(),
1012 )?;
1013 let request_uri: hyper::Uri = uri.into();
1014 let builder_string = format!("{:?}", builder);
1015 Ok(builder
1016 .uri(request_uri)
1017 .header(CONTENT_TYPE, "application/json")
1018 .body(body)
1019 .map_err::<Error, _>(|e| {
1020 HttpClientError {
1021 builder: builder_string,
1022 err: e,
1023 }
1024 .into()
1025 })?)
1026 })
1027 }
1028
execute_request( transport: Arc<Transport>, req: Request<Body>, timeout: u64, ) -> Result<Response<Body>, Error>1029 async fn execute_request(
1030 transport: Arc<Transport>,
1031 req: Request<Body>,
1032 timeout: u64,
1033 ) -> Result<Response<Body>, Error> {
1034 // This is where we determine to which transport we issue the request.
1035 let request = match *transport {
1036 Transport::Http { ref client } => client.request(req),
1037 #[cfg(feature = "ssl")]
1038 Transport::Https { ref client } => client.request(req),
1039 #[cfg(feature = "tls")]
1040 Transport::Tls { ref client } => client.request(req),
1041 #[cfg(unix)]
1042 Transport::Unix { ref client } => client.request(req),
1043 #[cfg(windows)]
1044 Transport::NamedPipe { ref client } => client.request(req),
1045 };
1046
1047 match tokio::time::timeout(Duration::from_secs(timeout), request).await {
1048 Ok(v) => v.map_err(|err| HyperResponseError { err }.into()),
1049 Err(_) => Err(RequestTimeoutError.into()),
1050 }
1051 }
1052
decode_into_stream<T>(res: Response<Body>) -> impl Stream<Item = Result<T, Error>> where T: DeserializeOwned,1053 fn decode_into_stream<T>(res: Response<Body>) -> impl Stream<Item = Result<T, Error>>
1054 where
1055 T: DeserializeOwned,
1056 {
1057 FramedRead::new(
1058 StreamReader::new(
1059 res.into_body()
1060 .map_err::<Error, _>(|e: hyper::Error| HyperResponseError { err: e }.into()),
1061 ),
1062 JsonLineDecoder::new(),
1063 )
1064 }
1065
decode_into_stream_string( res: Response<Body>, ) -> impl Stream<Item = Result<LogOutput, Error>>1066 fn decode_into_stream_string(
1067 res: Response<Body>,
1068 ) -> impl Stream<Item = Result<LogOutput, Error>> {
1069 FramedRead::new(
1070 StreamReader::new(
1071 res.into_body()
1072 .map_err::<Error, _>(|e: hyper::Error| HyperResponseError { err: e }.into()),
1073 ),
1074 NewlineLogOutputDecoder::new(),
1075 )
1076 }
1077
decode_into_upgraded_stream_string( res: Response<Body>, ) -> impl Stream<Item = Result<LogOutput, Error>>1078 fn decode_into_upgraded_stream_string(
1079 res: Response<Body>,
1080 ) -> impl Stream<Item = Result<LogOutput, Error>> {
1081 res.into_body()
1082 .on_upgrade()
1083 .into_stream()
1084 .map_ok(|r| FramedRead::new(r, NewlineLogOutputDecoder::new()))
1085 .map_err::<Error, _>(|e| HyperResponseError { err: e }.into())
1086 .try_flatten()
1087 }
1088
decode_into_string(response: Response<Body>) -> Result<String, Error>1089 async fn decode_into_string(response: Response<Body>) -> Result<String, Error> {
1090 let body = hyper::body::to_bytes(response.into_body())
1091 .await
1092 .map_err(|e| HyperResponseError { err: e })?;
1093
1094 from_utf8(&body).map(|x| x.to_owned()).map_err(|e| {
1095 StrParseError {
1096 content: hex::encode(body.to_owned()),
1097 err: e,
1098 }
1099 .into()
1100 })
1101 }
1102
decode_response<T>(response: Response<Body>) -> Result<T, Error> where T: DeserializeOwned,1103 async fn decode_response<T>(response: Response<Body>) -> Result<T, Error>
1104 where
1105 T: DeserializeOwned,
1106 {
1107 let contents = Docker::decode_into_string(response).await?;
1108
1109 debug!("Decoded into string: {}", &contents);
1110 serde_json::from_str::<T>(&contents).map_err(|e| {
1111 if e.is_data() {
1112 JsonDataError {
1113 message: e.to_string(),
1114 column: e.column(),
1115 contents: contents.to_owned(),
1116 }
1117 .into()
1118 } else {
1119 JsonDeserializeError {
1120 content: contents.to_owned(),
1121 err: e,
1122 }
1123 .into()
1124 }
1125 })
1126 }
1127 }
1128