1 mod bind; 2 mod raw; 3 mod stmt; 4 mod url; 5 6 use self::raw::RawConnection; 7 use self::stmt::Statement; 8 use self::url::ConnectionOptions; 9 use super::backend::Mysql; 10 use super::bind_collector::MysqlBindCollector; 11 use connection::*; 12 use deserialize::{Queryable, QueryableByName}; 13 use query_builder::*; 14 use result::*; 15 use sql_types::HasSqlType; 16 17 #[allow(missing_debug_implementations, missing_copy_implementations)] 18 /// A connection to a MySQL database. Connection URLs should be in the form 19 /// `mysql://[user[:password]@]host/database_name` 20 pub struct MysqlConnection { 21 raw_connection: RawConnection, 22 transaction_manager: AnsiTransactionManager, 23 statement_cache: StatementCache<Mysql, Statement>, 24 } 25 26 unsafe impl Send for MysqlConnection {} 27 28 impl SimpleConnection for MysqlConnection { batch_execute(&self, query: &str) -> QueryResult<()>29 fn batch_execute(&self, query: &str) -> QueryResult<()> { 30 self.raw_connection 31 .enable_multi_statements(|| self.raw_connection.execute(query)) 32 } 33 } 34 35 impl Connection for MysqlConnection { 36 type Backend = Mysql; 37 type TransactionManager = AnsiTransactionManager; 38 establish(database_url: &str) -> ConnectionResult<Self>39 fn establish(database_url: &str) -> ConnectionResult<Self> { 40 use result::ConnectionError::CouldntSetupConfiguration; 41 42 let raw_connection = RawConnection::new(); 43 let connection_options = ConnectionOptions::parse(database_url)?; 44 raw_connection.connect(&connection_options)?; 45 let conn = MysqlConnection { 46 raw_connection: raw_connection, 47 transaction_manager: AnsiTransactionManager::new(), 48 statement_cache: StatementCache::new(), 49 }; 50 conn.set_config_options() 51 .map_err(CouldntSetupConfiguration)?; 52 Ok(conn) 53 } 54 55 #[doc(hidden)] execute(&self, query: &str) -> QueryResult<usize>56 fn execute(&self, query: &str) -> QueryResult<usize> { 57 self.raw_connection 58 .execute(query) 59 .map(|_| self.raw_connection.affected_rows()) 60 } 61 62 #[doc(hidden)] query_by_index<T, U>(&self, source: T) -> QueryResult<Vec<U>> where T: AsQuery, T::Query: QueryFragment<Self::Backend> + QueryId, Self::Backend: HasSqlType<T::SqlType>, U: Queryable<T::SqlType, Self::Backend>,63 fn query_by_index<T, U>(&self, source: T) -> QueryResult<Vec<U>> 64 where 65 T: AsQuery, 66 T::Query: QueryFragment<Self::Backend> + QueryId, 67 Self::Backend: HasSqlType<T::SqlType>, 68 U: Queryable<T::SqlType, Self::Backend>, 69 { 70 use deserialize::FromSqlRow; 71 use result::Error::DeserializationError; 72 73 let mut stmt = self.prepare_query(&source.as_query())?; 74 let mut metadata = Vec::new(); 75 Mysql::mysql_row_metadata(&mut metadata, &()); 76 let results = unsafe { stmt.results(metadata)? }; 77 results.map(|mut row| { 78 U::Row::build_from_row(&mut row) 79 .map(U::build) 80 .map_err(DeserializationError) 81 }) 82 } 83 84 #[doc(hidden)] query_by_name<T, U>(&self, source: &T) -> QueryResult<Vec<U>> where T: QueryFragment<Self::Backend> + QueryId, U: QueryableByName<Self::Backend>,85 fn query_by_name<T, U>(&self, source: &T) -> QueryResult<Vec<U>> 86 where 87 T: QueryFragment<Self::Backend> + QueryId, 88 U: QueryableByName<Self::Backend>, 89 { 90 use result::Error::DeserializationError; 91 92 let mut stmt = self.prepare_query(source)?; 93 let results = unsafe { stmt.named_results()? }; 94 results.map(|row| U::build(&row).map_err(DeserializationError)) 95 } 96 97 #[doc(hidden)] execute_returning_count<T>(&self, source: &T) -> QueryResult<usize> where T: QueryFragment<Self::Backend> + QueryId,98 fn execute_returning_count<T>(&self, source: &T) -> QueryResult<usize> 99 where 100 T: QueryFragment<Self::Backend> + QueryId, 101 { 102 let stmt = self.prepare_query(source)?; 103 unsafe { 104 stmt.execute()?; 105 } 106 Ok(stmt.affected_rows()) 107 } 108 109 #[doc(hidden)] transaction_manager(&self) -> &Self::TransactionManager110 fn transaction_manager(&self) -> &Self::TransactionManager { 111 &self.transaction_manager 112 } 113 } 114 115 impl MysqlConnection { prepare_query<T>(&self, source: &T) -> QueryResult<MaybeCached<Statement>> where T: QueryFragment<Mysql> + QueryId,116 fn prepare_query<T>(&self, source: &T) -> QueryResult<MaybeCached<Statement>> 117 where 118 T: QueryFragment<Mysql> + QueryId, 119 { 120 let mut stmt = self 121 .statement_cache 122 .cached_statement(source, &[], |sql| self.raw_connection.prepare(sql))?; 123 let mut bind_collector = MysqlBindCollector::new(); 124 source.collect_binds(&mut bind_collector, &())?; 125 stmt.bind(bind_collector.binds)?; 126 Ok(stmt) 127 } 128 set_config_options(&self) -> QueryResult<()>129 fn set_config_options(&self) -> QueryResult<()> { 130 self.execute("SET sql_mode=(SELECT CONCAT(@@sql_mode, ',PIPES_AS_CONCAT'))")?; 131 self.execute("SET time_zone = '+00:00';")?; 132 self.execute("SET character_set_client = 'utf8mb4'")?; 133 self.execute("SET character_set_connection = 'utf8mb4'")?; 134 self.execute("SET character_set_results = 'utf8mb4'")?; 135 Ok(()) 136 } 137 } 138 139 #[cfg(test)] 140 mod tests { 141 extern crate dotenv; 142 143 use super::*; 144 use std::env; 145 connection() -> MysqlConnection146 fn connection() -> MysqlConnection { 147 let _ = dotenv::dotenv(); 148 let database_url = env::var("MYSQL_UNIT_TEST_DATABASE_URL") 149 .or_else(|_| env::var("MYSQL_DATABASE_URL")) 150 .or_else(|_| env::var("DATABASE_URL")) 151 .expect("DATABASE_URL must be set in order to run unit tests"); 152 MysqlConnection::establish(&database_url).unwrap() 153 } 154 155 #[test] batch_execute_handles_single_queries_with_results()156 fn batch_execute_handles_single_queries_with_results() { 157 let connection = connection(); 158 assert!(connection.batch_execute("SELECT 1").is_ok()); 159 assert!(connection.batch_execute("SELECT 1").is_ok()); 160 } 161 162 #[test] batch_execute_handles_multi_queries_with_results()163 fn batch_execute_handles_multi_queries_with_results() { 164 let connection = connection(); 165 let query = "SELECT 1; SELECT 2; SELECT 3;"; 166 assert!(connection.batch_execute(query).is_ok()); 167 assert!(connection.batch_execute(query).is_ok()); 168 } 169 170 #[test] execute_handles_queries_which_return_results()171 fn execute_handles_queries_which_return_results() { 172 let connection = connection(); 173 assert!(connection.execute("SELECT 1").is_ok()); 174 assert!(connection.execute("SELECT 1").is_ok()); 175 } 176 } 177