1 /*********************************************************************/
2 // dar - disk archive - a backup/restoration program
3 // Copyright (C) 2002-2052 Denis Corbin
4 //
5 // This program is free software; you can redistribute it and/or
6 // modify it under the terms of the GNU General Public License
7 // as published by the Free Software Foundation; either version 2
8 // of the License, or (at your option) any later version.
9 //
10 // This program is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU General Public License
16 // along with this program; if not, write to the Free Software
17 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
18 //
19 // to contact the author : http://dar.linux.free.fr/email.html
20 /*********************************************************************/
21 
22 #include "../my_config.h"
23 
24 extern "C"
25 {
26 #if HAVE_STRING_H
27 #include <string.h>
28 #endif
29 } // end extern "C"
30 
31 #include "generic_thread.hpp"
32 
33 using namespace std;
34 
35 namespace libdar
36 {
generic_thread(generic_file * x_ptr,U_I data_block_size,U_I data_num_block,U_I ctrl_block_size,U_I ctrl_num_block)37     generic_thread::generic_thread(generic_file * x_ptr,
38 				   U_I data_block_size,
39 				   U_I data_num_block,
40 				   U_I ctrl_block_size,
41 				   U_I ctrl_num_block):
42 	generic_file(gf_read_only),
43 	toslave_data(data_num_block, data_block_size),
44 	tomaster_data(data_num_block, data_block_size),
45 	toslave_ctrl(ctrl_num_block, ctrl_block_size),
46 	tomaster_ctrl(ctrl_num_block, ctrl_block_size)
47     {
48 	unsigned int tmp = sizeof(data_header);
49 
50 	if(tmp < sizeof(char))
51 	    throw SRC_BUG;
52 
53 	if(x_ptr == nullptr)
54 	    throw SRC_BUG;
55 	set_mode(x_ptr->get_mode());
56 	reached_eof = false;
57 	running = false;
58 
59 	order.clear();
60 	order.set_type(msg_type::data_partial);
61 	order.reset_get_block();
62 	if(!order.get_block(&data_header, tmp))
63 	    throw SRC_BUG; // data header larger than one byte !
64 
65 	order.set_type(msg_type::data_completed);
66 	order.reset_get_block();
67 	if(!order.get_block(&data_header_eof, tmp))
68 	    throw SRC_BUG;
69 
70 	remote = new (get_pool()) slave_thread(x_ptr,
71 					       &toslave_data,
72 					       &tomaster_data,
73 					       &toslave_ctrl,
74 					       &tomaster_ctrl);
75 	if(remote == nullptr)
76 	    throw Ememory("generic_thread::generic_thread");
77 	try
78 	{
79 	    my_run(); // launching the new thread
80 	}
81 	catch(...)
82 	{
83 	    if(remote != nullptr)
84 		delete remote;
85 	    throw;
86 	}
87     }
88 
generic_thread(const generic_thread & ref)89     generic_thread::generic_thread(const generic_thread & ref):
90 	generic_file (gf_read_only),
91 	toslave_data(2, 10),
92 	tomaster_data(2, 10),
93 	toslave_ctrl(2, 10),
94 	tomaster_ctrl(2, 10)
95     {
96 	throw SRC_BUG;
97     }
98 
99 
~generic_thread()100     generic_thread::~generic_thread()
101     {
102 	if(remote != nullptr)
103 	{
104 	    if(!is_terminated())
105 		terminate();
106 	    delete remote;
107 	    remote = nullptr;
108 	}
109     }
110 
skippable(skippability direction,const infinint & amount)111     bool generic_thread::skippable(skippability direction, const infinint & amount)
112     {
113 	bool ret;
114 
115 	    // rerun the thread if an exception has occured previously
116 	my_run();
117 
118 	    // preparing the order message
119 
120 	order.clear();
121 	switch(direction)
122 	{
123 	case generic_file::skip_backward:
124 	    order.set_type(msg_type::order_skippable_fwd);
125 	    break;
126 	case generic_file::skip_forward:
127 	    order.set_type(msg_type::order_skippable_bkd);
128 	    break;
129 	default:
130 	    throw SRC_BUG;
131 	}
132 	order.set_infinint(amount);
133 
134 	    // order completed
135 
136 	send_order();
137 	read_answer();
138 	check_answer(msg_type::answr_skippable);
139 	ret = answer.get_bool();
140 	release_block_answer();
141 
142 	return ret;
143     }
144 
skip(const infinint & pos)145     bool generic_thread::skip(const infinint & pos)
146     {
147 	bool ret;
148 	reached_eof = false;
149 
150 	    // rerun the thread if an exception has occured previously
151 	my_run();
152 
153 	    // preparing the order message
154 
155 	order.clear();
156 	order.set_type(msg_type::order_skip);
157 	order.set_infinint(pos);
158 
159 	    // order completed
160 
161 	send_order();
162 	read_answer();
163 	check_answer(msg_type::answr_skip_done); // this flushes all read ahead data up before skip occured
164 	ret = answer.get_bool();
165 	release_block_answer();
166 
167 	purge_data_pipe();
168 
169 	return ret;
170     }
171 
skip_to_eof()172     bool generic_thread::skip_to_eof()
173     {
174 	bool ret;
175 
176 	    // rerun the thread if an exception has occured previously
177 	my_run();
178 
179 	    // preparing the order message
180 
181 	order.clear();
182 	order.set_type(msg_type::order_skip_to_eof);
183 
184 	    // order completed
185 
186 	send_order();
187 	read_answer();
188 	check_answer(msg_type::answr_skip_done);
189 	ret = answer.get_bool();
190 	release_block_answer();
191 	reached_eof = ret;
192 
193 	purge_data_pipe();
194 
195 	return ret;
196     }
197 
skip_relative(S_I x)198     bool generic_thread::skip_relative(S_I x)
199     {
200 	U_I val;
201 	bool ret;
202 
203 	    // rerun the thread if an exception has occured previously
204 	my_run();
205 
206 	    // preparing the order message
207 
208 	order.clear();
209 	if(x >= 0)
210 	{
211 	    val = x;
212 	    order.set_type(msg_type::order_skip_fwd);
213 	    order.set_U_I(val);
214 	}
215 	else // x < 0
216 	{
217 	    reached_eof = false;
218 	    val = -x;
219 	    order.set_type(msg_type::order_skip_bkd);
220 	    order.set_U_I(val);
221 	}
222 
223 	    // order completed
224 
225 	send_order();
226 	read_answer();
227 	check_answer(msg_type::answr_skip_done);
228 	ret = answer.get_bool();
229 	release_block_answer();
230 
231 	purge_data_pipe();
232 
233 	return ret;
234     }
235 
get_position() const236     infinint generic_thread::get_position() const
237     {
238 	infinint ret;
239 
240 	    // rerun the thread if an exception has occured previously
241 	my_run();
242 
243 	    // preparing the order message
244 
245 	order.clear();
246 	order.set_type(msg_type::order_get_position);
247 
248 	    // order completed
249 
250 	send_order();
251 	read_answer();
252 	check_answer(msg_type::answr_position);
253 	ret = answer.get_infinint();
254 	release_block_answer();
255 
256 	return ret;
257     }
258 
inherited_read_ahead(const infinint & amount)259     void generic_thread::inherited_read_ahead(const infinint & amount)
260     {
261 
262 	    // rerun the thread if an exception has occured previously
263 	my_run();
264 
265 	    // preparing the order message
266 
267 	order.clear();
268 	order.set_type(msg_type::order_read_ahead);
269 	order.set_infinint(amount);
270 
271 	    // order completed
272 
273 	send_order();
274     }
275 
inherited_read(char * a,U_I size)276     U_I generic_thread::inherited_read(char *a, U_I size)
277     {
278 	U_I read = 0;
279 	U_I min;
280 	char *data_ptr = nullptr;
281 	unsigned int data_num;
282 
283 
284 	    // rerun the thread if an exception has occured previously
285 	my_run();
286 
287 	if(reached_eof)
288 	    return 0;
289 
290 
291 	    // preparing the order message
292 
293 	order.clear();
294 	order.set_type(msg_type::order_read);
295 	order.set_U_I(size);
296 
297 	    // order completed
298 
299 	send_order();
300 
301 	    // now retreiving the data from the data_pipe
302 	do
303 	{
304 	    tomaster_data.fetch(data_ptr, data_num);
305 	    --data_num; // the first byte contains the message type
306 
307 	    min = size - read; // what's still need to be read
308 	    if(data_num > min) // we retreived more data than necessary
309 	    {
310 		U_I kept = data_num - min;
311 
312 		(void)memcpy(a + read, data_ptr + 1, min);
313 		read += min;
314 		(void)memmove(data_ptr + 1, data_ptr + 1 + min, kept);
315 		tomaster_data.fetch_push_back(data_ptr, kept + 1);
316 	    }
317 	    else // the whole block will be read
318 	    {
319 		(void)memcpy(a + read, data_ptr + 1, data_num);
320 		read += data_num;
321 		if(data_ptr[0] == data_header_eof)
322 		    reached_eof = true;
323 		tomaster_data.fetch_recycle(data_ptr);
324 	    }
325 	}
326 	while(!reached_eof && read < size);
327 
328 	return read;
329     }
330 
inherited_write(const char * a,U_I size)331     void generic_thread::inherited_write(const char *a, U_I size)
332     {
333 	U_I wrote = 0;
334 	unsigned int bksize;
335 	char *tmptr = nullptr;
336 	U_I min;
337 
338 	    // rerun the thread if an exception has occured previously
339 	my_run();
340 
341 	if(tomaster_data.is_not_empty())
342 	    throw SRC_BUG;
343 
344 	do
345 	{
346 	    toslave_data.get_block_to_feed(tmptr, bksize);
347 	    if(bksize > 1)
348 		tmptr[0] = data_header;
349 	    else
350 	    {
351 		toslave_data.feed_cancel_get_block(tmptr);
352 		throw SRC_BUG;
353 	    }
354 	    min = bksize - 1 > size - wrote ? size - wrote : bksize - 1;
355 	    (void)memcpy(tmptr + 1, a + wrote, min);
356 	    wrote += min;
357 	    toslave_data.feed(tmptr, min + 1);
358 	    wake_up_slave_if_asked();
359 	}
360 	while(wrote < size);
361     }
362 
inherited_sync_write()363     void generic_thread::inherited_sync_write()
364     {
365 	    // rerun the thread if an exception has occured previously
366 	my_run();
367 
368 	    // preparing the order message
369 
370 	order.clear();
371 	order.set_type(msg_type::order_sync_write);
372 
373 	    // order completed
374 
375 	send_order();
376 	read_answer();
377 	check_answer(msg_type::answr_sync_write_done);
378 	release_block_answer();
379     }
380 
inherited_flush_read()381     void generic_thread::inherited_flush_read()
382     {
383 	    // rerun the thread if an exception has occured previously
384 	my_run();
385 
386 	    // preparing the order message to stop a possible read_ahead
387 	    // running in the slave_thread
388 
389 	order.clear();
390 	order.set_type(msg_type::order_stop_readahead);
391 
392 	    // order completed
393 
394 	send_order();
395 	read_answer();
396 	check_answer(msg_type::answr_readahead_stopped);
397 	release_block_answer();
398 
399 	    // dropping all data currently in the pipe
400 	purge_data_pipe();
401 
402 	    // resetting the current object
403 	reached_eof = false;
404     }
405 
inherited_terminate()406     void generic_thread::inherited_terminate()
407     {
408 	    // rerun the thread if an exception has occured previously
409 	my_run();
410 
411 	    // preparing the order message
412 
413 	order.clear();
414 	order.set_type(msg_type::order_end_of_xmit);
415 
416 	    // order completed
417 
418 	send_order();
419 
420 	purge_data_pipe();
421 	my_join();
422     }
423 
send_order()424     void generic_thread::send_order()
425     {
426 	bool completed = false;
427 
428 	order.reset_get_block();
429 	do
430 	{
431 	    toslave_ctrl.get_block_to_feed(ptr, num);
432 	    completed = order.get_block(ptr, num);
433 	    toslave_ctrl.feed(ptr, num);
434 	}
435 	while(!completed);
436     }
437 
read_answer()438     void generic_thread::read_answer()
439     {
440 	bool completed = false;
441 
442 	answer.clear();
443 	do
444 	{
445 	    tomaster_ctrl.fetch(ptr, num);
446 	    completed = answer.add_block(ptr, num);
447 	    if(!completed)
448 		tomaster_ctrl.fetch_recycle(ptr);
449 	}
450 	while(!completed);
451 	    // note ptr and num are relative
452 	    // to the last block read which
453 	    // must be released/recycled calling release_answer()
454     }
455 
check_answer(msg_type expected)456     void generic_thread::check_answer(msg_type expected)
457     {
458 	switch(answer.get_type())
459 	{
460 	case msg_type::answr_exception:
461 	    release_block_answer();
462 	    my_join();
463 	    throw SRC_BUG; // join should rethrow the exception that has been raised in the thread "remote"
464 	default:
465 	    if(answer.get_type() == expected)
466 		break;
467 	    else
468 	    {
469 		release_block_answer();
470 		throw SRC_BUG;
471 	    }
472 	}
473     }
474 
475 
wake_up_slave_if_asked()476     void generic_thread::wake_up_slave_if_asked()
477     {
478 	if(remote == nullptr)
479 	    throw SRC_BUG;
480 	if(remote->wake_me_up())
481 	{
482 	    order.clear();
483 	    order.set_type(msg_type::order_wakeup);
484 	    send_order();
485 	}
486     }
487 
purge_data_pipe()488     void generic_thread::purge_data_pipe()
489     {
490 	char *tmp;
491 	unsigned int tmp_ui;
492 
493 	while(tomaster_data.is_not_empty())
494 	{
495 	    tomaster_data.fetch(tmp, tmp_ui);
496 	    tomaster_data.fetch_recycle(tmp);
497 	}
498     }
499 
my_run()500     void generic_thread::my_run()
501     {
502 	if(!running)
503 	{
504 	    running = true;
505 	    if(remote == nullptr)
506 		throw SRC_BUG;
507 	    if(remote->is_running())
508 		throw SRC_BUG;
509 	    toslave_data.reset();
510 	    tomaster_data.reset();
511 	    toslave_ctrl.reset();
512 	    tomaster_ctrl.reset();
513 	    remote->run(); // launching the new thread
514 	    if(remote->is_running(tid))
515 	    {
516 		thread_cancellation::associate_tid_to_tid(pthread_self(), tid);
517 		thread_cancellation::associate_tid_to_tid(tid, pthread_self());
518 	    }
519 	    else
520 		running = false;
521 	}
522     }
523 
my_join()524     void generic_thread::my_join()
525     {
526 	try
527 	{
528 	    remote->join(); // may throw exceptions
529 	}
530 	catch(...)
531 	{
532 	    if(running) // running may be false if the thread lived to shortly to obtain its tid
533 		thread_cancellation::dead_thread(tid);
534 	    running = false;
535 	    throw;
536 	}
537 	if(running)
538 	    thread_cancellation::dead_thread(tid);
539 	running = false;
540     }
541 
542 } // end of generic_thread::namespace
543