1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
4 #include "common.h"
5
6 #include <pthread.h>
7
8 #include "db.h"
9 #include "seafile-session.h"
10 #include "seafile-config.h"
11 #include "sync-mgr.h"
12 #include "seafile-error-impl.h"
13 #include "mq-mgr.h"
14 #include "utils.h"
15 #include "vc-utils.h"
16
17 #include "sync-status-tree.h"
18
19 #ifdef WIN32
20 #include <shlobj.h>
21 #endif
22
23 #define DEBUG_FLAG SEAFILE_DEBUG_SYNC
24 #include "log.h"
25
26 #include "timer.h"
27
28 #define DEFAULT_SYNC_INTERVAL 30 /* 30s */
29 #define CHECK_SYNC_INTERVAL 1000 /* 1s */
30 #define UPDATE_TX_STATE_INTERVAL 1000 /* 1s */
31 #define MAX_RUNNING_SYNC_TASKS 5
32 #define CHECK_LOCKED_FILES_INTERVAL 10 /* 10s */
33 #define CHECK_FOLDER_PERMS_INTERVAL 30 /* 30s */
34
35 #define SYNC_PERM_ERROR_RETRY_TIME 2
36
37 struct _HttpServerState {
38 int http_version;
39 gboolean checking;
40 gint64 last_http_check_time;
41 char *testing_host;
42 /* Can be server_url or server_url:8082, depends on which one works. */
43 char *effective_host;
44 gboolean use_fileserver_port;
45
46 gboolean folder_perms_not_supported;
47 gint64 last_check_perms_time;
48 gboolean checking_folder_perms;
49
50 gboolean locked_files_not_supported;
51 gint64 last_check_locked_files_time;
52 gboolean checking_locked_files;
53
54 /*
55 * repo_id -> head commit id mapping.
56 * Caches the head commit ids of synced repos.
57 */
58 GHashTable *head_commit_map;
59 pthread_mutex_t head_commit_map_lock;
60 gboolean head_commit_map_init;
61 gint64 last_update_head_commit_map_time;
62 };
63 typedef struct _HttpServerState HttpServerState;
64
65 struct _SeafSyncManagerPriv {
66 struct SeafTimer *check_sync_timer;
67 struct SeafTimer *update_tx_state_timer;
68 int pulse_count;
69
70 /* When FALSE, auto sync is globally disabled */
71 gboolean auto_sync_enabled;
72
73 GHashTable *active_paths;
74 pthread_mutex_t paths_lock;
75
76 #ifdef WIN32
77 GAsyncQueue *refresh_paths;
78 struct SeafTimer *refresh_windows_timer;
79 #endif
80 };
81
82 struct _ActivePathsInfo {
83 GHashTable *paths;
84 struct SyncStatusTree *syncing_tree;
85 struct SyncStatusTree *synced_tree;
86 };
87 typedef struct _ActivePathsInfo ActivePathsInfo;
88
89 static int auto_sync_pulse (void *vmanager);
90
91 static void on_repo_http_fetched (SeafileSession *seaf,
92 HttpTxTask *tx_task,
93 SeafSyncManager *manager);
94 static void on_repo_http_uploaded (SeafileSession *seaf,
95 HttpTxTask *tx_task,
96 SeafSyncManager *manager);
97
98 static inline void
99 transition_sync_state (SyncTask *task, int new_state);
100
101 static void sync_task_free (SyncTask *task);
102
103 static int
104 sync_repo_v2 (SeafSyncManager *manager, SeafRepo *repo, gboolean is_manual_sync);
105
106 static gboolean
107 check_http_protocol (SeafSyncManager *mgr, SeafRepo *repo);
108
109 static void
110 active_paths_info_free (ActivePathsInfo *info);
111
112 static HttpServerState *
http_server_state_new()113 http_server_state_new ()
114 {
115 HttpServerState *state = g_new0 (HttpServerState, 1);
116 state->head_commit_map = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, g_free);
117 pthread_mutex_init (&state->head_commit_map_lock, NULL);
118 return state;
119 }
120
121 static void
http_server_state_free(HttpServerState * state)122 http_server_state_free (HttpServerState *state)
123 {
124 if (!state)
125 return;
126 g_hash_table_destroy (state->head_commit_map);
127 pthread_mutex_destroy (&state->head_commit_map_lock);
128 g_free (state);
129 }
130
131 SeafSyncManager*
seaf_sync_manager_new(SeafileSession * seaf)132 seaf_sync_manager_new (SeafileSession *seaf)
133 {
134 SeafSyncManager *mgr = g_new0 (SeafSyncManager, 1);
135 mgr->priv = g_new0 (SeafSyncManagerPriv, 1);
136 mgr->priv->auto_sync_enabled = TRUE;
137 mgr->seaf = seaf;
138
139 mgr->sync_interval = DEFAULT_SYNC_INTERVAL;
140 mgr->sync_infos = g_hash_table_new (g_str_hash, g_str_equal);
141
142 mgr->http_server_states = g_hash_table_new_full (g_str_hash, g_str_equal,
143 g_free,
144 (GDestroyNotify)http_server_state_free);
145
146 gboolean exists;
147 int download_limit = seafile_session_config_get_int (seaf,
148 KEY_DOWNLOAD_LIMIT,
149 &exists);
150 if (exists)
151 mgr->download_limit = download_limit;
152
153 int upload_limit = seafile_session_config_get_int (seaf,
154 KEY_UPLOAD_LIMIT,
155 &exists);
156 if (exists)
157 mgr->upload_limit = upload_limit;
158
159 mgr->priv->active_paths = g_hash_table_new_full (g_str_hash, g_str_equal,
160 g_free,
161 (GDestroyNotify)active_paths_info_free);
162 pthread_mutex_init (&mgr->priv->paths_lock, NULL);
163
164 #ifdef WIN32
165 mgr->priv->refresh_paths = g_async_queue_new ();
166 #endif
167
168 return mgr;
169 }
170
171 static SyncInfo*
get_sync_info(SeafSyncManager * manager,const char * repo_id)172 get_sync_info (SeafSyncManager *manager, const char *repo_id)
173 {
174 SyncInfo *info = g_hash_table_lookup (manager->sync_infos, repo_id);
175 if (info) return info;
176
177 info = g_new0 (SyncInfo, 1);
178 memcpy (info->repo_id, repo_id, 36);
179 g_hash_table_insert (manager->sync_infos, info->repo_id, info);
180 return info;
181 }
182
183 SyncInfo *
seaf_sync_manager_get_sync_info(SeafSyncManager * mgr,const char * repo_id)184 seaf_sync_manager_get_sync_info (SeafSyncManager *mgr,
185 const char *repo_id)
186 {
187 return g_hash_table_lookup (mgr->sync_infos, repo_id);
188 }
189
190 int
seaf_sync_manager_init(SeafSyncManager * mgr)191 seaf_sync_manager_init (SeafSyncManager *mgr)
192 {
193 return 0;
194 }
195
196 static void
format_http_task_detail(HttpTxTask * task,GString * buf)197 format_http_task_detail (HttpTxTask *task, GString *buf)
198 {
199 if (task->state != HTTP_TASK_STATE_NORMAL ||
200 task->runtime_state == HTTP_TASK_RT_STATE_INIT ||
201 task->runtime_state == HTTP_TASK_RT_STATE_FINISHED)
202 return;
203
204 SeafRepo *repo = seaf_repo_manager_get_repo (seaf->repo_mgr,
205 task->repo_id);
206 char *repo_name;
207 char *type;
208
209 if (repo) {
210 repo_name = repo->name;
211 type = (task->type == HTTP_TASK_TYPE_UPLOAD) ? "upload" : "download";
212
213 } else if (task->is_clone) {
214 CloneTask *ctask;
215 ctask = seaf_clone_manager_get_task (seaf->clone_mgr, task->repo_id);
216 repo_name = ctask->repo_name;
217 type = "download";
218
219 } else {
220 return;
221 }
222 int rate = http_tx_task_get_rate(task);
223
224 g_string_append_printf (buf, "%s\t%d %s\n", type, rate, repo_name);
225 }
226
227 /*
228 * Publish a notification message to report :
229 *
230 * [uploading/downloading]\t[transfer-rate] [repo-name]\n
231 */
232 static int
update_tx_state(void * vmanager)233 update_tx_state (void *vmanager)
234 {
235 SeafSyncManager *mgr = vmanager;
236 GString *buf = g_string_new (NULL);
237 GList *tasks, *ptr;
238 HttpTxTask *http_task;
239
240 mgr->last_sent_bytes = g_atomic_int_get (&mgr->sent_bytes);
241 g_atomic_int_set (&mgr->sent_bytes, 0);
242 mgr->last_recv_bytes = g_atomic_int_get (&mgr->recv_bytes);
243 g_atomic_int_set (&mgr->recv_bytes, 0);
244
245 tasks = http_tx_manager_get_upload_tasks (seaf->http_tx_mgr);
246 for (ptr = tasks; ptr; ptr = ptr->next) {
247 http_task = ptr->data;
248 format_http_task_detail (http_task, buf);
249 }
250 g_list_free (tasks);
251
252 tasks = http_tx_manager_get_download_tasks (seaf->http_tx_mgr);
253 for (ptr = tasks; ptr; ptr = ptr->next) {
254 http_task = ptr->data;
255 format_http_task_detail (http_task, buf);
256 }
257 g_list_free (tasks);
258
259 if (buf->len != 0)
260 seaf_mq_manager_publish_notification (seaf->mq_mgr, "transfer",
261 buf->str);
262
263 g_string_free (buf, TRUE);
264
265 return TRUE;
266 }
267
268 #ifdef WIN32
269 static void *
270 refresh_windows_explorer_thread (void *vdata);
271
272 #define STARTUP_REFRESH_WINDOWS_DELAY 10000
273
274 static int
refresh_all_windows_on_startup(void * vdata)275 refresh_all_windows_on_startup (void *vdata)
276 {
277 /* This is a hack to tell Windows Explorer to refresh all open windows.
278 * On startup, if there is one big library, its events may dominate the
279 * explorer refresh queue. Other libraries don't get refreshed until
280 * the big library's events are consumed. So we refresh the open windows
281 * to reduce the delay.
282 */
283 SHChangeNotify (SHCNE_ASSOCCHANGED, SHCNF_IDLIST, NULL, NULL);
284
285 /* One time */
286 return 0;
287 }
288 #endif
289
290 static void *update_cached_head_commit_ids (void *arg);
291
292 int
seaf_sync_manager_start(SeafSyncManager * mgr)293 seaf_sync_manager_start (SeafSyncManager *mgr)
294 {
295 mgr->priv->check_sync_timer = seaf_timer_new (
296 auto_sync_pulse, mgr, CHECK_SYNC_INTERVAL);
297
298 mgr->priv->update_tx_state_timer = seaf_timer_new (
299 update_tx_state, mgr, UPDATE_TX_STATE_INTERVAL);
300
301 g_signal_connect (seaf, "repo-http-fetched",
302 (GCallback)on_repo_http_fetched, mgr);
303 g_signal_connect (seaf, "repo-http-uploaded",
304 (GCallback)on_repo_http_uploaded, mgr);
305
306 #ifdef WIN32
307 seaf_job_manager_schedule_job (seaf->job_mgr,
308 refresh_windows_explorer_thread,
309 NULL,
310 mgr->priv->refresh_paths);
311
312 mgr->priv->refresh_windows_timer = seaf_timer_new (
313 refresh_all_windows_on_startup, mgr, STARTUP_REFRESH_WINDOWS_DELAY);
314 #endif
315
316 pthread_t tid;
317 pthread_attr_t attr;
318 pthread_attr_init(&attr);
319 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
320 if (pthread_create (&tid, &attr, update_cached_head_commit_ids, mgr) < 0) {
321 seaf_warning ("Failed to create update cached head commit id thread.\n");
322 return -1;
323 }
324
325 return 0;
326 }
327
328 int
seaf_sync_manager_add_sync_task(SeafSyncManager * mgr,const char * repo_id,GError ** error)329 seaf_sync_manager_add_sync_task (SeafSyncManager *mgr,
330 const char *repo_id,
331 GError **error)
332 {
333 if (!seaf->started) {
334 seaf_message ("sync manager is not started, skip sync request.\n");
335 return -1;
336 }
337
338 SeafRepo *repo;
339
340 repo = seaf_repo_manager_get_repo (seaf->repo_mgr, repo_id);
341 if (!repo) {
342 seaf_warning ("[sync mgr] cannot find repo %s.\n", repo_id);
343 g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_BAD_REPO, "Invalid repo");
344 return -1;
345 }
346
347 if (seaf_repo_check_worktree (repo) < 0) {
348 g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_NO_WORKTREE,
349 "Worktree doesn't exist");
350 return -1;
351 }
352
353 #ifdef USE_GPL_CRYPTO
354 if (repo->version == 0 || (repo->encrypted && repo->enc_version < 2)) {
355 seaf_warning ("Don't support syncing old version libraries.\n");
356 g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_BAD_ARGS,
357 "Don't support syncing old version libraries");
358 return -1;
359 }
360 #endif
361
362 SyncInfo *info = get_sync_info (mgr, repo->id);
363
364 if (info->in_sync)
365 return 0;
366
367 if (repo->version > 0) {
368 if (check_http_protocol (mgr, repo)) {
369 sync_repo_v2 (mgr, repo, TRUE);
370 return 0;
371 }
372 } else {
373 seaf_warning ("Repo %s(%s) is version 0 library. Syncing is no longer supported.\n",
374 repo->name, repo->id);
375 }
376
377 return 0;
378 }
379
380 void
seaf_sync_manager_cancel_sync_task(SeafSyncManager * mgr,const char * repo_id)381 seaf_sync_manager_cancel_sync_task (SeafSyncManager *mgr,
382 const char *repo_id)
383 {
384 SyncInfo *info;
385 SyncTask *task;
386
387 if (!seaf->started) {
388 seaf_message ("sync manager is not started, skip cancel request.\n");
389 return;
390 }
391
392 /* Cancel running task. */
393 info = g_hash_table_lookup (mgr->sync_infos, repo_id);
394
395 if (!info)
396 return;
397 else if (!info->in_sync) {
398 if (info->current_task && info->current_task->state == SYNC_STATE_ERROR) {
399 info->err_cnt = 0;
400 info->in_error = FALSE;
401 info->sync_perm_err_cnt = 0;
402 }
403 return;
404 }
405
406 g_return_if_fail (info->current_task != NULL);
407 task = info->current_task;
408
409 switch (task->state) {
410 case SYNC_STATE_FETCH:
411 http_tx_manager_cancel_task (seaf->http_tx_mgr,
412 repo_id,
413 HTTP_TASK_TYPE_DOWNLOAD);
414 transition_sync_state (task, SYNC_STATE_CANCEL_PENDING);
415 break;
416 case SYNC_STATE_UPLOAD:
417 http_tx_manager_cancel_task (seaf->http_tx_mgr,
418 repo_id,
419 HTTP_TASK_TYPE_UPLOAD);
420 transition_sync_state (task, SYNC_STATE_CANCEL_PENDING);
421 break;
422 case SYNC_STATE_COMMIT:
423 case SYNC_STATE_INIT:
424 case SYNC_STATE_MERGE:
425 transition_sync_state (task, SYNC_STATE_CANCEL_PENDING);
426 break;
427 case SYNC_STATE_CANCEL_PENDING:
428 break;
429 default:
430 g_return_if_reached ();
431 }
432 }
433
434 /* Check the notify setting by user. */
435 static gboolean
need_notify_sync(SeafRepo * repo)436 need_notify_sync (SeafRepo *repo)
437 {
438 char *notify_setting = seafile_session_config_get_string(seaf, "notify_sync");
439 if (notify_setting == NULL) {
440 seafile_session_config_set_string(seaf, "notify_sync", "on");
441 return TRUE;
442 }
443
444 gboolean result = (g_strcmp0(notify_setting, "on") == 0);
445 g_free (notify_setting);
446 return result;
447 }
448
449 static const char *sync_state_str[] = {
450 "synchronized",
451 "committing",
452 "initializing",
453 "downloading",
454 "merging",
455 "uploading",
456 "error",
457 "canceled",
458 "cancel pending"
459 };
460
461 static gboolean
find_meaningful_commit(SeafCommit * commit,void * data,gboolean * stop)462 find_meaningful_commit (SeafCommit *commit, void *data, gboolean *stop)
463 {
464 SeafCommit **p_head = data;
465
466 if (commit->second_parent_id && commit->new_merge && !commit->conflict)
467 return TRUE;
468
469 *stop = TRUE;
470 seaf_commit_ref (commit);
471 *p_head = commit;
472 return TRUE;
473 }
474
475 static void
notify_sync(SeafRepo * repo,gboolean is_multipart_upload)476 notify_sync (SeafRepo *repo, gboolean is_multipart_upload)
477 {
478 SeafCommit *head = NULL;
479
480 if (!seaf_commit_manager_traverse_commit_tree_truncated (seaf->commit_mgr,
481 repo->id, repo->version,
482 repo->head->commit_id,
483 find_meaningful_commit,
484 &head, FALSE)) {
485 seaf_warning ("Failed to traverse commit tree of %.8s.\n", repo->id);
486 return;
487 }
488 if (!head)
489 return;
490
491 GString *buf = g_string_new (NULL);
492 g_string_append_printf (buf, "%s\t%s\t%s\t%s\t%s",
493 repo->name,
494 repo->id,
495 head->commit_id,
496 head->parent_id,
497 head->desc);
498 if (!is_multipart_upload)
499 seaf_mq_manager_publish_notification (seaf->mq_mgr,
500 "sync.done",
501 buf->str);
502 else
503 seaf_mq_manager_publish_notification (seaf->mq_mgr,
504 "sync.multipart_upload",
505 buf->str);
506 g_string_free (buf, TRUE);
507 seaf_commit_unref (head);
508 }
509
510 #define IN_ERROR_THRESHOLD 3
511
512 static gboolean
is_perm_error(int error)513 is_perm_error (int error)
514 {
515 return (error == SYNC_ERROR_ID_ACCESS_DENIED ||
516 error == SYNC_ERROR_ID_NO_WRITE_PERMISSION ||
517 error == SYNC_ERROR_ID_PERM_NOT_SYNCABLE ||
518 error == SYNC_ERROR_ID_FOLDER_PERM_DENIED);
519 }
520
521 static void
update_sync_info_error_state(SyncTask * task,int new_state)522 update_sync_info_error_state (SyncTask *task, int new_state)
523 {
524 SyncInfo *info = task->info;
525
526 if (new_state == SYNC_STATE_ERROR) {
527 info->err_cnt++;
528 if (info->err_cnt == IN_ERROR_THRESHOLD)
529 info->in_error = TRUE;
530 if (is_perm_error(task->error))
531 info->sync_perm_err_cnt++;
532 } else if (info->err_cnt > 0) {
533 info->err_cnt = 0;
534 info->in_error = FALSE;
535 info->sync_perm_err_cnt = 0;
536 }
537 }
538
539 static void commit_repo (SyncTask *task);
540
541 static void
transition_sync_state(SyncTask * task,int new_state)542 transition_sync_state (SyncTask *task, int new_state)
543 {
544 g_return_if_fail (new_state >= 0 && new_state < SYNC_STATE_NUM);
545
546 SyncInfo *info = task->info;
547
548 if (task->state != new_state) {
549 if (((task->state == SYNC_STATE_INIT && task->uploaded) ||
550 task->state == SYNC_STATE_FETCH) &&
551 new_state == SYNC_STATE_DONE &&
552 need_notify_sync(task->repo))
553 notify_sync (task->repo, (info->multipart_upload && !info->end_multipart_upload));
554
555 /* If we're in the process of uploading large set of files, they'll be splitted
556 * into multiple batches for upload. We want to immediately start the next batch
557 * after previous one is done.
558 */
559 if (new_state == SYNC_STATE_DONE &&
560 info->multipart_upload &&
561 !info->end_multipart_upload) {
562 commit_repo (task);
563 return;
564 }
565
566 /* If file error levels occured during sync, the whole sync process can still finish
567 * with DONE state. But we need to notify the user about this error in the interface.
568 * Such file level errors are set with seaf_sync_manager_set_task_error_code().
569 */
570 if (new_state != SYNC_STATE_ERROR && task->error != SYNC_ERROR_ID_NO_ERROR) {
571 new_state = SYNC_STATE_ERROR;
572 seaf_message ("Repo '%s' sync is finished but with error: %s\n",
573 task->repo->name,
574 sync_error_id_to_str(task->error));
575 }
576
577 if (!(task->state == SYNC_STATE_DONE && new_state == SYNC_STATE_INIT) &&
578 !(task->state == SYNC_STATE_INIT && new_state == SYNC_STATE_DONE)) {
579 seaf_message ("Repo '%s' sync state transition from '%s' to '%s'.\n",
580 task->repo->name,
581 sync_state_str[task->state],
582 sync_state_str[new_state]);
583 }
584
585 task->state = new_state;
586 if (new_state == SYNC_STATE_DONE ||
587 new_state == SYNC_STATE_CANCELED ||
588 new_state == SYNC_STATE_ERROR) {
589 info->in_sync = FALSE;
590 --(task->mgr->n_running_tasks);
591 update_sync_info_error_state (task, new_state);
592
593 /* Keep previous upload progress if sync task is canceled or failed. */
594 if (new_state == SYNC_STATE_DONE) {
595 info->multipart_upload = FALSE;
596 info->end_multipart_upload = FALSE;
597 info->total_bytes = 0;
598 info->uploaded_bytes = 0;
599 }
600 }
601
602 #ifdef WIN32
603 seaf_sync_manager_add_refresh_path (seaf->sync_mgr, task->repo->worktree);
604 #endif
605 }
606 }
607
608 static void
set_task_error(SyncTask * task,int error)609 set_task_error (SyncTask *task, int error)
610 {
611 g_return_if_fail (error >= 0 && error < N_SYNC_ERROR_ID);
612
613 const char *err_str = sync_error_id_to_str(error);
614 int err_level = sync_error_level(error);
615
616 if (task->state != SYNC_STATE_ERROR) {
617 seaf_message ("Repo '%s' sync state transition from %s to '%s': '%s'.\n",
618 task->repo->name,
619 sync_state_str[task->state],
620 sync_state_str[SYNC_STATE_ERROR],
621 err_str);
622 task->state = SYNC_STATE_ERROR;
623 task->error = error;
624 task->info->in_sync = FALSE;
625 --(task->mgr->n_running_tasks);
626 update_sync_info_error_state (task, SYNC_STATE_ERROR);
627
628 /* For repo-level errors, only need to record in database, but not send notifications.
629 * File-level errors are recorded and notified in the location they happens, not here.
630 */
631 if (err_level == SYNC_ERROR_LEVEL_REPO)
632 seaf_repo_manager_record_sync_error (task->repo->id, task->repo->name, NULL, error);
633
634 #ifdef WIN32
635 seaf_sync_manager_add_refresh_path (seaf->sync_mgr, task->repo->worktree);
636 #endif
637
638 }
639 }
640
641 void
seaf_sync_manager_set_task_error_code(SeafSyncManager * mgr,const char * repo_id,int error)642 seaf_sync_manager_set_task_error_code (SeafSyncManager *mgr,
643 const char *repo_id,
644 int error)
645 {
646 SyncInfo *info = g_hash_table_lookup (mgr->sync_infos, repo_id);
647 if (!info)
648 return;
649
650 info->current_task->error = error;
651 }
652
653 static void
sync_task_free(SyncTask * task)654 sync_task_free (SyncTask *task)
655 {
656 g_free (task->tx_id);
657 g_free (task->dest_id);
658 g_free (task->token);
659 g_free (task);
660 }
661
662 static void
start_upload_if_necessary(SyncTask * task)663 start_upload_if_necessary (SyncTask *task)
664 {
665 GError *error = NULL;
666 SeafRepo *repo = task->repo;
667
668 if (http_tx_manager_add_upload (seaf->http_tx_mgr,
669 repo->id,
670 repo->version,
671 repo->effective_host,
672 repo->token,
673 task->http_version,
674 repo->use_fileserver_port,
675 &error) < 0) {
676 seaf_warning ("Failed to start http upload: %s\n", error->message);
677 set_task_error (task, SYNC_ERROR_ID_NOT_ENOUGH_MEMORY);
678 return;
679 }
680 task->tx_id = g_strdup(repo->id);
681
682 transition_sync_state (task, SYNC_STATE_UPLOAD);
683 }
684
685 static void
start_fetch_if_necessary(SyncTask * task,const char * remote_head)686 start_fetch_if_necessary (SyncTask *task, const char *remote_head)
687 {
688 GError *error = NULL;
689 SeafRepo *repo = task->repo;
690
691 if (http_tx_manager_add_download (seaf->http_tx_mgr,
692 repo->id,
693 repo->version,
694 repo->effective_host,
695 repo->token,
696 remote_head,
697 FALSE,
698 NULL, NULL,
699 task->http_version,
700 repo->email,
701 repo->use_fileserver_port,
702 repo->name,
703 &error) < 0) {
704 seaf_warning ("Failed to start http download: %s.\n", error->message);
705 set_task_error (task, SYNC_ERROR_ID_NOT_ENOUGH_MEMORY);
706 return;
707 }
708 task->tx_id = g_strdup(repo->id);
709
710 transition_sync_state (task, SYNC_STATE_FETCH);
711 }
712
713 static gboolean
repo_block_store_exists(SeafRepo * repo)714 repo_block_store_exists (SeafRepo *repo)
715 {
716 gboolean ret;
717 char *store_path = g_build_filename (seaf->seaf_dir, "storage", "blocks",
718 repo->id, NULL);
719 if (g_file_test (store_path, G_FILE_TEST_IS_DIR))
720 ret = TRUE;
721 else
722 ret = FALSE;
723 g_free (store_path);
724 return ret;
725 }
726
727 #if defined WIN32 || defined __APPLE__
728
729 static GHashTable *
load_locked_files_blocks(const char * repo_id)730 load_locked_files_blocks (const char *repo_id)
731 {
732 LockedFileSet *fset;
733 GHashTable *block_id_hash;
734 GHashTableIter iter;
735 gpointer key, value;
736 LockedFile *locked;
737 Seafile *file;
738 int i;
739 char *blk_id;
740
741 fset = seaf_repo_manager_get_locked_file_set (seaf->repo_mgr, repo_id);
742
743 block_id_hash = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL);
744
745 g_hash_table_iter_init (&iter, fset->locked_files);
746 while (g_hash_table_iter_next (&iter, &key, &value)) {
747 locked = value;
748
749 if (strcmp (locked->operation, LOCKED_OP_UPDATE) == 0) {
750 file = seaf_fs_manager_get_seafile (seaf->fs_mgr,
751 fset->repo_id, 1,
752 locked->file_id);
753 if (!file) {
754 seaf_warning ("Failed to find file %s in repo %.8s.\n",
755 locked->file_id, fset->repo_id);
756 continue;
757 }
758
759 for (i = 0; i < file->n_blocks; ++i) {
760 blk_id = g_strdup (file->blk_sha1s[i]);
761 g_hash_table_replace (block_id_hash, blk_id, blk_id);
762 }
763
764 seafile_unref (file);
765 }
766 }
767
768 locked_file_set_free (fset);
769
770 return block_id_hash;
771 }
772
773 static gboolean
remove_block_cb(const char * store_id,int version,const char * block_id,void * user_data)774 remove_block_cb (const char *store_id,
775 int version,
776 const char *block_id,
777 void *user_data)
778 {
779 GHashTable *block_hash = user_data;
780
781 if (!g_hash_table_lookup (block_hash, block_id))
782 seaf_block_manager_remove_block (seaf->block_mgr, store_id, version, block_id);
783
784 return TRUE;
785 }
786
787 #endif
788
789 static void *
remove_repo_blocks(void * vtask)790 remove_repo_blocks (void *vtask)
791 {
792 SyncTask *task = vtask;
793
794 #if defined WIN32 || defined __APPLE__
795 GHashTable *block_hash;
796
797 block_hash = load_locked_files_blocks (task->repo->id);
798 if (g_hash_table_size (block_hash) == 0) {
799 g_hash_table_destroy (block_hash);
800 seaf_block_manager_remove_store (seaf->block_mgr, task->repo->id);
801 return vtask;
802 }
803
804 seaf_block_manager_foreach_block (seaf->block_mgr,
805 task->repo->id,
806 task->repo->version,
807 remove_block_cb,
808 block_hash);
809
810 g_hash_table_destroy (block_hash);
811 #else
812 seaf_block_manager_remove_store (seaf->block_mgr, task->repo->id);
813 #endif
814
815 return vtask;
816 }
817
818 static void
remove_blocks_done(void * vtask)819 remove_blocks_done (void *vtask)
820 {
821 SyncTask *task = vtask;
822
823 transition_sync_state (task, SYNC_STATE_DONE);
824 }
825
826 static void
on_repo_deleted_on_server(SyncTask * task,SeafRepo * repo)827 on_repo_deleted_on_server (SyncTask *task, SeafRepo *repo)
828 {
829 set_task_error (task, SYNC_ERROR_ID_SERVER_REPO_DELETED);
830
831 seaf_warning ("repo %s(%.8s) not found on server\n",
832 repo->name, repo->id);
833
834 if (!seafile_session_config_get_allow_repo_not_found_on_server(seaf)) {
835 seaf_message ("remove repo %s(%.8s) since it's deleted on relay\n",
836 repo->name, repo->id);
837 /* seaf_mq_manager_publish_notification (seaf->mq_mgr, */
838 /* "repo.deleted_on_relay", */
839 /* repo->name); */
840 seaf_repo_manager_del_repo (seaf->repo_mgr, repo);
841 }
842 }
843
844 static void
update_sync_status_v2(SyncTask * task)845 update_sync_status_v2 (SyncTask *task)
846 {
847 SyncInfo *info = task->info;
848 SeafRepo *repo = task->repo;
849 SeafBranch *master = NULL, *local = NULL;
850
851 local = seaf_branch_manager_get_branch (
852 seaf->branch_mgr, info->repo_id, "local");
853 if (!local) {
854 seaf_warning ("[sync-mgr] Branch local not found for repo %s(%.8s).\n",
855 repo->name, repo->id);
856 set_task_error (task, SYNC_ERROR_ID_LOCAL_DATA_CORRUPT);
857 return;
858 }
859
860 master = seaf_branch_manager_get_branch (
861 seaf->branch_mgr, info->repo_id, "master");
862 if (!master) {
863 seaf_warning ("[sync-mgr] Branch master not found for repo %s(%.8s).\n",
864 repo->name, repo->id);
865 set_task_error (task, SYNC_ERROR_ID_LOCAL_DATA_CORRUPT);
866 return;
867 }
868
869 if (info->repo_corrupted) {
870 set_task_error (task, SYNC_ERROR_ID_SERVER_REPO_CORRUPT);
871 } else if (info->deleted_on_relay) {
872 on_repo_deleted_on_server (task, repo);
873 } else {
874 /* If local head is the same as remote head, already in sync. */
875 if (strcmp (local->commit_id, info->head_commit) == 0) {
876 /* As long as the repo is synced with the server. All the local
877 * blocks are not useful any more.
878 */
879 if (repo_block_store_exists (repo)) {
880 seaf_message ("Removing blocks for repo %s(%.8s).\n",
881 repo->name, repo->id);
882 seaf_job_manager_schedule_job (seaf->job_mgr,
883 remove_repo_blocks,
884 remove_blocks_done,
885 task);
886 } else
887 transition_sync_state (task, SYNC_STATE_DONE);
888 } else
889 start_fetch_if_necessary (task, task->info->head_commit);
890 }
891
892 seaf_branch_unref (local);
893 seaf_branch_unref (master);
894 }
895
896 static void
check_head_commit_done(HttpHeadCommit * result,void * user_data)897 check_head_commit_done (HttpHeadCommit *result, void *user_data)
898 {
899 SyncTask *task = user_data;
900 SyncInfo *info = task->info;
901
902 if (!result->check_success) {
903 set_task_error (task, result->error_code);
904 return;
905 }
906
907 info->deleted_on_relay = result->is_deleted;
908 info->repo_corrupted = result->is_corrupt;
909 memcpy (info->head_commit, result->head_commit, 40);
910
911 update_sync_status_v2 (task);
912 }
913
914 static int
check_head_commit_http(SyncTask * task)915 check_head_commit_http (SyncTask *task)
916 {
917 SeafRepo *repo = task->repo;
918
919 int ret = http_tx_manager_check_head_commit (seaf->http_tx_mgr,
920 repo->id, repo->version,
921 repo->effective_host,
922 repo->token,
923 repo->use_fileserver_port,
924 check_head_commit_done,
925 task);
926 if (ret == 0)
927 transition_sync_state (task, SYNC_STATE_INIT);
928 else if (ret < 0)
929 set_task_error (task, SYNC_ERROR_ID_NOT_ENOUGH_MEMORY);
930
931 return ret;
932 }
933
934 struct CommitResult {
935 SyncTask *task;
936 gboolean changed;
937 gboolean success;
938 };
939
940 static void *
commit_job(void * vtask)941 commit_job (void *vtask)
942 {
943 SyncTask *task = vtask;
944 SeafRepo *repo = task->repo;
945 struct CommitResult *res = g_new0 (struct CommitResult, 1);
946 GError *error = NULL;
947
948 res->task = task;
949
950 if (repo->delete_pending)
951 return res;
952
953 res->changed = TRUE;
954 res->success = TRUE;
955
956 char *commit_id = seaf_repo_index_commit (repo,
957 task->is_manual_sync,
958 task->is_initial_commit,
959 &error);
960 if (commit_id == NULL && error != NULL) {
961 seaf_warning ("[Sync mgr] Failed to commit to repo %s(%.8s).\n",
962 repo->name, repo->id);
963 res->success = FALSE;
964 } else if (commit_id == NULL) {
965 res->changed = FALSE;
966 }
967 g_free (commit_id);
968
969 return res;
970 }
971
972 static void
commit_job_done(void * vres)973 commit_job_done (void *vres)
974 {
975 struct CommitResult *res = vres;
976 SeafRepo *repo = res->task->repo;
977 SyncTask *task = res->task;
978
979 res->task->mgr->commit_job_running = FALSE;
980
981 if (repo->delete_pending) {
982 transition_sync_state (res->task, SYNC_STATE_CANCELED);
983 seaf_repo_manager_del_repo (seaf->repo_mgr, repo);
984 g_free (res);
985 return;
986 }
987
988 if (res->task->state == SYNC_STATE_CANCEL_PENDING) {
989 transition_sync_state (res->task, SYNC_STATE_CANCELED);
990 g_free (res);
991 return;
992 }
993
994 if (!res->success) {
995 set_task_error (res->task, SYNC_ERROR_ID_INDEX_ERROR);
996 g_free (res);
997 return;
998 }
999
1000 if (res->changed)
1001 start_upload_if_necessary (res->task);
1002 else if (task->is_manual_sync || task->is_initial_commit)
1003 check_head_commit_http (task);
1004 else
1005 transition_sync_state (task, SYNC_STATE_DONE);
1006
1007 g_free (res);
1008 }
1009
1010 static int check_commit_state (void *data);
1011
1012 static void
commit_repo(SyncTask * task)1013 commit_repo (SyncTask *task)
1014 {
1015 /* In order not to eat too much CPU power, only one commit job can be run
1016 * at the same time. Other sync tasks have to check every 1 second.
1017 */
1018 if (task->mgr->commit_job_running) {
1019 task->commit_timer = seaf_timer_new (check_commit_state, task, 1000);
1020 return;
1021 }
1022
1023 task->mgr->commit_job_running = TRUE;
1024
1025 transition_sync_state (task, SYNC_STATE_COMMIT);
1026
1027 if (seaf_job_manager_schedule_job (seaf->job_mgr,
1028 commit_job,
1029 commit_job_done,
1030 task) < 0)
1031 set_task_error (task, SYNC_ERROR_ID_NOT_ENOUGH_MEMORY);
1032 }
1033
1034 static int
check_commit_state(void * data)1035 check_commit_state (void *data)
1036 {
1037 SyncTask *task = data;
1038
1039 if (!task->mgr->commit_job_running) {
1040 seaf_timer_free (&task->commit_timer);
1041 commit_repo (task);
1042 return 0;
1043 }
1044
1045 return 1;
1046 }
1047
1048 static SyncTask *
create_sync_task_v2(SeafSyncManager * manager,SeafRepo * repo,gboolean is_manual_sync,gboolean is_initial_commit)1049 create_sync_task_v2 (SeafSyncManager *manager, SeafRepo *repo,
1050 gboolean is_manual_sync, gboolean is_initial_commit)
1051 {
1052 SyncTask *task = g_new0 (SyncTask, 1);
1053 SyncInfo *info;
1054
1055 info = get_sync_info (manager, repo->id);
1056
1057 task->info = info;
1058 task->mgr = manager;
1059
1060 task->dest_id = g_strdup (repo->relay_id);
1061 task->token = g_strdup(repo->token);
1062 task->is_manual_sync = is_manual_sync;
1063 task->is_initial_commit = is_initial_commit;
1064 task->error = SYNC_ERROR_ID_NO_ERROR;
1065
1066 repo->last_sync_time = time(NULL);
1067 ++(manager->n_running_tasks);
1068
1069 /* Free the last task when a new task is started.
1070 * This way we can always get the state of the last task even
1071 * after it's done.
1072 */
1073 if (task->info->current_task)
1074 sync_task_free (task->info->current_task);
1075 task->info->current_task = task;
1076 task->info->in_sync = TRUE;
1077 task->repo = repo;
1078
1079 if (repo->server_url) {
1080 HttpServerState *state = g_hash_table_lookup (manager->http_server_states,
1081 repo->server_url);
1082 if (state) {
1083 task->http_version = state->http_version;
1084 }
1085 }
1086
1087 return task;
1088 }
1089
1090 static gboolean
create_commit_from_event_queue(SeafSyncManager * manager,SeafRepo * repo,gboolean is_manual_sync)1091 create_commit_from_event_queue (SeafSyncManager *manager, SeafRepo *repo,
1092 gboolean is_manual_sync)
1093 {
1094 WTStatus *status;
1095 SyncTask *task;
1096 gboolean ret = FALSE;
1097 gint now = (gint)time(NULL);
1098 gint last_changed;
1099
1100 status = seaf_wt_monitor_get_worktree_status (manager->seaf->wt_monitor,
1101 repo->id);
1102 if (status) {
1103 last_changed = g_atomic_int_get (&status->last_changed);
1104 if (status->last_check == 0) {
1105 /* Force commit and sync after a new repo is added. */
1106 task = create_sync_task_v2 (manager, repo, is_manual_sync, TRUE);
1107 repo->create_partial_commit = TRUE;
1108 commit_repo (task);
1109 status->last_check = now;
1110 ret = TRUE;
1111 } else if (status->partial_commit) {
1112 task = create_sync_task_v2 (manager, repo, is_manual_sync, FALSE);
1113 repo->create_partial_commit = TRUE;
1114 commit_repo (task);
1115 ret = TRUE;
1116 } else if (last_changed != 0 && status->last_check <= last_changed) {
1117 /* Commit and sync if the repo has been updated after the
1118 * last check and is not updated for the last 2 seconds.
1119 */
1120 if (now - last_changed >= 2) {
1121 task = create_sync_task_v2 (manager, repo, is_manual_sync, FALSE);
1122 repo->create_partial_commit = TRUE;
1123 commit_repo (task);
1124 status->last_check = now;
1125 ret = TRUE;
1126 }
1127 }
1128 wt_status_unref (status);
1129 }
1130
1131 return ret;
1132 }
1133
1134 static gboolean
can_schedule_repo(SeafSyncManager * manager,SeafRepo * repo)1135 can_schedule_repo (SeafSyncManager *manager, SeafRepo *repo)
1136 {
1137 int now = (int)time(NULL);
1138
1139 return ((repo->last_sync_time == 0 ||
1140 repo->last_sync_time < now - manager->sync_interval) &&
1141 manager->n_running_tasks < MAX_RUNNING_SYNC_TASKS);
1142 }
1143
1144 static gboolean
need_check_on_server(SeafSyncManager * manager,SeafRepo * repo,const char * master_head_id)1145 need_check_on_server (SeafSyncManager *manager, SeafRepo *repo, const char *master_head_id)
1146 {
1147 #define HEAD_COMMIT_MAP_TTL 90
1148 HttpServerState *state;
1149 gboolean ret = FALSE;
1150 SyncInfo *info;
1151
1152 /* If sync state is in error, always retry. */
1153 info = get_sync_info (manager, repo->id);
1154 if (info && info->current_task && info->current_task->state == SYNC_STATE_ERROR)
1155 return TRUE;
1156
1157 state = g_hash_table_lookup (manager->http_server_states, repo->server_url);
1158 if (!state)
1159 return TRUE;
1160
1161 pthread_mutex_lock (&state->head_commit_map_lock);
1162
1163 if (!state->head_commit_map_init) {
1164 ret = TRUE;
1165 goto out;
1166 }
1167
1168 gint64 now = (gint64)time(NULL);
1169 if (now - state->last_update_head_commit_map_time >= HEAD_COMMIT_MAP_TTL) {
1170 ret = TRUE;
1171 goto out;
1172 }
1173
1174 char *server_head = g_hash_table_lookup (state->head_commit_map, repo->id);
1175 if (!server_head) {
1176 /* Repo was removed on server. Just return "changed on server". */
1177 ret = TRUE;
1178 goto out;
1179 }
1180 if (g_strcmp0 (server_head, master_head_id) != 0)
1181 ret = TRUE;
1182
1183 out:
1184 pthread_mutex_unlock (&state->head_commit_map_lock);
1185 return ret;
1186 }
1187
1188 static int
sync_repo_v2(SeafSyncManager * manager,SeafRepo * repo,gboolean is_manual_sync)1189 sync_repo_v2 (SeafSyncManager *manager, SeafRepo *repo, gboolean is_manual_sync)
1190 {
1191 SeafBranch *master, *local;
1192 SyncTask *task;
1193 int ret = 0;
1194 char *last_download = NULL;
1195
1196 master = seaf_branch_manager_get_branch (seaf->branch_mgr, repo->id, "master");
1197 if (!master) {
1198 seaf_warning ("No master branch found for repo %s(%.8s).\n",
1199 repo->name, repo->id);
1200 return -1;
1201 }
1202 local = seaf_branch_manager_get_branch (seaf->branch_mgr, repo->id, "local");
1203 if (!local) {
1204 seaf_warning ("No local branch found for repo %s(%.8s).\n",
1205 repo->name, repo->id);
1206 return -1;
1207 }
1208
1209 /* If last download was interrupted in the fetch and download stage,
1210 * need to resume it at exactly the same remote commit.
1211 */
1212 last_download = seaf_repo_manager_get_repo_property (seaf->repo_mgr,
1213 repo->id,
1214 REPO_PROP_DOWNLOAD_HEAD);
1215 if (last_download && strcmp (last_download, EMPTY_SHA1) != 0) {
1216 if (is_manual_sync || can_schedule_repo (manager, repo)) {
1217 task = create_sync_task_v2 (manager, repo, is_manual_sync, FALSE);
1218 start_fetch_if_necessary (task, last_download);
1219 }
1220 goto out;
1221 }
1222
1223 if (strcmp (master->commit_id, local->commit_id) != 0) {
1224 if (is_manual_sync || can_schedule_repo (manager, repo)) {
1225 task = create_sync_task_v2 (manager, repo, is_manual_sync, FALSE);
1226 start_upload_if_necessary (task);
1227 }
1228 /* Do nothing if the client still has something to upload
1229 * but it's before 30-second schedule.
1230 */
1231 goto out;
1232 } else if (is_manual_sync) {
1233 task = create_sync_task_v2 (manager, repo, is_manual_sync, FALSE);
1234 commit_repo (task);
1235 goto out;
1236 } else if (create_commit_from_event_queue (manager, repo, is_manual_sync))
1237 goto out;
1238
1239 if (is_manual_sync || can_schedule_repo (manager, repo)) {
1240 /* If file syncing protocol version is higher than 2, we check for all head commit ids
1241 * for synced repos regularly.
1242 */
1243 if (!is_manual_sync && !need_check_on_server (manager, repo, master->commit_id)) {
1244 seaf_debug ("Repo %s is not changed on server %s.\n", repo->name, repo->server_url);
1245 repo->last_sync_time = time(NULL);
1246 goto out;
1247 }
1248
1249 task = create_sync_task_v2 (manager, repo, is_manual_sync, FALSE);
1250 check_head_commit_http (task);
1251 }
1252
1253 out:
1254 g_free (last_download);
1255 seaf_branch_unref (master);
1256 seaf_branch_unref (local);
1257 return ret;
1258 }
1259
1260 static void
auto_delete_repo(SeafSyncManager * manager,SeafRepo * repo)1261 auto_delete_repo (SeafSyncManager *manager, SeafRepo *repo)
1262 {
1263 SyncInfo *info = seaf_sync_manager_get_sync_info (manager, repo->id);
1264 char *name = g_strdup (repo->name);
1265
1266 seaf_message ("Auto deleted repo '%s'.\n", repo->name);
1267
1268 seaf_sync_manager_cancel_sync_task (seaf->sync_mgr, repo->id);
1269
1270 if (info != NULL && info->in_sync) {
1271 seaf_repo_manager_mark_repo_deleted (seaf->repo_mgr, repo);
1272 } else {
1273 seaf_repo_manager_del_repo (seaf->repo_mgr, repo);
1274 }
1275
1276 /* Publish a message, for applet to notify in the system tray */
1277 seaf_mq_manager_publish_notification (seaf->mq_mgr,
1278 "repo.removed",
1279 name);
1280 g_free (name);
1281 }
1282
1283 static char *
http_fileserver_url(const char * url)1284 http_fileserver_url (const char *url)
1285 {
1286 const char *host;
1287 char *colon;
1288 char *url_no_port;
1289 char *ret = NULL;
1290
1291 /* Just return the url itself if it's invalid. */
1292 if (strlen(url) <= strlen("http://"))
1293 return g_strdup(url);
1294
1295 /* Skip protocol schem. */
1296 host = url + strlen("http://");
1297
1298 colon = strrchr (host, ':');
1299 if (colon) {
1300 url_no_port = g_strndup(url, colon - url);
1301 ret = g_strconcat(url_no_port, ":8082", NULL);
1302 g_free (url_no_port);
1303 } else {
1304 ret = g_strconcat(url, ":8082", NULL);
1305 }
1306
1307 return ret;
1308 }
1309
1310 static void
check_http_fileserver_protocol_done(HttpProtocolVersion * result,void * user_data)1311 check_http_fileserver_protocol_done (HttpProtocolVersion *result, void *user_data)
1312 {
1313 HttpServerState *state = user_data;
1314
1315 state->checking = FALSE;
1316
1317 if (result->check_success && !result->not_supported) {
1318 state->http_version = MIN(result->version, CURRENT_SYNC_PROTO_VERSION);
1319 state->effective_host = http_fileserver_url(state->testing_host);
1320 state->use_fileserver_port = TRUE;
1321 seaf_message ("File syncing protocol version on server %s is %d. "
1322 "Client file syncing protocol version is %d. Use version %d.\n",
1323 state->effective_host, result->version, CURRENT_SYNC_PROTO_VERSION,
1324 state->http_version);
1325 }
1326 }
1327
1328 static void
check_http_protocol_done(HttpProtocolVersion * result,void * user_data)1329 check_http_protocol_done (HttpProtocolVersion *result, void *user_data)
1330 {
1331 HttpServerState *state = user_data;
1332
1333 if (result->check_success && !result->not_supported) {
1334 state->http_version = MIN(result->version, CURRENT_SYNC_PROTO_VERSION);
1335 state->effective_host = g_strdup(state->testing_host);
1336 state->checking = FALSE;
1337 seaf_message ("File syncing protocol version on server %s is %d. "
1338 "Client file syncing protocol version is %d. Use version %d.\n",
1339 state->effective_host, result->version, CURRENT_SYNC_PROTO_VERSION,
1340 state->http_version);
1341 } else if (strncmp(state->testing_host, "https", 5) != 0) {
1342 char *host_fileserver = http_fileserver_url(state->testing_host);
1343 if (http_tx_manager_check_protocol_version (seaf->http_tx_mgr,
1344 host_fileserver,
1345 TRUE,
1346 check_http_fileserver_protocol_done,
1347 state) < 0)
1348 state->checking = FALSE;
1349 g_free (host_fileserver);
1350 } else {
1351 state->checking = FALSE;
1352 }
1353 }
1354
1355 #define CHECK_HTTP_INTERVAL 10
1356
1357 /*
1358 * Returns TRUE if we're ready to use http-sync; otherwise FALSE.
1359 */
1360 static gboolean
check_http_protocol(SeafSyncManager * mgr,SeafRepo * repo)1361 check_http_protocol (SeafSyncManager *mgr, SeafRepo *repo)
1362 {
1363 /* If a repo was cloned before 4.0, server-url is not set. */
1364 if (!repo->server_url)
1365 return FALSE;
1366
1367 HttpServerState *state = g_hash_table_lookup (mgr->http_server_states,
1368 repo->server_url);
1369 if (!state) {
1370 state = http_server_state_new ();
1371 g_hash_table_insert (mgr->http_server_states,
1372 g_strdup(repo->server_url), state);
1373 }
1374
1375 if (state->checking) {
1376 return FALSE;
1377 }
1378
1379 if (state->http_version > 0) {
1380 if (!repo->effective_host) {
1381 repo->effective_host = g_strdup(state->effective_host);
1382 repo->use_fileserver_port = state->use_fileserver_port;
1383 }
1384 return TRUE;
1385 }
1386
1387 /* If we haven't detected the server url successfully, retry every 10 seconds. */
1388 gint64 now = time(NULL);
1389 if (now - state->last_http_check_time < CHECK_HTTP_INTERVAL)
1390 return FALSE;
1391
1392 /* First try repo->server_url.
1393 * If it fails and https is not used, try server_url:8082 instead.
1394 */
1395 g_free (state->testing_host);
1396 state->testing_host = g_strdup(repo->server_url);
1397
1398 state->last_http_check_time = (gint64)time(NULL);
1399
1400 if (http_tx_manager_check_protocol_version (seaf->http_tx_mgr,
1401 repo->server_url,
1402 FALSE,
1403 check_http_protocol_done,
1404 state) < 0)
1405 return FALSE;
1406
1407 state->checking = TRUE;
1408
1409 return FALSE;
1410 }
1411
1412 gint
cmp_repos_by_sync_time(gconstpointer a,gconstpointer b,gpointer user_data)1413 cmp_repos_by_sync_time (gconstpointer a, gconstpointer b, gpointer user_data)
1414 {
1415 const SeafRepo *repo_a = a;
1416 const SeafRepo *repo_b = b;
1417
1418 return (repo_a->last_sync_time - repo_b->last_sync_time);
1419 }
1420
1421 #if defined WIN32 || defined __APPLE__
1422
1423 static void
cleanup_file_blocks(const char * repo_id,int version,const char * file_id)1424 cleanup_file_blocks (const char *repo_id, int version, const char *file_id)
1425 {
1426 Seafile *file;
1427 int i;
1428
1429 file = seaf_fs_manager_get_seafile (seaf->fs_mgr,
1430 repo_id, version,
1431 file_id);
1432 for (i = 0; i < file->n_blocks; ++i)
1433 seaf_block_manager_remove_block (seaf->block_mgr,
1434 repo_id, version,
1435 file->blk_sha1s[i]);
1436
1437 seafile_unref (file);
1438 }
1439
1440 static gboolean
handle_locked_file_update(SeafRepo * repo,struct index_state * istate,LockedFileSet * fset,const char * path,LockedFile * locked)1441 handle_locked_file_update (SeafRepo *repo, struct index_state *istate,
1442 LockedFileSet *fset, const char *path, LockedFile *locked)
1443 {
1444 gboolean locked_on_server = FALSE;
1445 struct cache_entry *ce;
1446 char file_id[41];
1447 char *fullpath = NULL;
1448 SeafStat st;
1449 gboolean file_exists = TRUE;
1450 SeafileCrypt *crypt = NULL;
1451 SeafBranch *master = NULL;
1452 gboolean ret = TRUE;
1453
1454 locked_on_server = seaf_filelock_manager_is_file_locked (seaf->filelock_mgr,
1455 repo->id,
1456 path);
1457 /* File is still locked, do nothing. */
1458 if (do_check_file_locked (path, repo->worktree, locked_on_server))
1459 return FALSE;
1460
1461 seaf_debug ("Update previously locked file %s in repo %.8s.\n",
1462 path, repo->id);
1463
1464 /* If the file was locked on the last checkout, the worktree file was not
1465 * updated, but the index has been updated. So the ce in the index should
1466 * contain the information for the file to be updated.
1467 */
1468 ce = index_name_exists (istate, path, strlen(path), 0);
1469 if (!ce) {
1470 seaf_warning ("Cache entry for %s in repo %s(%.8s) is not found "
1471 "when update locked file.",
1472 path, repo->name, repo->id);
1473 goto remove_from_db;
1474 }
1475
1476 rawdata_to_hex (ce->sha1, file_id, 20);
1477
1478 fullpath = g_build_filename (repo->worktree, path, NULL);
1479
1480 file_exists = seaf_util_exists (fullpath);
1481
1482 if (file_exists && seaf_stat (fullpath, &st) < 0) {
1483 seaf_warning ("Failed to stat %s: %s.\n", fullpath, strerror(errno));
1484 goto out;
1485 }
1486
1487 if (repo->encrypted)
1488 crypt = seafile_crypt_new (repo->enc_version,
1489 repo->enc_key,
1490 repo->enc_iv);
1491
1492 master = seaf_branch_manager_get_branch (seaf->branch_mgr, repo->id, "master");
1493 if (!master) {
1494 seaf_warning ("No master branch found for repo %s(%.8s).\n",
1495 repo->name, repo->id);
1496 goto out;
1497 }
1498
1499 gboolean conflicted;
1500 gboolean force_conflict = (file_exists && st.st_mtime != locked->old_mtime);
1501 if (seaf_fs_manager_checkout_file (seaf->fs_mgr,
1502 repo->id, repo->version,
1503 file_id, fullpath,
1504 ce->ce_mode, ce->ce_mtime.sec,
1505 crypt,
1506 path,
1507 master->commit_id,
1508 force_conflict,
1509 &conflicted,
1510 repo->email) < 0) {
1511 seaf_warning ("Failed to checkout previously locked file %s in repo "
1512 "%s(%.8s).\n",
1513 path, repo->name, repo->id);
1514 }
1515
1516 seaf_sync_manager_update_active_path (seaf->sync_mgr,
1517 repo->id,
1518 path,
1519 S_IFREG,
1520 SYNC_STATUS_SYNCED,
1521 TRUE);
1522
1523 /* In checkout, the file was overwritten by rename, so the file attributes
1524 are gone. We have to set read-only state again.
1525 */
1526 if (locked_on_server)
1527 seaf_filelock_manager_lock_wt_file (seaf->filelock_mgr,
1528 repo->id,
1529 path);
1530
1531 out:
1532 cleanup_file_blocks (repo->id, repo->version, file_id);
1533
1534 remove_from_db:
1535 /* Remove the locked file record from db. */
1536 locked_file_set_remove (fset, path, TRUE);
1537
1538 g_free (fullpath);
1539 g_free (crypt);
1540 seaf_branch_unref (master);
1541 return ret;
1542 }
1543
1544 static gboolean
handle_locked_file_delete(SeafRepo * repo,struct index_state * istate,LockedFileSet * fset,const char * path,LockedFile * locked)1545 handle_locked_file_delete (SeafRepo *repo, struct index_state *istate,
1546 LockedFileSet *fset, const char *path, LockedFile *locked)
1547 {
1548 gboolean locked_on_server = FALSE;
1549 char *fullpath = NULL;
1550 SeafStat st;
1551 gboolean file_exists = TRUE;
1552 gboolean ret = TRUE;
1553
1554 locked_on_server = seaf_filelock_manager_is_file_locked (seaf->filelock_mgr,
1555 repo->id,
1556 path);
1557
1558 /* File is still locked, do nothing. */
1559 if (do_check_file_locked (path, repo->worktree, locked_on_server))
1560 return FALSE;
1561
1562 seaf_debug ("Delete previously locked file %s in repo %.8s.\n",
1563 path, repo->id);
1564
1565 fullpath = g_build_filename (repo->worktree, path, NULL);
1566
1567 file_exists = seaf_util_exists (fullpath);
1568
1569 if (file_exists && seaf_stat (fullpath, &st) < 0) {
1570 seaf_warning ("Failed to stat %s: %s.\n", fullpath, strerror(errno));
1571 goto out;
1572 }
1573
1574 if (file_exists && st.st_mtime == locked->old_mtime)
1575 seaf_util_unlink (fullpath);
1576
1577 out:
1578 /* Remove the locked file record from db. */
1579 locked_file_set_remove (fset, path, TRUE);
1580
1581 g_free (fullpath);
1582 return ret;
1583 }
1584
1585 static void *
check_locked_files(void * vdata)1586 check_locked_files (void *vdata)
1587 {
1588 SeafRepo *repo = vdata;
1589 LockedFileSet *fset;
1590 GHashTableIter iter;
1591 gpointer key, value;
1592 char *path;
1593 LockedFile *locked;
1594 char index_path[SEAF_PATH_MAX];
1595 struct index_state istate;
1596
1597 fset = seaf_repo_manager_get_locked_file_set (seaf->repo_mgr, repo->id);
1598
1599 if (g_hash_table_size (fset->locked_files) == 0) {
1600 locked_file_set_free (fset);
1601 return vdata;
1602 }
1603
1604 memset (&istate, 0, sizeof(istate));
1605 snprintf (index_path, SEAF_PATH_MAX, "%s/%s", repo->manager->index_dir, repo->id);
1606 if (read_index_from (&istate, index_path, repo->version) < 0) {
1607 seaf_warning ("Failed to load index.\n");
1608 return vdata;
1609 }
1610
1611 gboolean success;
1612 g_hash_table_iter_init (&iter, fset->locked_files);
1613 while (g_hash_table_iter_next (&iter, &key, &value)) {
1614 path = key;
1615 locked = value;
1616
1617 success = FALSE;
1618 if (strcmp (locked->operation, LOCKED_OP_UPDATE) == 0)
1619 success = handle_locked_file_update (repo, &istate, fset, path, locked);
1620 else if (strcmp (locked->operation, LOCKED_OP_DELETE) == 0)
1621 success = handle_locked_file_delete (repo, &istate, fset, path, locked);
1622
1623 if (success)
1624 g_hash_table_iter_remove (&iter);
1625 }
1626
1627 discard_index (&istate);
1628 locked_file_set_free (fset);
1629
1630 return vdata;
1631 }
1632
1633 static void
check_locked_files_done(void * vdata)1634 check_locked_files_done (void *vdata)
1635 {
1636 SeafRepo *repo = vdata;
1637 repo->checking_locked_files = FALSE;
1638 }
1639
1640 #endif
1641
1642 static void
check_folder_perms_done(HttpFolderPerms * result,void * user_data)1643 check_folder_perms_done (HttpFolderPerms *result, void *user_data)
1644 {
1645 HttpServerState *server_state = user_data;
1646 GList *ptr;
1647 HttpFolderPermRes *res;
1648 gint64 now = (gint64)time(NULL);
1649
1650 server_state->checking_folder_perms = FALSE;
1651
1652 if (!result->success) {
1653 /* If on star-up we find that checking folder perms fails,
1654 * we assume the server doesn't support it.
1655 */
1656 if (server_state->last_check_perms_time == 0)
1657 server_state->folder_perms_not_supported = TRUE;
1658 server_state->last_check_perms_time = now;
1659 return;
1660 }
1661
1662 SyncInfo *info;
1663 for (ptr = result->results; ptr; ptr = ptr->next) {
1664 res = ptr->data;
1665
1666 info = get_sync_info (seaf->sync_mgr, res->repo_id);
1667 if (info->in_sync)
1668 continue;
1669
1670 seaf_repo_manager_update_folder_perms (seaf->repo_mgr, res->repo_id,
1671 FOLDER_PERM_TYPE_USER,
1672 res->user_perms);
1673 seaf_repo_manager_update_folder_perms (seaf->repo_mgr, res->repo_id,
1674 FOLDER_PERM_TYPE_GROUP,
1675 res->group_perms);
1676 seaf_repo_manager_update_folder_perm_timestamp (seaf->repo_mgr,
1677 res->repo_id,
1678 res->timestamp);
1679 }
1680
1681 server_state->last_check_perms_time = now;
1682 }
1683
1684 static void
check_folder_permissions_one_server(SeafSyncManager * mgr,const char * host,HttpServerState * server_state,GList * repos)1685 check_folder_permissions_one_server (SeafSyncManager *mgr,
1686 const char *host,
1687 HttpServerState *server_state,
1688 GList *repos)
1689 {
1690 GList *ptr;
1691 SeafRepo *repo;
1692 char *token;
1693 gint64 timestamp;
1694 HttpFolderPermReq *req;
1695 GList *requests = NULL;
1696
1697 if (!seaf_repo_manager_server_is_pro (seaf->repo_mgr, host))
1698 return;
1699
1700 gint64 now = (gint64)time(NULL);
1701
1702 if (server_state->http_version == 0 ||
1703 server_state->folder_perms_not_supported ||
1704 server_state->checking_folder_perms)
1705 return;
1706
1707 if (server_state->last_check_perms_time > 0 &&
1708 now - server_state->last_check_perms_time < CHECK_FOLDER_PERMS_INTERVAL)
1709 return;
1710
1711 for (ptr = repos; ptr; ptr = ptr->next) {
1712 repo = ptr->data;
1713
1714 if (!repo->head)
1715 continue;
1716
1717 if (g_strcmp0 (host, repo->server_url) != 0)
1718 continue;
1719
1720 token = seaf_repo_manager_get_repo_property (seaf->repo_mgr,
1721 repo->id, REPO_PROP_TOKEN);
1722 if (!token)
1723 continue;
1724
1725 timestamp = seaf_repo_manager_get_folder_perm_timestamp (seaf->repo_mgr,
1726 repo->id);
1727 if (timestamp < 0)
1728 timestamp = 0;
1729
1730 req = g_new0 (HttpFolderPermReq, 1);
1731 memcpy (req->repo_id, repo->id, 36);
1732 req->token = g_strdup(token);
1733 req->timestamp = timestamp;
1734
1735 requests = g_list_append (requests, req);
1736
1737 g_free (token);
1738 }
1739
1740 if (!requests)
1741 return;
1742
1743 server_state->checking_folder_perms = TRUE;
1744
1745 /* The requests list will be freed in http tx manager. */
1746 if (http_tx_manager_get_folder_perms (seaf->http_tx_mgr,
1747 server_state->effective_host,
1748 server_state->use_fileserver_port,
1749 requests,
1750 check_folder_perms_done,
1751 server_state) < 0) {
1752 seaf_warning ("Failed to schedule check folder permissions\n");
1753 server_state->checking_folder_perms = FALSE;
1754 }
1755 }
1756
1757 static void
check_folder_permissions(SeafSyncManager * mgr,GList * repos)1758 check_folder_permissions (SeafSyncManager *mgr, GList *repos)
1759 {
1760 GHashTableIter iter;
1761 gpointer key, value;
1762 char *host;
1763 HttpServerState *state;
1764
1765 g_hash_table_iter_init (&iter, mgr->http_server_states);
1766 while (g_hash_table_iter_next (&iter, &key, &value)) {
1767 host = key;
1768 state = value;
1769 check_folder_permissions_one_server (mgr, host, state, repos);
1770 }
1771 }
1772
1773 static void
check_server_locked_files_done(HttpLockedFiles * result,void * user_data)1774 check_server_locked_files_done (HttpLockedFiles *result, void *user_data)
1775 {
1776 HttpServerState *server_state = user_data;
1777 GList *ptr;
1778 HttpLockedFilesRes *locked_res;
1779 gint64 now = (gint64)time(NULL);
1780
1781 server_state->checking_locked_files = FALSE;
1782
1783 if (!result->success) {
1784 /* If on star-up we find that checking locked files fails,
1785 * we assume the server doesn't support it.
1786 */
1787 if (server_state->last_check_locked_files_time == 0)
1788 server_state->locked_files_not_supported = TRUE;
1789 server_state->last_check_locked_files_time = now;
1790 return;
1791 }
1792
1793 SyncInfo *info;
1794 for (ptr = result->results; ptr; ptr = ptr->next) {
1795 locked_res = ptr->data;
1796
1797 info = get_sync_info (seaf->sync_mgr, locked_res->repo_id);
1798 if (info->in_sync)
1799 continue;
1800
1801 seaf_filelock_manager_update (seaf->filelock_mgr,
1802 locked_res->repo_id,
1803 locked_res->locked_files);
1804
1805 seaf_filelock_manager_update_timestamp (seaf->filelock_mgr,
1806 locked_res->repo_id,
1807 locked_res->timestamp);
1808 }
1809
1810 server_state->last_check_locked_files_time = now;
1811 }
1812
1813 static void
check_locked_files_one_server(SeafSyncManager * mgr,const char * host,HttpServerState * server_state,GList * repos)1814 check_locked_files_one_server (SeafSyncManager *mgr,
1815 const char *host,
1816 HttpServerState *server_state,
1817 GList *repos)
1818 {
1819 GList *ptr;
1820 SeafRepo *repo;
1821 char *token;
1822 gint64 timestamp;
1823 HttpLockedFilesReq *req;
1824 GList *requests = NULL;
1825
1826 if (!seaf_repo_manager_server_is_pro (seaf->repo_mgr, host))
1827 return;
1828
1829 gint64 now = (gint64)time(NULL);
1830
1831 if (server_state->http_version == 0 ||
1832 server_state->locked_files_not_supported ||
1833 server_state->checking_locked_files)
1834 return;
1835
1836 if (server_state->last_check_locked_files_time > 0 &&
1837 now - server_state->last_check_locked_files_time < CHECK_FOLDER_PERMS_INTERVAL)
1838 return;
1839
1840 for (ptr = repos; ptr; ptr = ptr->next) {
1841 repo = ptr->data;
1842
1843 if (!repo->head)
1844 continue;
1845
1846 if (g_strcmp0 (host, repo->server_url) != 0)
1847 continue;
1848
1849 token = seaf_repo_manager_get_repo_property (seaf->repo_mgr,
1850 repo->id, REPO_PROP_TOKEN);
1851 if (!token)
1852 continue;
1853
1854 timestamp = seaf_filelock_manager_get_timestamp (seaf->filelock_mgr,
1855 repo->id);
1856 if (timestamp < 0)
1857 timestamp = 0;
1858
1859 req = g_new0 (HttpLockedFilesReq, 1);
1860 memcpy (req->repo_id, repo->id, 36);
1861 req->token = g_strdup(token);
1862 req->timestamp = timestamp;
1863
1864 requests = g_list_append (requests, req);
1865
1866 g_free (token);
1867 }
1868
1869 if (!requests)
1870 return;
1871
1872 server_state->checking_locked_files = TRUE;
1873
1874 /* The requests list will be freed in http tx manager. */
1875 if (http_tx_manager_get_locked_files (seaf->http_tx_mgr,
1876 server_state->effective_host,
1877 server_state->use_fileserver_port,
1878 requests,
1879 check_server_locked_files_done,
1880 server_state) < 0) {
1881 seaf_warning ("Failed to schedule check server locked files\n");
1882 server_state->checking_locked_files = FALSE;
1883 }
1884 }
1885
1886 static void
check_server_locked_files(SeafSyncManager * mgr,GList * repos)1887 check_server_locked_files (SeafSyncManager *mgr, GList *repos)
1888 {
1889 GHashTableIter iter;
1890 gpointer key, value;
1891 char *host;
1892 HttpServerState *state;
1893
1894 g_hash_table_iter_init (&iter, mgr->http_server_states);
1895 while (g_hash_table_iter_next (&iter, &key, &value)) {
1896 host = key;
1897 state = value;
1898 check_locked_files_one_server (mgr, host, state, repos);
1899 }
1900 }
1901
1902 #if 0
1903 static void
1904 print_active_paths (SeafSyncManager *mgr)
1905 {
1906 int n = seaf_sync_manager_active_paths_number(mgr);
1907 seaf_message ("%d active paths\n\n", n);
1908 if (n < 10) {
1909 char *paths_json = seaf_sync_manager_list_active_paths_json (mgr);
1910 seaf_message ("%s\n", paths_json);
1911 g_free (paths_json);
1912 }
1913 }
1914 #endif
1915
1916 inline static gboolean
periodic_sync_due(SeafRepo * repo)1917 periodic_sync_due (SeafRepo *repo)
1918 {
1919 int now = (int)time(NULL);
1920 return (now > (repo->last_sync_time + repo->sync_interval));
1921 }
1922
1923 static int
auto_sync_pulse(void * vmanager)1924 auto_sync_pulse (void *vmanager)
1925 {
1926 SeafSyncManager *manager = vmanager;
1927 GList *repos, *ptr;
1928 SeafRepo *repo;
1929
1930 repos = seaf_repo_manager_get_repo_list (manager->seaf->repo_mgr, -1, -1);
1931
1932 check_folder_permissions (manager, repos);
1933
1934 check_server_locked_files (manager, repos);
1935
1936 /* Sort repos by last_sync_time, so that we don't "starve" any repo. */
1937 repos = g_list_sort_with_data (repos, cmp_repos_by_sync_time, NULL);
1938
1939 for (ptr = repos; ptr != NULL; ptr = ptr->next) {
1940 repo = ptr->data;
1941
1942 /* Every second, we'll check the worktree to see if it still exists.
1943 * We'll invalidate worktree if it gets moved or deleted.
1944 * But there is a hole here: If the user delete the worktree dir and
1945 * recreate a dir with the same name within a second, we'll falsely
1946 * see the worktree as valid. What's worse, the new worktree dir won't
1947 * be monitored.
1948 * This problem can only be solved by restart.
1949 */
1950 /* If repo has been checked out and the worktree doesn't exist,
1951 * we'll delete the repo automatically.
1952 */
1953
1954 if (repo->head != NULL) {
1955 if (seaf_repo_check_worktree (repo) < 0) {
1956 if (!repo->worktree_invalid) {
1957 // The repo worktree was valid, but now it's invalid
1958 seaf_repo_manager_invalidate_repo_worktree (seaf->repo_mgr, repo);
1959 if (!seafile_session_config_get_allow_invalid_worktree(seaf)) {
1960 auto_delete_repo (manager, repo);
1961 }
1962 }
1963 continue;
1964 } else {
1965 if (repo->worktree_invalid) {
1966 // The repo worktree was invalid, but now it's valid again,
1967 // so we start watch it
1968 seaf_repo_manager_validate_repo_worktree (seaf->repo_mgr, repo);
1969 continue;
1970 }
1971 }
1972 }
1973
1974 repo->worktree_invalid = FALSE;
1975
1976 #ifdef USE_GPL_CRYPTO
1977 if (repo->version == 0 || (repo->encrypted && repo->enc_version < 2)) {
1978 continue;
1979 }
1980 #endif
1981
1982 if (!repo->token) {
1983 /* If the user has logged out of the account, the repo token would
1984 * be null */
1985 seaf_debug ("repo token of %s (%.8s) is null, would not sync it\n", repo->name, repo->id);
1986 continue;
1987 }
1988
1989 /* Don't sync repos not checked out yet. */
1990 if (!repo->head)
1991 continue;
1992
1993 if (!manager->priv->auto_sync_enabled || !repo->auto_sync)
1994 continue;
1995
1996 #if defined WIN32 || defined __APPLE__
1997 if (repo->version > 0) {
1998 if (repo->checking_locked_files)
1999 continue;
2000
2001 gint64 now = (gint64)time(NULL);
2002 if (repo->last_check_locked_time == 0 ||
2003 now - repo->last_check_locked_time >= CHECK_LOCKED_FILES_INTERVAL)
2004 {
2005 repo->checking_locked_files = TRUE;
2006 if (seaf_job_manager_schedule_job (seaf->job_mgr,
2007 check_locked_files,
2008 check_locked_files_done,
2009 repo) < 0) {
2010 seaf_warning ("Failed to schedule check local locked files\n");
2011 repo->checking_locked_files = FALSE;
2012 } else {
2013 repo->last_check_locked_time = now;
2014 }
2015
2016 }
2017 }
2018 #endif
2019
2020 SyncInfo *info = get_sync_info (manager, repo->id);
2021
2022 if (info->in_sync)
2023 continue;
2024
2025 if (info->sync_perm_err_cnt > SYNC_PERM_ERROR_RETRY_TIME)
2026 continue;
2027
2028 if (repo->version > 0) {
2029 /* For repo version > 0, only use http sync. */
2030 if (check_http_protocol (manager, repo)) {
2031 if (repo->sync_interval == 0)
2032 sync_repo_v2 (manager, repo, FALSE);
2033 else if (periodic_sync_due (repo))
2034 sync_repo_v2 (manager, repo, TRUE);
2035 }
2036 } else {
2037 seaf_warning ("Repo %s(%s) is version 0 library. Syncing is no longer supported.\n",
2038 repo->name, repo->id);
2039 }
2040 }
2041
2042 g_list_free (repos);
2043 return TRUE;
2044 }
2045
2046 static void
on_repo_http_fetched(SeafileSession * seaf,HttpTxTask * tx_task,SeafSyncManager * manager)2047 on_repo_http_fetched (SeafileSession *seaf,
2048 HttpTxTask *tx_task,
2049 SeafSyncManager *manager)
2050 {
2051 SyncInfo *info = get_sync_info (manager, tx_task->repo_id);
2052 SyncTask *task = info->current_task;
2053
2054 /* Clone tasks are handled by clone manager. */
2055 if (tx_task->is_clone)
2056 return;
2057
2058 if (task->repo->delete_pending) {
2059 transition_sync_state (task, SYNC_STATE_CANCELED);
2060 seaf_repo_manager_del_repo (seaf->repo_mgr, task->repo);
2061 return;
2062 }
2063
2064 if (tx_task->state == HTTP_TASK_STATE_FINISHED) {
2065 memcpy (info->head_commit, tx_task->head, 41);
2066 transition_sync_state (task, SYNC_STATE_DONE);
2067 } else if (tx_task->state == HTTP_TASK_STATE_CANCELED) {
2068 transition_sync_state (task, SYNC_STATE_CANCELED);
2069 } else if (tx_task->state == HTTP_TASK_STATE_ERROR) {
2070 if (tx_task->error == SYNC_ERROR_ID_SERVER_REPO_DELETED) {
2071 on_repo_deleted_on_server (task, task->repo);
2072 } else {
2073 set_task_error (task, tx_task->error);
2074 }
2075 }
2076 }
2077
2078 static void
on_repo_http_uploaded(SeafileSession * seaf,HttpTxTask * tx_task,SeafSyncManager * manager)2079 on_repo_http_uploaded (SeafileSession *seaf,
2080 HttpTxTask *tx_task,
2081 SeafSyncManager *manager)
2082 {
2083 SyncInfo *info = get_sync_info (manager, tx_task->repo_id);
2084 SyncTask *task = info->current_task;
2085
2086 g_return_if_fail (task != NULL && info->in_sync);
2087
2088 if (task->repo->delete_pending) {
2089 transition_sync_state (task, SYNC_STATE_CANCELED);
2090 seaf_repo_manager_del_repo (seaf->repo_mgr, task->repo);
2091 return;
2092 }
2093
2094 if (tx_task->state == HTTP_TASK_STATE_FINISHED) {
2095 memcpy (info->head_commit, tx_task->head, 41);
2096
2097 /* Save current head commit id for GC. */
2098 seaf_repo_manager_set_repo_property (seaf->repo_mgr,
2099 task->repo->id,
2100 REPO_LOCAL_HEAD,
2101 task->repo->head->commit_id);
2102 task->uploaded = TRUE;
2103 check_head_commit_http (task);
2104 } else if (tx_task->state == HTTP_TASK_STATE_CANCELED) {
2105 transition_sync_state (task, SYNC_STATE_CANCELED);
2106 } else if (tx_task->state == HTTP_TASK_STATE_ERROR) {
2107 if (tx_task->error == SYNC_ERROR_ID_SERVER_REPO_DELETED) {
2108 on_repo_deleted_on_server (task, task->repo);
2109 } else {
2110 set_task_error (task, tx_task->error);
2111 }
2112 }
2113 }
2114
2115 const char *
sync_state_to_str(int state)2116 sync_state_to_str (int state)
2117 {
2118 if (state < 0 || state >= SYNC_STATE_NUM) {
2119 seaf_warning ("illegal sync state: %d\n", state);
2120 return NULL;
2121 }
2122
2123 return sync_state_str[state];
2124 }
2125
2126 static void
disable_auto_sync_for_repos(SeafSyncManager * mgr)2127 disable_auto_sync_for_repos (SeafSyncManager *mgr)
2128 {
2129 GList *repos;
2130 GList *ptr;
2131 SeafRepo *repo;
2132
2133 repos = seaf_repo_manager_get_repo_list (seaf->repo_mgr, -1, -1);
2134 for (ptr = repos; ptr; ptr = ptr->next) {
2135 repo = ptr->data;
2136 if (repo->sync_interval == 0)
2137 seaf_wt_monitor_unwatch_repo (seaf->wt_monitor, repo->id);
2138 seaf_sync_manager_cancel_sync_task (mgr, repo->id);
2139 seaf_sync_manager_remove_active_path_info (mgr, repo->id);
2140 }
2141
2142 g_list_free (repos);
2143 }
2144
2145 int
seaf_sync_manager_disable_auto_sync(SeafSyncManager * mgr)2146 seaf_sync_manager_disable_auto_sync (SeafSyncManager *mgr)
2147 {
2148 if (!seaf->started) {
2149 seaf_message ("sync manager is not started, skip disable auto sync.\n");
2150 return -1;
2151 }
2152
2153 disable_auto_sync_for_repos (mgr);
2154
2155 mgr->priv->auto_sync_enabled = FALSE;
2156 g_debug ("[sync mgr] auto sync is disabled\n");
2157 return 0;
2158 }
2159
2160 static void
enable_auto_sync_for_repos(SeafSyncManager * mgr)2161 enable_auto_sync_for_repos (SeafSyncManager *mgr)
2162 {
2163 GList *repos;
2164 GList *ptr;
2165 SeafRepo *repo;
2166
2167 repos = seaf_repo_manager_get_repo_list (seaf->repo_mgr, -1, -1);
2168 for (ptr = repos; ptr; ptr = ptr->next) {
2169 repo = ptr->data;
2170 if (repo->sync_interval == 0)
2171 seaf_wt_monitor_watch_repo (seaf->wt_monitor, repo->id, repo->worktree);
2172 }
2173
2174 g_list_free (repos);
2175 }
2176
2177 int
seaf_sync_manager_enable_auto_sync(SeafSyncManager * mgr)2178 seaf_sync_manager_enable_auto_sync (SeafSyncManager *mgr)
2179 {
2180 if (!seaf->started) {
2181 seaf_message ("sync manager is not started, skip enable auto sync.\n");
2182 return -1;
2183 }
2184
2185 enable_auto_sync_for_repos (mgr);
2186
2187 mgr->priv->auto_sync_enabled = TRUE;
2188 g_debug ("[sync mgr] auto sync is enabled\n");
2189 return 0;
2190 }
2191
2192 int
seaf_sync_manager_is_auto_sync_enabled(SeafSyncManager * mgr)2193 seaf_sync_manager_is_auto_sync_enabled (SeafSyncManager *mgr)
2194 {
2195 if (mgr->priv->auto_sync_enabled)
2196 return 1;
2197 else
2198 return 0;
2199 }
2200
2201 static ActivePathsInfo *
active_paths_info_new(SeafRepo * repo)2202 active_paths_info_new (SeafRepo *repo)
2203 {
2204 ActivePathsInfo *info = g_new0 (ActivePathsInfo, 1);
2205
2206 info->paths = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL);
2207 info->syncing_tree = sync_status_tree_new (repo->worktree);
2208 info->synced_tree = sync_status_tree_new (repo->worktree);
2209
2210 return info;
2211 }
2212
2213 static void
active_paths_info_free(ActivePathsInfo * info)2214 active_paths_info_free (ActivePathsInfo *info)
2215 {
2216 if (!info)
2217 return;
2218 g_hash_table_destroy (info->paths);
2219 sync_status_tree_free (info->syncing_tree);
2220 sync_status_tree_free (info->synced_tree);
2221 g_free (info);
2222 }
2223
2224 void
seaf_sync_manager_update_active_path(SeafSyncManager * mgr,const char * repo_id,const char * path,int mode,SyncStatus status,gboolean refresh)2225 seaf_sync_manager_update_active_path (SeafSyncManager *mgr,
2226 const char *repo_id,
2227 const char *path,
2228 int mode,
2229 SyncStatus status,
2230 gboolean refresh)
2231 {
2232 ActivePathsInfo *info;
2233 SeafRepo *repo;
2234
2235 if (!repo_id || !path) {
2236 seaf_warning ("BUG: empty repo_id or path.\n");
2237 return;
2238 }
2239
2240 if (status <= SYNC_STATUS_NONE || status >= N_SYNC_STATUS) {
2241 seaf_warning ("BUG: invalid sync status %d.\n", status);
2242 return;
2243 }
2244
2245 pthread_mutex_lock (&mgr->priv->paths_lock);
2246
2247 info = g_hash_table_lookup (mgr->priv->active_paths, repo_id);
2248 if (!info) {
2249 repo = seaf_repo_manager_get_repo (seaf->repo_mgr, repo_id);
2250 if (!repo) {
2251 pthread_mutex_unlock (&mgr->priv->paths_lock);
2252 return;
2253 }
2254 info = active_paths_info_new (repo);
2255 g_hash_table_insert (mgr->priv->active_paths, g_strdup(repo_id), info);
2256 }
2257
2258 SyncStatus existing = (SyncStatus) g_hash_table_lookup (info->paths, path);
2259 if (!existing) {
2260 g_hash_table_insert (info->paths, g_strdup(path), (void*)status);
2261 if (status == SYNC_STATUS_SYNCING)
2262 sync_status_tree_add (info->syncing_tree, path, mode, refresh);
2263 else if (status == SYNC_STATUS_SYNCED)
2264 sync_status_tree_add (info->synced_tree, path, mode, refresh);
2265 else {
2266 #ifdef WIN32
2267 seaf_sync_manager_add_refresh_path (mgr, path);
2268 #endif
2269 }
2270 } else if (existing != status) {
2271 g_hash_table_replace (info->paths, g_strdup(path), (void*)status);
2272
2273 if (existing == SYNC_STATUS_SYNCING)
2274 sync_status_tree_del (info->syncing_tree, path);
2275 else if (existing == SYNC_STATUS_SYNCED)
2276 sync_status_tree_del (info->synced_tree, path);
2277
2278 if (status == SYNC_STATUS_SYNCING)
2279 sync_status_tree_add (info->syncing_tree, path, mode, refresh);
2280 else if (status == SYNC_STATUS_SYNCED)
2281 sync_status_tree_add (info->synced_tree, path, mode, refresh);
2282
2283 #ifdef WIN32
2284 seaf_sync_manager_add_refresh_path (mgr, path);
2285 #endif
2286 }
2287
2288 pthread_mutex_unlock (&mgr->priv->paths_lock);
2289 }
2290
2291 void
seaf_sync_manager_delete_active_path(SeafSyncManager * mgr,const char * repo_id,const char * path)2292 seaf_sync_manager_delete_active_path (SeafSyncManager *mgr,
2293 const char *repo_id,
2294 const char *path)
2295 {
2296 ActivePathsInfo *info;
2297
2298 if (!repo_id || !path) {
2299 seaf_warning ("BUG: empty repo_id or path.\n");
2300 return;
2301 }
2302
2303 pthread_mutex_lock (&mgr->priv->paths_lock);
2304
2305 info = g_hash_table_lookup (mgr->priv->active_paths, repo_id);
2306 if (!info) {
2307 pthread_mutex_unlock (&mgr->priv->paths_lock);
2308 return;
2309 }
2310
2311 g_hash_table_remove (info->paths, path);
2312 sync_status_tree_del (info->syncing_tree, path);
2313 sync_status_tree_del (info->synced_tree, path);
2314
2315 pthread_mutex_unlock (&mgr->priv->paths_lock);
2316 }
2317
2318 static char *path_status_tbl[] = {
2319 "none",
2320 "syncing",
2321 "error",
2322 "ignored",
2323 "synced",
2324 "paused",
2325 "readonly",
2326 "locked",
2327 "locked_by_me",
2328 NULL,
2329 };
2330
2331 static char *
get_repo_sync_status(SeafSyncManager * mgr,const char * repo_id)2332 get_repo_sync_status (SeafSyncManager *mgr, const char *repo_id)
2333 {
2334 SyncInfo *info = get_sync_info (mgr, repo_id);
2335 SeafRepo *repo;
2336
2337 if (info->in_error)
2338 return g_strdup(path_status_tbl[SYNC_STATUS_ERROR]);
2339
2340 repo = seaf_repo_manager_get_repo (seaf->repo_mgr, repo_id);
2341 if (!repo)
2342 return g_strdup(path_status_tbl[SYNC_STATUS_NONE]);
2343
2344 if (!repo->auto_sync || !mgr->priv->auto_sync_enabled)
2345 return g_strdup(path_status_tbl[SYNC_STATUS_PAUSED]);
2346
2347 char allzeros[41] = {0};
2348 if (!info->in_sync && memcmp(allzeros, info->head_commit, 41) == 0)
2349 return g_strdup(path_status_tbl[SYNC_STATUS_NONE]);
2350
2351 if (info->in_sync &&
2352 (info->current_task->state == SYNC_STATE_COMMIT ||
2353 info->current_task->state == SYNC_STATE_FETCH ||
2354 info->current_task->state == SYNC_STATE_UPLOAD ||
2355 info->current_task->state == SYNC_STATE_MERGE))
2356 return g_strdup(path_status_tbl[SYNC_STATUS_SYNCING]);
2357 else if (!repo->is_readonly)
2358 return g_strdup(path_status_tbl[SYNC_STATUS_SYNCED]);
2359 else
2360 return g_strdup(path_status_tbl[SYNC_STATUS_READONLY]);
2361 }
2362
2363 char *
seaf_sync_manager_get_path_sync_status(SeafSyncManager * mgr,const char * repo_id,const char * path,gboolean is_dir)2364 seaf_sync_manager_get_path_sync_status (SeafSyncManager *mgr,
2365 const char *repo_id,
2366 const char *path,
2367 gboolean is_dir)
2368 {
2369 ActivePathsInfo *info;
2370 SyncInfo *sync_info;
2371 SyncStatus ret = SYNC_STATUS_NONE;
2372
2373 if (!repo_id || !path) {
2374 seaf_warning ("BUG: empty repo_id or path.\n");
2375 return NULL;
2376 }
2377
2378 if (path[0] == 0) {
2379 return get_repo_sync_status (mgr, repo_id);
2380 }
2381
2382 /* If the repo is in error, all files in it should show no sync status. */
2383 sync_info = get_sync_info (mgr, repo_id);
2384 if (sync_info && sync_info->in_error) {
2385 ret = SYNC_STATUS_NONE;
2386 goto out;
2387 }
2388
2389 pthread_mutex_lock (&mgr->priv->paths_lock);
2390
2391 info = g_hash_table_lookup (mgr->priv->active_paths, repo_id);
2392 if (!info) {
2393 pthread_mutex_unlock (&mgr->priv->paths_lock);
2394 ret = SYNC_STATUS_NONE;
2395 goto out;
2396 }
2397
2398 ret = (SyncStatus) g_hash_table_lookup (info->paths, path);
2399 if (is_dir && (ret == SYNC_STATUS_NONE)) {
2400 /* If a dir is not in the syncing tree but in the synced tree,
2401 * it's synced. Otherwise if it's in the syncing tree, some files
2402 * under it must be syncing, so it should be in syncing status too.
2403 */
2404 if (sync_status_tree_exists (info->syncing_tree, path))
2405 ret = SYNC_STATUS_SYNCING;
2406 else if (sync_status_tree_exists (info->synced_tree, path))
2407 ret = SYNC_STATUS_SYNCED;
2408 }
2409
2410 pthread_mutex_unlock (&mgr->priv->paths_lock);
2411
2412 if (ret == SYNC_STATUS_SYNCED) {
2413 if (!seaf_repo_manager_is_path_writable(seaf->repo_mgr, repo_id, path))
2414 ret = SYNC_STATUS_READONLY;
2415 else if (seaf_filelock_manager_is_file_locked_by_me (seaf->filelock_mgr,
2416 repo_id, path))
2417 ret = SYNC_STATUS_LOCKED_BY_ME;
2418 else if (seaf_filelock_manager_is_file_locked (seaf->filelock_mgr,
2419 repo_id, path))
2420 ret = SYNC_STATUS_LOCKED;
2421 }
2422
2423 out:
2424 return g_strdup(path_status_tbl[ret]);
2425 }
2426
2427 static json_t *
active_paths_to_json(GHashTable * paths)2428 active_paths_to_json (GHashTable *paths)
2429 {
2430 json_t *array = NULL, *obj = NULL;
2431 GHashTableIter iter;
2432 gpointer key, value;
2433 char *path;
2434 SyncStatus status;
2435
2436 array = json_array ();
2437
2438 g_hash_table_iter_init (&iter, paths);
2439 while (g_hash_table_iter_next (&iter, &key, &value)) {
2440 path = key;
2441 status = (SyncStatus)value;
2442
2443 obj = json_object ();
2444 json_object_set (obj, "path", json_string(path));
2445 json_object_set (obj, "status", json_string(path_status_tbl[status]));
2446
2447 json_array_append (array, obj);
2448 }
2449
2450 return array;
2451 }
2452
2453 char *
seaf_sync_manager_list_active_paths_json(SeafSyncManager * mgr)2454 seaf_sync_manager_list_active_paths_json (SeafSyncManager *mgr)
2455 {
2456 json_t *array = NULL, *obj = NULL, *path_array = NULL;
2457 GHashTableIter iter;
2458 gpointer key, value;
2459 char *repo_id;
2460 ActivePathsInfo *info;
2461 char *ret = NULL;
2462
2463 pthread_mutex_lock (&mgr->priv->paths_lock);
2464
2465 array = json_array ();
2466
2467 g_hash_table_iter_init (&iter, mgr->priv->active_paths);
2468 while (g_hash_table_iter_next (&iter, &key, &value)) {
2469 repo_id = key;
2470 info = value;
2471
2472 obj = json_object();
2473 path_array = active_paths_to_json (info->paths);
2474 json_object_set (obj, "repo_id", json_string(repo_id));
2475 json_object_set (obj, "paths", path_array);
2476
2477 json_array_append (array, obj);
2478 }
2479
2480 pthread_mutex_unlock (&mgr->priv->paths_lock);
2481
2482 ret = json_dumps (array, JSON_INDENT(4));
2483 if (!ret) {
2484 seaf_warning ("Failed to convert active paths to json\n");
2485 }
2486
2487 json_decref (array);
2488
2489 return ret;
2490 }
2491
2492 int
seaf_sync_manager_active_paths_number(SeafSyncManager * mgr)2493 seaf_sync_manager_active_paths_number (SeafSyncManager *mgr)
2494 {
2495 GHashTableIter iter;
2496 gpointer key, value;
2497 ActivePathsInfo *info;
2498 int ret = 0;
2499
2500 g_hash_table_iter_init (&iter, mgr->priv->active_paths);
2501 while (g_hash_table_iter_next (&iter, &key, &value)) {
2502 info = value;
2503 ret += g_hash_table_size(info->paths);
2504 }
2505
2506 return ret;
2507 }
2508
2509 void
seaf_sync_manager_remove_active_path_info(SeafSyncManager * mgr,const char * repo_id)2510 seaf_sync_manager_remove_active_path_info (SeafSyncManager *mgr, const char *repo_id)
2511 {
2512 pthread_mutex_lock (&mgr->priv->paths_lock);
2513
2514 g_hash_table_remove (mgr->priv->active_paths, repo_id);
2515
2516 pthread_mutex_unlock (&mgr->priv->paths_lock);
2517
2518 #ifdef WIN32
2519 /* This is a hack to tell Windows Explorer to refresh all open windows. */
2520 SHChangeNotify (SHCNE_ASSOCCHANGED, SHCNF_IDLIST, NULL, NULL);
2521 #endif
2522 }
2523
2524 #ifdef WIN32
2525
2526 static wchar_t *
win_path(const char * path)2527 win_path (const char *path)
2528 {
2529 char *ret = g_strdup(path);
2530 wchar_t *ret_w;
2531 char *p;
2532
2533 for (p = ret; *p != 0; ++p)
2534 if (*p == '/')
2535 *p = '\\';
2536
2537 ret_w = g_utf8_to_utf16 (ret, -1, NULL, NULL, NULL);
2538
2539 g_free (ret);
2540 return ret_w;
2541 }
2542
2543 static void *
refresh_windows_explorer_thread(void * vdata)2544 refresh_windows_explorer_thread (void *vdata)
2545 {
2546 GAsyncQueue *q = vdata;
2547 char *path;
2548 wchar_t *wpath;
2549 int count = 0;
2550
2551 while (1) {
2552 path = g_async_queue_pop (q);
2553 wpath = win_path (path);
2554
2555 SHChangeNotify (SHCNE_ATTRIBUTES, SHCNF_PATHW, wpath, NULL);
2556
2557 g_free (path);
2558 g_free (wpath);
2559
2560 if (++count >= 100) {
2561 g_usleep (G_USEC_PER_SEC);
2562 count = 0;
2563 }
2564 }
2565
2566 return NULL;
2567 }
2568
2569 void
seaf_sync_manager_add_refresh_path(SeafSyncManager * mgr,const char * path)2570 seaf_sync_manager_add_refresh_path (SeafSyncManager *mgr, const char *path)
2571 {
2572 g_async_queue_push (mgr->priv->refresh_paths, g_strdup(path));
2573 }
2574
2575 void
seaf_sync_manager_refresh_path(SeafSyncManager * mgr,const char * path)2576 seaf_sync_manager_refresh_path (SeafSyncManager *mgr, const char *path)
2577 {
2578 wchar_t *wpath;
2579
2580 wpath = win_path (path);
2581 SHChangeNotify (SHCNE_ATTRIBUTES, SHCNF_PATHW, wpath, NULL);
2582 g_free (wpath);
2583 }
2584
2585 #endif
2586
2587 static void
update_head_commit_ids_for_server(gpointer key,gpointer value,gpointer user_data)2588 update_head_commit_ids_for_server (gpointer key, gpointer value, gpointer user_data)
2589 {
2590 char *server_url = key;
2591 HttpServerState *state = value;
2592
2593 /* Only get head commit ids from server if:
2594 * 1. syncing protocol version has been checked, and
2595 * 2. protocol version is at least 2.
2596 */
2597 if (state->http_version >= 2) {
2598 seaf_debug ("Updating repo head commit ids for server %s.\n", server_url);
2599 GList *repo_id_list = seaf_repo_manager_get_repo_id_list_by_server (seaf->repo_mgr,
2600 server_url);
2601 if (!repo_id_list) {
2602 return;
2603 }
2604
2605 GHashTable *new_map = http_tx_manager_get_head_commit_ids (seaf->http_tx_mgr,
2606 state->effective_host,
2607 state->use_fileserver_port,
2608 repo_id_list);
2609 if (new_map) {
2610 pthread_mutex_lock (&state->head_commit_map_lock);
2611 g_hash_table_destroy (state->head_commit_map);
2612 state->head_commit_map = new_map;
2613 if (!state->head_commit_map_init)
2614 state->head_commit_map_init = TRUE;
2615 state->last_update_head_commit_map_time = (gint64)time(NULL);
2616 pthread_mutex_unlock (&state->head_commit_map_lock);
2617 }
2618
2619 g_list_free_full (repo_id_list, g_free);
2620 }
2621 }
2622
2623 static void *
update_cached_head_commit_ids(void * arg)2624 update_cached_head_commit_ids (void *arg)
2625 {
2626 SeafSyncManager *mgr = (SeafSyncManager *)arg;
2627
2628 while (1) {
2629 g_usleep (30 * G_USEC_PER_SEC);
2630
2631 g_hash_table_foreach (mgr->http_server_states, update_head_commit_ids_for_server, mgr);
2632 }
2633
2634 return NULL;
2635 }
2636