1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3 #include "common.h"
4
5 #include "utils.h"
6
7 #define DEBUG_FLAG SEAFILE_DEBUG_OTHER
8 #include "log.h"
9
10 #include <timer.h>
11 #include <pthread.h>
12
13 #include "seafile-session.h"
14 #include "commit-mgr.h"
15 #include "branch-mgr.h"
16 #include "repo-mgr.h"
17 #include "fs-mgr.h"
18 #include "seafile-error.h"
19 #include "seafile-crypt.h"
20 #include "merge-new.h"
21 #include "seafile-error.h"
22
23 #include "seaf-db.h"
24 #include "diff-simple.h"
25
26 #define MAX_RUNNING_TASKS 5
27 #define SCHEDULE_INTERVAL 1000 /* 1s */
28
29 typedef struct MergeTask {
30 char repo_id[37];
31 } MergeTask;
32
33 typedef struct MergeScheduler {
34 pthread_mutex_t q_lock;
35 GQueue *queue;
36 GHashTable *running;
37 CcnetJobManager *tpool;
38 CcnetTimer *timer;
39 } MergeScheduler;
40
41 static MergeScheduler *scheduler = NULL;
42
43 static void
44 add_merge_task (const char *repo_id);
45
46 static int
save_virtual_repo_info(SeafRepoManager * mgr,const char * repo_id,const char * origin_repo_id,const char * path,const char * base_commit)47 save_virtual_repo_info (SeafRepoManager *mgr,
48 const char *repo_id,
49 const char *origin_repo_id,
50 const char *path,
51 const char *base_commit)
52 {
53 int ret = 0;
54
55 if (seaf_db_statement_query (mgr->seaf->db,
56 "INSERT INTO VirtualRepo (repo_id, origin_repo, path, base_commit) VALUES (?, ?, ?, ?)",
57 4, "string", repo_id, "string", origin_repo_id,
58 "string", path, "string", base_commit) < 0)
59 ret = -1;
60
61 return ret;
62 }
63
64 static int
do_create_virtual_repo(SeafRepoManager * mgr,SeafRepo * origin_repo,const char * repo_id,const char * repo_name,const char * repo_desc,const char * root_id,const char * user,const char * passwd,GError ** error)65 do_create_virtual_repo (SeafRepoManager *mgr,
66 SeafRepo *origin_repo,
67 const char *repo_id,
68 const char *repo_name,
69 const char *repo_desc,
70 const char *root_id,
71 const char *user,
72 const char *passwd,
73 GError **error)
74 {
75 SeafRepo *repo = NULL;
76 SeafCommit *commit = NULL;
77 SeafBranch *master = NULL;
78 int ret = 0;
79
80 repo = seaf_repo_new (repo_id, repo_name, repo_desc);
81
82 repo->no_local_history = TRUE;
83 if (passwd != NULL && passwd[0] != '\0') {
84 repo->encrypted = TRUE;
85 repo->enc_version = origin_repo->enc_version;
86 if (repo->enc_version >= 3)
87 memcpy (repo->salt, origin_repo->salt, 64);
88 seafile_generate_magic (repo->enc_version, repo_id, passwd, repo->salt, repo->magic);
89 if (repo->enc_version >= 2)
90 memcpy (repo->random_key, origin_repo->random_key, 96);
91 }
92
93 /* Virtual repos share fs and block store with origin repo and
94 * have the same version as the origin.
95 */
96 repo->version = origin_repo->version;
97 memcpy (repo->store_id, origin_repo->id, 36);
98
99 commit = seaf_commit_new (NULL, repo->id,
100 root_id, /* root id */
101 user, /* creator */
102 EMPTY_SHA1, /* creator id */
103 repo_desc, /* description */
104 0); /* ctime */
105
106 seaf_repo_to_commit (repo, commit);
107 if (seaf_commit_manager_add_commit (seaf->commit_mgr, commit) < 0) {
108 seaf_warning ("Failed to add commit.\n");
109 g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_GENERAL,
110 "Failed to add commit");
111 ret = -1;
112 goto out;
113 }
114
115 master = seaf_branch_new ("master", repo->id, commit->commit_id);
116 if (seaf_branch_manager_add_branch (seaf->branch_mgr, master) < 0) {
117 seaf_warning ("Failed to add branch.\n");
118 g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_GENERAL,
119 "Failed to add branch");
120 ret = -1;
121 goto out;
122 }
123
124 if (seaf_repo_set_head (repo, master) < 0) {
125 seaf_warning ("Failed to set repo head.\n");
126 g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_GENERAL,
127 "Failed to set repo head.");
128 ret = -1;
129 goto out;
130 }
131
132 if (seaf_repo_manager_add_repo (mgr, repo) < 0) {
133 seaf_warning ("Failed to add repo.\n");
134 g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_GENERAL,
135 "Failed to add repo.");
136 ret = -1;
137 goto out;
138 }
139
140 if (set_repo_commit_to_db (repo_id, repo_name, commit->ctime,
141 repo->version, repo->encrypted, user) < 0) {
142 seaf_warning("Failed to add repo info.\n");
143 g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_GENERAL,
144 "Failed to add repo info");
145 ret = -1;
146 goto out;
147 }
148
149 out:
150 if (repo)
151 seaf_repo_unref (repo);
152 if (commit)
153 seaf_commit_unref (commit);
154 if (master)
155 seaf_branch_unref (master);
156
157 return ret;
158 }
159
160 static void
update_repo_size(const char * repo_id)161 update_repo_size(const char *repo_id)
162 {
163 schedule_repo_size_computation (seaf->size_sched, repo_id);
164 }
165
166 static char *
get_existing_virtual_repo(SeafRepoManager * mgr,const char * origin_repo_id,const char * path)167 get_existing_virtual_repo (SeafRepoManager *mgr,
168 const char *origin_repo_id,
169 const char *path)
170 {
171 char *sql = "SELECT repo_id FROM VirtualRepo WHERE origin_repo = ? AND path = ?";
172
173 return seaf_db_statement_get_string (mgr->seaf->db, sql, 2,
174 "string", origin_repo_id, "string", path);
175 }
176
177 static char *
create_virtual_repo_common(SeafRepoManager * mgr,const char * origin_repo_id,const char * path,const char * repo_name,const char * repo_desc,const char * owner,const char * passwd,GError ** error)178 create_virtual_repo_common (SeafRepoManager *mgr,
179 const char *origin_repo_id,
180 const char *path,
181 const char *repo_name,
182 const char *repo_desc,
183 const char *owner,
184 const char *passwd,
185 GError **error)
186 {
187 SeafRepo *origin_repo = NULL;
188 SeafCommit *origin_head = NULL;
189 char *repo_id = NULL;
190 char *dir_id = NULL;
191
192 origin_repo = seaf_repo_manager_get_repo (mgr, origin_repo_id);
193 if (!origin_repo) {
194 seaf_warning ("Failed to get origin repo %.10s\n", origin_repo_id);
195 g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_GENERAL,
196 "Origin library not exists");
197 return NULL;
198 }
199 if (origin_repo->status != REPO_STATUS_NORMAL) {
200 seaf_warning("Status of repo %.8s is %d, can't create VirtualRepo\n",
201 origin_repo_id, origin_repo->status);
202 g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_GENERAL,
203 "Unnormal repo status");
204 seaf_repo_unref (origin_repo);
205 return NULL;
206 }
207
208 if (origin_repo->encrypted) {
209 if (origin_repo->enc_version < 2) {
210 g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_BAD_ARGS,
211 "Library encryption version must be higher than 2");
212 seaf_repo_unref (origin_repo);
213 return NULL;
214 }
215
216 if (!passwd || passwd[0] == 0) {
217 g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_BAD_ARGS,
218 "Password is not set");
219 seaf_repo_unref (origin_repo);
220 return NULL;
221 }
222
223 if (seafile_verify_repo_passwd (origin_repo_id,
224 passwd,
225 origin_repo->magic,
226 origin_repo->enc_version,
227 origin_repo->salt) < 0) {
228 g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_GENERAL,
229 "Incorrect password");
230 seaf_repo_unref (origin_repo);
231 return NULL;
232 }
233 }
234
235 origin_head = seaf_commit_manager_get_commit (seaf->commit_mgr,
236 origin_repo->id,
237 origin_repo->version,
238 origin_repo->head->commit_id);
239 if (!origin_head) {
240 seaf_warning ("Failed to get head commit %.8s of repo %s.\n",
241 origin_repo->head->commit_id, origin_repo->id);
242 g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_GENERAL,
243 "Bad origin repo head");
244 goto error;
245 }
246
247 dir_id = seaf_fs_manager_get_seafdir_id_by_path (seaf->fs_mgr,
248 origin_repo->store_id,
249 origin_repo->version,
250 origin_head->root_id,
251 path, NULL);
252 if (!dir_id) {
253 seaf_warning ("Path %s doesn't exist or is not a dir in repo %.10s.\n",
254 path, origin_repo_id);
255 g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_BAD_ARGS, "Bad path");
256 goto error;
257 }
258
259 repo_id = gen_uuid();
260
261 /* Save virtual repo info before actually create the repo.
262 */
263 if (save_virtual_repo_info (mgr, repo_id, origin_repo_id,
264 path, origin_head->commit_id) < 0) {
265 seaf_warning ("Failed to save virtual repo info for %.10s:%s",
266 origin_repo_id, path);
267 g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_GENERAL, "Internal error");
268 goto error;
269 }
270
271 if (do_create_virtual_repo (mgr, origin_repo, repo_id, repo_name, repo_desc,
272 dir_id, owner, passwd, error) < 0)
273 goto error;
274
275 /* The size of virtual repo is non-zero at the beginning. */
276 update_repo_size (repo_id);
277
278 seaf_repo_unref (origin_repo);
279 seaf_commit_unref (origin_head);
280 g_free (dir_id);
281 return repo_id;
282
283 error:
284 seaf_repo_unref (origin_repo);
285 seaf_commit_unref (origin_head);
286 g_free (repo_id);
287 g_free (dir_id);
288 return NULL;
289 }
290
291 static char *
canonical_vrepo_path(const char * path)292 canonical_vrepo_path (const char *path)
293 {
294 char *ret = NULL;
295
296 if (path[0] != '/')
297 ret = g_strconcat ("/", path, NULL);
298 else
299 ret = g_strdup(path);
300
301 int len = strlen(ret);
302 int i = len - 1;
303 while (i >= 0 && ret[i] == '/')
304 ret[i--] = 0;
305
306 return ret;
307 }
308
309 char *
seaf_repo_manager_create_virtual_repo(SeafRepoManager * mgr,const char * origin_repo_id,const char * path,const char * repo_name,const char * repo_desc,const char * owner,const char * passwd,GError ** error)310 seaf_repo_manager_create_virtual_repo (SeafRepoManager *mgr,
311 const char *origin_repo_id,
312 const char *path,
313 const char *repo_name,
314 const char *repo_desc,
315 const char *owner,
316 const char *passwd,
317 GError **error)
318 {
319 char *repo_id = NULL;
320 char *orig_owner = NULL;
321 char *canon_path = NULL;
322 SeafVirtRepo *vrepo = NULL;
323 char *r_origin_repo_id = NULL;
324 char *r_path = NULL;
325
326 if (g_strcmp0 (path, "/") == 0) {
327 g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_GENERAL,
328 "Invalid path");
329 return NULL;
330 }
331
332 canon_path = canonical_vrepo_path (path);
333 vrepo = seaf_repo_manager_get_virtual_repo_info (mgr, origin_repo_id);
334 if (vrepo) {
335 // virtual repo
336 r_path = g_strconcat(vrepo->path, canon_path, NULL);
337 r_origin_repo_id = g_strdup (vrepo->origin_repo_id);
338 seaf_virtual_repo_info_free (vrepo);
339 repo_id = get_existing_virtual_repo (mgr, r_origin_repo_id, r_path);
340 if (repo_id) {
341 g_free (r_origin_repo_id);
342 g_free (r_path);
343 g_free (canon_path);
344 return repo_id;
345 }
346 } else {
347 r_path = g_strdup (canon_path);
348 r_origin_repo_id = g_strdup (origin_repo_id);
349 repo_id = get_existing_virtual_repo (mgr, r_origin_repo_id, r_path);
350 if (repo_id) {
351 g_free (r_origin_repo_id);
352 g_free (r_path);
353 g_free (canon_path);
354 return repo_id;
355 }
356 }
357
358 orig_owner = seaf_repo_manager_get_repo_owner (mgr, r_origin_repo_id);
359
360 repo_id = create_virtual_repo_common (mgr, r_origin_repo_id, r_path,
361 repo_name, repo_desc, orig_owner,
362 passwd, error);
363 if (!repo_id) {
364 goto out;
365 }
366
367 if (seaf_repo_manager_set_repo_owner (mgr, repo_id, orig_owner) < 0) {
368 seaf_warning ("Failed to set repo owner for %.10s.\n", repo_id);
369 g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_GENERAL,
370 "Failed to set repo owner.");
371 g_free (repo_id);
372 repo_id = NULL;
373 }
374
375 out:
376 g_free (orig_owner);
377 g_free (r_origin_repo_id);
378 g_free (r_path);
379 g_free (canon_path);
380 return repo_id;
381 }
382
383 static gboolean
load_virtual_info(SeafDBRow * row,void * p_vinfo)384 load_virtual_info (SeafDBRow *row, void *p_vinfo)
385 {
386 SeafVirtRepo *vinfo;
387 const char *repo_id, *origin_repo_id, *path, *base_commit;
388
389 repo_id = seaf_db_row_get_column_text (row, 0);
390 origin_repo_id = seaf_db_row_get_column_text (row, 1);
391 path = seaf_db_row_get_column_text (row, 2);
392 base_commit = seaf_db_row_get_column_text (row, 3);
393
394 vinfo = g_new0 (SeafVirtRepo, 1);
395 memcpy (vinfo->repo_id, repo_id, 36);
396 memcpy (vinfo->origin_repo_id, origin_repo_id, 36);
397 vinfo->path = g_strdup(path);
398 memcpy (vinfo->base_commit, base_commit, 40);
399
400 *((SeafVirtRepo **)p_vinfo) = vinfo;
401
402 return FALSE;
403 }
404
405 SeafVirtRepo *
seaf_repo_manager_get_virtual_repo_info(SeafRepoManager * mgr,const char * repo_id)406 seaf_repo_manager_get_virtual_repo_info (SeafRepoManager *mgr,
407 const char *repo_id)
408 {
409 char *sql;
410 SeafVirtRepo *vinfo = NULL;
411
412 sql = "SELECT repo_id, origin_repo, path, base_commit FROM VirtualRepo "
413 "WHERE repo_id = ?";
414 seaf_db_statement_foreach_row (seaf->db, sql, load_virtual_info, &vinfo,
415 1, "string", repo_id);
416
417 return vinfo;
418 }
419
420 void
seaf_virtual_repo_info_free(SeafVirtRepo * vinfo)421 seaf_virtual_repo_info_free (SeafVirtRepo *vinfo)
422 {
423 if (!vinfo) return;
424
425 g_free (vinfo->path);
426 g_free (vinfo);
427 }
428
429 gboolean
seaf_repo_manager_is_virtual_repo(SeafRepoManager * mgr,const char * repo_id)430 seaf_repo_manager_is_virtual_repo (SeafRepoManager *mgr, const char *repo_id)
431 {
432 gboolean db_err;
433
434 char *sql = "SELECT 1 FROM VirtualRepo WHERE repo_id = ?";
435 return seaf_db_statement_exists (seaf->db, sql, &db_err,
436 1, "string", repo_id);
437 }
438
439 char *
seaf_repo_manager_get_virtual_repo_id(SeafRepoManager * mgr,const char * origin_repo,const char * path,const char * owner)440 seaf_repo_manager_get_virtual_repo_id (SeafRepoManager *mgr,
441 const char *origin_repo,
442 const char *path,
443 const char *owner)
444 {
445 char *sql;
446 char *ret;
447
448 if (owner) {
449 sql = "SELECT RepoOwner.repo_id FROM RepoOwner, VirtualRepo "
450 "WHERE owner_id=? AND origin_repo=? AND path=? "
451 "AND RepoOwner.repo_id = VirtualRepo.repo_id";
452 ret = seaf_db_statement_get_string (mgr->seaf->db, sql,
453 3, "string", owner,
454 "string", origin_repo, "string", path);
455 } else {
456 sql = "SELECT repo_id FROM VirtualRepo "
457 "WHERE origin_repo=? AND path=? ";
458 ret = seaf_db_statement_get_string (mgr->seaf->db, sql,
459 2, "string", origin_repo, "string", path);
460 }
461
462 return ret;
463 }
464
465 static gboolean
collect_virtual_repo_ids(SeafDBRow * row,void * data)466 collect_virtual_repo_ids (SeafDBRow *row, void *data)
467 {
468 GList **p_ids = data;
469 const char *repo_id;
470
471 repo_id = seaf_db_row_get_column_text (row, 0);
472 *p_ids = g_list_prepend (*p_ids, g_strdup(repo_id));
473
474 return TRUE;
475 }
476
477 GList *
seaf_repo_manager_get_virtual_repos_by_owner(SeafRepoManager * mgr,const char * owner,GError ** error)478 seaf_repo_manager_get_virtual_repos_by_owner (SeafRepoManager *mgr,
479 const char *owner,
480 GError **error)
481 {
482 GList *id_list = NULL, *ptr;
483 GList *ret = NULL;
484 char *sql;
485
486 sql = "SELECT RepoOwner.repo_id FROM RepoOwner, VirtualRepo "
487 "WHERE owner_id=? "
488 "AND RepoOwner.repo_id = VirtualRepo.repo_id";
489
490 if (seaf_db_statement_foreach_row (mgr->seaf->db, sql,
491 collect_virtual_repo_ids, &id_list,
492 1, "string", owner) < 0) {
493 g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_GENERAL, "DB error");
494 return NULL;
495 }
496
497 char *repo_id;
498 SeafRepo *repo;
499 for (ptr = id_list; ptr; ptr = ptr->next) {
500 repo_id = ptr->data;
501 repo = seaf_repo_manager_get_repo (mgr, repo_id);
502 if (repo != NULL)
503 ret = g_list_prepend (ret, repo);
504 }
505
506 string_list_free (id_list);
507 return ret;
508 }
509
510 GList *
seaf_repo_manager_get_virtual_repo_ids_by_origin(SeafRepoManager * mgr,const char * origin_repo)511 seaf_repo_manager_get_virtual_repo_ids_by_origin (SeafRepoManager *mgr,
512 const char *origin_repo)
513 {
514 GList *ret = NULL;
515 char *sql;
516
517 sql = "SELECT repo_id FROM VirtualRepo WHERE origin_repo=?";
518 if (seaf_db_statement_foreach_row (mgr->seaf->db, sql,
519 collect_virtual_repo_ids, &ret,
520 1, "string", origin_repo) < 0) {
521 return NULL;
522 }
523
524 return g_list_reverse (ret);
525 }
526
527 static gboolean
collect_virtual_info(SeafDBRow * row,void * plist)528 collect_virtual_info (SeafDBRow *row, void *plist)
529 {
530 GList **pret = plist;
531 SeafVirtRepo *vinfo;
532 const char *repo_id, *origin_repo_id, *path, *base_commit;
533
534 repo_id = seaf_db_row_get_column_text (row, 0);
535 origin_repo_id = seaf_db_row_get_column_text (row, 1);
536 path = seaf_db_row_get_column_text (row, 2);
537 base_commit = seaf_db_row_get_column_text (row, 3);
538
539 vinfo = g_new0 (SeafVirtRepo, 1);
540 memcpy (vinfo->repo_id, repo_id, 36);
541 memcpy (vinfo->origin_repo_id, origin_repo_id, 36);
542 vinfo->path = g_strdup(path);
543 memcpy (vinfo->base_commit, base_commit, 40);
544
545 *pret = g_list_prepend (*pret, vinfo);
546
547 return TRUE;
548 }
549
550 GList *
seaf_repo_manager_get_virtual_info_by_origin(SeafRepoManager * mgr,const char * origin_repo)551 seaf_repo_manager_get_virtual_info_by_origin (SeafRepoManager *mgr,
552 const char *origin_repo)
553 {
554 GList *ret = NULL;
555 char *sql;
556
557 sql = "SELECT repo_id, origin_repo, path, base_commit "
558 "FROM VirtualRepo WHERE origin_repo=?";
559 if (seaf_db_statement_foreach_row (mgr->seaf->db, sql,
560 collect_virtual_info, &ret,
561 1, "string", origin_repo) < 0) {
562 return NULL;
563 }
564
565 return g_list_reverse (ret);
566 }
567
568 static void
set_virtual_repo_base_commit_path(const char * vrepo_id,const char * base_commit_id,const char * new_path)569 set_virtual_repo_base_commit_path (const char *vrepo_id, const char *base_commit_id,
570 const char *new_path)
571 {
572 seaf_db_statement_query (seaf->db,
573 "UPDATE VirtualRepo SET base_commit=?, path=? WHERE repo_id=?",
574 3, "string", base_commit_id, "string", new_path,
575 "string", vrepo_id);
576 }
577
578 int
seaf_repo_manager_merge_virtual_repo(SeafRepoManager * mgr,const char * repo_id,const char * exclude_repo)579 seaf_repo_manager_merge_virtual_repo (SeafRepoManager *mgr,
580 const char *repo_id,
581 const char *exclude_repo)
582 {
583 GList *vrepos = NULL, *ptr;
584 char *vrepo_id;
585 int ret = 0;
586
587 if (seaf_repo_manager_is_virtual_repo (mgr, repo_id)) {
588 add_merge_task (repo_id);
589 return 0;
590 }
591
592 vrepos = seaf_repo_manager_get_virtual_repo_ids_by_origin (mgr, repo_id);
593 for (ptr = vrepos; ptr; ptr = ptr->next) {
594 vrepo_id = ptr->data;
595
596 if (g_strcmp0 (exclude_repo, vrepo_id) == 0)
597 continue;
598
599 add_merge_task (vrepo_id);
600 }
601
602 string_list_free (vrepos);
603 return ret;
604 }
605
606 /*
607 * If the missing virtual repo is renamed, update database entry;
608 * otherwise delete the virtual repo.
609 */
610 static void
handle_missing_virtual_repo(SeafRepoManager * mgr,SeafRepo * repo,SeafCommit * head,SeafVirtRepo * vinfo,char ** return_new_path)611 handle_missing_virtual_repo (SeafRepoManager *mgr,
612 SeafRepo *repo, SeafCommit *head, SeafVirtRepo *vinfo,
613 char **return_new_path)
614 {
615 SeafCommit *parent = NULL;
616 char *old_dir_id = NULL;
617 GList *diff_res = NULL, *ptr;
618 DiffEntry *de;
619
620 parent = seaf_commit_manager_get_commit (seaf->commit_mgr,
621 head->repo_id, head->version,
622 head->parent_id);
623 if (!parent) {
624 seaf_warning ("Failed to find commit %s:%s.\n", head->repo_id, head->parent_id);
625 return;
626 }
627
628 int rc = diff_commits (parent, head, &diff_res, TRUE);
629 if (rc < 0) {
630 seaf_warning ("Failed to diff commit %s to %s.\n",
631 parent->commit_id, head->commit_id);
632 seaf_commit_unref (parent);
633 return;
634 }
635
636 char *path = vinfo->path, *sub_path, *p, *par_path;
637 gboolean is_renamed = FALSE;
638 p = &path[strlen(path)];
639 par_path = g_strdup(path);
640 sub_path = NULL;
641
642 while (1) {
643 GError *error = NULL;
644 old_dir_id = seaf_fs_manager_get_seafdir_id_by_path (seaf->fs_mgr,
645 repo->store_id,
646 repo->version,
647 parent->root_id,
648 par_path, &error);
649 if (!old_dir_id) {
650 if (error && error->code == SEAF_ERR_PATH_NO_EXIST) {
651 seaf_warning ("Failed to find %s under commit %s in repo %s.\n",
652 par_path, parent->commit_id, repo->store_id);
653 seaf_debug ("Delete virtual repo %.10s.\n", vinfo->repo_id);
654 seaf_repo_manager_del_virtual_repo (mgr, vinfo->repo_id);
655 g_clear_error (&error);
656 }
657 goto out;
658 }
659
660 char de_id[41];
661 char *new_path, *new_name;
662
663 for (ptr = diff_res; ptr; ptr = ptr->next) {
664 de = ptr->data;
665 if (de->status == DIFF_STATUS_DIR_RENAMED) {
666 rawdata_to_hex (de->sha1, de_id, 20);
667 if (strcmp (de_id, old_dir_id) == 0) {
668 if (sub_path != NULL)
669 new_path = g_strconcat ("/", de->new_name, "/", sub_path, NULL);
670 else
671 new_path = g_strconcat ("/", de->new_name, NULL);
672 seaf_debug ("Updating path of virtual repo %s to %s.\n",
673 vinfo->repo_id, new_path);
674 set_virtual_repo_base_commit_path (vinfo->repo_id,
675 head->commit_id, new_path);
676 if (return_new_path)
677 *return_new_path = g_strdup(new_path);
678 /* 'sub_path = NUll' means the virtual dir itself has been renamed,
679 * we need to make a new commit for the virtual repo
680 */
681 if (sub_path == NULL) {
682 new_name = g_path_get_basename(new_path);
683 seaf_repo_manager_edit_repo (vinfo->repo_id,
684 new_name,
685 "Changed library name",
686 NULL,
687 &error);
688 if (error) {
689 seaf_warning ("Failed to rename repo %s", new_name);
690 g_clear_error (&error);
691 }
692 g_free(new_name);
693 }
694 is_renamed = TRUE;
695 g_free (new_path);
696 break;
697 }
698 }
699 }
700 g_free (old_dir_id);
701
702 if (is_renamed)
703 break;
704
705 while (--p != path && *p != '/');
706
707 if (p == path)
708 break;
709
710 g_free (par_path);
711 g_free (sub_path);
712 par_path = g_strndup (path, p - path);
713 sub_path = g_strdup (p + 1);
714 }
715
716 if (!is_renamed) {
717 seaf_debug ("Delete virtual repo %.10s.\n", vinfo->repo_id);
718 seaf_repo_manager_del_virtual_repo (mgr, vinfo->repo_id);
719 }
720
721 out:
722 g_free (par_path);
723 g_free (sub_path);
724
725 for (ptr = diff_res; ptr; ptr = ptr->next)
726 diff_entry_free ((DiffEntry *)ptr->data);
727 g_list_free (diff_res);
728
729 seaf_commit_unref (parent);
730 }
731
732 void
seaf_repo_manager_cleanup_virtual_repos(SeafRepoManager * mgr,const char * origin_repo_id)733 seaf_repo_manager_cleanup_virtual_repos (SeafRepoManager *mgr,
734 const char *origin_repo_id)
735 {
736 SeafRepo *repo = NULL;
737 SeafCommit *head = NULL;
738 GList *vinfo_list = NULL, *ptr;
739 SeafVirtRepo *vinfo;
740 SeafDir *dir;
741 GError *error = NULL;
742
743 repo = seaf_repo_manager_get_repo (mgr, origin_repo_id);
744 if (!repo) {
745 seaf_warning ("Failed to get repo %.10s.\n", origin_repo_id);
746 goto out;
747 }
748
749 head = seaf_commit_manager_get_commit (seaf->commit_mgr,
750 repo->id,
751 repo->version,
752 repo->head->commit_id);
753 if (!head) {
754 seaf_warning ("Failed to get commit %s:%.8s.\n",
755 repo->id, repo->head->commit_id);
756 goto out;
757 }
758
759 vinfo_list = seaf_repo_manager_get_virtual_info_by_origin (mgr,
760 origin_repo_id);
761 for (ptr = vinfo_list; ptr; ptr = ptr->next) {
762 vinfo = ptr->data;
763 dir = seaf_fs_manager_get_seafdir_by_path (seaf->fs_mgr,
764 repo->store_id,
765 repo->version,
766 head->root_id,
767 vinfo->path,
768 &error);
769 if (error) {
770 if (error->code == SEAF_ERR_PATH_NO_EXIST) {
771 handle_missing_virtual_repo (mgr, repo, head, vinfo, NULL);
772 }
773 g_clear_error (&error);
774 } else
775 seaf_dir_free (dir);
776 seaf_virtual_repo_info_free (vinfo);
777 }
778
779 out:
780 seaf_repo_unref (repo);
781 seaf_commit_unref (head);
782 g_list_free (vinfo_list);
783 }
784
merge_virtual_repo(void * vtask)785 static void *merge_virtual_repo (void *vtask)
786 {
787 MergeTask *task = vtask;
788 SeafRepoManager *mgr = seaf->repo_mgr;
789 char *repo_id = task->repo_id;
790 SeafVirtRepo *vinfo;
791 SeafRepo *repo = NULL, *orig_repo = NULL;
792 SeafCommit *head = NULL, *orig_head = NULL, *base = NULL;
793 char *root = NULL, *orig_root = NULL, *base_root = NULL;
794 char new_base_commit[41] = {0};
795 int ret = 0;
796
797 /* repos */
798 repo = seaf_repo_manager_get_repo (mgr, repo_id);
799 if (!repo) {
800 seaf_warning ("Failed to get virt repo %.10s.\n", repo_id);
801 ret = -1;
802 goto out;
803 }
804
805 vinfo = repo->virtual_info;
806
807 orig_repo = seaf_repo_manager_get_repo (mgr, vinfo->origin_repo_id);
808 if (!orig_repo) {
809 seaf_warning ("Failed to get orig repo %.10s.\n", vinfo->origin_repo_id);
810 ret = -1;
811 goto out;
812 }
813
814 /* commits */
815 head = seaf_commit_manager_get_commit (seaf->commit_mgr,
816 repo->id, repo->version,
817 repo->head->commit_id);
818 if (!head) {
819 seaf_warning ("Failed to get commit %s:%.8s.\n",
820 repo->id, repo->head->commit_id);
821 ret = -1;
822 goto out;
823 }
824
825 orig_head = seaf_commit_manager_get_commit (seaf->commit_mgr,
826 orig_repo->id, orig_repo->version,
827 orig_repo->head->commit_id);
828 if (!orig_head) {
829 seaf_warning ("Failed to get commit %s:%.8s.\n",
830 orig_repo->id, orig_repo->head->commit_id);
831 ret = -1;
832 goto out;
833 }
834
835 orig_root = seaf_fs_manager_get_seafdir_id_by_path (seaf->fs_mgr,
836 orig_repo->store_id,
837 orig_repo->version,
838 orig_head->root_id,
839 vinfo->path,
840 NULL);
841 if (!orig_root) {
842 seaf_debug("Path %s not found in origin repo %.8s, delete or rename virtual repo %.8s\n",
843 vinfo->path, vinfo->origin_repo_id, repo_id);
844
845 char *new_path = NULL;
846 handle_missing_virtual_repo (mgr, orig_repo, orig_head, vinfo, &new_path);
847 if (new_path != NULL) {
848 orig_root = seaf_fs_manager_get_seafdir_id_by_path (seaf->fs_mgr,
849 orig_repo->store_id,
850 orig_repo->version,
851 orig_head->root_id,
852 new_path,
853 NULL);
854 g_free (new_path);
855 }
856 if (!orig_root)
857 goto out;
858 }
859
860 base = seaf_commit_manager_get_commit (seaf->commit_mgr,
861 orig_repo->id, orig_repo->version,
862 vinfo->base_commit);
863 if (!base) {
864 seaf_warning ("Failed to get commit %s:%.8s.\n",
865 orig_repo->id, vinfo->base_commit);
866 ret = -1;
867 goto out;
868 }
869
870 /* fs roots */
871 root = head->root_id;
872
873 base_root = seaf_fs_manager_get_seafdir_id_by_path (seaf->fs_mgr,
874 orig_repo->store_id,
875 orig_repo->version,
876 base->root_id,
877 vinfo->path,
878 NULL);
879 if (!base_root) {
880 seaf_warning ("Cannot find seafdir for repo %.10s path %s.\n",
881 vinfo->origin_repo_id, vinfo->path);
882 ret = -1;
883 goto out;
884 }
885
886 if (strcmp (root, orig_root) == 0) {
887 /* Nothing to merge. */
888 seaf_debug ("Nothing to merge.\n");
889 } else if (strcmp (base_root, root) == 0) {
890 /* Origin changed, virtual repo not changed. */
891 seaf_debug ("Origin changed, virtual repo not changed.\n");
892 ret = seaf_repo_manager_update_dir (mgr,
893 repo_id,
894 "/",
895 orig_root,
896 orig_head->creator_name,
897 head->commit_id,
898 NULL,
899 NULL);
900 if (ret < 0) {
901 seaf_warning ("Failed to update root of virtual repo %.10s.\n",
902 repo_id);
903 goto out;
904 }
905
906 set_virtual_repo_base_commit_path (repo->id, orig_repo->head->commit_id,
907 vinfo->path);
908 } else if (strcmp (base_root, orig_root) == 0) {
909 /* Origin not changed, virutal repo changed. */
910 seaf_debug ("Origin not changed, virutal repo changed.\n");
911 ret = seaf_repo_manager_update_dir (mgr,
912 vinfo->origin_repo_id,
913 vinfo->path,
914 root,
915 head->creator_name,
916 orig_head->commit_id,
917 new_base_commit,
918 NULL);
919 if (ret < 0) {
920 seaf_warning ("Failed to update origin repo %.10s path %s.\n",
921 vinfo->origin_repo_id, vinfo->path);
922 goto out;
923 }
924
925 set_virtual_repo_base_commit_path (repo->id, new_base_commit, vinfo->path);
926
927 /* Since origin repo is updated, we have to merge it with other
928 * virtual repos if necessary. But we don't need to merge with
929 * the current virtual repo again.
930 */
931 seaf_repo_manager_cleanup_virtual_repos (mgr, vinfo->origin_repo_id);
932 seaf_repo_manager_merge_virtual_repo (mgr,
933 vinfo->origin_repo_id,
934 repo_id);
935 } else {
936 /* Both origin and virtual repo are changed. */
937 seaf_debug ("Both origin and virtual repo are changed.\n");
938 MergeOptions opt;
939 const char *roots[3];
940
941 memset (&opt, 0, sizeof(opt));
942 opt.n_ways = 3;
943 memcpy (opt.remote_repo_id, repo_id, 36);
944 memcpy (opt.remote_head, head->commit_id, 40);
945 opt.do_merge = TRUE;
946
947 roots[0] = base_root; /* base */
948 roots[1] = orig_root; /* head */
949 roots[2] = root; /* remote */
950
951 /* Merge virtual into origin */
952 if (seaf_merge_trees (orig_repo->store_id, orig_repo->version,
953 3, roots, &opt) < 0) {
954 seaf_warning ("Failed to merge virtual repo %.10s.\n", repo_id);
955 ret = -1;
956 goto out;
957 }
958
959 seaf_debug ("Number of dirs visted in merge: %d.\n", opt.visit_dirs);
960
961 /* Update virtual repo root. */
962 ret = seaf_repo_manager_update_dir (mgr,
963 repo_id,
964 "/",
965 opt.merged_tree_root,
966 orig_head->creator_name,
967 head->commit_id,
968 NULL,
969 NULL);
970 if (ret < 0) {
971 seaf_warning ("Failed to update root of virtual repo %.10s.\n",
972 repo_id);
973 goto out;
974 }
975
976 /* Update origin repo path. */
977 ret = seaf_repo_manager_update_dir (mgr,
978 vinfo->origin_repo_id,
979 vinfo->path,
980 opt.merged_tree_root,
981 head->creator_name,
982 orig_head->commit_id,
983 new_base_commit,
984 NULL);
985 if (ret < 0) {
986 seaf_warning ("Failed to update origin repo %.10s path %s.\n",
987 vinfo->origin_repo_id, vinfo->path);
988 goto out;
989 }
990
991 set_virtual_repo_base_commit_path (repo->id, new_base_commit, vinfo->path);
992
993 seaf_repo_manager_cleanup_virtual_repos (mgr, vinfo->origin_repo_id);
994 seaf_repo_manager_merge_virtual_repo (mgr,
995 vinfo->origin_repo_id,
996 repo_id);
997 }
998
999 out:
1000 seaf_repo_unref (repo);
1001 seaf_repo_unref (orig_repo);
1002 seaf_commit_unref (head);
1003 seaf_commit_unref (orig_head);
1004 seaf_commit_unref (base);
1005 g_free (base_root);
1006 g_free (orig_root);
1007 return vtask;
1008 }
1009
merge_virtual_repo_done(void * vtask)1010 static void merge_virtual_repo_done (void *vtask)
1011 {
1012 MergeTask *task = vtask;
1013
1014 seaf_debug ("Task %.8s done.\n", task->repo_id);
1015
1016 g_hash_table_remove (scheduler->running, task->repo_id);
1017 }
1018
1019 static int
schedule_merge_tasks(void * vscheduler)1020 schedule_merge_tasks (void *vscheduler)
1021 {
1022 MergeScheduler *scheduler = vscheduler;
1023 int n_running = g_hash_table_size (scheduler->running);
1024 MergeTask *task;
1025
1026 /* seaf_debug ("Waiting tasks %d, running tasks %d.\n", */
1027 /* g_queue_get_length (scheduler->queue), n_running); */
1028
1029 if (n_running >= MAX_RUNNING_TASKS)
1030 return TRUE;
1031
1032 pthread_mutex_lock (&scheduler->q_lock);
1033
1034 while (n_running < MAX_RUNNING_TASKS) {
1035 task = g_queue_pop_head (scheduler->queue);
1036 if (!task)
1037 break;
1038
1039 if (!g_hash_table_lookup (scheduler->running, task->repo_id)) {
1040 int ret = ccnet_job_manager_schedule_job (scheduler->tpool,
1041 merge_virtual_repo,
1042 merge_virtual_repo_done,
1043 task);
1044 if (ret < 0) {
1045 g_queue_push_tail (scheduler->queue, task);
1046 break;
1047 }
1048
1049 g_hash_table_insert (scheduler->running,
1050 g_strdup(task->repo_id),
1051 task);
1052 n_running++;
1053
1054 seaf_debug ("Run task for repo %.8s.\n", task->repo_id);
1055 } else {
1056 seaf_debug ("A task for repo %.8s is already running.\n", task->repo_id);
1057
1058 g_queue_push_tail (scheduler->queue, task);
1059 break;
1060 }
1061 }
1062
1063 pthread_mutex_unlock (&scheduler->q_lock);
1064
1065 return TRUE;
1066 }
1067
task_cmp(gconstpointer a,gconstpointer b)1068 static gint task_cmp (gconstpointer a, gconstpointer b)
1069 {
1070 const MergeTask *task_a = a;
1071 const MergeTask *task_b = b;
1072
1073 return strcmp (task_a->repo_id, task_b->repo_id);
1074 }
1075
1076 static void
add_merge_task(const char * repo_id)1077 add_merge_task (const char *repo_id)
1078 {
1079 MergeTask *task = g_new0 (MergeTask, 1);
1080
1081 seaf_debug ("Add merge task for repo %.8s.\n", repo_id);
1082
1083 memcpy (task->repo_id, repo_id, 36);
1084
1085 pthread_mutex_lock (&scheduler->q_lock);
1086
1087 if (g_queue_find_custom (scheduler->queue, task, task_cmp) != NULL) {
1088 seaf_debug ("Task for repo %.8s is already queued.\n", repo_id);
1089 g_free (task);
1090 } else
1091 g_queue_push_tail (scheduler->queue, task);
1092
1093 pthread_mutex_unlock (&scheduler->q_lock);
1094 }
1095
1096 int
seaf_repo_manager_init_merge_scheduler()1097 seaf_repo_manager_init_merge_scheduler ()
1098 {
1099 scheduler = g_new0 (MergeScheduler, 1);
1100 if (!scheduler)
1101 return -1;
1102
1103 pthread_mutex_init (&scheduler->q_lock, NULL);
1104
1105 scheduler->queue = g_queue_new ();
1106 scheduler->running = g_hash_table_new_full (g_str_hash, g_str_equal,
1107 g_free, g_free);
1108
1109 scheduler->tpool = ccnet_job_manager_new (MAX_RUNNING_TASKS);
1110 scheduler->timer = ccnet_timer_new (schedule_merge_tasks,
1111 scheduler,
1112 SCHEDULE_INTERVAL);
1113 return 0;
1114 }
1115