1 use std::cmp;
2 use std::env;
3 use std::fmt;
4 use std::future::Future;
5 #[cfg(any(feature = "ssl", feature = "tls"))]
6 use std::path::{Path, PathBuf};
7 use std::str::from_utf8;
8 use std::sync::atomic::AtomicUsize;
9 use std::sync::atomic::Ordering;
10 use std::sync::Arc;
11 use std::time::Duration;
12 
13 use arrayvec::ArrayVec;
14 use futures_core::Stream;
15 use futures_util::future::FutureExt;
16 use futures_util::future::TryFutureExt;
17 use futures_util::stream;
18 use futures_util::stream::TryStreamExt;
19 use http::header::CONTENT_TYPE;
20 use http::request::Builder;
21 use hyper::client::HttpConnector;
22 use hyper::{self, body::Bytes, Body, Client, Method, Request, Response, StatusCode};
23 #[cfg(feature = "ssl")]
24 use hyper_openssl::HttpsConnector;
25 #[cfg(feature = "tls")]
26 use hyper_tls;
27 #[cfg(unix)]
28 use hyperlocal::UnixClient as UnixConnector;
29 #[cfg(feature = "tls")]
30 use native_tls::{Certificate, Identity, TlsConnector};
31 #[cfg(feature = "ssl")]
32 use openssl::ssl::{SslConnector, SslFiletype, SslMethod};
33 use tokio_util::codec::FramedRead;
34 
35 use crate::container::LogOutput;
36 use crate::errors::Error;
37 use crate::errors::ErrorKind::{
38     APIVersionParseError, DockerResponseBadParameterError, DockerResponseConflictError,
39     DockerResponseNotFoundError, DockerResponseNotModifiedError, DockerResponseServerError,
40     HttpClientError, HyperResponseError, JsonDataError, JsonDeserializeError, JsonSerializeError,
41     RequestTimeoutError, StrParseError,
42 };
43 #[cfg(any(feature = "ssl", feature = "tls"))]
44 use crate::errors::ErrorKind::NoCertPathError;
45 #[cfg(feature = "ssl")]
46 use crate::errors::ErrorKind::SSLError;
47 #[cfg(windows)]
48 use crate::named_pipe::NamedPipeConnector;
49 use crate::read::{JsonLineDecoder, NewlineLogOutputDecoder, StreamReader};
50 use crate::system::Version;
51 use crate::uri::Uri;
52 
53 use serde::de::DeserializeOwned;
54 use serde::ser::Serialize;
55 use serde_json;
56 
57 /// The default `DOCKER_SOCKET` address that we will try to connect to.
58 #[cfg(unix)]
59 pub const DEFAULT_SOCKET: &'static str = "unix:///var/run/docker.sock";
60 
61 /// The default `DOCKER_NAMED_PIPE` address that a windows client will try to connect to.
62 #[cfg(windows)]
63 pub const DEFAULT_NAMED_PIPE: &'static str = "npipe:////./pipe/docker_engine";
64 
65 /// The default `DOCKER_HOST` address that we will try to connect to.
66 pub const DEFAULT_DOCKER_HOST: &'static str = "tcp://localhost:2375";
67 
68 /// Default timeout for all requests is 2 minutes.
69 const DEFAULT_TIMEOUT: u64 = 120;
70 
71 /// Default Client Version to communicate with the server.
72 pub const API_DEFAULT_VERSION: &'static ClientVersion = &ClientVersion {
73     major_version: 1,
74     minor_version: 40,
75 };
76 
77 pub(crate) const TRUE_STR: &'static str = "true";
78 pub(crate) const FALSE_STR: &'static str = "false";
79 
80 /// The default directory in which to look for our Docker certificate
81 /// files.
82 #[cfg(any(feature = "ssl", feature = "tls"))]
default_cert_path() -> Result<PathBuf, Error>83 pub fn default_cert_path() -> Result<PathBuf, Error> {
84     let from_env = env::var("DOCKER_CERT_PATH").or_else(|_| env::var("DOCKER_CONFIG"));
85     if let Ok(ref path) = from_env {
86         Ok(Path::new(path).to_owned())
87     } else {
88         let home = dirs::home_dir().ok_or_else(|| NoCertPathError)?;
89         Ok(home.join(".docker"))
90     }
91 }
92 
93 #[derive(Debug, Clone)]
94 pub(crate) enum ClientType {
95     #[cfg(unix)]
96     Unix,
97     Http,
98     #[cfg(any(feature = "ssl", feature = "tls"))]
99     SSL,
100     #[cfg(windows)]
101     NamedPipe,
102 }
103 
104 /// Transport is the type representing the means of communication
105 /// with the Docker daemon.
106 ///
107 /// Each transport usually encapsulate a hyper client
108 /// with various Connect traits fulfilled.
109 pub(crate) enum Transport {
110     Http {
111         client: Client<HttpConnector>,
112     },
113     #[cfg(feature = "ssl")]
114     Https {
115         client: Client<HttpsConnector<HttpConnector>>,
116     },
117     #[cfg(feature = "tls")]
118     Tls {
119         client: Client<hyper_tls::HttpsConnector<HttpConnector>>,
120     },
121     #[cfg(unix)]
122     Unix {
123         client: Client<UnixConnector>,
124     },
125     #[cfg(windows)]
126     NamedPipe {
127         client: Client<NamedPipeConnector>,
128     },
129 }
130 
131 impl fmt::Debug for Transport {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result132     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
133         match self {
134             Transport::Http { .. } => write!(f, "HTTP"),
135             #[cfg(feature = "ssl")]
136             Transport::Https { .. } => write!(f, "HTTPS(openssl)"),
137             #[cfg(feature = "tls")]
138             Transport::Tls { .. } => write!(f, "HTTPS(native)"),
139             #[cfg(unix)]
140             Transport::Unix { .. } => write!(f, "Unix"),
141             #[cfg(windows)]
142             Transport::NamedPipe { .. } => write!(f, "NamedPipe"),
143         }
144     }
145 }
146 
147 #[derive(Debug, Copy, Clone, PartialEq)]
148 /// Advisory version stub to use for communicating with the Server. The docker server will error if
149 /// a higher client version is used than is compatible with the server. Beware also, that the
150 /// docker server will return stubs for a higher version than the version set when communicating.
151 ///
152 /// See also [negotiate_version](struct.Docker.html#method.negotiate_version), and the `client_version` argument when instantiating the
153 /// [Docker](struct.Docker.html) client instance.
154 pub struct ClientVersion {
155     /// The major version number.
156     pub major_version: usize,
157     /// The minor version number.
158     pub minor_version: usize,
159 }
160 
161 pub(crate) enum MaybeClientVersion {
162     Some(ClientVersion),
163     None,
164 }
165 
166 impl From<String> for MaybeClientVersion {
from(s: String) -> MaybeClientVersion167     fn from(s: String) -> MaybeClientVersion {
168         match s
169             .split(".")
170             .map(|v| v.parse::<usize>())
171             .collect::<Vec<Result<usize, std::num::ParseIntError>>>()
172             .as_slice()
173         {
174             [Ok(first), Ok(second)] => MaybeClientVersion::Some(ClientVersion {
175                 major_version: first.to_owned(),
176                 minor_version: second.to_owned(),
177             }),
178             _ => MaybeClientVersion::None,
179         }
180     }
181 }
182 
183 impl fmt::Display for ClientVersion {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result184     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
185         write!(f, "{}.{}", self.major_version, self.minor_version)
186     }
187 }
188 
189 impl PartialOrd for ClientVersion {
partial_cmp(&self, other: &Self) -> Option<cmp::Ordering>190     fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
191         match self.major_version.partial_cmp(&other.major_version) {
192             Some(cmp::Ordering::Equal) => self.minor_version.partial_cmp(&other.minor_version),
193             res => res,
194         }
195     }
196 }
197 
198 impl From<&(AtomicUsize, AtomicUsize)> for ClientVersion {
from(tpl: &(AtomicUsize, AtomicUsize)) -> ClientVersion199     fn from(tpl: &(AtomicUsize, AtomicUsize)) -> ClientVersion {
200         ClientVersion {
201             major_version: tpl.0.load(Ordering::Relaxed),
202             minor_version: tpl.1.load(Ordering::Relaxed),
203         }
204     }
205 }
206 
207 #[derive(Debug)]
208 /// ---
209 ///
210 /// # Docker
211 ///
212 /// The main interface for calling the Docker API. Construct a new Docker instance using one of the
213 /// connect methods:
214 ///  - [`Docker::connect_with_http_defaults`](struct.Docker.html#method.connect_with_http_defaults)
215 ///  - [`Docker::connect_with_named_pipe_defaults`](struct.Docker.html#method.connect_with_pipe_defaults)
216 ///  - [`Docker::connect_with_ssl_defaults`](struct.Docker.html#method.connect_with_ssl_defaults)
217 ///  - [`Docker::connect_with_unix_defaults`](struct.Docker.html#method.connect_with_unix_defaults)
218 ///  - [`Docker::connect_with_tls_defaults`](struct.Docker.html#method.connect_with_tls_defaults)
219 ///  - [`Docker::connect_with_local_defaults`](struct.Docker.html#method.connect_with_local_defaults)
220 pub struct Docker {
221     pub(crate) transport: Arc<Transport>,
222     pub(crate) client_type: ClientType,
223     pub(crate) client_addr: String,
224     pub(crate) client_timeout: u64,
225     pub(crate) version: Arc<(AtomicUsize, AtomicUsize)>,
226 }
227 
228 impl Clone for Docker {
clone(&self) -> Docker229     fn clone(&self) -> Docker {
230         Docker {
231             transport: self.transport.clone(),
232             client_type: self.client_type.clone(),
233             client_addr: self.client_addr.clone(),
234             client_timeout: self.client_timeout,
235             version: self.version.clone(),
236         }
237     }
238 }
239 
240 #[cfg(feature = "ssl")]
241 /// A Docker implementation typed to connect to a secure HTTPS connection using the `openssl`
242 /// library.
243 impl Docker {
244     /// Connect using secure HTTPS using defaults that are signalled by environment variables.
245     ///
246     /// # Defaults
247     ///
248     ///  - The connection url is sourced from the `DOCKER_HOST` environment variable.
249     ///  - The certificate directory is sourced from the `DOCKER_CERT_PATH` environment variable.
250     ///  - Certificates are named `key.pem`, `cert.pem` and `ca.pem` to indicate the private key,
251     ///  the server certificate and the certificate chain respectively.
252     ///  - The number of threads used for the HTTP connection pool defaults to 1.
253     ///  - The request timeout defaults to 2 minutes.
254     ///
255     /// # Examples
256     ///
257     /// ```rust,no_run
258     /// use bollard::Docker;
259     ///
260     /// use futures_util::try_future::TryFutureExt;
261     ///
262     /// let connection = Docker::connect_with_ssl_defaults().unwrap();
263     /// connection.ping()
264     ///   .map_ok(|_| Ok::<_, ()>(println!("Connected!")));
265     /// ```
connect_with_ssl_defaults() -> Result<Docker, Error>266     pub fn connect_with_ssl_defaults() -> Result<Docker, Error> {
267         let cert_path = default_cert_path()?;
268         if let Ok(ref host) = env::var("DOCKER_HOST") {
269             Docker::connect_with_ssl(
270                 host,
271                 &cert_path.join("key.pem"),
272                 &cert_path.join("cert.pem"),
273                 &cert_path.join("ca.pem"),
274                 DEFAULT_TIMEOUT,
275                 API_DEFAULT_VERSION,
276             )
277         } else {
278             Docker::connect_with_ssl(
279                 DEFAULT_DOCKER_HOST,
280                 &cert_path.join("key.pem"),
281                 &cert_path.join("cert.pem"),
282                 &cert_path.join("ca.pem"),
283                 DEFAULT_TIMEOUT,
284                 API_DEFAULT_VERSION,
285             )
286         }
287     }
288 
289     /// Connect using secure HTTPS.
290     ///
291     /// # Arguments
292     ///
293     ///  - `addr`: the connection url.
294     ///  - `ssl_key`: the private key path.
295     ///  - `ssl_cert`: the server certificate path.
296     ///  - `ssl_ca`: the certificate chain path.
297     ///  - `timeout`: the read/write timeout (seconds) to use for every hyper connection
298     ///  - `client_version`: the client version to communicate with the server.
299     ///
300     /// # Examples
301     ///
302     /// ```rust,no_run
303     /// use bollard::Docker;
304     ///
305     /// use std::path::Path;
306     ///
307     /// use futures_util::try_future::TryFutureExt;
308     ///
309     /// let connection = Docker::connect_with_ssl(
310     ///     "localhost:2375",
311     ///     Path::new("/certs/key.pem"),
312     ///     Path::new("/certs/cert.pem"),
313     ///     Path::new("/certs/ca.pem"),
314     ///     1,
315     ///     120).unwrap();
316     /// connection.ping()
317     ///   .map_ok(|_| Ok::<_, ()>(println!("Connected!")));
318     /// ```
connect_with_ssl( addr: &str, ssl_key: &Path, ssl_cert: &Path, ssl_ca: &Path, timeout: u64, client_version: &ClientVersion, ) -> Result<Docker, Error>319     pub fn connect_with_ssl(
320         addr: &str,
321         ssl_key: &Path,
322         ssl_cert: &Path,
323         ssl_ca: &Path,
324         timeout: u64,
325         client_version: &ClientVersion,
326     ) -> Result<Docker, Error> {
327         // This ensures that using docker-machine-esque addresses work with Hyper.
328         let client_addr = addr.replacen("tcp://", "", 1);
329 
330         let mut ssl_connector_builder = SslConnector::builder(SslMethod::tls())
331             .map_err::<Error, _>(|e| SSLError { err: e }.into())?;
332 
333         ssl_connector_builder
334             .set_ca_file(ssl_ca)
335             .map_err::<Error, _>(|e| SSLError { err: e }.into())?;
336         ssl_connector_builder
337             .set_certificate_file(ssl_cert, SslFiletype::PEM)
338             .map_err::<Error, _>(|e| SSLError { err: e }.into())?;
339         ssl_connector_builder
340             .set_private_key_file(ssl_key, SslFiletype::PEM)
341             .map_err::<Error, _>(|e| SSLError { err: e }.into())?;
342 
343         let mut http_connector = HttpConnector::new();
344         http_connector.enforce_http(false);
345 
346         let https_connector: HttpsConnector<HttpConnector> =
347             HttpsConnector::with_connector(http_connector, ssl_connector_builder)
348                 .map_err::<Error, _>(|e| SSLError { err: e }.into())?;
349 
350         let client_builder = Client::builder();
351         let client = client_builder.build(https_connector);
352         let transport = Transport::Https { client };
353         let docker = Docker {
354             transport: Arc::new(transport),
355             client_type: ClientType::SSL,
356             client_addr: client_addr.to_owned(),
357             client_timeout: timeout,
358             version: Arc::new((
359                 AtomicUsize::new(client_version.major_version),
360                 AtomicUsize::new(client_version.minor_version),
361             )),
362         };
363 
364         Ok(docker)
365     }
366 }
367 
368 /// A Docker implementation typed to connect to an unsecure Http connection.
369 impl Docker {
370     /// Connect using unsecured HTTP using defaults that are signalled by environment variables.
371     ///
372     /// # Defaults
373     ///
374     ///  - The connection url is sourced from the `DOCKER_HOST` environment variable, and defaults
375     ///  to `localhost:2375`.
376     ///  - The number of threads used for the HTTP connection pool defaults to 1.
377     ///  - The request timeout defaults to 2 minutes.
378     ///
379     /// # Examples
380     ///
381     /// ```rust,no_run
382     /// use bollard::Docker;
383     ///
384     /// use futures_util::future::TryFutureExt;
385     ///
386     /// let connection = Docker::connect_with_http_defaults().unwrap();
387     /// connection.ping()
388     ///   .map_ok(|_| Ok::<_, ()>(println!("Connected!")));
389     /// ```
connect_with_http_defaults() -> Result<Docker, Error>390     pub fn connect_with_http_defaults() -> Result<Docker, Error> {
391         let host = env::var("DOCKER_HOST").unwrap_or(DEFAULT_DOCKER_HOST.to_string());
392         Docker::connect_with_http(&host, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
393     }
394 
395     /// Connect using unsecured HTTP.
396     ///
397     /// # Arguments
398     ///
399     ///  - `addr`: connection url including scheme and port.
400     ///  - `timeout`: the read/write timeout (seconds) to use for every hyper connection
401     ///  - `client_version`: the client version to communicate with the server.
402     ///
403     /// # Examples
404     ///
405     /// ```rust,no_run
406     /// use bollard::{API_DEFAULT_VERSION, Docker};
407     ///
408     /// use futures_util::future::TryFutureExt;
409     ///
410     /// let connection = Docker::connect_with_http(
411     ///                    "http://my-custom-docker-server:2735", 4, API_DEFAULT_VERSION)
412     ///                    .unwrap();
413     /// connection.ping()
414     ///   .map_ok(|_| Ok::<_, ()>(println!("Connected!")));
415     /// ```
connect_with_http( addr: &str, timeout: u64, client_version: &ClientVersion, ) -> Result<Docker, Error>416     pub fn connect_with_http(
417         addr: &str,
418         timeout: u64,
419         client_version: &ClientVersion,
420     ) -> Result<Docker, Error> {
421         // This ensures that using docker-machine-esque addresses work with Hyper.
422         let client_addr = addr.replacen("tcp://", "", 1);
423 
424         let http_connector = HttpConnector::new();
425 
426         let client_builder = Client::builder();
427         let client = client_builder.build(http_connector);
428         let transport = Transport::Http { client };
429         let docker = Docker {
430             transport: Arc::new(transport),
431             client_type: ClientType::Http,
432             client_addr: client_addr.to_owned(),
433             client_timeout: timeout,
434             version: Arc::new((
435                 AtomicUsize::new(client_version.major_version),
436                 AtomicUsize::new(client_version.minor_version),
437             )),
438         };
439 
440         Ok(docker)
441     }
442 }
443 
444 #[cfg(unix)]
445 /// A Docker implementation typed to connect to a Unix socket.
446 impl Docker {
447     /// Connect using a Unix socket using defaults that are signalled by environment variables.
448     ///
449     /// # Defaults
450     ///
451     ///  - The socket location defaults to `/var/run/docker.sock`.
452     ///  - The request timeout defaults to 2 minutes.
453     ///
454     /// # Examples
455     ///
456     /// ```rust,no_run
457     /// use bollard::Docker;
458     ///
459     /// use futures_util::future::TryFutureExt;
460     ///
461     /// let connection = Docker::connect_with_unix_defaults().unwrap();
462     /// connection.ping().map_ok(|_| Ok::<_, ()>(println!("Connected!")));
463     /// ```
connect_with_unix_defaults() -> Result<Docker, Error>464     pub fn connect_with_unix_defaults() -> Result<Docker, Error> {
465         Docker::connect_with_unix(DEFAULT_SOCKET, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
466     }
467 
468     /// Connect using a Unix socket.
469     ///
470     /// # Arguments
471     ///
472     ///  - `addr`: connection socket path.
473     ///  - `timeout`: the read/write timeout (seconds) to use for every hyper connection
474     ///  - `client_version`: the client version to communicate with the server.
475     ///
476     /// # Examples
477     ///
478     /// ```rust,no_run
479     /// use bollard::{API_DEFAULT_VERSION, Docker};
480     ///
481     /// use futures_util::future::TryFutureExt;
482     ///
483     /// let connection = Docker::connect_with_unix("/var/run/docker.sock", 120, API_DEFAULT_VERSION).unwrap();
484     /// connection.ping().map_ok(|_| Ok::<_, ()>(println!("Connected!")));
485     /// ```
connect_with_unix( addr: &str, timeout: u64, client_version: &ClientVersion, ) -> Result<Docker, Error>486     pub fn connect_with_unix(
487         addr: &str,
488         timeout: u64,
489         client_version: &ClientVersion,
490     ) -> Result<Docker, Error> {
491         let client_addr = addr.replacen("unix://", "", 1);
492 
493         let unix_connector = UnixConnector;
494 
495         let mut client_builder = Client::builder();
496         client_builder.pool_max_idle_per_host(0);
497 
498         let client = client_builder.build(unix_connector);
499         let transport = Transport::Unix { client };
500         let docker = Docker {
501             transport: Arc::new(transport),
502             client_type: ClientType::Unix,
503             client_addr: client_addr.to_owned(),
504             client_timeout: timeout,
505             version: Arc::new((
506                 AtomicUsize::new(client_version.major_version),
507                 AtomicUsize::new(client_version.minor_version),
508             )),
509         };
510 
511         Ok(docker)
512     }
513 }
514 
515 #[cfg(windows)]
516 /// A Docker implementation typed to connect to a Windows Named Pipe, exclusive to the windows
517 /// target.
518 impl Docker {
519     /// Connect using a Windows Named Pipe using defaults that are signalled by environment
520     /// variables.
521     ///
522     /// # Defaults
523     ///
524     ///  - The socket location defaults to `//./pipe/docker_engine`.
525     ///  - The request timeout defaults to 2 minutes.
526     ///
527     /// # Examples
528     ///
529     /// ```rust,no_run
530     /// use bollard::Docker;
531     ///
532     /// use futures_util::future::TryFutureExt;
533     ///
534     /// let connection = Docker::connect_with_named_pipe_defaults().unwrap();
535     /// connection.ping().map_ok(|_| Ok::<_, ()>(println!("Connected!")));
536     ///
537     /// ```
connect_with_named_pipe_defaults() -> Result<Docker, Error>538     pub fn connect_with_named_pipe_defaults() -> Result<Docker, Error> {
539         Docker::connect_with_named_pipe(DEFAULT_NAMED_PIPE, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
540     }
541 
542     /// Connect using a Windows Named Pipe.
543     ///
544     /// # Arguments
545     ///
546     ///  - `addr`: socket location.
547     ///  - `timeout`: the read/write timeout (seconds) to use for every hyper connection
548     ///  - `client_version`: the client version to communicate with the server.
549     ///
550     /// # Examples
551     ///
552     /// ```rust,no_run
553     /// use bollard::{API_DEFAULT_VERSION, Docker};
554     ///
555     /// use futures_util::future::TryFutureExt;
556     ///
557     /// let connection = Docker::connect_with_named_pipe(
558     ///     "//./pipe/docker_engine", 120, API_DEFAULT_VERSION).unwrap();
559     /// connection.ping().map_ok(|_| Ok::<_, ()>(println!("Connected!")));
560     ///
561     /// ```
connect_with_named_pipe( addr: &str, timeout: u64, client_version: &ClientVersion, ) -> Result<Docker, Error>562     pub fn connect_with_named_pipe(
563         addr: &str,
564         timeout: u64,
565         client_version: &ClientVersion,
566     ) -> Result<Docker, Error> {
567         let client_addr = addr.replacen("npipe://", "", 1);
568 
569         let named_pipe_connector = NamedPipeConnector;
570 
571         let mut client_builder = Client::builder();
572         client_builder.pool_max_idle_per_host(0);
573         client_builder.http1_title_case_headers(true);
574         let client = client_builder.build(named_pipe_connector);
575         let transport = Transport::NamedPipe { client };
576         let docker = Docker {
577             transport: Arc::new(transport),
578             client_type: ClientType::NamedPipe,
579             client_addr: client_addr.to_owned(),
580             client_timeout: timeout,
581             version: Arc::new((
582                 AtomicUsize::new(client_version.major_version),
583                 AtomicUsize::new(client_version.minor_version),
584             )),
585         };
586 
587         Ok(docker)
588     }
589 }
590 
591 /// A Docker implementation that wraps away which local implementation we are calling.
592 #[cfg(any(unix, windows))]
593 impl Docker {
594     /// Connect using the local machine connection method with default arguments.
595     ///
596     /// This is a simple wrapper over the OS specific handlers:
597     ///  * Unix: [`Docker::connect_with_unix_defaults`]
598     ///  * Windows: [`Docker::connect_with_named_pipe_defaults`]
599     ///
600     /// [`Docker::connect_with_unix_defaults`]: struct.Docker.html#method.connect_with_unix_defaults
601     /// [`Docker::connect_with_named_pipe_defaults`]: struct.Docker.html#method.connect_with_named_pipe_defaults
connect_with_local_defaults() -> Result<Docker, Error>602     pub fn connect_with_local_defaults() -> Result<Docker, Error> {
603         #[cfg(unix)]
604         return Docker::connect_with_unix_defaults();
605         #[cfg(windows)]
606         return Docker::connect_with_named_pipe_defaults();
607     }
608 
609     /// Connect using the local machine connection method with supplied arguments.
610     ///
611     /// This is a simple wrapper over the OS specific handlers:
612     ///  * Unix: [`Docker::connect_with_unix`]
613     ///  * Windows: [`Docker::connect_with_named_pipe`]
614     ///
615     /// [`Docker::connect_with_unix`]: struct.Docker.html#method.connect_with_unix
616     /// [`Docker::connect_with_named_pipe`]: struct.Docker.html#method.connect_with_named_pipe
connect_with_local( addr: &str, timeout: u64, client_version: &ClientVersion, ) -> Result<Docker, Error>617     pub fn connect_with_local(
618         addr: &str,
619         timeout: u64,
620         client_version: &ClientVersion,
621     ) -> Result<Docker, Error> {
622         #[cfg(unix)]
623         return Docker::connect_with_unix(addr, timeout, client_version);
624         #[cfg(windows)]
625         return Docker::connect_with_named_pipe(addr, timeout, client_version);
626     }
627 }
628 
629 /// A Docker implementation typed to connect to a secure HTTPS connection, using the native rust
630 /// TLS library.
631 #[cfg(feature = "tls")]
632 impl Docker {
633     /// Connect using secure HTTPS using native TLS and defaults that are signalled by environment
634     /// variables.
635     ///
636     /// # Defaults
637     ///
638     ///  - The connection url is sourced from the `DOCKER_HOST` environment variable.
639     ///  - The certificate directory is sourced from the `DOCKER_CERT_PATH` environment variable.
640     ///  - Certificate PKCS #12 archive is named `identity.pfx` and the certificate chain is named `ca.pem`.
641     ///  - The password for the PKCS #12 archive defaults to an empty password.
642     ///  - The number of threads used for the HTTP connection pool defaults to 1.
643     ///  - The request timeout defaults to 2 minutes.
644     ///
645     ///  # PKCS12
646     ///
647     ///  PKCS #12 archives can be created with OpenSSL:
648     ///
649     ///  ```bash
650     ///  openssl pkcs12 -export -out identity.pfx -inkey key.pem -in cert.pem -certfile
651     ///  chain_certs.pem
652     ///  ```
653     ///
654     /// # Examples
655     ///
656     /// ```rust,no_run
657     /// use bollard::Docker;
658     ///
659     /// use futures_util::try_future::TryFutureExt;
660     ///
661     /// let connection = Docker::connect_with_tls_defaults().unwrap();
662     /// connection.ping()
663     ///   .map_ok(|_| Ok::<_, ()>(println!("Connected!")));
664     /// ```
connect_with_tls_defaults() -> Result<Docker, Error>665     pub fn connect_with_tls_defaults() -> Result<Docker, Error> {
666         let cert_path = default_cert_path()?;
667         if let Ok(ref host) = env::var("DOCKER_HOST") {
668             Docker::connect_with_tls(
669                 host,
670                 &cert_path.join("identity.pfx"),
671                 &cert_path.join("ca.pem"),
672                 "",
673                 DEFAULT_TIMEOUT,
674                 API_DEFAULT_VERSION,
675             )
676         } else {
677             Docker::connect_with_tls(
678                 DEFAULT_DOCKER_HOST,
679                 &cert_path.join("identity.pfx"),
680                 &cert_path.join("ca.pem"),
681                 "",
682                 DEFAULT_TIMEOUT,
683                 API_DEFAULT_VERSION,
684             )
685         }
686     }
687 
688     /// Connect using secure HTTPS using native TLS.
689     ///
690     /// # Arguments
691     ///
692     ///  - `addr`: the connection url.
693     ///  - `pkcs12_file`: the PKCS #12 archive.
694     ///  - `ca_file`: the certificate chain.
695     ///  - `pkcs12_password`: the password to the PKCS #12 archive.
696     ///  - `timeout`: the read/write timeout (seconds) to use for every hyper connection
697     ///  - `client_version`: the client version to communicate with the server.
698     ///
699     ///  # PKCS12
700     ///
701     ///  PKCS #12 archives can be created with OpenSSL:
702     ///
703     ///  ```bash
704     ///  openssl pkcs12 -export -out identity.pfx -inkey key.pem -in cert.pem -certfile
705     ///  chain_certs.pem
706     ///  ```
707     ///
708     /// # Examples
709     ///
710     /// ```rust,no_run
711     /// use bollard::Docker;
712     ///
713     /// use std::path::Path;
714     ///
715     /// use futures_util::try_future::TryFutureExt;
716     ///
717     /// let connection = Docker::connect_with_tls(
718     ///     "localhost:2375",
719     ///     Path::new("/certs/identity.pfx"),
720     ///     Path::new("/certs/ca.pem"),
721     ///     "my_secret_password",
722     ///     1,
723     ///     120
724     /// ).unwrap();
725     /// connection.ping()
726     ///   .map_ok(|_| Ok::<_, ()>(println!("Connected!")));
727     /// ```
connect_with_tls( addr: &str, pkcs12_file: &Path, ca_file: &Path, pkcs12_password: &str, timeout: u64, client_version: &ClientVersion, ) -> Result<Docker, Error>728     pub fn connect_with_tls(
729         addr: &str,
730         pkcs12_file: &Path,
731         ca_file: &Path,
732         pkcs12_password: &str,
733         timeout: u64,
734         client_version: &ClientVersion,
735     ) -> Result<Docker, Error> {
736         let client_addr = addr.replacen("tcp://", "", 1);
737 
738         let mut tls_connector_builder = TlsConnector::builder();
739 
740         use crate::errors::ErrorKind;
741         use std::fs::File;
742         use std::io::Read;
743         let mut file = File::open(pkcs12_file)?;
744         let mut buf = vec![];
745         file.read_to_end(&mut buf)?;
746         let identity = Identity::from_pkcs12(&buf, pkcs12_password)
747             .map_err(|err| ErrorKind::TLSError { err })?;
748 
749         let mut file = File::open(ca_file)?;
750         let mut buf = vec![];
751         file.read_to_end(&mut buf)?;
752         let ca = Certificate::from_pem(&buf).map_err(|err| ErrorKind::TLSError { err })?;
753 
754         let tls_connector_builder = tls_connector_builder.identity(identity);
755         tls_connector_builder.add_root_certificate(ca);
756 
757         let mut http_connector = HttpConnector::new();
758         http_connector.enforce_http(false);
759 
760         let tls_connector = tls_connector_builder
761             .build()
762             .map_err(|err| ErrorKind::TLSError { err })?;
763         let https_connector: hyper_tls::HttpsConnector<HttpConnector> =
764             hyper_tls::HttpsConnector::from((http_connector, tls_connector.into()));
765 
766         let client_builder = Client::builder();
767         let client = client_builder.build(https_connector);
768         let transport = Transport::Tls { client };
769         let docker = Docker {
770             transport: Arc::new(transport),
771             client_type: ClientType::SSL,
772             client_addr: client_addr.to_owned(),
773             client_timeout: timeout,
774             version: Arc::new((
775                 AtomicUsize::new(client_version.major_version),
776                 AtomicUsize::new(client_version.minor_version),
777             )),
778         };
779 
780         Ok(docker)
781     }
782 }
783 
784 // The implementation block for Docker requests
785 impl Docker {
process_into_value<T>( &self, req: Result<Request<Body>, Error>, ) -> impl Future<Output = Result<T, Error>> where T: DeserializeOwned,786     pub(crate) fn process_into_value<T>(
787         &self,
788         req: Result<Request<Body>, Error>,
789     ) -> impl Future<Output = Result<T, Error>>
790     where
791         T: DeserializeOwned,
792     {
793         let fut = self.process_request(req);
794         async move {
795             let response = fut.await?;
796             Docker::decode_response(response).await
797         }
798     }
799 
process_into_stream<T>( &self, req: Result<Request<Body>, Error>, ) -> impl Stream<Item = Result<T, Error>> + Unpin where T: DeserializeOwned,800     pub(crate) fn process_into_stream<T>(
801         &self,
802         req: Result<Request<Body>, Error>,
803     ) -> impl Stream<Item = Result<T, Error>> + Unpin
804     where
805         T: DeserializeOwned,
806     {
807         Box::pin(
808             self.process_request(req)
809                 .map_ok(Docker::decode_into_stream::<T>)
810                 .into_stream()
811                 .try_flatten(),
812         )
813     }
814 
process_into_stream_string( &self, req: Result<Request<Body>, Error>, ) -> impl Stream<Item = Result<LogOutput, Error>> + Unpin815     pub(crate) fn process_into_stream_string(
816         &self,
817         req: Result<Request<Body>, Error>,
818     ) -> impl Stream<Item = Result<LogOutput, Error>> + Unpin {
819         Box::pin(
820             self.process_request(req)
821                 .map_ok(Docker::decode_into_stream_string)
822                 .try_flatten_stream(),
823         )
824     }
825 
process_into_unit( &self, req: Result<Request<Body>, Error>, ) -> impl Future<Output = Result<(), Error>>826     pub(crate) fn process_into_unit(
827         &self,
828         req: Result<Request<Body>, Error>,
829     ) -> impl Future<Output = Result<(), Error>> {
830         let fut = self.process_request(req);
831         async move {
832             fut.await?;
833             Ok(())
834         }
835     }
836 
process_into_body( &self, req: Result<Request<Body>, Error>, ) -> impl Stream<Item = Result<Bytes, Error>> + Unpin837     pub(crate) fn process_into_body(
838         &self,
839         req: Result<Request<Body>, Error>,
840     ) -> impl Stream<Item = Result<Bytes, Error>> + Unpin {
841         Box::pin(
842             self.process_request(req)
843                 .map_ok(|response| {
844                     response
845                         .into_body()
846                         .map_err::<Error, _>(|e: hyper::Error| HyperResponseError { err: e }.into())
847                 })
848                 .into_stream()
849                 .try_flatten(),
850         )
851     }
852 
process_upgraded_stream_string<'a>( &self, req: Result<Request<Body>, Error>, ) -> impl Stream<Item = Result<LogOutput, Error>>853     pub(crate) fn process_upgraded_stream_string<'a>(
854         &self,
855         req: Result<Request<Body>, Error>,
856     ) -> impl Stream<Item = Result<LogOutput, Error>> {
857         let fut = self.process_request(req);
858         stream::once(async move { fut.await.map(Docker::decode_into_upgraded_stream_string) })
859             .try_flatten()
860     }
861 
transpose_option<T>( option: Option<Result<T, Error>>, ) -> Result<Option<T>, Error>862     pub(crate) fn transpose_option<T>(
863         option: Option<Result<T, Error>>,
864     ) -> Result<Option<T>, Error> {
865         option.transpose()
866     }
867 
serialize_payload<S>(body: Option<S>) -> Result<Body, Error> where S: Serialize,868     pub(crate) fn serialize_payload<S>(body: Option<S>) -> Result<Body, Error>
869     where
870         S: Serialize,
871     {
872         match body.map(|inst| serde_json::to_string(&inst)) {
873             Some(Ok(res)) => Ok(Some(res)),
874             Some(Err(e)) => Err(e),
875             None => Ok(None),
876         }
877         .map_err(|e| JsonSerializeError { err: e }.into())
878         .map(|payload| {
879             debug!("{}", payload.clone().unwrap_or_else(String::new));
880             payload
881                 .map(|content| content.into())
882                 .unwrap_or(Body::empty())
883         })
884     }
885 
886     /// Return the currently set client version.
client_version(&self) -> ClientVersion887     pub fn client_version(&self) -> ClientVersion {
888         self.version.as_ref().into()
889     }
890 
891     /// Check with the server for a supported version, and downgrade the client version if
892     /// appropriate.
893     ///
894     /// # Examples:
895     ///
896     /// ```rust,norun
897     ///     use bollard::Docker;
898     ///
899     ///     let docker = Docker::connect_with_http_defaults().unwrap();
900     ///     async move {
901     ///         &docker.negotiate_version().await.unwrap().version();
902     ///     };
903     /// ```
negotiate_version(self) -> Result<Self, Error>904     pub async fn negotiate_version(self) -> Result<Self, Error> {
905         let req = self.build_request::<_, String, String>(
906             "/version",
907             Builder::new().method(Method::GET),
908             Ok(None::<ArrayVec<[(_, _); 0]>>),
909             Ok(Body::empty()),
910         );
911 
912         let res = self.process_into_value::<Version>(req).await?;
913 
914         let err_api_version = res.api_version.clone();
915         let server_version: ClientVersion = match res.api_version.into() {
916             MaybeClientVersion::Some(client_version) => client_version,
917             MaybeClientVersion::None => {
918                 return Err(APIVersionParseError {
919                     api_version: err_api_version,
920                 }
921                 .into())
922             }
923         };
924         if server_version < self.client_version() {
925             self.version
926                 .0
927                 .store(server_version.major_version, Ordering::Relaxed);
928             self.version
929                 .1
930                 .store(server_version.minor_version, Ordering::Relaxed);
931         }
932         Ok(self)
933     }
934 
process_request( &self, request: Result<Request<Body>, Error>, ) -> impl Future<Output = Result<Response<Body>, Error>>935     fn process_request(
936         &self,
937         request: Result<Request<Body>, Error>,
938     ) -> impl Future<Output = Result<Response<Body>, Error>> {
939         let transport = self.transport.clone();
940         let timeout = self.client_timeout;
941 
942         async move {
943             let request = request?;
944             let response = Docker::execute_request(transport, request, timeout).await?;
945 
946             let status = response.status();
947             match status {
948                 // Status code 200 - 299
949                 s if s.is_success() => Ok(response),
950 
951                 StatusCode::SWITCHING_PROTOCOLS => Ok(response),
952 
953                 // Status code 304: Not Modified
954                 StatusCode::NOT_MODIFIED => {
955                     let message = Docker::decode_into_string(response).await?;
956                     Err(DockerResponseNotModifiedError { message }.into())
957                 }
958 
959                 // Status code 409: Conflict
960                 StatusCode::CONFLICT => {
961                     let message = Docker::decode_into_string(response).await?;
962                     Err(DockerResponseConflictError { message }.into())
963                 }
964 
965                 // Status code 400: Bad request
966                 StatusCode::BAD_REQUEST => {
967                     let message = Docker::decode_into_string(response).await?;
968                     Err(DockerResponseBadParameterError { message }.into())
969                 }
970 
971                 // Status code 404: Not Found
972                 StatusCode::NOT_FOUND => {
973                     let message = Docker::decode_into_string(response).await?;
974                     Err(DockerResponseNotFoundError { message }.into())
975                 }
976 
977                 // All other status codes
978                 _ => {
979                     let message = Docker::decode_into_string(response).await?;
980                     Err(DockerResponseServerError {
981                         status_code: status.as_u16(),
982                         message,
983                     }
984                     .into())
985                 }
986             }
987         }
988     }
989 
build_request<O, K, V>( &self, path: &str, builder: Builder, query: Result<Option<O>, Error>, payload: Result<Body, Error>, ) -> Result<Request<Body>, Error> where O: IntoIterator, O::Item: ::std::borrow::Borrow<(K, V)>, K: AsRef<str>, V: AsRef<str>,990     pub(crate) fn build_request<O, K, V>(
991         &self,
992         path: &str,
993         builder: Builder,
994         query: Result<Option<O>, Error>,
995         payload: Result<Body, Error>,
996     ) -> Result<Request<Body>, Error>
997     where
998         O: IntoIterator,
999         O::Item: ::std::borrow::Borrow<(K, V)>,
1000         K: AsRef<str>,
1001         V: AsRef<str>,
1002     {
1003         query
1004             .and_then(|q| payload.map(|body| (q, body)))
1005             .and_then(|(q, body)| {
1006                 let uri = Uri::parse(
1007                     &self.client_addr,
1008                     &self.client_type,
1009                     path,
1010                     q,
1011                     &self.client_version(),
1012                 )?;
1013                 let request_uri: hyper::Uri = uri.into();
1014                 let builder_string = format!("{:?}", builder);
1015                 Ok(builder
1016                     .uri(request_uri)
1017                     .header(CONTENT_TYPE, "application/json")
1018                     .body(body)
1019                     .map_err::<Error, _>(|e| {
1020                         HttpClientError {
1021                             builder: builder_string,
1022                             err: e,
1023                         }
1024                         .into()
1025                     })?)
1026             })
1027     }
1028 
execute_request( transport: Arc<Transport>, req: Request<Body>, timeout: u64, ) -> Result<Response<Body>, Error>1029     async fn execute_request(
1030         transport: Arc<Transport>,
1031         req: Request<Body>,
1032         timeout: u64,
1033     ) -> Result<Response<Body>, Error> {
1034         // This is where we determine to which transport we issue the request.
1035         let request = match *transport {
1036             Transport::Http { ref client } => client.request(req),
1037             #[cfg(feature = "ssl")]
1038             Transport::Https { ref client } => client.request(req),
1039             #[cfg(feature = "tls")]
1040             Transport::Tls { ref client } => client.request(req),
1041             #[cfg(unix)]
1042             Transport::Unix { ref client } => client.request(req),
1043             #[cfg(windows)]
1044             Transport::NamedPipe { ref client } => client.request(req),
1045         };
1046 
1047         match tokio::time::timeout(Duration::from_secs(timeout), request).await {
1048             Ok(v) => v.map_err(|err| HyperResponseError { err }.into()),
1049             Err(_) => Err(RequestTimeoutError.into()),
1050         }
1051     }
1052 
decode_into_stream<T>(res: Response<Body>) -> impl Stream<Item = Result<T, Error>> where T: DeserializeOwned,1053     fn decode_into_stream<T>(res: Response<Body>) -> impl Stream<Item = Result<T, Error>>
1054     where
1055         T: DeserializeOwned,
1056     {
1057         FramedRead::new(
1058             StreamReader::new(
1059                 res.into_body()
1060                     .map_err::<Error, _>(|e: hyper::Error| HyperResponseError { err: e }.into()),
1061             ),
1062             JsonLineDecoder::new(),
1063         )
1064     }
1065 
decode_into_stream_string( res: Response<Body>, ) -> impl Stream<Item = Result<LogOutput, Error>>1066     fn decode_into_stream_string(
1067         res: Response<Body>,
1068     ) -> impl Stream<Item = Result<LogOutput, Error>> {
1069         FramedRead::new(
1070             StreamReader::new(
1071                 res.into_body()
1072                     .map_err::<Error, _>(|e: hyper::Error| HyperResponseError { err: e }.into()),
1073             ),
1074             NewlineLogOutputDecoder::new(),
1075         )
1076     }
1077 
decode_into_upgraded_stream_string( res: Response<Body>, ) -> impl Stream<Item = Result<LogOutput, Error>>1078     fn decode_into_upgraded_stream_string(
1079         res: Response<Body>,
1080     ) -> impl Stream<Item = Result<LogOutput, Error>> {
1081         res.into_body()
1082             .on_upgrade()
1083             .into_stream()
1084             .map_ok(|r| FramedRead::new(r, NewlineLogOutputDecoder::new()))
1085             .map_err::<Error, _>(|e| HyperResponseError { err: e }.into())
1086             .try_flatten()
1087     }
1088 
decode_into_string(response: Response<Body>) -> Result<String, Error>1089     async fn decode_into_string(response: Response<Body>) -> Result<String, Error> {
1090         let body = hyper::body::to_bytes(response.into_body())
1091             .await
1092             .map_err(|e| HyperResponseError { err: e })?;
1093 
1094         from_utf8(&body).map(|x| x.to_owned()).map_err(|e| {
1095             StrParseError {
1096                 content: hex::encode(body.to_owned()),
1097                 err: e,
1098             }
1099             .into()
1100         })
1101     }
1102 
decode_response<T>(response: Response<Body>) -> Result<T, Error> where T: DeserializeOwned,1103     async fn decode_response<T>(response: Response<Body>) -> Result<T, Error>
1104     where
1105         T: DeserializeOwned,
1106     {
1107         let contents = Docker::decode_into_string(response).await?;
1108 
1109         debug!("Decoded into string: {}", &contents);
1110         serde_json::from_str::<T>(&contents).map_err(|e| {
1111             if e.is_data() {
1112                 JsonDataError {
1113                     message: e.to_string(),
1114                     column: e.column(),
1115                     contents: contents.to_owned(),
1116                 }
1117                 .into()
1118             } else {
1119                 JsonDeserializeError {
1120                     content: contents.to_owned(),
1121                     err: e,
1122                 }
1123                 .into()
1124             }
1125         })
1126     }
1127 }
1128