1 /*********************************************************************/ 2 // dar - disk archive - a backup/restoration program 3 // Copyright (C) 2002-2052 Denis Corbin 4 // 5 // This program is free software; you can redistribute it and/or 6 // modify it under the terms of the GNU General Public License 7 // as published by the Free Software Foundation; either version 2 8 // of the License, or (at your option) any later version. 9 // 10 // This program is distributed in the hope that it will be useful, 11 // but WITHOUT ANY WARRANTY; without even the implied warranty of 12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 // GNU General Public License for more details. 14 // 15 // You should have received a copy of the GNU General Public License 16 // along with this program; if not, write to the Free Software 17 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. 18 // 19 // to contact the author : http://dar.linux.free.fr/email.html 20 /*********************************************************************/ 21 22 #include "../my_config.h" 23 24 extern "C" 25 { 26 #if HAVE_STRING_H 27 #include <string.h> 28 #endif 29 } // end extern "C" 30 31 #include "generic_thread.hpp" 32 33 using namespace std; 34 35 namespace libdar 36 { generic_thread(generic_file * x_ptr,U_I data_block_size,U_I data_num_block,U_I ctrl_block_size,U_I ctrl_num_block)37 generic_thread::generic_thread(generic_file * x_ptr, 38 U_I data_block_size, 39 U_I data_num_block, 40 U_I ctrl_block_size, 41 U_I ctrl_num_block): 42 generic_file(gf_read_only), 43 toslave_data(data_num_block, data_block_size), 44 tomaster_data(data_num_block, data_block_size), 45 toslave_ctrl(ctrl_num_block, ctrl_block_size), 46 tomaster_ctrl(ctrl_num_block, ctrl_block_size) 47 { 48 unsigned int tmp = sizeof(data_header); 49 50 if(tmp < sizeof(char)) 51 throw SRC_BUG; 52 53 if(x_ptr == nullptr) 54 throw SRC_BUG; 55 set_mode(x_ptr->get_mode()); 56 reached_eof = false; 57 running = false; 58 59 order.clear(); 60 order.set_type(msg_type::data_partial); 61 order.reset_get_block(); 62 if(!order.get_block(&data_header, tmp)) 63 throw SRC_BUG; // data header larger than one byte ! 64 65 order.set_type(msg_type::data_completed); 66 order.reset_get_block(); 67 if(!order.get_block(&data_header_eof, tmp)) 68 throw SRC_BUG; 69 70 remote = new (get_pool()) slave_thread(x_ptr, 71 &toslave_data, 72 &tomaster_data, 73 &toslave_ctrl, 74 &tomaster_ctrl); 75 if(remote == nullptr) 76 throw Ememory("generic_thread::generic_thread"); 77 try 78 { 79 my_run(); // launching the new thread 80 } 81 catch(...) 82 { 83 if(remote != nullptr) 84 delete remote; 85 throw; 86 } 87 } 88 generic_thread(const generic_thread & ref)89 generic_thread::generic_thread(const generic_thread & ref): 90 generic_file (gf_read_only), 91 toslave_data(2, 10), 92 tomaster_data(2, 10), 93 toslave_ctrl(2, 10), 94 tomaster_ctrl(2, 10) 95 { 96 throw SRC_BUG; 97 } 98 99 ~generic_thread()100 generic_thread::~generic_thread() 101 { 102 if(remote != nullptr) 103 { 104 if(!is_terminated()) 105 terminate(); 106 delete remote; 107 remote = nullptr; 108 } 109 } 110 skippable(skippability direction,const infinint & amount)111 bool generic_thread::skippable(skippability direction, const infinint & amount) 112 { 113 bool ret; 114 115 // rerun the thread if an exception has occured previously 116 my_run(); 117 118 // preparing the order message 119 120 order.clear(); 121 switch(direction) 122 { 123 case generic_file::skip_backward: 124 order.set_type(msg_type::order_skippable_fwd); 125 break; 126 case generic_file::skip_forward: 127 order.set_type(msg_type::order_skippable_bkd); 128 break; 129 default: 130 throw SRC_BUG; 131 } 132 order.set_infinint(amount); 133 134 // order completed 135 136 send_order(); 137 read_answer(); 138 check_answer(msg_type::answr_skippable); 139 ret = answer.get_bool(); 140 release_block_answer(); 141 142 return ret; 143 } 144 skip(const infinint & pos)145 bool generic_thread::skip(const infinint & pos) 146 { 147 bool ret; 148 reached_eof = false; 149 150 // rerun the thread if an exception has occured previously 151 my_run(); 152 153 // preparing the order message 154 155 order.clear(); 156 order.set_type(msg_type::order_skip); 157 order.set_infinint(pos); 158 159 // order completed 160 161 send_order(); 162 read_answer(); 163 check_answer(msg_type::answr_skip_done); // this flushes all read ahead data up before skip occured 164 ret = answer.get_bool(); 165 release_block_answer(); 166 167 purge_data_pipe(); 168 169 return ret; 170 } 171 skip_to_eof()172 bool generic_thread::skip_to_eof() 173 { 174 bool ret; 175 176 // rerun the thread if an exception has occured previously 177 my_run(); 178 179 // preparing the order message 180 181 order.clear(); 182 order.set_type(msg_type::order_skip_to_eof); 183 184 // order completed 185 186 send_order(); 187 read_answer(); 188 check_answer(msg_type::answr_skip_done); 189 ret = answer.get_bool(); 190 release_block_answer(); 191 reached_eof = ret; 192 193 purge_data_pipe(); 194 195 return ret; 196 } 197 skip_relative(S_I x)198 bool generic_thread::skip_relative(S_I x) 199 { 200 U_I val; 201 bool ret; 202 203 // rerun the thread if an exception has occured previously 204 my_run(); 205 206 // preparing the order message 207 208 order.clear(); 209 if(x >= 0) 210 { 211 val = x; 212 order.set_type(msg_type::order_skip_fwd); 213 order.set_U_I(val); 214 } 215 else // x < 0 216 { 217 reached_eof = false; 218 val = -x; 219 order.set_type(msg_type::order_skip_bkd); 220 order.set_U_I(val); 221 } 222 223 // order completed 224 225 send_order(); 226 read_answer(); 227 check_answer(msg_type::answr_skip_done); 228 ret = answer.get_bool(); 229 release_block_answer(); 230 231 purge_data_pipe(); 232 233 return ret; 234 } 235 get_position() const236 infinint generic_thread::get_position() const 237 { 238 infinint ret; 239 240 // rerun the thread if an exception has occured previously 241 my_run(); 242 243 // preparing the order message 244 245 order.clear(); 246 order.set_type(msg_type::order_get_position); 247 248 // order completed 249 250 send_order(); 251 read_answer(); 252 check_answer(msg_type::answr_position); 253 ret = answer.get_infinint(); 254 release_block_answer(); 255 256 return ret; 257 } 258 inherited_read_ahead(const infinint & amount)259 void generic_thread::inherited_read_ahead(const infinint & amount) 260 { 261 262 // rerun the thread if an exception has occured previously 263 my_run(); 264 265 // preparing the order message 266 267 order.clear(); 268 order.set_type(msg_type::order_read_ahead); 269 order.set_infinint(amount); 270 271 // order completed 272 273 send_order(); 274 } 275 inherited_read(char * a,U_I size)276 U_I generic_thread::inherited_read(char *a, U_I size) 277 { 278 U_I read = 0; 279 U_I min; 280 char *data_ptr = nullptr; 281 unsigned int data_num; 282 283 284 // rerun the thread if an exception has occured previously 285 my_run(); 286 287 if(reached_eof) 288 return 0; 289 290 291 // preparing the order message 292 293 order.clear(); 294 order.set_type(msg_type::order_read); 295 order.set_U_I(size); 296 297 // order completed 298 299 send_order(); 300 301 // now retreiving the data from the data_pipe 302 do 303 { 304 tomaster_data.fetch(data_ptr, data_num); 305 --data_num; // the first byte contains the message type 306 307 min = size - read; // what's still need to be read 308 if(data_num > min) // we retreived more data than necessary 309 { 310 U_I kept = data_num - min; 311 312 (void)memcpy(a + read, data_ptr + 1, min); 313 read += min; 314 (void)memmove(data_ptr + 1, data_ptr + 1 + min, kept); 315 tomaster_data.fetch_push_back(data_ptr, kept + 1); 316 } 317 else // the whole block will be read 318 { 319 (void)memcpy(a + read, data_ptr + 1, data_num); 320 read += data_num; 321 if(data_ptr[0] == data_header_eof) 322 reached_eof = true; 323 tomaster_data.fetch_recycle(data_ptr); 324 } 325 } 326 while(!reached_eof && read < size); 327 328 return read; 329 } 330 inherited_write(const char * a,U_I size)331 void generic_thread::inherited_write(const char *a, U_I size) 332 { 333 U_I wrote = 0; 334 unsigned int bksize; 335 char *tmptr = nullptr; 336 U_I min; 337 338 // rerun the thread if an exception has occured previously 339 my_run(); 340 341 if(tomaster_data.is_not_empty()) 342 throw SRC_BUG; 343 344 do 345 { 346 toslave_data.get_block_to_feed(tmptr, bksize); 347 if(bksize > 1) 348 tmptr[0] = data_header; 349 else 350 { 351 toslave_data.feed_cancel_get_block(tmptr); 352 throw SRC_BUG; 353 } 354 min = bksize - 1 > size - wrote ? size - wrote : bksize - 1; 355 (void)memcpy(tmptr + 1, a + wrote, min); 356 wrote += min; 357 toslave_data.feed(tmptr, min + 1); 358 wake_up_slave_if_asked(); 359 } 360 while(wrote < size); 361 } 362 inherited_sync_write()363 void generic_thread::inherited_sync_write() 364 { 365 // rerun the thread if an exception has occured previously 366 my_run(); 367 368 // preparing the order message 369 370 order.clear(); 371 order.set_type(msg_type::order_sync_write); 372 373 // order completed 374 375 send_order(); 376 read_answer(); 377 check_answer(msg_type::answr_sync_write_done); 378 release_block_answer(); 379 } 380 inherited_flush_read()381 void generic_thread::inherited_flush_read() 382 { 383 // rerun the thread if an exception has occured previously 384 my_run(); 385 386 // preparing the order message to stop a possible read_ahead 387 // running in the slave_thread 388 389 order.clear(); 390 order.set_type(msg_type::order_stop_readahead); 391 392 // order completed 393 394 send_order(); 395 read_answer(); 396 check_answer(msg_type::answr_readahead_stopped); 397 release_block_answer(); 398 399 // dropping all data currently in the pipe 400 purge_data_pipe(); 401 402 // resetting the current object 403 reached_eof = false; 404 } 405 inherited_terminate()406 void generic_thread::inherited_terminate() 407 { 408 // rerun the thread if an exception has occured previously 409 my_run(); 410 411 // preparing the order message 412 413 order.clear(); 414 order.set_type(msg_type::order_end_of_xmit); 415 416 // order completed 417 418 send_order(); 419 420 purge_data_pipe(); 421 my_join(); 422 } 423 send_order()424 void generic_thread::send_order() 425 { 426 bool completed = false; 427 428 order.reset_get_block(); 429 do 430 { 431 toslave_ctrl.get_block_to_feed(ptr, num); 432 completed = order.get_block(ptr, num); 433 toslave_ctrl.feed(ptr, num); 434 } 435 while(!completed); 436 } 437 read_answer()438 void generic_thread::read_answer() 439 { 440 bool completed = false; 441 442 answer.clear(); 443 do 444 { 445 tomaster_ctrl.fetch(ptr, num); 446 completed = answer.add_block(ptr, num); 447 if(!completed) 448 tomaster_ctrl.fetch_recycle(ptr); 449 } 450 while(!completed); 451 // note ptr and num are relative 452 // to the last block read which 453 // must be released/recycled calling release_answer() 454 } 455 check_answer(msg_type expected)456 void generic_thread::check_answer(msg_type expected) 457 { 458 switch(answer.get_type()) 459 { 460 case msg_type::answr_exception: 461 release_block_answer(); 462 my_join(); 463 throw SRC_BUG; // join should rethrow the exception that has been raised in the thread "remote" 464 default: 465 if(answer.get_type() == expected) 466 break; 467 else 468 { 469 release_block_answer(); 470 throw SRC_BUG; 471 } 472 } 473 } 474 475 wake_up_slave_if_asked()476 void generic_thread::wake_up_slave_if_asked() 477 { 478 if(remote == nullptr) 479 throw SRC_BUG; 480 if(remote->wake_me_up()) 481 { 482 order.clear(); 483 order.set_type(msg_type::order_wakeup); 484 send_order(); 485 } 486 } 487 purge_data_pipe()488 void generic_thread::purge_data_pipe() 489 { 490 char *tmp; 491 unsigned int tmp_ui; 492 493 while(tomaster_data.is_not_empty()) 494 { 495 tomaster_data.fetch(tmp, tmp_ui); 496 tomaster_data.fetch_recycle(tmp); 497 } 498 } 499 my_run()500 void generic_thread::my_run() 501 { 502 if(!running) 503 { 504 running = true; 505 if(remote == nullptr) 506 throw SRC_BUG; 507 if(remote->is_running()) 508 throw SRC_BUG; 509 toslave_data.reset(); 510 tomaster_data.reset(); 511 toslave_ctrl.reset(); 512 tomaster_ctrl.reset(); 513 remote->run(); // launching the new thread 514 if(remote->is_running(tid)) 515 { 516 thread_cancellation::associate_tid_to_tid(pthread_self(), tid); 517 thread_cancellation::associate_tid_to_tid(tid, pthread_self()); 518 } 519 else 520 running = false; 521 } 522 } 523 my_join()524 void generic_thread::my_join() 525 { 526 try 527 { 528 remote->join(); // may throw exceptions 529 } 530 catch(...) 531 { 532 if(running) // running may be false if the thread lived to shortly to obtain its tid 533 thread_cancellation::dead_thread(tid); 534 running = false; 535 throw; 536 } 537 if(running) 538 thread_cancellation::dead_thread(tid); 539 running = false; 540 } 541 542 } // end of generic_thread::namespace 543