1 #include "common.h"
2 #include "log.h"
3 
4 #include <pthread.h>
5 
6 #include "seafile-session.h"
7 #include "seafile-object.h"
8 #include "seafile-error.h"
9 
10 #include "copy-mgr.h"
11 
12 #include "utils.h"
13 
14 #include "log.h"
15 
16 #define DEFAULT_MAX_THREADS 5
17 
18 struct _SeafCopyManagerPriv {
19     GHashTable *copy_tasks;
20     pthread_mutex_t lock;
21     CcnetJobManager *job_mgr;
22 };
23 
24 static void
copy_task_free(CopyTask * task)25 copy_task_free (CopyTask *task)
26 {
27     if (!task) return;
28 
29     g_free (task->failed_reason);
30     g_free (task);
31 }
32 
33 SeafCopyManager *
seaf_copy_manager_new(struct _SeafileSession * session)34 seaf_copy_manager_new (struct _SeafileSession *session)
35 {
36     SeafCopyManager *mgr = g_new0 (SeafCopyManager, 1);
37 
38     mgr->session = session;
39     mgr->priv = g_new0 (struct _SeafCopyManagerPriv, 1);
40     mgr->priv->copy_tasks = g_hash_table_new_full (g_str_hash, g_str_equal,
41                                                    g_free,
42                                                    (GDestroyNotify)copy_task_free);
43     pthread_mutex_init (&mgr->priv->lock, NULL);
44 
45     mgr->max_files = g_key_file_get_int64 (session->config,
46                                            "web_copy", "max_files", NULL);
47     mgr->max_size = g_key_file_get_int64 (session->config,
48                                           "web_copy", "max_size", NULL);
49     /* size is given in MB */
50     mgr->max_size <<= 20;
51 
52     return mgr;
53 }
54 
55 int
seaf_copy_manager_start(SeafCopyManager * mgr)56 seaf_copy_manager_start (SeafCopyManager *mgr)
57 {
58     mgr->priv->job_mgr = ccnet_job_manager_new (DEFAULT_MAX_THREADS);
59 
60     return 1;
61 }
62 
63 SeafileCopyTask *
seaf_copy_manager_get_task(SeafCopyManager * mgr,const char * task_id)64 seaf_copy_manager_get_task (SeafCopyManager *mgr,
65                             const char *task_id)
66 {
67     SeafCopyManagerPriv *priv = mgr->priv;
68     CopyTask *task;
69     SeafileCopyTask *t = NULL;
70 
71     pthread_mutex_lock (&priv->lock);
72 
73     task = g_hash_table_lookup (priv->copy_tasks, task_id);
74 
75     if (task) {
76         t = seafile_copy_task_new ();
77         g_object_set (t, "done", task->done, "total", task->total,
78                       "canceled", task->canceled, "failed", task->failed,
79                       "failed_reason", task->failed_reason, "successful", task->successful,
80                       NULL);
81         if (task->canceled || task->failed || task->successful)
82             g_hash_table_remove(priv->copy_tasks, task_id);
83     }
84 
85     pthread_mutex_unlock (&priv->lock);
86 
87     return t;
88 }
89 
90 struct CopyThreadData {
91     SeafCopyManager *mgr;
92     char src_repo_id[37];
93     char *src_path;
94     char *src_filename;
95     char dst_repo_id[37];
96     char *dst_path;
97     char *dst_filename;
98     int replace;
99     char *modifier;
100     CopyTask *task;
101     CopyTaskFunc func;
102 };
103 typedef struct CopyThreadData CopyThreadData;
104 
105 static void *
copy_thread(void * vdata)106 copy_thread (void *vdata)
107 {
108     CopyThreadData *data = vdata;
109 
110     data->func (data->src_repo_id, data->src_path, data->src_filename,
111                 data->dst_repo_id, data->dst_path, data->dst_filename,
112                 data->replace, data->modifier, data->task);
113 
114     return vdata;
115 }
116 
117 static void
copy_done(void * vdata)118 copy_done (void *vdata)
119 {
120     CopyThreadData *data = vdata;
121 
122     g_free (data->src_path);
123     g_free (data->src_filename);
124     g_free (data->dst_path);
125     g_free (data->dst_filename);
126     g_free (data->modifier);
127     g_free (data);
128 }
129 
130 char *
seaf_copy_manager_add_task(SeafCopyManager * mgr,const char * src_repo_id,const char * src_path,const char * src_filename,const char * dst_repo_id,const char * dst_path,const char * dst_filename,int replace,const char * modifier,CopyTaskFunc function,gboolean need_progress)131 seaf_copy_manager_add_task (SeafCopyManager *mgr,
132                             const char *src_repo_id,
133                             const char *src_path,
134                             const char *src_filename,
135                             const char *dst_repo_id,
136                             const char *dst_path,
137                             const char *dst_filename,
138                             int replace,
139                             const char *modifier,
140                             CopyTaskFunc function,
141                             gboolean need_progress)
142 {
143     SeafCopyManagerPriv *priv = mgr->priv;
144     char *task_id = NULL;
145     CopyTask *task = NULL;
146     struct CopyThreadData *data;
147 
148     if (need_progress) {
149         task_id = gen_uuid();
150         task = g_new0 (CopyTask, 1);
151         memcpy (task->task_id, task_id, 36);
152 
153         pthread_mutex_lock (&priv->lock);
154         g_hash_table_insert (priv->copy_tasks, g_strdup(task_id), task);
155         pthread_mutex_unlock (&priv->lock);
156     }
157 
158     data = g_new0 (CopyThreadData, 1);
159     data->mgr = mgr;
160     memcpy (data->src_repo_id, src_repo_id, 36);
161     data->src_path = g_strdup(src_path);
162     data->src_filename = g_strdup(src_filename);
163     memcpy (data->dst_repo_id, dst_repo_id, 36);
164     data->dst_path = g_strdup(dst_path);
165     data->dst_filename = g_strdup(dst_filename);
166     data->replace = replace;
167     data->modifier = g_strdup(modifier);
168     data->task = task;
169     data->func = function;
170 
171     ccnet_job_manager_schedule_job (mgr->priv->job_mgr,
172                                     copy_thread,
173                                     copy_done,
174                                     data);
175     return task_id;
176 }
177 
178 int
seaf_copy_manager_cancel_task(SeafCopyManager * mgr,const char * task_id)179 seaf_copy_manager_cancel_task (SeafCopyManager *mgr, const char *task_id)
180 {
181     SeafCopyManagerPriv *priv = mgr->priv;
182     CopyTask *task;
183 
184     pthread_mutex_lock (&priv->lock);
185 
186     task = g_hash_table_lookup (priv->copy_tasks, task_id);
187 
188     pthread_mutex_unlock (&priv->lock);
189 
190     if (task) {
191         if (task->canceled || task->failed || task->successful)
192             return -1;
193         g_atomic_int_set (&task->canceled, 1);
194     }
195 
196     return 0;
197 }
198