1 /*
2 * This file is part of Siril, an astronomy image processor.
3 * Copyright (C) 2005-2011 Francois Meyer (dulle at free.fr)
4 * Copyright (C) 2012-2021 team free-astro (see more in AUTHORS file)
5 * Reference site is https://free-astro.org/index.php/Siril
6 *
7 * Siril is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * Siril is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with Siril. If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include <assert.h>
22 #include <string.h>
23 #include <glib.h>
24
25 #include "core/siril.h"
26 #include "core/proto.h"
27 #include "core/processing.h"
28 #include "core/sequence_filtering.h"
29 #include "core/OS_utils.h"
30 #include "gui/utils.h"
31 #include "gui/progress_and_log.h"
32 #include "io/sequence.h"
33 #include "io/ser.h"
34 #include "io/seqwriter.h"
35 #include "io/fits_sequence.h"
36 #include "io/image_format_fits.h"
37 #include "algos/statistics.h"
38
39 // called in start_in_new_thread only
40 // works in parallel if the arg->parallel is TRUE for FITS or SER sequences
generic_sequence_worker(gpointer p)41 gpointer generic_sequence_worker(gpointer p) {
42 struct generic_seq_args *args = (struct generic_seq_args *)p;
43 struct timeval t_start, t_end;
44 int frame; // output frame index
45 int input_idx; // index of the frame being processed in the sequence
46 int *index_mapping = NULL;
47 int nb_frames, excluded_frames = 0, progress = 0;
48 int abort = 0; // variable for breaking out of loop
49 gboolean have_seqwriter = FALSE;
50
51 assert(args);
52 assert(args->seq);
53 assert(args->image_hook);
54 set_progress_bar_data(NULL, PROGRESS_RESET);
55 gettimeofday(&t_start, NULL);
56
57 if (args->nb_filtered_images > 0)
58 nb_frames = args->nb_filtered_images;
59 else {
60 nb_frames = compute_nb_filtered_images(args->seq, args->filtering_criterion, args->filtering_parameter);
61 args->nb_filtered_images = nb_frames;
62 if (nb_frames <= 0) {
63 siril_log_message(_("No image selected for processing, aborting\n"));
64 args->retval = 1;
65 goto the_end;
66 }
67 }
68 float nb_framesf = (float)nb_frames + 0.3f; // leave margin for rounding errors and post processing
69 args->retval = 0;
70
71 #ifdef _OPENMP
72 if (args->max_thread < 1) {
73 if (args->compute_mem_limits_hook)
74 args->max_thread = args->compute_mem_limits_hook(args, FALSE);
75 else args->max_thread = seq_compute_mem_limits(args, FALSE);
76 }
77 if (args->max_thread < 1) {
78 args->retval = 1;
79 goto the_end;
80 }
81 if (args->compute_mem_limits_hook)
82 siril_log_message(_("%s: with the current memory and thread limits, up to %d thread(s) can be used\n"),
83 args->description, args->max_thread);
84 #endif
85
86 if (args->prepare_hook && args->prepare_hook(args)) {
87 siril_log_message(_("Preparing sequence processing failed.\n"));
88 args->retval = 1;
89 goto the_end;
90 }
91
92 /* We have a sequence in which images can be filtered out. In order to
93 * distribute the workload fairly among all threads, the main iteration
94 * should not be on the list of images of the sequence, but on the
95 * actual list of selected images.
96 * Here we create this map of images to be processed, each cell of the
97 * array providing the image number in the input sequence. It cannot be
98 * done in parallel.
99 * This is mandatory for SER contiguous output. */
100 if (args->filtering_criterion) {
101 index_mapping = malloc(nb_frames * sizeof(int));
102 if (!index_mapping) {
103 PRINT_ALLOC_ERR;
104 args->retval = 1;
105 goto the_end;
106 }
107 for (input_idx = 0, frame = 0; input_idx < args->seq->number; input_idx++) {
108 if (!args->filtering_criterion(args->seq, input_idx, args->filtering_parameter)) {
109 continue;
110 }
111 index_mapping[frame++] = input_idx;
112 }
113 if (frame != nb_frames) {
114 siril_log_message(_("Output index mapping failed (%d/%d).\n"), frame, nb_frames);
115 args->retval = 1;
116 goto the_end;
117 }
118 }
119
120 if (args->has_output && !args->partial_image) {
121 gint64 size;
122 if (args->compute_size_hook)
123 size = args->compute_size_hook(args, nb_frames);
124 else size = seq_compute_size(args->seq, nb_frames, args->output_type);
125 if (test_available_space(size)) {
126 args->retval = 1;
127 goto the_end;
128 }
129 }
130
131 /* Output print of algorithm description */
132 if (args->description) {
133 gchar *desc = g_strdup_printf(_("%s: processing...\n"), args->description);
134 siril_log_color_message(desc, "green");
135 g_free(desc);
136 }
137
138 have_seqwriter = args->has_output &&
139 ((args->force_fitseq_output || args->seq->type == SEQ_FITSEQ) ||
140 (args->force_ser_output || args->seq->type == SEQ_SER));
141 #ifdef _OPENMP
142 omp_init_lock(&args->lock);
143 if (have_seqwriter)
144 omp_set_schedule(omp_sched_dynamic, 1);
145 else omp_set_schedule(omp_sched_guided, 0);
146 #ifdef HAVE_FFMS2
147 // we don't want to enable parallel processing for films, as ffms2 is not thread-safe
148 #pragma omp parallel for num_threads(args->max_thread) private(input_idx) schedule(runtime) \
149 if(args->seq->type != SEQ_AVI && args->parallel && (args->seq->type == SEQ_SER || fits_is_reentrant()))
150 #else
151 #pragma omp parallel for num_threads(args->max_thread) private(input_idx) schedule(runtime) \
152 if(args->parallel && (args->seq->type == SEQ_SER || fits_is_reentrant()))
153 #endif // HAVE_FFMS2
154 #endif // _OPENMP
155 for (frame = 0; frame < nb_frames; frame++) {
156 if (abort) continue;
157
158 fits *fit = calloc(1, sizeof(fits));
159 char filename[256];
160 rectangle area = { .x = args->area.x, .y = args->area.y,
161 .w = args->area.w, .h = args->area.h };
162
163 if (!get_thread_run()) {
164 abort = 1;
165 continue;
166 }
167 if (index_mapping)
168 input_idx = index_mapping[frame];
169 else input_idx = frame;
170
171 if (!seq_get_image_filename(args->seq, input_idx, filename)) {
172 abort = 1;
173 continue;
174 }
175
176 int thread_id = -1;
177 #ifdef _OPENMP
178 thread_id = omp_get_thread_num();
179 if (have_seqwriter) {
180 seqwriter_wait_for_memory();
181 if (abort) {
182 seqwriter_release_memory();
183 continue;
184 }
185 }
186 #endif
187
188 if (args->partial_image) {
189 // if we run in parallel, it will not be the same for all
190 // and we don't want to overwrite the original anyway
191 if (args->regdata_for_partial) {
192 int shiftx = roundf_to_int(args->seq->regparam[args->layer_for_partial][input_idx].shiftx);
193 int shifty = roundf_to_int(args->seq->regparam[args->layer_for_partial][input_idx].shifty);
194 area.x -= shiftx;
195 area.y += shifty;
196 }
197
198 // args->area may be modified in hooks
199 enforce_area_in_image(&area, args->seq);
200 if (seq_read_frame_part(args->seq,
201 args->layer_for_partial,
202 input_idx, fit, &area,
203 args->get_photometry_data_for_partial, thread_id))
204 {
205 abort = 1;
206 clearfits(fit);
207 free(fit);
208 continue;
209 }
210 /*char tmpfn[100]; // this is for debug purposes
211 sprintf(tmpfn, "/tmp/partial_%d.fit", input_idx);
212 savefits(tmpfn, fit);*/
213 } else {
214 // image is obtained bottom to top here, while it's in natural order for partial images!
215 if (seq_read_frame(args->seq, input_idx, fit, args->force_float, thread_id)) {
216 abort = 1;
217 clearfits(fit);
218 free(fit);
219 continue;
220 }
221 }
222
223 if (args->image_hook(args, frame, input_idx, fit, &area)) {
224 if (args->stop_on_error)
225 abort = 1;
226 else {
227 #ifdef _OPENMP
228 #pragma omp atomic
229 #endif
230 excluded_frames++;
231 }
232 clearfits(fit);
233 free(fit);
234 // for seqwriter, we need to notify the failed frame
235 if (have_seqwriter) {
236 int retval;
237 if (args->save_hook)
238 retval = args->save_hook(args, frame, input_idx, NULL);
239 else retval = generic_save(args, frame, input_idx, NULL);
240 if (retval)
241 abort = 1;
242 }
243 continue;
244 }
245
246 if (args->has_output) {
247 int retval;
248 if (args->save_hook)
249 retval = args->save_hook(args, frame, input_idx, fit);
250 else retval = generic_save(args, frame, input_idx, fit);
251 if (retval) {
252 abort = 1;
253 clearfits(fit);
254 free(fit);
255 continue;
256 }
257 } else {
258 /* save stats that may have been computed for the first
259 * time, but if fit has been modified for the new
260 * sequence, we shouldn't save it for the old one.
261 */
262 save_stats_from_fit(fit, args->seq, input_idx);
263 }
264
265 if (!have_seqwriter) {
266 clearfits(fit);
267 free(fit);
268 }
269
270 #ifdef _OPENMP
271 #pragma omp atomic
272 #endif
273 progress++;
274 gchar *msg = g_strdup_printf(_("%s. Processing image %d (%s)"), args->description, input_idx + 1, filename);
275 set_progress_bar_data(msg, (float)progress / nb_framesf);
276 g_free(msg);
277 }
278
279 /* the finalize hook contains the sequence writer synchronization, it
280 * should be called before outputing the logs */
281 if (have_seqwriter && args->finalize_hook && args->finalize_hook(args)) {
282 siril_log_message(_("Finalizing sequence processing failed.\n"));
283 abort = 1;
284 }
285 if (abort) {
286 set_progress_bar_data(_("Sequence processing failed. Check the log."), PROGRESS_RESET);
287 siril_log_color_message(_("Sequence processing failed.\n"), "red");
288 args->retval = abort;
289 }
290 else {
291 if (excluded_frames) {
292 set_progress_bar_data(_("Sequence processing partially succeeded. Check the log."), PROGRESS_RESET);
293 siril_log_color_message(_("Sequence processing partially succeeded, with %d images that failed and that were temporarily excluded from the sequence.\n"), "salmon", excluded_frames);
294 } else {
295 set_progress_bar_data(_("Sequence processing succeeded."), PROGRESS_RESET);
296 siril_log_color_message(_("Sequence processing succeeded.\n"), "green");
297 }
298 gettimeofday(&t_end, NULL);
299 show_time(t_start, t_end);
300 }
301
302 #ifdef _OPENMP
303 omp_destroy_lock(&args->lock);
304 #endif
305 the_end:
306 if (index_mapping) free(index_mapping);
307 if (!have_seqwriter && args->finalize_hook && args->finalize_hook(args)) {
308 siril_log_message(_("Finalizing sequence processing failed.\n"));
309 args->retval = 1;
310 }
311
312 if (args->already_in_a_thread) {
313 if (args->idle_function)
314 args->idle_function(args);
315 } else {
316 if (args->idle_function)
317 siril_add_idle(args->idle_function, args);
318 else siril_add_idle(end_generic_sequence, args);
319 }
320 return GINT_TO_POINTER(args->retval);
321 }
322
323 // defaut idle function (in GTK main thread) to run at the end of the generic sequence processing
end_generic_sequence(gpointer p)324 gboolean end_generic_sequence(gpointer p) {
325 struct generic_seq_args *args = (struct generic_seq_args *) p;
326
327 if (args->has_output && args->load_new_sequence &&
328 args->new_seq_prefix && !args->retval) {
329 gchar *basename = g_path_get_basename(args->seq->seqname);
330 gchar *seqname = g_strdup_printf("%s%s.seq", args->new_seq_prefix, basename);
331 check_seq(0);
332 update_sequences_list(seqname);
333 free(seqname);
334 g_free(basename);
335 }
336
337 free(p);
338 return end_generic(NULL);
339 }
340
341 /* If for_writer is false, it computes how many images can be processed in
342 * parallel, with regard to how many of them can fit in memory. It returns at
343 * most com.max_thread.
344 * If for_writer is true, it computes how many images can be stored in the
345 * queue. It returns at most 3 times com.max_thread.
346 */
seq_compute_mem_limits(struct generic_seq_args * args,gboolean for_writer)347 int seq_compute_mem_limits(struct generic_seq_args *args, gboolean for_writer) {
348 unsigned int MB_per_image, MB_avail;
349 int limit = compute_nb_images_fit_memory(args->seq, args->upscale_ratio, args->force_float, NULL, &MB_per_image, &MB_avail);
350 if (limit == 0) {
351 gchar *mem_per_image = g_format_size_full(MB_per_image * BYTES_IN_A_MB, G_FORMAT_SIZE_IEC_UNITS);
352 gchar *mem_available = g_format_size_full(MB_avail * BYTES_IN_A_MB, G_FORMAT_SIZE_IEC_UNITS);
353
354 siril_log_color_message(_("%s: not enough memory to do this operation (%s required per image, %s considered available)\n"),
355 "red", args->description, mem_per_image, mem_available);
356
357 g_free(mem_per_image);
358 g_free(mem_available);
359 } else {
360 #ifdef _OPENMP
361 if (for_writer) {
362 int max_queue_size = com.max_thread * 3;
363 if (limit > max_queue_size)
364 limit = max_queue_size;
365 }
366 else if (limit > com.max_thread)
367 limit = com.max_thread;
368 #else
369 if (!for_writer)
370 limit = 1;
371 else if (limit > 3)
372 limit = 3;
373 #endif
374 }
375 return limit;
376 }
377
seq_prepare_hook(struct generic_seq_args * args)378 int seq_prepare_hook(struct generic_seq_args *args) {
379 int retval = 0;
380 g_assert(args->has_output); // don't call this hook otherwise
381 if (args->force_ser_output || (args->seq->type == SEQ_SER && !args->force_fitseq_output)) {
382 gchar *dest;
383 const char *ptr = strrchr(args->seq->seqname, G_DIR_SEPARATOR);
384 if (ptr)
385 dest = g_strdup_printf("%s%s.ser", args->new_seq_prefix, ptr + 1);
386 else dest = g_strdup_printf("%s%s.ser", args->new_seq_prefix, args->seq->seqname);
387
388 args->new_ser = malloc(sizeof(struct ser_struct));
389 if (ser_create_file(dest, args->new_ser, TRUE, args->seq->ser_file)) {
390 free(args->new_ser);
391 args->new_ser = NULL;
392 retval = 1;
393 }
394 g_free(dest);
395 }
396 else if (args->force_fitseq_output || (args->seq->type == SEQ_FITSEQ && !args->force_ser_output)) {
397 gchar *dest;
398 const char *ptr = strrchr(args->seq->seqname, G_DIR_SEPARATOR);
399 if (ptr)
400 dest = g_strdup_printf("%s%s%s", args->new_seq_prefix, ptr + 1, com.pref.ext);
401 else dest = g_strdup_printf("%s%s%s", args->new_seq_prefix, args->seq->seqname, com.pref.ext);
402
403 args->new_fitseq = malloc(sizeof(fitseq));
404 if (fitseq_create_file(dest, args->new_fitseq, args->nb_filtered_images)) {
405 free(args->new_fitseq);
406 args->new_fitseq = NULL;
407 retval = 1;
408 }
409 g_free(dest);
410 }
411 else return 0;
412
413 if (!retval)
414 retval = seq_prepare_writer(args);
415 return retval;
416 }
417
seq_prepare_writer(struct generic_seq_args * args)418 int seq_prepare_writer(struct generic_seq_args *args) {
419 int limit = 0;
420 if (args->compute_mem_limits_hook)
421 limit = args->compute_mem_limits_hook(args, TRUE);
422 else limit = seq_compute_mem_limits(args, TRUE); // the default
423
424 if (limit == 0) {
425 if (args->force_ser_output || (args->seq->type == SEQ_SER && !args->force_fitseq_output)) {
426 ser_close_file(args->new_ser);
427 free(args->new_ser);
428 args->new_ser = NULL;
429 }
430 else if (args->force_fitseq_output || (args->seq->type == SEQ_FITSEQ && !args->force_ser_output)) {
431 fitseq_close_file(args->new_fitseq);
432 free(args->new_fitseq);
433 args->new_fitseq = NULL;
434 }
435 return 1;
436 }
437 seqwriter_set_max_active_blocks(limit);
438 return 0;
439 }
440
seq_finalize_hook(struct generic_seq_args * args)441 int seq_finalize_hook(struct generic_seq_args *args) {
442 int retval = 0;
443 g_assert(args->has_output); // don't call this hook otherwise
444 if ((args->force_ser_output || (args->seq->type == SEQ_SER && !args->force_fitseq_output)) && args->new_ser) {
445 retval = ser_write_and_close(args->new_ser);
446 free(args->new_ser);
447 }
448 else if ((args->force_fitseq_output || (args->seq->type == SEQ_FITSEQ && !args->force_ser_output)) && args->new_fitseq) {
449 retval = fitseq_close_file(args->new_fitseq);
450 free(args->new_fitseq);
451 }
452 return retval;
453 }
454
455 /* In SER, all images must be in a contiguous sequence, so we use the out_index.
456 * In FITS sequences, to keep track of image accross processings, we keep the
457 * input file number all along (in_index is the index in the sequence, not the name).
458 * The 2nd condition ensures that any force condition prevails over opposite
459 * input-type condition
460 */
generic_save(struct generic_seq_args * args,int out_index,int in_index,fits * fit)461 int generic_save(struct generic_seq_args *args, int out_index, int in_index, fits *fit) {
462 if (args->force_ser_output || (args->seq->type == SEQ_SER && !args->force_fitseq_output)) {
463 return ser_write_frame_from_fit(args->new_ser, fit, out_index);
464 } else if (args->force_fitseq_output || (args->seq->type == SEQ_FITSEQ && !args->force_ser_output)) {
465 return fitseq_write_image(args->new_fitseq, fit, out_index);
466 } else {
467 char *dest = fit_sequence_get_image_filename_prefixed(args->seq,
468 args->new_seq_prefix, in_index);
469 fit->bitpix = fit->orig_bitpix;
470 int retval = savefits(dest, fit);
471 free(dest);
472 return retval;
473 }
474 }
475
476 /*****************************************************************************
477 * P R O C E S S I N G T H R E A D M A N A G E M E N T *
478 ****************************************************************************/
479
480 static gboolean thread_being_waited = FALSE;
481
482 // This function is reentrant. The pointer will be freed in the idle function,
483 // so it must be a proper pointer to an allocated memory chunk.
start_in_new_thread(gpointer (* f)(gpointer),gpointer p)484 void start_in_new_thread(gpointer (*f)(gpointer), gpointer p) {
485 g_mutex_lock(&com.mutex);
486 if (com.run_thread || com.thread) {
487 fprintf(stderr, "The processing thread is busy, stop it first.\n");
488 g_mutex_unlock(&com.mutex);
489 free(p);
490 return;
491 }
492
493 com.run_thread = TRUE;
494 g_mutex_unlock(&com.mutex);
495 com.thread = g_thread_new("processing", f, p);
496 }
497
start_in_reserved_thread(gpointer (* f)(gpointer),gpointer p)498 void start_in_reserved_thread(gpointer (*f)(gpointer), gpointer p) {
499 g_mutex_lock(&com.mutex);
500 if (com.thread) {
501 fprintf(stderr, "The processing thread is busy, stop it first.\n");
502 g_mutex_unlock(&com.mutex);
503 free(p);
504 return;
505 }
506
507 com.run_thread = TRUE;
508 g_mutex_unlock(&com.mutex);
509 com.thread = g_thread_new("processing", f, p);
510 }
511
waiting_for_thread()512 gpointer waiting_for_thread() {
513 gpointer retval = NULL;
514 if (com.thread) {
515 thread_being_waited = TRUE;
516 retval = g_thread_join(com.thread);
517 }
518 com.thread = NULL;
519 thread_being_waited = FALSE;
520 set_thread_run(FALSE); // do it anyway in case of wait without stop
521 return retval;
522 }
523
stop_processing_thread()524 void stop_processing_thread() {
525 if (com.thread == NULL) {
526 siril_debug_print("The processing thread is not running.\n");
527 return;
528 }
529
530 set_thread_run(FALSE);
531 if (!thread_being_waited)
532 waiting_for_thread();
533 }
534
set_thread_run(gboolean b)535 void set_thread_run(gboolean b) {
536 g_mutex_lock(&com.mutex);
537 com.run_thread = b;
538 g_mutex_unlock(&com.mutex);
539 }
540
get_thread_run()541 gboolean get_thread_run() {
542 gboolean retval;
543 g_mutex_lock(&com.mutex);
544 retval = com.run_thread;
545 g_mutex_unlock(&com.mutex);
546 return retval;
547 }
548
549 // equivalent to atomic get and set if not running
reserve_thread()550 gboolean reserve_thread() {
551 gboolean retval;
552 g_mutex_lock(&com.mutex);
553 retval = !com.run_thread;
554 if (retval)
555 com.run_thread = TRUE;
556 g_mutex_unlock(&com.mutex);
557 return retval;
558 }
559
unreserve_thread()560 void unreserve_thread() {
561 set_thread_run(FALSE);
562 }
563
564 /* should be called in a threaded function if nothing special has to be done at the end.
565 * siril_add_idle(end_generic, NULL);
566 */
end_generic(gpointer arg)567 gboolean end_generic(gpointer arg) {
568 stop_processing_thread();
569
570 set_cursor_waiting(FALSE);
571 return FALSE;
572 }
573
574 /* wrapper for gdk_threads_add_idle that deactivates idle functions when
575 * running in script/console mode */
siril_add_idle(GSourceFunc idle_function,gpointer data)576 guint siril_add_idle(GSourceFunc idle_function, gpointer data) {
577 if (!com.script && !com.headless)
578 return gdk_threads_add_idle(idle_function, data);
579 return 0;
580 }
581
wait_for_script_thread()582 void wait_for_script_thread() {
583 if (com.script_thread)
584 g_thread_join(com.script_thread);
585 com.script_thread = NULL;
586 }
587
on_processes_button_cancel_clicked(GtkButton * button,gpointer user_data)588 void on_processes_button_cancel_clicked(GtkButton *button, gpointer user_data) {
589 if (com.thread != NULL)
590 siril_log_color_message(_("Process aborted by user\n"), "red");
591 com.stop_script = TRUE;
592 stop_processing_thread();
593 wait_for_script_thread();
594 }
595
create_default_seqargs(sequence * seq)596 struct generic_seq_args *create_default_seqargs(sequence *seq) {
597 struct generic_seq_args *args = calloc(1, sizeof(struct generic_seq_args));
598 args->seq = seq;
599 args->filtering_criterion = seq_filter_all;
600 args->nb_filtered_images = seq->number;
601 args->stop_on_error = TRUE;
602 args->upscale_ratio = 1.0;
603 args->parallel = TRUE;
604 return args;
605 }
606