1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 
3 #include "common.h"
4 
5 #include "utils.h"
6 
7 #define DEBUG_FLAG SEAFILE_DEBUG_OTHER
8 #include "log.h"
9 
10 #include <timer.h>
11 #include <pthread.h>
12 
13 #include "seafile-session.h"
14 #include "commit-mgr.h"
15 #include "branch-mgr.h"
16 #include "repo-mgr.h"
17 #include "fs-mgr.h"
18 #include "seafile-error.h"
19 #include "seafile-crypt.h"
20 #include "merge-new.h"
21 #include "seafile-error.h"
22 
23 #include "seaf-db.h"
24 #include "diff-simple.h"
25 
26 #define MAX_RUNNING_TASKS 5
27 #define SCHEDULE_INTERVAL 1000  /* 1s */
28 
29 typedef struct MergeTask {
30     char repo_id[37];
31 } MergeTask;
32 
33 typedef struct MergeScheduler {
34     pthread_mutex_t q_lock;
35     GQueue *queue;
36     GHashTable *running;
37     CcnetJobManager *tpool;
38     CcnetTimer *timer;
39 } MergeScheduler;
40 
41 static MergeScheduler *scheduler = NULL;
42 
43 static void
44 add_merge_task (const char *repo_id);
45 
46 static int
save_virtual_repo_info(SeafRepoManager * mgr,const char * repo_id,const char * origin_repo_id,const char * path,const char * base_commit)47 save_virtual_repo_info (SeafRepoManager *mgr,
48                         const char *repo_id,
49                         const char *origin_repo_id,
50                         const char *path,
51                         const char *base_commit)
52 {
53     int ret = 0;
54 
55     if (seaf_db_statement_query (mgr->seaf->db,
56                        "INSERT INTO VirtualRepo (repo_id, origin_repo, path, base_commit) VALUES (?, ?, ?, ?)",
57                        4, "string", repo_id, "string", origin_repo_id,
58                        "string", path, "string", base_commit) < 0)
59         ret = -1;
60 
61     return ret;
62 }
63 
64 static int
do_create_virtual_repo(SeafRepoManager * mgr,SeafRepo * origin_repo,const char * repo_id,const char * repo_name,const char * repo_desc,const char * root_id,const char * user,const char * passwd,GError ** error)65 do_create_virtual_repo (SeafRepoManager *mgr,
66                         SeafRepo *origin_repo,
67                         const char *repo_id,
68                         const char *repo_name,
69                         const char *repo_desc,
70                         const char *root_id,
71                         const char *user,
72                         const char *passwd,
73                         GError **error)
74 {
75     SeafRepo *repo = NULL;
76     SeafCommit *commit = NULL;
77     SeafBranch *master = NULL;
78     int ret = 0;
79 
80     repo = seaf_repo_new (repo_id, repo_name, repo_desc);
81 
82     repo->no_local_history = TRUE;
83     if (passwd != NULL && passwd[0] != '\0') {
84         repo->encrypted = TRUE;
85         repo->enc_version = origin_repo->enc_version;
86         if (repo->enc_version >= 3)
87             memcpy (repo->salt, origin_repo->salt, 64);
88         seafile_generate_magic (repo->enc_version, repo_id, passwd, repo->salt, repo->magic);
89         if (repo->enc_version >= 2)
90             memcpy (repo->random_key, origin_repo->random_key, 96);
91     }
92 
93     /* Virtual repos share fs and block store with origin repo and
94      * have the same version as the origin.
95      */
96     repo->version = origin_repo->version;
97     memcpy (repo->store_id, origin_repo->id, 36);
98 
99     commit = seaf_commit_new (NULL, repo->id,
100                               root_id, /* root id */
101                               user, /* creator */
102                               EMPTY_SHA1, /* creator id */
103                               repo_desc,  /* description */
104                               0);         /* ctime */
105 
106     seaf_repo_to_commit (repo, commit);
107     if (seaf_commit_manager_add_commit (seaf->commit_mgr, commit) < 0) {
108         seaf_warning ("Failed to add commit.\n");
109         g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_GENERAL,
110                      "Failed to add commit");
111         ret = -1;
112         goto out;
113     }
114 
115     master = seaf_branch_new ("master", repo->id, commit->commit_id);
116     if (seaf_branch_manager_add_branch (seaf->branch_mgr, master) < 0) {
117         seaf_warning ("Failed to add branch.\n");
118         g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_GENERAL,
119                      "Failed to add branch");
120         ret = -1;
121         goto out;
122     }
123 
124     if (seaf_repo_set_head (repo, master) < 0) {
125         seaf_warning ("Failed to set repo head.\n");
126         g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_GENERAL,
127                      "Failed to set repo head.");
128         ret = -1;
129         goto out;
130     }
131 
132     if (seaf_repo_manager_add_repo (mgr, repo) < 0) {
133         seaf_warning ("Failed to add repo.\n");
134         g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_GENERAL,
135                      "Failed to add repo.");
136         ret = -1;
137         goto out;
138     }
139 
140     if (set_repo_commit_to_db (repo_id, repo_name, commit->ctime,
141                                repo->version, repo->encrypted, user) < 0) {
142         seaf_warning("Failed to add repo info.\n");
143         g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_GENERAL,
144                      "Failed to add repo info");
145         ret = -1;
146         goto out;
147     }
148 
149 out:
150     if (repo)
151         seaf_repo_unref (repo);
152     if (commit)
153         seaf_commit_unref (commit);
154     if (master)
155         seaf_branch_unref (master);
156 
157     return ret;
158 }
159 
160 static void
update_repo_size(const char * repo_id)161 update_repo_size(const char *repo_id)
162 {
163     schedule_repo_size_computation (seaf->size_sched, repo_id);
164 }
165 
166 static char *
get_existing_virtual_repo(SeafRepoManager * mgr,const char * origin_repo_id,const char * path)167 get_existing_virtual_repo (SeafRepoManager *mgr,
168                            const char *origin_repo_id,
169                            const char *path)
170 {
171     char *sql = "SELECT repo_id FROM VirtualRepo WHERE origin_repo = ? AND path = ?";
172 
173     return seaf_db_statement_get_string (mgr->seaf->db, sql, 2,
174                                          "string", origin_repo_id, "string", path);
175 }
176 
177 static char *
create_virtual_repo_common(SeafRepoManager * mgr,const char * origin_repo_id,const char * path,const char * repo_name,const char * repo_desc,const char * owner,const char * passwd,GError ** error)178 create_virtual_repo_common (SeafRepoManager *mgr,
179                             const char *origin_repo_id,
180                             const char *path,
181                             const char *repo_name,
182                             const char *repo_desc,
183                             const char *owner,
184                             const char *passwd,
185                             GError **error)
186 {
187     SeafRepo *origin_repo = NULL;
188     SeafCommit *origin_head = NULL;
189     char *repo_id = NULL;
190     char *dir_id = NULL;
191 
192     origin_repo = seaf_repo_manager_get_repo (mgr, origin_repo_id);
193     if (!origin_repo) {
194         seaf_warning ("Failed to get origin repo %.10s\n", origin_repo_id);
195         g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_GENERAL,
196                      "Origin library not exists");
197         return NULL;
198     }
199     if (origin_repo->status != REPO_STATUS_NORMAL) {
200         seaf_warning("Status of repo %.8s is %d, can't create VirtualRepo\n",
201                      origin_repo_id, origin_repo->status);
202         g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_GENERAL,
203                      "Unnormal repo status");
204         seaf_repo_unref (origin_repo);
205         return NULL;
206     }
207 
208     if (origin_repo->encrypted) {
209         if (origin_repo->enc_version < 2) {
210             g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_BAD_ARGS,
211                          "Library encryption version must be higher than 2");
212             seaf_repo_unref (origin_repo);
213             return NULL;
214         }
215 
216         if (!passwd || passwd[0] == 0) {
217             g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_BAD_ARGS,
218                          "Password is not set");
219             seaf_repo_unref (origin_repo);
220             return NULL;
221         }
222 
223         if (seafile_verify_repo_passwd (origin_repo_id,
224                                         passwd,
225                                         origin_repo->magic,
226                                         origin_repo->enc_version,
227                                         origin_repo->salt) < 0) {
228             g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_GENERAL,
229                          "Incorrect password");
230             seaf_repo_unref (origin_repo);
231             return NULL;
232         }
233     }
234 
235     origin_head = seaf_commit_manager_get_commit (seaf->commit_mgr,
236                                                   origin_repo->id,
237                                                   origin_repo->version,
238                                                   origin_repo->head->commit_id);
239     if (!origin_head) {
240         seaf_warning ("Failed to get head commit %.8s of repo %s.\n",
241                       origin_repo->head->commit_id, origin_repo->id);
242         g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_GENERAL,
243                      "Bad origin repo head");
244         goto error;
245     }
246 
247     dir_id = seaf_fs_manager_get_seafdir_id_by_path (seaf->fs_mgr,
248                                                      origin_repo->store_id,
249                                                      origin_repo->version,
250                                                      origin_head->root_id,
251                                                      path, NULL);
252     if (!dir_id) {
253         seaf_warning ("Path %s doesn't exist or is not a dir in repo %.10s.\n",
254                       path, origin_repo_id);
255         g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_BAD_ARGS, "Bad path");
256         goto error;
257     }
258 
259     repo_id = gen_uuid();
260 
261     /* Save virtual repo info before actually create the repo.
262      */
263     if (save_virtual_repo_info (mgr, repo_id, origin_repo_id,
264                                 path, origin_head->commit_id) < 0) {
265         seaf_warning ("Failed to save virtual repo info for %.10s:%s",
266                       origin_repo_id, path);
267         g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_GENERAL, "Internal error");
268         goto error;
269     }
270 
271     if (do_create_virtual_repo (mgr, origin_repo, repo_id, repo_name, repo_desc,
272                                 dir_id, owner, passwd, error) < 0)
273         goto error;
274 
275     /* The size of virtual repo is non-zero at the beginning. */
276     update_repo_size (repo_id);
277 
278     seaf_repo_unref (origin_repo);
279     seaf_commit_unref (origin_head);
280     g_free (dir_id);
281     return repo_id;
282 
283 error:
284     seaf_repo_unref (origin_repo);
285     seaf_commit_unref (origin_head);
286     g_free (repo_id);
287     g_free (dir_id);
288     return NULL;
289 }
290 
291 static char *
canonical_vrepo_path(const char * path)292 canonical_vrepo_path (const char *path)
293 {
294     char *ret = NULL;
295 
296     if (path[0] != '/')
297         ret = g_strconcat ("/", path, NULL);
298     else
299         ret = g_strdup(path);
300 
301     int len = strlen(ret);
302     int i = len - 1;
303     while (i >= 0 && ret[i] == '/')
304         ret[i--] = 0;
305 
306     return ret;
307 }
308 
309 char *
seaf_repo_manager_create_virtual_repo(SeafRepoManager * mgr,const char * origin_repo_id,const char * path,const char * repo_name,const char * repo_desc,const char * owner,const char * passwd,GError ** error)310 seaf_repo_manager_create_virtual_repo (SeafRepoManager *mgr,
311                                        const char *origin_repo_id,
312                                        const char *path,
313                                        const char *repo_name,
314                                        const char *repo_desc,
315                                        const char *owner,
316                                        const char *passwd,
317                                        GError **error)
318 {
319     char *repo_id = NULL;
320     char *orig_owner = NULL;
321     char *canon_path = NULL;
322     SeafVirtRepo *vrepo = NULL;
323     char *r_origin_repo_id = NULL;
324     char *r_path = NULL;
325 
326     if (g_strcmp0 (path, "/") == 0) {
327         g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_GENERAL,
328                      "Invalid path");
329         return NULL;
330     }
331 
332     canon_path = canonical_vrepo_path (path);
333     vrepo = seaf_repo_manager_get_virtual_repo_info (mgr, origin_repo_id);
334     if (vrepo) {
335         // virtual repo
336         r_path = g_strconcat(vrepo->path, canon_path, NULL);
337         r_origin_repo_id = g_strdup (vrepo->origin_repo_id);
338         seaf_virtual_repo_info_free (vrepo);
339         repo_id = get_existing_virtual_repo (mgr, r_origin_repo_id, r_path);
340         if (repo_id) {
341             g_free (r_origin_repo_id);
342             g_free (r_path);
343             g_free (canon_path);
344             return repo_id;
345         }
346     } else {
347         r_path = g_strdup (canon_path);
348         r_origin_repo_id = g_strdup (origin_repo_id);
349         repo_id = get_existing_virtual_repo (mgr, r_origin_repo_id, r_path);
350         if (repo_id) {
351             g_free (r_origin_repo_id);
352             g_free (r_path);
353             g_free (canon_path);
354             return repo_id;
355         }
356      }
357 
358     orig_owner = seaf_repo_manager_get_repo_owner (mgr, r_origin_repo_id);
359 
360     repo_id = create_virtual_repo_common (mgr, r_origin_repo_id, r_path,
361                                           repo_name, repo_desc, orig_owner,
362                                           passwd, error);
363     if (!repo_id) {
364         goto out;
365     }
366 
367     if (seaf_repo_manager_set_repo_owner (mgr, repo_id, orig_owner) < 0) {
368         seaf_warning ("Failed to set repo owner for %.10s.\n", repo_id);
369         g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_GENERAL,
370                      "Failed to set repo owner.");
371         g_free (repo_id);
372         repo_id = NULL;
373     }
374 
375 out:
376     g_free (orig_owner);
377     g_free (r_origin_repo_id);
378     g_free (r_path);
379     g_free (canon_path);
380     return repo_id;
381 }
382 
383 static gboolean
load_virtual_info(SeafDBRow * row,void * p_vinfo)384 load_virtual_info (SeafDBRow *row, void *p_vinfo)
385 {
386     SeafVirtRepo *vinfo;
387     const char *repo_id, *origin_repo_id, *path, *base_commit;
388 
389     repo_id = seaf_db_row_get_column_text (row, 0);
390     origin_repo_id = seaf_db_row_get_column_text (row, 1);
391     path = seaf_db_row_get_column_text (row, 2);
392     base_commit = seaf_db_row_get_column_text (row, 3);
393 
394     vinfo = g_new0 (SeafVirtRepo, 1);
395     memcpy (vinfo->repo_id, repo_id, 36);
396     memcpy (vinfo->origin_repo_id, origin_repo_id, 36);
397     vinfo->path = g_strdup(path);
398     memcpy (vinfo->base_commit, base_commit, 40);
399 
400     *((SeafVirtRepo **)p_vinfo) = vinfo;
401 
402     return FALSE;
403 }
404 
405 SeafVirtRepo *
seaf_repo_manager_get_virtual_repo_info(SeafRepoManager * mgr,const char * repo_id)406 seaf_repo_manager_get_virtual_repo_info (SeafRepoManager *mgr,
407                                          const char *repo_id)
408 {
409     char *sql;
410     SeafVirtRepo *vinfo = NULL;
411 
412     sql = "SELECT repo_id, origin_repo, path, base_commit FROM VirtualRepo "
413         "WHERE repo_id = ?";
414     seaf_db_statement_foreach_row (seaf->db, sql, load_virtual_info, &vinfo,
415                                    1, "string", repo_id);
416 
417     return vinfo;
418 }
419 
420 void
seaf_virtual_repo_info_free(SeafVirtRepo * vinfo)421 seaf_virtual_repo_info_free (SeafVirtRepo *vinfo)
422 {
423     if (!vinfo) return;
424 
425     g_free (vinfo->path);
426     g_free (vinfo);
427 }
428 
429 gboolean
seaf_repo_manager_is_virtual_repo(SeafRepoManager * mgr,const char * repo_id)430 seaf_repo_manager_is_virtual_repo (SeafRepoManager *mgr, const char *repo_id)
431 {
432     gboolean db_err;
433 
434     char *sql = "SELECT 1 FROM VirtualRepo WHERE repo_id = ?";
435     return seaf_db_statement_exists (seaf->db, sql, &db_err,
436                                      1, "string", repo_id);
437 }
438 
439 char *
seaf_repo_manager_get_virtual_repo_id(SeafRepoManager * mgr,const char * origin_repo,const char * path,const char * owner)440 seaf_repo_manager_get_virtual_repo_id (SeafRepoManager *mgr,
441                                        const char *origin_repo,
442                                        const char *path,
443                                        const char *owner)
444 {
445     char *sql;
446     char *ret;
447 
448     if (owner) {
449         sql = "SELECT RepoOwner.repo_id FROM RepoOwner, VirtualRepo "
450               "WHERE owner_id=? AND origin_repo=? AND path=? "
451               "AND RepoOwner.repo_id = VirtualRepo.repo_id";
452         ret = seaf_db_statement_get_string (mgr->seaf->db, sql,
453                                             3, "string", owner,
454                                             "string", origin_repo, "string", path);
455     } else {
456         sql = "SELECT repo_id FROM VirtualRepo "
457               "WHERE origin_repo=? AND path=? ";
458         ret = seaf_db_statement_get_string (mgr->seaf->db, sql,
459                                             2, "string", origin_repo, "string", path);
460     }
461 
462     return ret;
463 }
464 
465 static gboolean
collect_virtual_repo_ids(SeafDBRow * row,void * data)466 collect_virtual_repo_ids (SeafDBRow *row, void *data)
467 {
468     GList **p_ids = data;
469     const char *repo_id;
470 
471     repo_id = seaf_db_row_get_column_text (row, 0);
472     *p_ids = g_list_prepend (*p_ids, g_strdup(repo_id));
473 
474     return TRUE;
475 }
476 
477 GList *
seaf_repo_manager_get_virtual_repos_by_owner(SeafRepoManager * mgr,const char * owner,GError ** error)478 seaf_repo_manager_get_virtual_repos_by_owner (SeafRepoManager *mgr,
479                                               const char *owner,
480                                               GError **error)
481 {
482     GList *id_list = NULL, *ptr;
483     GList *ret = NULL;
484     char *sql;
485 
486     sql = "SELECT RepoOwner.repo_id FROM RepoOwner, VirtualRepo "
487         "WHERE owner_id=? "
488         "AND RepoOwner.repo_id = VirtualRepo.repo_id";
489 
490     if (seaf_db_statement_foreach_row (mgr->seaf->db, sql,
491                                        collect_virtual_repo_ids, &id_list,
492                                        1, "string", owner) < 0) {
493         g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_GENERAL, "DB error");
494         return NULL;
495     }
496 
497     char *repo_id;
498     SeafRepo *repo;
499     for (ptr = id_list; ptr; ptr = ptr->next) {
500         repo_id = ptr->data;
501         repo = seaf_repo_manager_get_repo (mgr, repo_id);
502         if (repo != NULL)
503             ret = g_list_prepend (ret, repo);
504     }
505 
506     string_list_free (id_list);
507     return ret;
508 }
509 
510 GList *
seaf_repo_manager_get_virtual_repo_ids_by_origin(SeafRepoManager * mgr,const char * origin_repo)511 seaf_repo_manager_get_virtual_repo_ids_by_origin (SeafRepoManager *mgr,
512                                                   const char *origin_repo)
513 {
514     GList *ret = NULL;
515     char *sql;
516 
517     sql = "SELECT repo_id FROM VirtualRepo WHERE origin_repo=?";
518     if (seaf_db_statement_foreach_row (mgr->seaf->db, sql,
519                                        collect_virtual_repo_ids, &ret,
520                                        1, "string", origin_repo) < 0) {
521         return NULL;
522     }
523 
524     return g_list_reverse (ret);
525 }
526 
527 static gboolean
collect_virtual_info(SeafDBRow * row,void * plist)528 collect_virtual_info (SeafDBRow *row, void *plist)
529 {
530     GList **pret = plist;
531     SeafVirtRepo *vinfo;
532     const char *repo_id, *origin_repo_id, *path, *base_commit;
533 
534     repo_id = seaf_db_row_get_column_text (row, 0);
535     origin_repo_id = seaf_db_row_get_column_text (row, 1);
536     path = seaf_db_row_get_column_text (row, 2);
537     base_commit = seaf_db_row_get_column_text (row, 3);
538 
539     vinfo = g_new0 (SeafVirtRepo, 1);
540     memcpy (vinfo->repo_id, repo_id, 36);
541     memcpy (vinfo->origin_repo_id, origin_repo_id, 36);
542     vinfo->path = g_strdup(path);
543     memcpy (vinfo->base_commit, base_commit, 40);
544 
545     *pret = g_list_prepend (*pret, vinfo);
546 
547     return TRUE;
548 }
549 
550 GList *
seaf_repo_manager_get_virtual_info_by_origin(SeafRepoManager * mgr,const char * origin_repo)551 seaf_repo_manager_get_virtual_info_by_origin (SeafRepoManager *mgr,
552                                               const char *origin_repo)
553 {
554     GList *ret = NULL;
555     char *sql;
556 
557     sql = "SELECT repo_id, origin_repo, path, base_commit "
558         "FROM VirtualRepo WHERE origin_repo=?";
559     if (seaf_db_statement_foreach_row (mgr->seaf->db, sql,
560                                        collect_virtual_info, &ret,
561                                        1, "string", origin_repo) < 0) {
562         return NULL;
563     }
564 
565     return g_list_reverse (ret);
566 }
567 
568 static void
set_virtual_repo_base_commit_path(const char * vrepo_id,const char * base_commit_id,const char * new_path)569 set_virtual_repo_base_commit_path (const char *vrepo_id, const char *base_commit_id,
570                                    const char *new_path)
571 {
572     seaf_db_statement_query (seaf->db,
573                              "UPDATE VirtualRepo SET base_commit=?, path=? WHERE repo_id=?",
574                              3, "string", base_commit_id, "string", new_path,
575                              "string", vrepo_id);
576 }
577 
578 int
seaf_repo_manager_merge_virtual_repo(SeafRepoManager * mgr,const char * repo_id,const char * exclude_repo)579 seaf_repo_manager_merge_virtual_repo (SeafRepoManager *mgr,
580                                       const char *repo_id,
581                                       const char *exclude_repo)
582 {
583     GList *vrepos = NULL, *ptr;
584     char *vrepo_id;
585     int ret = 0;
586 
587     if (seaf_repo_manager_is_virtual_repo (mgr, repo_id)) {
588         add_merge_task (repo_id);
589         return 0;
590     }
591 
592     vrepos = seaf_repo_manager_get_virtual_repo_ids_by_origin (mgr, repo_id);
593     for (ptr = vrepos; ptr; ptr = ptr->next) {
594         vrepo_id = ptr->data;
595 
596         if (g_strcmp0 (exclude_repo, vrepo_id) == 0)
597             continue;
598 
599         add_merge_task (vrepo_id);
600     }
601 
602     string_list_free (vrepos);
603     return ret;
604 }
605 
606 /*
607  * If the missing virtual repo is renamed, update database entry;
608  * otherwise delete the virtual repo.
609  */
610 static void
handle_missing_virtual_repo(SeafRepoManager * mgr,SeafRepo * repo,SeafCommit * head,SeafVirtRepo * vinfo,char ** return_new_path)611 handle_missing_virtual_repo (SeafRepoManager *mgr,
612                              SeafRepo *repo, SeafCommit *head, SeafVirtRepo *vinfo,
613                              char **return_new_path)
614 {
615     SeafCommit *parent = NULL;
616     char *old_dir_id = NULL;
617     GList *diff_res = NULL, *ptr;
618     DiffEntry *de;
619 
620     parent = seaf_commit_manager_get_commit (seaf->commit_mgr,
621                                              head->repo_id, head->version,
622                                              head->parent_id);
623     if (!parent) {
624         seaf_warning ("Failed to find commit %s:%s.\n", head->repo_id, head->parent_id);
625         return;
626     }
627 
628     int rc = diff_commits (parent, head, &diff_res, TRUE);
629     if (rc < 0) {
630         seaf_warning ("Failed to diff commit %s to %s.\n",
631                       parent->commit_id, head->commit_id);
632         seaf_commit_unref (parent);
633         return;
634     }
635 
636     char *path = vinfo->path, *sub_path, *p, *par_path;
637     gboolean is_renamed = FALSE;
638     p = &path[strlen(path)];
639     par_path = g_strdup(path);
640     sub_path = NULL;
641 
642     while (1) {
643         GError *error = NULL;
644         old_dir_id = seaf_fs_manager_get_seafdir_id_by_path (seaf->fs_mgr,
645                                                              repo->store_id,
646                                                              repo->version,
647                                                              parent->root_id,
648                                                              par_path, &error);
649         if (!old_dir_id) {
650             if (error && error->code == SEAF_ERR_PATH_NO_EXIST) {
651                 seaf_warning ("Failed to find %s under commit %s in repo %s.\n",
652                               par_path, parent->commit_id, repo->store_id);
653                 seaf_debug ("Delete virtual repo %.10s.\n", vinfo->repo_id);
654                 seaf_repo_manager_del_virtual_repo (mgr, vinfo->repo_id);
655                 g_clear_error (&error);
656             }
657             goto out;
658         }
659 
660         char de_id[41];
661         char *new_path, *new_name;
662 
663         for (ptr = diff_res; ptr; ptr = ptr->next) {
664             de = ptr->data;
665             if (de->status == DIFF_STATUS_DIR_RENAMED) {
666                 rawdata_to_hex (de->sha1, de_id, 20);
667                 if (strcmp (de_id, old_dir_id) == 0) {
668                     if (sub_path != NULL)
669                         new_path = g_strconcat ("/", de->new_name, "/", sub_path, NULL);
670                     else
671                         new_path = g_strconcat ("/", de->new_name, NULL);
672                     seaf_debug ("Updating path of virtual repo %s to %s.\n",
673                                 vinfo->repo_id, new_path);
674                     set_virtual_repo_base_commit_path (vinfo->repo_id,
675                                                        head->commit_id, new_path);
676                     if (return_new_path)
677                         *return_new_path = g_strdup(new_path);
678                     /* 'sub_path = NUll' means the virtual dir itself has been renamed,
679                      *  we need to make a new commit for the virtual repo
680                      */
681                     if (sub_path == NULL) {
682                         new_name = g_path_get_basename(new_path);
683                         seaf_repo_manager_edit_repo (vinfo->repo_id,
684                                                      new_name,
685                                                      "Changed library name",
686                                                      NULL,
687                                                      &error);
688                         if (error) {
689                             seaf_warning ("Failed to rename repo %s", new_name);
690                             g_clear_error (&error);
691                         }
692                         g_free(new_name);
693                     }
694                     is_renamed = TRUE;
695                     g_free (new_path);
696                     break;
697                 }
698             }
699         }
700         g_free (old_dir_id);
701 
702         if (is_renamed)
703             break;
704 
705         while (--p != path && *p != '/');
706 
707         if (p == path)
708             break;
709 
710         g_free (par_path);
711         g_free (sub_path);
712         par_path = g_strndup (path, p - path);
713         sub_path = g_strdup (p + 1);
714     }
715 
716     if (!is_renamed) {
717         seaf_debug ("Delete virtual repo %.10s.\n", vinfo->repo_id);
718         seaf_repo_manager_del_virtual_repo (mgr, vinfo->repo_id);
719     }
720 
721 out:
722     g_free (par_path);
723     g_free (sub_path);
724 
725     for (ptr = diff_res; ptr; ptr = ptr->next)
726         diff_entry_free ((DiffEntry *)ptr->data);
727     g_list_free (diff_res);
728 
729     seaf_commit_unref (parent);
730 }
731 
732 void
seaf_repo_manager_cleanup_virtual_repos(SeafRepoManager * mgr,const char * origin_repo_id)733 seaf_repo_manager_cleanup_virtual_repos (SeafRepoManager *mgr,
734                                          const char *origin_repo_id)
735 {
736     SeafRepo *repo = NULL;
737     SeafCommit *head = NULL;
738     GList *vinfo_list = NULL, *ptr;
739     SeafVirtRepo *vinfo;
740     SeafDir *dir;
741     GError *error = NULL;
742 
743     repo = seaf_repo_manager_get_repo (mgr, origin_repo_id);
744     if (!repo) {
745         seaf_warning ("Failed to get repo %.10s.\n", origin_repo_id);
746         goto out;
747     }
748 
749     head = seaf_commit_manager_get_commit (seaf->commit_mgr,
750                                            repo->id,
751                                            repo->version,
752                                            repo->head->commit_id);
753     if (!head) {
754         seaf_warning ("Failed to get commit %s:%.8s.\n",
755                       repo->id, repo->head->commit_id);
756         goto out;
757     }
758 
759     vinfo_list = seaf_repo_manager_get_virtual_info_by_origin (mgr,
760                                                                origin_repo_id);
761     for (ptr = vinfo_list; ptr; ptr = ptr->next) {
762         vinfo = ptr->data;
763         dir = seaf_fs_manager_get_seafdir_by_path (seaf->fs_mgr,
764                                                    repo->store_id,
765                                                    repo->version,
766                                                    head->root_id,
767                                                    vinfo->path,
768                                                    &error);
769         if (error) {
770             if (error->code == SEAF_ERR_PATH_NO_EXIST) {
771                 handle_missing_virtual_repo (mgr, repo, head, vinfo, NULL);
772             }
773             g_clear_error (&error);
774         } else
775             seaf_dir_free (dir);
776         seaf_virtual_repo_info_free (vinfo);
777     }
778 
779 out:
780     seaf_repo_unref (repo);
781     seaf_commit_unref (head);
782     g_list_free (vinfo_list);
783 }
784 
merge_virtual_repo(void * vtask)785 static void *merge_virtual_repo (void *vtask)
786 {
787     MergeTask *task = vtask;
788     SeafRepoManager *mgr = seaf->repo_mgr;
789     char *repo_id = task->repo_id;
790     SeafVirtRepo *vinfo;
791     SeafRepo *repo = NULL, *orig_repo = NULL;
792     SeafCommit *head = NULL, *orig_head = NULL, *base = NULL;
793     char *root = NULL, *orig_root = NULL, *base_root = NULL;
794     char new_base_commit[41] = {0};
795     int ret = 0;
796 
797     /* repos */
798     repo = seaf_repo_manager_get_repo (mgr, repo_id);
799     if (!repo) {
800         seaf_warning ("Failed to get virt repo %.10s.\n", repo_id);
801         ret = -1;
802         goto out;
803     }
804 
805     vinfo = repo->virtual_info;
806 
807     orig_repo = seaf_repo_manager_get_repo (mgr, vinfo->origin_repo_id);
808     if (!orig_repo) {
809         seaf_warning ("Failed to get orig repo %.10s.\n", vinfo->origin_repo_id);
810         ret = -1;
811         goto out;
812     }
813 
814     /* commits */
815     head = seaf_commit_manager_get_commit (seaf->commit_mgr,
816                                            repo->id, repo->version,
817                                            repo->head->commit_id);
818     if (!head) {
819         seaf_warning ("Failed to get commit %s:%.8s.\n",
820                       repo->id, repo->head->commit_id);
821         ret = -1;
822         goto out;
823     }
824 
825     orig_head = seaf_commit_manager_get_commit (seaf->commit_mgr,
826                                                 orig_repo->id, orig_repo->version,
827                                                 orig_repo->head->commit_id);
828     if (!orig_head) {
829         seaf_warning ("Failed to get commit %s:%.8s.\n",
830                       orig_repo->id, orig_repo->head->commit_id);
831         ret = -1;
832         goto out;
833     }
834 
835     orig_root = seaf_fs_manager_get_seafdir_id_by_path (seaf->fs_mgr,
836                                                         orig_repo->store_id,
837                                                         orig_repo->version,
838                                                         orig_head->root_id,
839                                                         vinfo->path,
840                                                         NULL);
841     if (!orig_root) {
842         seaf_debug("Path %s not found in origin repo %.8s, delete or rename virtual repo %.8s\n",
843                     vinfo->path, vinfo->origin_repo_id, repo_id);
844 
845         char *new_path = NULL;
846         handle_missing_virtual_repo (mgr, orig_repo, orig_head, vinfo, &new_path);
847         if (new_path != NULL) {
848             orig_root = seaf_fs_manager_get_seafdir_id_by_path (seaf->fs_mgr,
849                                                         orig_repo->store_id,
850                                                         orig_repo->version,
851                                                         orig_head->root_id,
852                                                         new_path,
853                                                         NULL);
854             g_free (new_path);
855         }
856         if (!orig_root)
857             goto out;
858     }
859 
860     base = seaf_commit_manager_get_commit (seaf->commit_mgr,
861                                            orig_repo->id, orig_repo->version,
862                                            vinfo->base_commit);
863     if (!base) {
864         seaf_warning ("Failed to get commit %s:%.8s.\n",
865                       orig_repo->id, vinfo->base_commit);
866         ret = -1;
867         goto out;
868     }
869 
870     /* fs roots */
871     root = head->root_id;
872 
873     base_root = seaf_fs_manager_get_seafdir_id_by_path (seaf->fs_mgr,
874                                                         orig_repo->store_id,
875                                                         orig_repo->version,
876                                                         base->root_id,
877                                                         vinfo->path,
878                                                         NULL);
879     if (!base_root) {
880         seaf_warning ("Cannot find seafdir for repo %.10s path %s.\n",
881                       vinfo->origin_repo_id, vinfo->path);
882         ret = -1;
883         goto out;
884     }
885 
886     if (strcmp (root, orig_root) == 0) {
887         /* Nothing to merge. */
888         seaf_debug ("Nothing to merge.\n");
889     } else if (strcmp (base_root, root) == 0) {
890         /* Origin changed, virtual repo not changed. */
891         seaf_debug ("Origin changed, virtual repo not changed.\n");
892         ret = seaf_repo_manager_update_dir (mgr,
893                                             repo_id,
894                                             "/",
895                                             orig_root,
896                                             orig_head->creator_name,
897                                             head->commit_id,
898                                             NULL,
899                                             NULL);
900         if (ret < 0) {
901             seaf_warning ("Failed to update root of virtual repo %.10s.\n",
902                           repo_id);
903             goto out;
904         }
905 
906         set_virtual_repo_base_commit_path (repo->id, orig_repo->head->commit_id,
907                                            vinfo->path);
908     } else if (strcmp (base_root, orig_root) == 0) {
909         /* Origin not changed, virutal repo changed. */
910         seaf_debug ("Origin not changed, virutal repo changed.\n");
911         ret = seaf_repo_manager_update_dir (mgr,
912                                             vinfo->origin_repo_id,
913                                             vinfo->path,
914                                             root,
915                                             head->creator_name,
916                                             orig_head->commit_id,
917                                             new_base_commit,
918                                             NULL);
919         if (ret < 0) {
920             seaf_warning ("Failed to update origin repo %.10s path %s.\n",
921                           vinfo->origin_repo_id, vinfo->path);
922             goto out;
923         }
924 
925         set_virtual_repo_base_commit_path (repo->id, new_base_commit, vinfo->path);
926 
927         /* Since origin repo is updated, we have to merge it with other
928          * virtual repos if necessary. But we don't need to merge with
929          * the current virtual repo again.
930          */
931         seaf_repo_manager_cleanup_virtual_repos (mgr, vinfo->origin_repo_id);
932         seaf_repo_manager_merge_virtual_repo (mgr,
933                                               vinfo->origin_repo_id,
934                                               repo_id);
935     } else {
936         /* Both origin and virtual repo are changed. */
937         seaf_debug ("Both origin and virtual repo are changed.\n");
938         MergeOptions opt;
939         const char *roots[3];
940 
941         memset (&opt, 0, sizeof(opt));
942         opt.n_ways = 3;
943         memcpy (opt.remote_repo_id, repo_id, 36);
944         memcpy (opt.remote_head, head->commit_id, 40);
945         opt.do_merge = TRUE;
946 
947         roots[0] = base_root; /* base */
948         roots[1] = orig_root; /* head */
949         roots[2] = root;  /* remote */
950 
951         /* Merge virtual into origin */
952         if (seaf_merge_trees (orig_repo->store_id, orig_repo->version,
953                               3, roots, &opt) < 0) {
954             seaf_warning ("Failed to merge virtual repo %.10s.\n", repo_id);
955             ret = -1;
956             goto out;
957         }
958 
959         seaf_debug ("Number of dirs visted in merge: %d.\n", opt.visit_dirs);
960 
961         /* Update virtual repo root. */
962         ret = seaf_repo_manager_update_dir (mgr,
963                                             repo_id,
964                                             "/",
965                                             opt.merged_tree_root,
966                                             orig_head->creator_name,
967                                             head->commit_id,
968                                             NULL,
969                                             NULL);
970         if (ret < 0) {
971             seaf_warning ("Failed to update root of virtual repo %.10s.\n",
972                           repo_id);
973             goto out;
974         }
975 
976         /* Update origin repo path. */
977         ret = seaf_repo_manager_update_dir (mgr,
978                                             vinfo->origin_repo_id,
979                                             vinfo->path,
980                                             opt.merged_tree_root,
981                                             head->creator_name,
982                                             orig_head->commit_id,
983                                             new_base_commit,
984                                             NULL);
985         if (ret < 0) {
986             seaf_warning ("Failed to update origin repo %.10s path %s.\n",
987                           vinfo->origin_repo_id, vinfo->path);
988             goto out;
989         }
990 
991         set_virtual_repo_base_commit_path (repo->id, new_base_commit, vinfo->path);
992 
993         seaf_repo_manager_cleanup_virtual_repos (mgr, vinfo->origin_repo_id);
994         seaf_repo_manager_merge_virtual_repo (mgr,
995                                               vinfo->origin_repo_id,
996                                               repo_id);
997     }
998 
999 out:
1000     seaf_repo_unref (repo);
1001     seaf_repo_unref (orig_repo);
1002     seaf_commit_unref (head);
1003     seaf_commit_unref (orig_head);
1004     seaf_commit_unref (base);
1005     g_free (base_root);
1006     g_free (orig_root);
1007     return vtask;
1008 }
1009 
merge_virtual_repo_done(void * vtask)1010 static void merge_virtual_repo_done (void *vtask)
1011 {
1012     MergeTask *task = vtask;
1013 
1014     seaf_debug ("Task %.8s done.\n", task->repo_id);
1015 
1016     g_hash_table_remove (scheduler->running, task->repo_id);
1017 }
1018 
1019 static int
schedule_merge_tasks(void * vscheduler)1020 schedule_merge_tasks (void *vscheduler)
1021 {
1022     MergeScheduler *scheduler = vscheduler;
1023     int n_running = g_hash_table_size (scheduler->running);
1024     MergeTask *task;
1025 
1026     /* seaf_debug ("Waiting tasks %d, running tasks %d.\n", */
1027     /*             g_queue_get_length (scheduler->queue), n_running); */
1028 
1029     if (n_running >= MAX_RUNNING_TASKS)
1030         return TRUE;
1031 
1032     pthread_mutex_lock (&scheduler->q_lock);
1033 
1034     while (n_running < MAX_RUNNING_TASKS) {
1035         task = g_queue_pop_head (scheduler->queue);
1036         if (!task)
1037             break;
1038 
1039         if (!g_hash_table_lookup (scheduler->running, task->repo_id)) {
1040             int ret = ccnet_job_manager_schedule_job (scheduler->tpool,
1041                                                       merge_virtual_repo,
1042                                                       merge_virtual_repo_done,
1043                                                       task);
1044             if (ret < 0) {
1045                 g_queue_push_tail (scheduler->queue, task);
1046                 break;
1047             }
1048 
1049             g_hash_table_insert (scheduler->running,
1050                                  g_strdup(task->repo_id),
1051                                  task);
1052             n_running++;
1053 
1054             seaf_debug ("Run task for repo %.8s.\n", task->repo_id);
1055         } else {
1056             seaf_debug ("A task for repo %.8s is already running.\n", task->repo_id);
1057 
1058             g_queue_push_tail (scheduler->queue, task);
1059             break;
1060         }
1061     }
1062 
1063     pthread_mutex_unlock (&scheduler->q_lock);
1064 
1065     return TRUE;
1066 }
1067 
task_cmp(gconstpointer a,gconstpointer b)1068 static gint task_cmp (gconstpointer a, gconstpointer b)
1069 {
1070     const MergeTask *task_a = a;
1071     const MergeTask *task_b = b;
1072 
1073     return strcmp (task_a->repo_id, task_b->repo_id);
1074 }
1075 
1076 static void
add_merge_task(const char * repo_id)1077 add_merge_task (const char *repo_id)
1078 {
1079     MergeTask *task = g_new0 (MergeTask, 1);
1080 
1081     seaf_debug ("Add merge task for repo %.8s.\n", repo_id);
1082 
1083     memcpy (task->repo_id, repo_id, 36);
1084 
1085     pthread_mutex_lock (&scheduler->q_lock);
1086 
1087     if (g_queue_find_custom (scheduler->queue, task, task_cmp) != NULL) {
1088         seaf_debug ("Task for repo %.8s is already queued.\n", repo_id);
1089         g_free (task);
1090     } else
1091         g_queue_push_tail (scheduler->queue, task);
1092 
1093     pthread_mutex_unlock (&scheduler->q_lock);
1094 }
1095 
1096 int
seaf_repo_manager_init_merge_scheduler()1097 seaf_repo_manager_init_merge_scheduler ()
1098 {
1099     scheduler = g_new0 (MergeScheduler, 1);
1100     if (!scheduler)
1101         return -1;
1102 
1103     pthread_mutex_init (&scheduler->q_lock, NULL);
1104 
1105     scheduler->queue = g_queue_new ();
1106     scheduler->running = g_hash_table_new_full (g_str_hash, g_str_equal,
1107                                                 g_free, g_free);
1108 
1109     scheduler->tpool = ccnet_job_manager_new (MAX_RUNNING_TASKS);
1110     scheduler->timer = ccnet_timer_new (schedule_merge_tasks,
1111                                         scheduler,
1112                                         SCHEDULE_INTERVAL);
1113     return 0;
1114 }
1115