1 #include "common.h"
2 #include "log.h"
3
4 #include <pthread.h>
5
6 #include "seafile-session.h"
7 #include "seafile-object.h"
8 #include "seafile-error.h"
9
10 #include "copy-mgr.h"
11
12 #include "utils.h"
13
14 #include "log.h"
15
16 #define DEFAULT_MAX_THREADS 5
17
18 struct _SeafCopyManagerPriv {
19 GHashTable *copy_tasks;
20 pthread_mutex_t lock;
21 CcnetJobManager *job_mgr;
22 };
23
24 static void
copy_task_free(CopyTask * task)25 copy_task_free (CopyTask *task)
26 {
27 if (!task) return;
28
29 g_free (task->failed_reason);
30 g_free (task);
31 }
32
33 SeafCopyManager *
seaf_copy_manager_new(struct _SeafileSession * session)34 seaf_copy_manager_new (struct _SeafileSession *session)
35 {
36 SeafCopyManager *mgr = g_new0 (SeafCopyManager, 1);
37
38 mgr->session = session;
39 mgr->priv = g_new0 (struct _SeafCopyManagerPriv, 1);
40 mgr->priv->copy_tasks = g_hash_table_new_full (g_str_hash, g_str_equal,
41 g_free,
42 (GDestroyNotify)copy_task_free);
43 pthread_mutex_init (&mgr->priv->lock, NULL);
44
45 mgr->max_files = g_key_file_get_int64 (session->config,
46 "web_copy", "max_files", NULL);
47 mgr->max_size = g_key_file_get_int64 (session->config,
48 "web_copy", "max_size", NULL);
49 /* size is given in MB */
50 mgr->max_size <<= 20;
51
52 return mgr;
53 }
54
55 int
seaf_copy_manager_start(SeafCopyManager * mgr)56 seaf_copy_manager_start (SeafCopyManager *mgr)
57 {
58 mgr->priv->job_mgr = ccnet_job_manager_new (DEFAULT_MAX_THREADS);
59
60 return 1;
61 }
62
63 SeafileCopyTask *
seaf_copy_manager_get_task(SeafCopyManager * mgr,const char * task_id)64 seaf_copy_manager_get_task (SeafCopyManager *mgr,
65 const char *task_id)
66 {
67 SeafCopyManagerPriv *priv = mgr->priv;
68 CopyTask *task;
69 SeafileCopyTask *t = NULL;
70
71 pthread_mutex_lock (&priv->lock);
72
73 task = g_hash_table_lookup (priv->copy_tasks, task_id);
74
75 if (task) {
76 t = seafile_copy_task_new ();
77 g_object_set (t, "done", task->done, "total", task->total,
78 "canceled", task->canceled, "failed", task->failed,
79 "failed_reason", task->failed_reason, "successful", task->successful,
80 NULL);
81 if (task->canceled || task->failed || task->successful)
82 g_hash_table_remove(priv->copy_tasks, task_id);
83 }
84
85 pthread_mutex_unlock (&priv->lock);
86
87 return t;
88 }
89
90 struct CopyThreadData {
91 SeafCopyManager *mgr;
92 char src_repo_id[37];
93 char *src_path;
94 char *src_filename;
95 char dst_repo_id[37];
96 char *dst_path;
97 char *dst_filename;
98 int replace;
99 char *modifier;
100 CopyTask *task;
101 CopyTaskFunc func;
102 };
103 typedef struct CopyThreadData CopyThreadData;
104
105 static void *
copy_thread(void * vdata)106 copy_thread (void *vdata)
107 {
108 CopyThreadData *data = vdata;
109
110 data->func (data->src_repo_id, data->src_path, data->src_filename,
111 data->dst_repo_id, data->dst_path, data->dst_filename,
112 data->replace, data->modifier, data->task);
113
114 return vdata;
115 }
116
117 static void
copy_done(void * vdata)118 copy_done (void *vdata)
119 {
120 CopyThreadData *data = vdata;
121
122 g_free (data->src_path);
123 g_free (data->src_filename);
124 g_free (data->dst_path);
125 g_free (data->dst_filename);
126 g_free (data->modifier);
127 g_free (data);
128 }
129
130 char *
seaf_copy_manager_add_task(SeafCopyManager * mgr,const char * src_repo_id,const char * src_path,const char * src_filename,const char * dst_repo_id,const char * dst_path,const char * dst_filename,int replace,const char * modifier,CopyTaskFunc function,gboolean need_progress)131 seaf_copy_manager_add_task (SeafCopyManager *mgr,
132 const char *src_repo_id,
133 const char *src_path,
134 const char *src_filename,
135 const char *dst_repo_id,
136 const char *dst_path,
137 const char *dst_filename,
138 int replace,
139 const char *modifier,
140 CopyTaskFunc function,
141 gboolean need_progress)
142 {
143 SeafCopyManagerPriv *priv = mgr->priv;
144 char *task_id = NULL;
145 CopyTask *task = NULL;
146 struct CopyThreadData *data;
147
148 if (need_progress) {
149 task_id = gen_uuid();
150 task = g_new0 (CopyTask, 1);
151 memcpy (task->task_id, task_id, 36);
152
153 pthread_mutex_lock (&priv->lock);
154 g_hash_table_insert (priv->copy_tasks, g_strdup(task_id), task);
155 pthread_mutex_unlock (&priv->lock);
156 }
157
158 data = g_new0 (CopyThreadData, 1);
159 data->mgr = mgr;
160 memcpy (data->src_repo_id, src_repo_id, 36);
161 data->src_path = g_strdup(src_path);
162 data->src_filename = g_strdup(src_filename);
163 memcpy (data->dst_repo_id, dst_repo_id, 36);
164 data->dst_path = g_strdup(dst_path);
165 data->dst_filename = g_strdup(dst_filename);
166 data->replace = replace;
167 data->modifier = g_strdup(modifier);
168 data->task = task;
169 data->func = function;
170
171 ccnet_job_manager_schedule_job (mgr->priv->job_mgr,
172 copy_thread,
173 copy_done,
174 data);
175 return task_id;
176 }
177
178 int
seaf_copy_manager_cancel_task(SeafCopyManager * mgr,const char * task_id)179 seaf_copy_manager_cancel_task (SeafCopyManager *mgr, const char *task_id)
180 {
181 SeafCopyManagerPriv *priv = mgr->priv;
182 CopyTask *task;
183
184 pthread_mutex_lock (&priv->lock);
185
186 task = g_hash_table_lookup (priv->copy_tasks, task_id);
187
188 pthread_mutex_unlock (&priv->lock);
189
190 if (task) {
191 if (task->canceled || task->failed || task->successful)
192 return -1;
193 g_atomic_int_set (&task->canceled, 1);
194 }
195
196 return 0;
197 }
198