1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 
3 #include "common.h"
4 
5 #include "net.h"
6 
7 #include <pthread.h>
8 #include <curl/curl.h>
9 #include <jansson.h>
10 #include <event2/buffer.h>
11 
12 #ifdef WIN32
13 #include <windows.h>
14 #include <wincrypt.h>
15 #endif
16 
17 #ifndef USE_GPL_CRYPTO
18 #include <openssl/pem.h>
19 #include <openssl/x509.h>
20 #include <openssl/bio.h>
21 #include <openssl/asn1.h>
22 #include <openssl/ssl.h>
23 #endif
24 
25 #include "seafile-config.h"
26 
27 #include "seafile-session.h"
28 #include "http-tx-mgr.h"
29 
30 #include "seafile-error-impl.h"
31 #include "utils.h"
32 #include "diff-simple.h"
33 
34 #define DEBUG_FLAG SEAFILE_DEBUG_TRANSFER
35 #include "log.h"
36 
37 #include "timer.h"
38 
39 #define HTTP_OK 200
40 #define HTTP_BAD_REQUEST 400
41 #define HTTP_FORBIDDEN 403
42 #define HTTP_NOT_FOUND 404
43 #define HTTP_NO_QUOTA 443
44 #define HTTP_REPO_DELETED 444
45 #define HTTP_REPO_CORRUPTED 445
46 #define HTTP_INTERNAL_SERVER_ERROR 500
47 
48 #define RESET_BYTES_INTERVAL_MSEC 1000
49 
50 #define CLEAR_POOL_ERR_CNT 3
51 
52 #ifndef SEAFILE_CLIENT_VERSION
53 #define SEAFILE_CLIENT_VERSION PACKAGE_VERSION
54 #endif
55 
56 #ifdef WIN32
57 #define USER_AGENT_OS "Windows NT"
58 #endif
59 
60 #ifdef __APPLE__
61 #define USER_AGENT_OS "Apple OS X"
62 #endif
63 
64 #ifdef __linux__
65 #define USER_AGENT_OS "Linux"
66 #endif
67 
68 #if defined __FreeBSD__ || defined __NetBSD__ || defined __OpenBSD__ || defined __DragonFly__
69 #define USER_AGENT_OS "BSD"
70 #endif
71 
72 #ifdef __FreeBSD__
73 #define USER_AGENT_OS "FreeBSD"
74 #endif
75 
76 #ifdef __DragonFly__
77 #define USER_AGENT_OS "DragonFly"
78 #endif
79 
80 #ifdef __NetBSD__
81 #define USER_AGENT_OS "NetBSD"
82 #endif
83 
84 #ifdef __OpenBSD__
85 #define USER_AGENT_OS "OpenBSD"
86 #endif
87 
88 struct _Connection {
89     CURL *curl;
90     gint64 ctime;               /* Used to clean up unused connection. */
91     gboolean release;           /* If TRUE, the connection will be released. */
92 };
93 typedef struct _Connection Connection;
94 
95 struct _ConnectionPool {
96     char *host;
97     GQueue *queue;
98     pthread_mutex_t lock;
99     int err_cnt;
100 };
101 typedef struct _ConnectionPool ConnectionPool;
102 
103 struct _HttpTxPriv {
104     GHashTable *download_tasks;
105     GHashTable *upload_tasks;
106 
107     GHashTable *connection_pools; /* host -> connection pool */
108     pthread_mutex_t pools_lock;
109 
110     SeafTimer *reset_bytes_timer;
111 
112     char *ca_bundle_path;
113 
114     /* Regex to parse error message returned by update-branch. */
115     GRegex *locked_error_regex;
116     GRegex *folder_perm_error_regex;
117 };
118 typedef struct _HttpTxPriv HttpTxPriv;
119 
120 /* Http Tx Task */
121 
122 static HttpTxTask *
http_tx_task_new(HttpTxManager * mgr,const char * repo_id,int repo_version,int type,gboolean is_clone,const char * host,const char * token,const char * passwd,const char * worktree)123 http_tx_task_new (HttpTxManager *mgr,
124                   const char *repo_id,
125                   int repo_version,
126                   int type,
127                   gboolean is_clone,
128                   const char *host,
129                   const char *token,
130                   const char *passwd,
131                   const char *worktree)
132 {
133     HttpTxTask *task = g_new0 (HttpTxTask, 1);
134 
135     task->manager = mgr;
136     memcpy (task->repo_id, repo_id, 36);
137     task->repo_version = repo_version;
138     task->type = type;
139     task->is_clone = is_clone;
140 
141     task->host = g_strdup(host);
142     task->token = g_strdup(token);
143 
144     if (passwd)
145         task->passwd = g_strdup(passwd);
146     if (worktree)
147         task->worktree = g_strdup(worktree);
148 
149     task->error = SYNC_ERROR_ID_NO_ERROR;
150 
151     return task;
152 }
153 
154 static void
http_tx_task_free(HttpTxTask * task)155 http_tx_task_free (HttpTxTask *task)
156 {
157     g_free (task->host);
158     g_free (task->token);
159     g_free (task->passwd);
160     g_free (task->worktree);
161     g_free (task->email);
162     g_free (task->repo_name);
163     if (task->type == HTTP_TASK_TYPE_DOWNLOAD) {
164         g_hash_table_destroy (task->blk_ref_cnts);
165     }
166     g_free (task);
167 }
168 
169 static const char *http_task_state_str[] = {
170     "normal",
171     "canceled",
172     "finished",
173     "error",
174 };
175 
176 static const char *http_task_rt_state_str[] = {
177     "init",
178     "check",
179     "commit",
180     "fs",
181     "data",
182     "update-branch",
183     "finished",
184 };
185 
186 /* Http connection and connection pool. */
187 
188 static Connection *
connection_new()189 connection_new ()
190 {
191     Connection *conn = g_new0 (Connection, 1);
192 
193     conn->curl = curl_easy_init();
194     conn->ctime = (gint64)time(NULL);
195 
196     return conn;
197 }
198 
199 static void
connection_free(Connection * conn)200 connection_free (Connection *conn)
201 {
202     curl_easy_cleanup (conn->curl);
203     g_free (conn);
204 }
205 
206 static ConnectionPool *
connection_pool_new(const char * host)207 connection_pool_new (const char *host)
208 {
209     ConnectionPool *pool = g_new0 (ConnectionPool, 1);
210     pool->host = g_strdup(host);
211     pool->queue = g_queue_new ();
212     pthread_mutex_init (&pool->lock, NULL);
213     return pool;
214 }
215 
216 static ConnectionPool *
find_connection_pool(HttpTxPriv * priv,const char * host)217 find_connection_pool (HttpTxPriv *priv, const char *host)
218 {
219     ConnectionPool *pool;
220 
221     pthread_mutex_lock (&priv->pools_lock);
222     pool = g_hash_table_lookup (priv->connection_pools, host);
223     if (!pool) {
224         pool = connection_pool_new (host);
225         g_hash_table_insert (priv->connection_pools, g_strdup(host), pool);
226     }
227     pthread_mutex_unlock (&priv->pools_lock);
228 
229     return pool;
230 }
231 
232 static Connection *
connection_pool_get_connection(ConnectionPool * pool)233 connection_pool_get_connection (ConnectionPool *pool)
234 {
235     Connection *conn = NULL;
236 
237     pthread_mutex_lock (&pool->lock);
238     conn = g_queue_pop_head (pool->queue);
239     if (!conn) {
240         conn = connection_new ();
241     }
242     pthread_mutex_unlock (&pool->lock);
243 
244     return conn;
245 }
246 
247 static void
connection_pool_clear(ConnectionPool * pool)248 connection_pool_clear (ConnectionPool *pool)
249 {
250     Connection *conn = NULL;
251 
252     while (1) {
253         conn = g_queue_pop_head (pool->queue);
254         if (!conn)
255             break;
256         connection_free (conn);
257     }
258 }
259 
260 static void
connection_pool_return_connection(ConnectionPool * pool,Connection * conn)261 connection_pool_return_connection (ConnectionPool *pool, Connection *conn)
262 {
263     if (!conn)
264         return;
265 
266     if (conn->release) {
267         connection_free (conn);
268 
269         pthread_mutex_lock (&pool->lock);
270         if (++pool->err_cnt >= CLEAR_POOL_ERR_CNT) {
271             connection_pool_clear (pool);
272         }
273         pthread_mutex_unlock (&pool->lock);
274 
275         return;
276     }
277 
278     curl_easy_reset (conn->curl);
279 
280     /* Reset error count when one connection succeeded. */
281     pthread_mutex_lock (&pool->lock);
282     pool->err_cnt = 0;
283     g_queue_push_tail (pool->queue, conn);
284     pthread_mutex_unlock (&pool->lock);
285 }
286 
287 #define LOCKED_ERROR_PATTERN "File (.+) is locked"
288 #define FOLDER_PERM_ERROR_PATTERN "Update to path (.+) is not allowed by folder permission settings"
289 
290 HttpTxManager *
http_tx_manager_new(struct _SeafileSession * seaf)291 http_tx_manager_new (struct _SeafileSession *seaf)
292 {
293     HttpTxManager *mgr = g_new0 (HttpTxManager, 1);
294     HttpTxPriv *priv = g_new0 (HttpTxPriv, 1);
295 
296     mgr->seaf = seaf;
297 
298     priv->download_tasks = g_hash_table_new_full (g_str_hash, g_str_equal,
299                                                   g_free,
300                                                   (GDestroyNotify)http_tx_task_free);
301     priv->upload_tasks = g_hash_table_new_full (g_str_hash, g_str_equal,
302                                                 g_free,
303                                                 (GDestroyNotify)http_tx_task_free);
304 
305     priv->connection_pools = g_hash_table_new (g_str_hash, g_str_equal);
306     pthread_mutex_init (&priv->pools_lock, NULL);
307 
308     priv->ca_bundle_path = g_build_filename (seaf->seaf_dir, "ca-bundle.pem", NULL);
309 
310     GError *error = NULL;
311     priv->locked_error_regex = g_regex_new (LOCKED_ERROR_PATTERN, 0, 0, &error);
312     if (error) {
313         seaf_warning ("Failed to create regex '%s': %s\n", LOCKED_ERROR_PATTERN, error->message);
314         g_clear_error (&error);
315     }
316 
317     priv->folder_perm_error_regex = g_regex_new (FOLDER_PERM_ERROR_PATTERN, 0, 0, &error);
318     if (error) {
319         seaf_warning ("Failed to create regex '%s': %s\n", FOLDER_PERM_ERROR_PATTERN, error->message);
320         g_clear_error (&error);
321     }
322 
323     mgr->priv = priv;
324 
325     return mgr;
326 }
327 
328 static int
reset_bytes(void * vdata)329 reset_bytes (void *vdata)
330 {
331     HttpTxManager *mgr = vdata;
332     HttpTxPriv *priv = mgr->priv;
333     GHashTableIter iter;
334     gpointer key, value;
335     HttpTxTask *task;
336 
337     g_hash_table_iter_init (&iter, priv->download_tasks);
338     while (g_hash_table_iter_next (&iter, &key, &value)) {
339         task = value;
340         task->last_tx_bytes = g_atomic_int_get (&task->tx_bytes);
341         g_atomic_int_set (&task->tx_bytes, 0);
342     }
343 
344     g_hash_table_iter_init (&iter, priv->upload_tasks);
345     while (g_hash_table_iter_next (&iter, &key, &value)) {
346         task = value;
347         task->last_tx_bytes = g_atomic_int_get (&task->tx_bytes);
348         g_atomic_int_set (&task->tx_bytes, 0);
349     }
350 
351     return 1;
352 }
353 
354 int
http_tx_manager_start(HttpTxManager * mgr)355 http_tx_manager_start (HttpTxManager *mgr)
356 {
357 #ifdef WIN32
358     /* Remove existing ca-bundle file on start. */
359     g_unlink (mgr->priv->ca_bundle_path);
360 #endif
361 
362     /* TODO: add a timer to clean up unused Http connections. */
363 
364     mgr->priv->reset_bytes_timer = seaf_timer_new (reset_bytes,
365                                                    mgr,
366                                                    RESET_BYTES_INTERVAL_MSEC);
367 
368     return 0;
369 }
370 
371 /* Common Utility Functions. */
372 
373 #ifndef USE_GPL_CRYPTO
374 
375 #ifdef WIN32
376 
377 /* static void */
378 /* write_cert_name_to_pem_file (FILE *f, PCCERT_CONTEXT pc) */
379 /* { */
380 /*     char *name; */
381 /*     DWORD size; */
382 
383 /*     fprintf (f, "\n"); */
384 
385 /*     if (!CertGetCertificateContextProperty(pc, */
386 /*                                            CERT_FRIENDLY_NAME_PROP_ID, */
387 /*                                            NULL, &size)) { */
388 /*         return; */
389 /*     } */
390 
391 /*     name = g_malloc ((gsize)size); */
392 /*     if (!name) { */
393 /*         seaf_warning ("Failed to alloc memory\n"); */
394 /*         return; */
395 /*     } */
396 
397 /*     if (!CertGetCertificateContextProperty(pc, */
398 /*                                            CERT_FRIENDLY_NAME_PROP_ID, */
399 /*                                            name, &size)) { */
400 /*         g_free (name); */
401 /*         return; */
402 /*     } */
403 
404 /*     if (fwrite(name, (size_t)size, 1, f) != 1) { */
405 /*         seaf_warning ("Failed to write pem file.\n"); */
406 /*         g_free (name); */
407 /*         return; */
408 /*     } */
409 /*     fprintf (f, "\n"); */
410 
411 /*     g_free (name); */
412 /* } */
413 
414 static void
write_cert_to_pem_file(FILE * f,PCCERT_CONTEXT pc)415 write_cert_to_pem_file (FILE *f, PCCERT_CONTEXT pc)
416 {
417     const unsigned char *der = pc->pbCertEncoded;
418     X509 *cert;
419 
420     /* write_cert_name_to_pem_file (f, pc); */
421 
422     cert = d2i_X509 (NULL, &der, (int)pc->cbCertEncoded);
423     if (!cert) {
424         seaf_warning ("Failed to parse certificate from DER.\n");
425         return;
426     }
427 
428     /* Don't add expired certs to pem file, otherwise openssl will
429      * complain certificate expired.
430      */
431     if (X509_cmp_current_time (X509_get_notAfter(cert)) < 0)
432         return;
433 
434     if (!PEM_write_X509 (f, cert)) {
435         seaf_warning ("Failed to write certificate.\n");
436         X509_free (cert);
437         return;
438     }
439 
440     X509_free (cert);
441 }
442 
443 static int
load_ca_from_store(FILE * f,const wchar_t * store_name)444 load_ca_from_store (FILE *f, const wchar_t *store_name)
445 {
446     HCERTSTORE store;
447 
448     store = CertOpenSystemStoreW (0, store_name);
449     if (!store) {
450         seaf_warning ("Failed to open system cert store: %lu\n", GetLastError());
451         return -1;
452     }
453 
454     PCCERT_CONTEXT pc = NULL;
455     while (1) {
456         pc = CertFindCertificateInStore (store, X509_ASN_ENCODING, 0, CERT_FIND_ANY, NULL, pc);
457         if (!pc)
458             break;
459         write_cert_to_pem_file (f, pc);
460     }
461 
462     CertCloseStore(store, 0);
463 
464     return 0;
465 }
466 
467 static int
create_ca_bundle(const char * ca_bundle_path)468 create_ca_bundle (const char *ca_bundle_path)
469 {
470     FILE *f;
471     int ret = 0;
472 
473     f = g_fopen (ca_bundle_path, "w+b");
474     if (!f) {
475         seaf_warning ("Failed to open cabundle file %s: %s\n",
476                       ca_bundle_path, strerror(errno));
477         return -1;
478     }
479 
480     if (load_ca_from_store (f, L"ROOT") < 0) {
481         seaf_warning ("Failed to load ca from ROOT store.\n");
482         ret = -1;
483         goto out;
484     }
485 
486     if (load_ca_from_store (f, L"CA") < 0) {
487         seaf_warning ("Failed to load ca from CA store.\n");
488         ret = -1;
489         goto out;
490     }
491 
492 out:
493     fclose (f);
494     return ret;
495 }
496 
497 #endif	/* WIN32 */
498 
499 #ifndef __linux__
500 static void
load_ca_bundle(CURL * curl)501 load_ca_bundle (CURL *curl)
502 {
503     char *ca_bundle_path = seaf->http_tx_mgr->priv->ca_bundle_path;
504 
505     /* On MacOS the certs are loaded by seafile applet instead of seaf-daemon  */
506     if (!seaf_util_exists (ca_bundle_path)) {
507 #ifdef WIN32
508         if (create_ca_bundle (ca_bundle_path) < 0)
509             return;
510 #else
511         return;
512 #endif
513     }
514 
515     curl_easy_setopt (curl, CURLOPT_CAINFO, ca_bundle_path);
516 }
517 #endif  /* __linux__ */
518 
519 static gboolean
load_certs(sqlite3_stmt * stmt,void * vdata)520 load_certs (sqlite3_stmt *stmt, void *vdata)
521 {
522     X509_STORE *store = vdata;
523     X509 *saved = NULL;
524     const char *pem_b64;
525     char *pem = NULL;
526     BIO *b = NULL;
527     gboolean ret = TRUE;
528 
529     pem_b64 = (const char *)sqlite3_column_text (stmt, 0);
530 
531     gsize len;
532     pem = (char *)g_base64_decode (pem_b64, &len);
533     if (!pem) {
534         seaf_warning ("Failed to decode base64.\n");
535         goto out;
536     }
537 
538     b = BIO_new (BIO_s_mem());
539     if (!b) {
540         seaf_warning ("Failed to alloc BIO\n");
541         goto out;
542     }
543 
544     if (BIO_write (b, pem, len) != len) {
545         seaf_warning ("Failed to write pem to BIO\n");
546         goto out;
547     }
548 
549     saved = PEM_read_bio_X509 (b, NULL, 0, NULL);
550     if (!saved) {
551         seaf_warning ("Failed to read PEM from BIO\n");
552         goto out;
553     }
554 
555     X509_STORE_add_cert (store, saved);
556 
557 out:
558     g_free (pem);
559     if (b)
560         BIO_free (b);
561     if (saved)
562         X509_free (saved);
563 
564     return ret;
565 }
566 
567 static int
load_certs_from_db(X509_STORE * store)568 load_certs_from_db (X509_STORE *store)
569 {
570     char *cert_db_path = NULL;
571     sqlite3 *db = NULL;
572     char *sql;
573     int ret = 0;
574 
575     cert_db_path = g_build_filename (seaf->seaf_dir, "certs.db", NULL);
576     if (sqlite_open_db (cert_db_path, &db) < 0) {
577         seaf_warning ("Failed to open certs.db\n");
578         ret = -1;
579         goto out;
580     }
581 
582     sql = "SELECT cert FROM Certs;";
583 
584     if (sqlite_foreach_selected_row (db, sql, load_certs, store) < 0) {
585         ret = -1;
586         goto out;
587     }
588 
589 out:
590     g_free (cert_db_path);
591     if (db)
592         sqlite_close_db (db);
593 
594     return ret;
595 }
596 
597 static CURLcode
ssl_callback(CURL * curl,void * ssl_ctx,void * userptr)598 ssl_callback (CURL *curl, void *ssl_ctx, void *userptr)
599 {
600     SSL_CTX *ctx = ssl_ctx;
601     X509_STORE *store;
602 
603     store = SSL_CTX_get_cert_store(ctx);
604 
605     /* Add all certs stored in db as trusted CA certs.
606      * This workaround has one limitation though. The self-signed certs cannot
607      * contain chain. It must be the CA itself.
608      */
609     load_certs_from_db (store);
610 
611     return CURLE_OK;
612 }
613 
614 #endif  /* USE_GPL_CRYPTO */
615 
616 static void
set_proxy(CURL * curl,gboolean is_https)617 set_proxy (CURL *curl, gboolean is_https)
618 {
619     /* Disable proxy if proxy options are not set properly. */
620     if (!seaf->use_http_proxy || !seaf->http_proxy_type || !seaf->http_proxy_addr) {
621         curl_easy_setopt (curl, CURLOPT_PROXY, NULL);
622         return;
623     }
624 
625     if (g_strcmp0(seaf->http_proxy_type, PROXY_TYPE_HTTP) == 0) {
626         curl_easy_setopt(curl, CURLOPT_PROXYTYPE, CURLPROXY_HTTP);
627         /* Use CONNECT method create a SSL tunnel if https is used. */
628         if (is_https)
629             curl_easy_setopt(curl, CURLOPT_HTTPPROXYTUNNEL, 1L);
630         curl_easy_setopt(curl, CURLOPT_PROXY, seaf->http_proxy_addr);
631         curl_easy_setopt(curl, CURLOPT_PROXYPORT,
632                          seaf->http_proxy_port > 0 ? seaf->http_proxy_port : 80);
633         if (seaf->http_proxy_username && seaf->http_proxy_password) {
634             curl_easy_setopt(curl, CURLOPT_PROXYAUTH,
635                              CURLAUTH_BASIC |
636                              CURLAUTH_DIGEST |
637                              CURLAUTH_DIGEST_IE |
638                              CURLAUTH_GSSNEGOTIATE |
639                              CURLAUTH_NTLM);
640             curl_easy_setopt(curl, CURLOPT_PROXYUSERNAME, seaf->http_proxy_username);
641             curl_easy_setopt(curl, CURLOPT_PROXYPASSWORD, seaf->http_proxy_password);
642         }
643     } else if (g_strcmp0(seaf->http_proxy_type, PROXY_TYPE_SOCKS) == 0) {
644         if (seaf->http_proxy_port < 0)
645             return;
646         curl_easy_setopt(curl, CURLOPT_PROXYTYPE, CURLPROXY_SOCKS5);
647         curl_easy_setopt(curl, CURLOPT_PROXY, seaf->http_proxy_addr);
648         curl_easy_setopt(curl, CURLOPT_PROXYPORT, seaf->http_proxy_port);
649     }
650 }
651 
652 #ifdef WIN32
653 static int
sockopt_callback(void * clientp,curl_socket_t curlfd,curlsocktype purpose)654 sockopt_callback (void *clientp, curl_socket_t curlfd, curlsocktype purpose)
655 {
656     /* Set large enough TCP buffer size.
657      * This greatly enhances sync speed for high latency network.
658      * Windows by default use 8KB buffers, which is too small for such case.
659      * Linux has auto-tuning for TCP buffers, so don't need to set manually.
660      * OSX is TBD.
661      */
662 
663 #define DEFAULT_SNDBUF_SIZE (1 << 17) /* 128KB */
664 
665     /* Set send buffer size. */
666     int sndbuf_size;
667     socklen_t optlen;
668 
669     optlen = sizeof(int);
670     getsockopt (curlfd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf_size, &optlen);
671 
672     if (sndbuf_size < DEFAULT_SNDBUF_SIZE) {
673         sndbuf_size = DEFAULT_SNDBUF_SIZE;
674         optlen = sizeof(int);
675         setsockopt (curlfd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf_size, optlen);
676     }
677 
678     /* Disable Nagle's algorithm. */
679     int val = 1;
680     optlen = sizeof(int);
681     setsockopt (curlfd, IPPROTO_TCP, TCP_NODELAY, (char *)&val, optlen);
682 
683     return CURL_SOCKOPT_OK;
684 }
685 #endif  /* WIN32 */
686 
687 typedef struct _HttpResponse {
688     char *content;
689     size_t size;
690 } HttpResponse;
691 
692 static size_t
recv_response(void * contents,size_t size,size_t nmemb,void * userp)693 recv_response (void *contents, size_t size, size_t nmemb, void *userp)
694 {
695     size_t realsize = size * nmemb;
696     HttpResponse *rsp = userp;
697 
698     rsp->content = g_realloc (rsp->content, rsp->size + realsize);
699     if (!rsp->content) {
700         seaf_warning ("Not enough memory.\n");
701         /* return a value other than realsize to signify an error. */
702         return 0;
703     }
704 
705     memcpy (rsp->content + rsp->size, contents, realsize);
706     rsp->size += realsize;
707 
708     return realsize;
709 }
710 
711 extern FILE *seafile_get_log_fp ();
712 
713 #define HTTP_TIMEOUT_SEC 300
714 
715 typedef size_t (*HttpRecvCallback) (void *, size_t, size_t, void *);
716 
717 /*
718  * The @timeout parameter is for detecting network connection problems.
719  * The @timeout parameter should be set to TRUE for data-transfer-only operations,
720  * such as getting objects, blocks. For operations that requires calculations
721  * on the server side, the timeout should be set to FALSE. Otherwise when
722  * the server sometimes takes more than 45 seconds to calculate the result,
723  * the client will time out.
724  */
725 static int
http_get(CURL * curl,const char * url,const char * token,int * rsp_status,char ** rsp_content,gint64 * rsp_size,HttpRecvCallback callback,void * cb_data,gboolean timeout,int * pcurl_error)726 http_get (CURL *curl, const char *url, const char *token,
727           int *rsp_status, char **rsp_content, gint64 *rsp_size,
728           HttpRecvCallback callback, void *cb_data,
729           gboolean timeout, int *pcurl_error)
730 {
731     char *token_header;
732     struct curl_slist *headers = NULL;
733     int ret = 0;
734 
735     if (seafile_debug_flag_is_set (SEAFILE_DEBUG_CURL)) {
736         curl_easy_setopt (curl, CURLOPT_VERBOSE, 1);
737         curl_easy_setopt (curl, CURLOPT_STDERR, seafile_get_log_fp());
738     }
739 
740     headers = curl_slist_append (headers, "User-Agent: Seafile/"SEAFILE_CLIENT_VERSION" ("USER_AGENT_OS")");
741 
742     if (token) {
743         token_header = g_strdup_printf ("Seafile-Repo-Token: %s", token);
744         headers = curl_slist_append (headers, token_header);
745         g_free (token_header);
746     }
747 
748     curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
749 
750     curl_easy_setopt(curl, CURLOPT_URL, url);
751     curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L);
752 
753     if (timeout) {
754         /* Set low speed limit to 1 bytes. This effectively means no data. */
755         curl_easy_setopt(curl, CURLOPT_LOW_SPEED_LIMIT, 1);
756         curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, HTTP_TIMEOUT_SEC);
757     }
758 
759     if (seaf->disable_verify_certificate) {
760         curl_easy_setopt (curl, CURLOPT_SSL_VERIFYPEER, 0L);
761         curl_easy_setopt (curl, CURLOPT_SSL_VERIFYHOST, 0L);
762     }
763 
764     HttpResponse rsp;
765     memset (&rsp, 0, sizeof(rsp));
766     if (rsp_content) {
767         curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, recv_response);
768         curl_easy_setopt(curl, CURLOPT_WRITEDATA, &rsp);
769     } else if (callback) {
770         curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, callback);
771         curl_easy_setopt(curl, CURLOPT_WRITEDATA, cb_data);
772     }
773 
774     gboolean is_https = (strncasecmp(url, "https", strlen("https")) == 0);
775     set_proxy (curl, is_https);
776 
777     curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
778 
779 #ifndef USE_GPL_CRYPTO
780 #if defined WIN32 || defined __APPLE__
781     load_ca_bundle (curl);
782 #endif
783 #endif
784 
785 #ifndef USE_GPL_CRYPTO
786     if (!seaf->disable_verify_certificate) {
787         curl_easy_setopt (curl, CURLOPT_SSL_CTX_FUNCTION, ssl_callback);
788         curl_easy_setopt (curl, CURLOPT_SSL_CTX_DATA, url);
789     }
790 #endif
791 
792 #ifdef WIN32
793     curl_easy_setopt (curl, CURLOPT_SOCKOPTFUNCTION, sockopt_callback);
794 #endif
795 
796     int rc = curl_easy_perform (curl);
797     if (rc != 0) {
798         seaf_warning ("libcurl failed to GET %s: %s.\n",
799                       url, curl_easy_strerror(rc));
800         if (pcurl_error)
801             *pcurl_error = rc;
802         ret = -1;
803         goto out;
804     }
805 
806     long status;
807     rc = curl_easy_getinfo (curl, CURLINFO_RESPONSE_CODE, &status);
808     if (rc != CURLE_OK) {
809         seaf_warning ("Failed to get status code for GET %s.\n", url);
810         ret = -1;
811         goto out;
812     }
813 
814     *rsp_status = status;
815 
816     if (rsp_content) {
817         *rsp_content = rsp.content;
818         *rsp_size = rsp.size;
819     }
820 
821 out:
822     if (ret < 0)
823         g_free (rsp.content);
824     curl_slist_free_all (headers);
825     return ret;
826 }
827 
828 typedef struct _HttpRequest {
829     const char *content;
830     size_t size;
831 } HttpRequest;
832 
833 static size_t
send_request(void * ptr,size_t size,size_t nmemb,void * userp)834 send_request (void *ptr, size_t size, size_t nmemb, void *userp)
835 {
836     size_t realsize = size *nmemb;
837     size_t copy_size;
838     HttpRequest *req = userp;
839 
840     if (req->size == 0)
841         return 0;
842 
843     copy_size = MIN(req->size, realsize);
844     memcpy (ptr, req->content, copy_size);
845     req->size -= copy_size;
846     req->content = req->content + copy_size;
847 
848     return copy_size;
849 }
850 
851 typedef size_t (*HttpSendCallback) (void *, size_t, size_t, void *);
852 
853 static int
http_put(CURL * curl,const char * url,const char * token,const char * req_content,gint64 req_size,HttpSendCallback callback,void * cb_data,int * rsp_status,char ** rsp_content,gint64 * rsp_size,gboolean timeout,int * pcurl_error)854 http_put (CURL *curl, const char *url, const char *token,
855           const char *req_content, gint64 req_size,
856           HttpSendCallback callback, void *cb_data,
857           int *rsp_status, char **rsp_content, gint64 *rsp_size,
858           gboolean timeout, int *pcurl_error)
859 {
860     char *token_header;
861     struct curl_slist *headers = NULL;
862     int ret = 0;
863 
864     if (seafile_debug_flag_is_set (SEAFILE_DEBUG_CURL)) {
865         curl_easy_setopt (curl, CURLOPT_VERBOSE, 1);
866         curl_easy_setopt (curl, CURLOPT_STDERR, seafile_get_log_fp());
867     }
868 
869     headers = curl_slist_append (headers, "User-Agent: Seafile/"SEAFILE_CLIENT_VERSION" ("USER_AGENT_OS")");
870     /* Disable the default "Expect: 100-continue" header */
871     headers = curl_slist_append (headers, "Expect:");
872 
873     if (token) {
874         token_header = g_strdup_printf ("Seafile-Repo-Token: %s", token);
875         headers = curl_slist_append (headers, token_header);
876         g_free (token_header);
877     }
878 
879     curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
880 
881     curl_easy_setopt(curl, CURLOPT_URL, url);
882     curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L);
883 
884     if (timeout) {
885         /* Set low speed limit to 1 bytes. This effectively means no data. */
886         curl_easy_setopt(curl, CURLOPT_LOW_SPEED_LIMIT, 1);
887         curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, HTTP_TIMEOUT_SEC);
888     }
889 
890     if (seaf->disable_verify_certificate) {
891         curl_easy_setopt (curl, CURLOPT_SSL_VERIFYPEER, 0L);
892         curl_easy_setopt (curl, CURLOPT_SSL_VERIFYHOST, 0L);
893     }
894 
895     HttpRequest req;
896     if (req_content) {
897         memset (&req, 0, sizeof(req));
898         req.content = req_content;
899         req.size = req_size;
900         curl_easy_setopt(curl, CURLOPT_READFUNCTION, send_request);
901         curl_easy_setopt(curl, CURLOPT_READDATA, &req);
902         curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, (curl_off_t)req_size);
903     } else if (callback != NULL) {
904         curl_easy_setopt(curl, CURLOPT_READFUNCTION, callback);
905         curl_easy_setopt(curl, CURLOPT_READDATA, cb_data);
906         curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, (curl_off_t)req_size);
907     } else {
908         curl_easy_setopt (curl, CURLOPT_INFILESIZE_LARGE, (curl_off_t)0);
909     }
910 
911     curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L);
912 
913     HttpResponse rsp;
914     memset (&rsp, 0, sizeof(rsp));
915     if (rsp_content) {
916         curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, recv_response);
917         curl_easy_setopt(curl, CURLOPT_WRITEDATA, &rsp);
918     }
919 
920     gboolean is_https = (strncasecmp(url, "https", strlen("https")) == 0);
921     set_proxy (curl, is_https);
922 
923     curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
924 
925 #ifndef USE_GPL_CRYPTO
926 #if defined WIN32 || defined __APPLE__
927     load_ca_bundle (curl);
928 #endif
929 #endif
930 
931 #ifndef USE_GPL_CRYPTO
932     if (!seaf->disable_verify_certificate) {
933         curl_easy_setopt (curl, CURLOPT_SSL_CTX_FUNCTION, ssl_callback);
934         curl_easy_setopt (curl, CURLOPT_SSL_CTX_DATA, url);
935     }
936 #endif
937 
938 #ifdef WIN32
939     curl_easy_setopt (curl, CURLOPT_SOCKOPTFUNCTION, sockopt_callback);
940 #endif
941 
942     int rc = curl_easy_perform (curl);
943     if (rc != 0) {
944         seaf_warning ("libcurl failed to PUT %s: %s.\n",
945                       url, curl_easy_strerror(rc));
946         if (pcurl_error)
947             *pcurl_error = rc;
948         ret = -1;
949         goto out;
950     }
951 
952     long status;
953     rc = curl_easy_getinfo (curl, CURLINFO_RESPONSE_CODE, &status);
954     if (rc != CURLE_OK) {
955         seaf_warning ("Failed to get status code for PUT %s.\n", url);
956         ret = -1;
957         goto out;
958     }
959 
960     *rsp_status = status;
961 
962     if (rsp_content) {
963         *rsp_content = rsp.content;
964         *rsp_size = rsp.size;
965     }
966 
967 out:
968     if (ret < 0)
969         g_free (rsp.content);
970     curl_slist_free_all (headers);
971     return ret;
972 }
973 
974 static int
http_post(CURL * curl,const char * url,const char * token,const char * req_content,gint64 req_size,int * rsp_status,char ** rsp_content,gint64 * rsp_size,gboolean timeout,int * pcurl_error)975 http_post (CURL *curl, const char *url, const char *token,
976            const char *req_content, gint64 req_size,
977            int *rsp_status, char **rsp_content, gint64 *rsp_size,
978            gboolean timeout, int *pcurl_error)
979 {
980     char *token_header;
981     struct curl_slist *headers = NULL;
982     int ret = 0;
983 
984     g_return_val_if_fail (req_content != NULL, -1);
985 
986     if (seafile_debug_flag_is_set (SEAFILE_DEBUG_CURL)) {
987         curl_easy_setopt (curl, CURLOPT_VERBOSE, 1);
988         curl_easy_setopt (curl, CURLOPT_STDERR, seafile_get_log_fp());
989     }
990 
991     headers = curl_slist_append (headers, "User-Agent: Seafile/"SEAFILE_CLIENT_VERSION" ("USER_AGENT_OS")");
992     /* Disable the default "Expect: 100-continue" header */
993     headers = curl_slist_append (headers, "Expect:");
994 
995     if (token) {
996         token_header = g_strdup_printf ("Seafile-Repo-Token: %s", token);
997         headers = curl_slist_append (headers, token_header);
998         g_free (token_header);
999     }
1000 
1001     curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
1002 
1003     curl_easy_setopt(curl, CURLOPT_URL, url);
1004     curl_easy_setopt(curl, CURLOPT_POST, 1L);
1005 
1006     if (timeout) {
1007         /* Set low speed limit to 1 bytes. This effectively means no data. */
1008         curl_easy_setopt(curl, CURLOPT_LOW_SPEED_LIMIT, 1);
1009         curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, HTTP_TIMEOUT_SEC);
1010     }
1011 
1012     if (seaf->disable_verify_certificate) {
1013         curl_easy_setopt (curl, CURLOPT_SSL_VERIFYPEER, 0L);
1014         curl_easy_setopt (curl, CURLOPT_SSL_VERIFYHOST, 0L);
1015     }
1016 
1017     HttpRequest req;
1018     memset (&req, 0, sizeof(req));
1019     req.content = req_content;
1020     req.size = req_size;
1021     curl_easy_setopt(curl, CURLOPT_READFUNCTION, send_request);
1022     curl_easy_setopt(curl, CURLOPT_READDATA, &req);
1023     curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t)req_size);
1024 
1025     curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L);
1026 
1027     HttpResponse rsp;
1028     memset (&rsp, 0, sizeof(rsp));
1029     if (rsp_content) {
1030         curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, recv_response);
1031         curl_easy_setopt(curl, CURLOPT_WRITEDATA, &rsp);
1032     }
1033 
1034 #ifndef USE_GPL_CRYPTO
1035 #if defined WIN32 || defined __APPLE__
1036     load_ca_bundle (curl);
1037 #endif
1038 #endif
1039 
1040 #ifndef USE_GPL_CRYPTO
1041     if (!seaf->disable_verify_certificate) {
1042         curl_easy_setopt (curl, CURLOPT_SSL_CTX_FUNCTION, ssl_callback);
1043         curl_easy_setopt (curl, CURLOPT_SSL_CTX_DATA, url);
1044     }
1045 #endif
1046 
1047     gboolean is_https = (strncasecmp(url, "https", strlen("https")) == 0);
1048     set_proxy (curl, is_https);
1049 
1050     curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
1051     /* All POST requests should remain POST after redirect. */
1052     curl_easy_setopt(curl, CURLOPT_POSTREDIR, CURL_REDIR_POST_ALL);
1053 
1054 #ifdef WIN32
1055     curl_easy_setopt (curl, CURLOPT_SOCKOPTFUNCTION, sockopt_callback);
1056 #endif
1057 
1058     int rc = curl_easy_perform (curl);
1059     if (rc != 0) {
1060         seaf_warning ("libcurl failed to POST %s: %s.\n",
1061                       url, curl_easy_strerror(rc));
1062         if (pcurl_error)
1063             *pcurl_error = rc;
1064         ret = -1;
1065         goto out;
1066     }
1067 
1068     long status;
1069     rc = curl_easy_getinfo (curl, CURLINFO_RESPONSE_CODE, &status);
1070     if (rc != CURLE_OK) {
1071         seaf_warning ("Failed to get status code for POST %s.\n", url);
1072         ret = -1;
1073         goto out;
1074     }
1075 
1076     *rsp_status = status;
1077 
1078     if (rsp_content) {
1079         *rsp_content = rsp.content;
1080         *rsp_size = rsp.size;
1081     }
1082 
1083 out:
1084     if (ret < 0)
1085         g_free (rsp.content);
1086     curl_slist_free_all (headers);
1087     return ret;
1088 }
1089 
1090 static int
http_error_to_http_task_error(int status)1091 http_error_to_http_task_error (int status)
1092 {
1093     if (status == HTTP_BAD_REQUEST)
1094         /* This is usually a bug in the client. Set to general error. */
1095         return SYNC_ERROR_ID_GENERAL_ERROR;
1096     else if (status == HTTP_FORBIDDEN)
1097         return SYNC_ERROR_ID_ACCESS_DENIED;
1098     else if (status >= HTTP_INTERNAL_SERVER_ERROR)
1099         return SYNC_ERROR_ID_SERVER;
1100     else if (status == HTTP_NOT_FOUND)
1101         return SYNC_ERROR_ID_SERVER;
1102     else if (status == HTTP_NO_QUOTA)
1103         return SYNC_ERROR_ID_QUOTA_FULL;
1104     else if (status == HTTP_REPO_DELETED)
1105         return SYNC_ERROR_ID_SERVER_REPO_DELETED;
1106     else if (status == HTTP_REPO_CORRUPTED)
1107         return SYNC_ERROR_ID_SERVER_REPO_CORRUPT;
1108     else
1109         return SYNC_ERROR_ID_GENERAL_ERROR;
1110 }
1111 
1112 static void
handle_http_errors(HttpTxTask * task,int status)1113 handle_http_errors (HttpTxTask *task, int status)
1114 {
1115     task->error = http_error_to_http_task_error (status);
1116 }
1117 
1118 static int
curl_error_to_http_task_error(int curl_error)1119 curl_error_to_http_task_error (int curl_error)
1120 {
1121     if (curl_error == CURLE_SSL_CACERT ||
1122         curl_error == CURLE_PEER_FAILED_VERIFICATION)
1123         return SYNC_ERROR_ID_SSL;
1124 
1125     switch (curl_error) {
1126     case CURLE_COULDNT_RESOLVE_PROXY:
1127         return SYNC_ERROR_ID_RESOLVE_PROXY;
1128     case CURLE_COULDNT_RESOLVE_HOST:
1129         return SYNC_ERROR_ID_RESOLVE_HOST;
1130     case CURLE_COULDNT_CONNECT:
1131         return SYNC_ERROR_ID_CONNECT;
1132     case CURLE_OPERATION_TIMEDOUT:
1133         return SYNC_ERROR_ID_TX_TIMEOUT;
1134     case CURLE_SSL_CONNECT_ERROR:
1135     case CURLE_SSL_CERTPROBLEM:
1136     case CURLE_SSL_CACERT_BADFILE:
1137     case CURLE_SSL_ISSUER_ERROR:
1138         return SYNC_ERROR_ID_SSL;
1139     case CURLE_GOT_NOTHING:
1140     case CURLE_SEND_ERROR:
1141     case CURLE_RECV_ERROR:
1142         return SYNC_ERROR_ID_TX;
1143     case CURLE_SEND_FAIL_REWIND:
1144         return SYNC_ERROR_ID_UNHANDLED_REDIRECT;
1145     default:
1146         return SYNC_ERROR_ID_NETWORK;
1147     }
1148 }
1149 
1150 static void
handle_curl_errors(HttpTxTask * task,int curl_error)1151 handle_curl_errors (HttpTxTask *task, int curl_error)
1152 {
1153     task->error = curl_error_to_http_task_error (curl_error);
1154 }
1155 
1156 static void
emit_transfer_done_signal(HttpTxTask * task)1157 emit_transfer_done_signal (HttpTxTask *task)
1158 {
1159     if (task->type == HTTP_TASK_TYPE_DOWNLOAD)
1160         g_signal_emit_by_name (seaf, "repo-http-fetched", task);
1161     else
1162         g_signal_emit_by_name (seaf, "repo-http-uploaded", task);
1163 }
1164 
1165 static void
transition_state(HttpTxTask * task,int state,int rt_state)1166 transition_state (HttpTxTask *task, int state, int rt_state)
1167 {
1168     seaf_message ("Transfer repo '%.8s': ('%s', '%s') --> ('%s', '%s')\n",
1169                   task->repo_id,
1170                   http_task_state_to_str(task->state),
1171                   http_task_rt_state_to_str(task->runtime_state),
1172                   http_task_state_to_str(state),
1173                   http_task_rt_state_to_str(rt_state));
1174 
1175     if (state != task->state)
1176         task->state = state;
1177     task->runtime_state = rt_state;
1178 
1179     if (rt_state == HTTP_TASK_RT_STATE_FINISHED) {
1180         /* Clear download head info. */
1181         if (task->type == HTTP_TASK_TYPE_DOWNLOAD &&
1182             state == HTTP_TASK_STATE_FINISHED)
1183             seaf_repo_manager_set_repo_property (seaf->repo_mgr,
1184                                                  task->repo_id,
1185                                                  REPO_PROP_DOWNLOAD_HEAD,
1186                                                  EMPTY_SHA1);
1187 
1188         emit_transfer_done_signal (task);
1189     }
1190 }
1191 
1192 typedef struct {
1193     char *host;
1194     gboolean use_fileserver_port;
1195     HttpProtocolVersionCallback callback;
1196     void *user_data;
1197 
1198     gboolean success;
1199     gboolean not_supported;
1200     int version;
1201     int error_code;
1202 } CheckProtocolData;
1203 
1204 static int
parse_protocol_version(const char * rsp_content,int rsp_size,CheckProtocolData * data)1205 parse_protocol_version (const char *rsp_content, int rsp_size, CheckProtocolData *data)
1206 {
1207     json_t *object = NULL;
1208     json_error_t jerror;
1209     int version;
1210 
1211     object = json_loadb (rsp_content, rsp_size, 0, &jerror);
1212     if (!object) {
1213         seaf_warning ("Parse response failed: %s.\n", jerror.text);
1214         return -1;
1215     }
1216 
1217     if (json_object_has_member (object, "version")) {
1218         version = json_object_get_int_member (object, "version");
1219         data->version = version;
1220     } else {
1221         seaf_warning ("Response doesn't contain protocol version.\n");
1222         json_decref (object);
1223         return -1;
1224     }
1225 
1226     json_decref (object);
1227     return 0;
1228 }
1229 
1230 static void *
check_protocol_version_thread(void * vdata)1231 check_protocol_version_thread (void *vdata)
1232 {
1233     CheckProtocolData *data = vdata;
1234     HttpTxPriv *priv = seaf->http_tx_mgr->priv;
1235     ConnectionPool *pool;
1236     Connection *conn;
1237     CURL *curl;
1238     char *url;
1239     int status;
1240     char *rsp_content = NULL;
1241     gint64 rsp_size;
1242 
1243     pool = find_connection_pool (priv, data->host);
1244     if (!pool) {
1245         seaf_warning ("Failed to create connection pool for host %s.\n", data->host);
1246         return vdata;
1247     }
1248 
1249     conn = connection_pool_get_connection (pool);
1250     if (!conn) {
1251         seaf_warning ("Failed to get connection to host %s.\n", data->host);
1252         return vdata;
1253     }
1254 
1255     curl = conn->curl;
1256 
1257     if (!data->use_fileserver_port)
1258         url = g_strdup_printf ("%s/seafhttp/protocol-version", data->host);
1259     else
1260         url = g_strdup_printf ("%s/protocol-version", data->host);
1261 
1262     int curl_error;
1263     if (http_get (curl, url, NULL, &status, &rsp_content, &rsp_size, NULL, NULL, TRUE, &curl_error) < 0) {
1264         conn->release = TRUE;
1265         data->error_code = curl_error_to_http_task_error (curl_error);
1266         goto out;
1267     }
1268 
1269     data->success = TRUE;
1270 
1271     if (status == HTTP_OK) {
1272         if (rsp_size == 0)
1273             data->not_supported = TRUE;
1274         else if (parse_protocol_version (rsp_content, rsp_size, data) < 0)
1275             data->not_supported = TRUE;
1276     } else {
1277         seaf_warning ("Bad response code for GET %s: %d.\n", url, status);
1278         data->not_supported = TRUE;
1279         data->error_code = http_error_to_http_task_error (status);
1280     }
1281 
1282 out:
1283     g_free (url);
1284     g_free (rsp_content);
1285     connection_pool_return_connection (pool, conn);
1286 
1287     return vdata;
1288 }
1289 
1290 static void
check_protocol_version_done(void * vdata)1291 check_protocol_version_done (void *vdata)
1292 {
1293     CheckProtocolData *data = vdata;
1294     HttpProtocolVersion result;
1295 
1296     memset (&result, 0, sizeof(result));
1297     result.check_success = data->success;
1298     result.not_supported = data->not_supported;
1299     result.version = data->version;
1300     result.error_code = data->error_code;
1301 
1302     data->callback (&result, data->user_data);
1303 
1304     g_free (data->host);
1305     g_free (data);
1306 }
1307 
1308 int
http_tx_manager_check_protocol_version(HttpTxManager * manager,const char * host,gboolean use_fileserver_port,HttpProtocolVersionCallback callback,void * user_data)1309 http_tx_manager_check_protocol_version (HttpTxManager *manager,
1310                                         const char *host,
1311                                         gboolean use_fileserver_port,
1312                                         HttpProtocolVersionCallback callback,
1313                                         void *user_data)
1314 {
1315     CheckProtocolData *data = g_new0 (CheckProtocolData, 1);
1316 
1317     data->host = g_strdup(host);
1318     data->use_fileserver_port = use_fileserver_port;
1319     data->callback = callback;
1320     data->user_data = user_data;
1321 
1322     int ret = seaf_job_manager_schedule_job (seaf->job_mgr,
1323                                              check_protocol_version_thread,
1324                                              check_protocol_version_done,
1325                                              data);
1326     if (ret < 0) {
1327         g_free (data->host);
1328         g_free (data);
1329     }
1330 
1331     return ret;
1332 }
1333 
1334 /* Check Head Commit. */
1335 
1336 typedef struct {
1337     char repo_id[41];
1338     int repo_version;
1339     char *host;
1340     char *token;
1341     gboolean use_fileserver_port;
1342     HttpHeadCommitCallback callback;
1343     void *user_data;
1344 
1345     gboolean success;
1346     gboolean is_corrupt;
1347     gboolean is_deleted;
1348     char head_commit[41];
1349     int error_code;
1350 } CheckHeadData;
1351 
1352 static int
parse_head_commit_info(const char * rsp_content,int rsp_size,CheckHeadData * data)1353 parse_head_commit_info (const char *rsp_content, int rsp_size, CheckHeadData *data)
1354 {
1355     json_t *object = NULL;
1356     json_error_t jerror;
1357     const char *head_commit;
1358 
1359     object = json_loadb (rsp_content, rsp_size, 0, &jerror);
1360     if (!object) {
1361         seaf_warning ("Parse response failed: %s.\n", jerror.text);
1362         return -1;
1363     }
1364 
1365     if (json_object_has_member (object, "is_corrupted") &&
1366         json_object_get_int_member (object, "is_corrupted"))
1367         data->is_corrupt = TRUE;
1368 
1369     if (!data->is_corrupt) {
1370         head_commit = json_object_get_string_member (object, "head_commit_id");
1371         if (!head_commit) {
1372             seaf_warning ("Check head commit for repo %s failed. "
1373                           "Response doesn't contain head commit id.\n",
1374                           data->repo_id);
1375             json_decref (object);
1376             return -1;
1377         }
1378         memcpy (data->head_commit, head_commit, 40);
1379     }
1380 
1381     json_decref (object);
1382     return 0;
1383 }
1384 
1385 static void *
check_head_commit_thread(void * vdata)1386 check_head_commit_thread (void *vdata)
1387 {
1388     CheckHeadData *data = vdata;
1389     HttpTxPriv *priv = seaf->http_tx_mgr->priv;
1390     ConnectionPool *pool;
1391     Connection *conn;
1392     CURL *curl;
1393     char *url;
1394     int status;
1395     char *rsp_content = NULL;
1396     gint64 rsp_size;
1397 
1398     pool = find_connection_pool (priv, data->host);
1399     if (!pool) {
1400         seaf_warning ("Failed to create connection pool for host %s.\n", data->host);
1401         return vdata;
1402     }
1403 
1404     conn = connection_pool_get_connection (pool);
1405     if (!conn) {
1406         seaf_warning ("Failed to get connection to host %s.\n", data->host);
1407         return vdata;
1408     }
1409 
1410     curl = conn->curl;
1411 
1412     if (!data->use_fileserver_port)
1413         url = g_strdup_printf ("%s/seafhttp/repo/%s/commit/HEAD",
1414                                data->host, data->repo_id);
1415     else
1416         url = g_strdup_printf ("%s/repo/%s/commit/HEAD",
1417                                data->host, data->repo_id);
1418 
1419     int curl_error;
1420     if (http_get (curl, url, data->token, &status, &rsp_content, &rsp_size,
1421                   NULL, NULL, TRUE, &curl_error) < 0) {
1422         conn->release = TRUE;
1423         data->error_code = curl_error_to_http_task_error (curl_error);
1424         goto out;
1425     }
1426 
1427     if (status == HTTP_OK) {
1428         if (parse_head_commit_info (rsp_content, rsp_size, data) < 0) {
1429             data->error_code = SYNC_ERROR_ID_NETWORK;
1430             goto out;
1431         }
1432         data->success = TRUE;
1433     } else if (status == HTTP_REPO_DELETED) {
1434         data->is_deleted = TRUE;
1435         data->success = TRUE;
1436     } else {
1437         seaf_warning ("Bad response code for GET %s: %d.\n", url, status);
1438         data->error_code = http_error_to_http_task_error (status);
1439     }
1440 
1441 out:
1442     g_free (url);
1443     g_free (rsp_content);
1444     connection_pool_return_connection (pool, conn);
1445     return vdata;
1446 }
1447 
1448 static void
check_head_commit_done(void * vdata)1449 check_head_commit_done (void *vdata)
1450 {
1451     CheckHeadData *data = vdata;
1452     HttpHeadCommit result;
1453 
1454     memset (&result, 0, sizeof(result));
1455     result.check_success = data->success;
1456     result.is_corrupt = data->is_corrupt;
1457     result.is_deleted = data->is_deleted;
1458     memcpy (result.head_commit, data->head_commit, 40);
1459     result.error_code = data->error_code;
1460 
1461     data->callback (&result, data->user_data);
1462 
1463     g_free (data->host);
1464     g_free (data->token);
1465     g_free (data);
1466 }
1467 
1468 int
http_tx_manager_check_head_commit(HttpTxManager * manager,const char * repo_id,int repo_version,const char * host,const char * token,gboolean use_fileserver_port,HttpHeadCommitCallback callback,void * user_data)1469 http_tx_manager_check_head_commit (HttpTxManager *manager,
1470                                    const char *repo_id,
1471                                    int repo_version,
1472                                    const char *host,
1473                                    const char *token,
1474                                    gboolean use_fileserver_port,
1475                                    HttpHeadCommitCallback callback,
1476                                    void *user_data)
1477 {
1478     CheckHeadData *data = g_new0 (CheckHeadData, 1);
1479 
1480     memcpy (data->repo_id, repo_id, 36);
1481     data->repo_version = repo_version;
1482     data->host = g_strdup(host);
1483     data->token = g_strdup(token);
1484     data->callback = callback;
1485     data->user_data = user_data;
1486     data->use_fileserver_port = use_fileserver_port;
1487 
1488     if (seaf_job_manager_schedule_job (seaf->job_mgr,
1489                                        check_head_commit_thread,
1490                                        check_head_commit_done,
1491                                        data) < 0) {
1492         g_free (data->host);
1493         g_free (data->token);
1494         g_free (data);
1495         return -1;
1496     }
1497 
1498     return 0;
1499 }
1500 
1501 /* Get folder permissions. */
1502 
1503 void
http_folder_perm_req_free(HttpFolderPermReq * req)1504 http_folder_perm_req_free (HttpFolderPermReq *req)
1505 {
1506     if (!req)
1507         return;
1508     g_free (req->token);
1509     g_free (req);
1510 }
1511 
1512 void
http_folder_perm_res_free(HttpFolderPermRes * res)1513 http_folder_perm_res_free (HttpFolderPermRes *res)
1514 {
1515     GList *ptr;
1516 
1517     if (!res)
1518         return;
1519     for (ptr = res->user_perms; ptr; ptr = ptr->next)
1520         folder_perm_free ((FolderPerm *)ptr->data);
1521     for (ptr = res->group_perms; ptr; ptr = ptr->next)
1522         folder_perm_free ((FolderPerm *)ptr->data);
1523     g_free (res);
1524 }
1525 
1526 typedef struct {
1527     char *host;
1528     gboolean use_fileserver_port;
1529     GList *requests;
1530     HttpGetFolderPermsCallback callback;
1531     void *user_data;
1532 
1533     gboolean success;
1534     GList *results;
1535 } GetFolderPermsData;
1536 
1537 /* Make sure the path starts with '/' but doesn't end with '/'. */
1538 static char *
canonical_perm_path(const char * path)1539 canonical_perm_path (const char *path)
1540 {
1541     int len = strlen(path);
1542     char *copy, *ret;
1543 
1544     if (strcmp (path, "/") == 0)
1545         return g_strdup(path);
1546 
1547     if (path[0] == '/' && path[len-1] != '/')
1548         return g_strdup(path);
1549 
1550     copy = g_strdup(path);
1551 
1552     if (copy[len-1] == '/')
1553         copy[len-1] = 0;
1554 
1555     if (copy[0] != '/')
1556         ret = g_strconcat ("/", copy, NULL);
1557     else
1558         ret = copy;
1559 
1560     return ret;
1561 }
1562 
1563 static GList *
parse_permission_list(json_t * array,gboolean * error)1564 parse_permission_list (json_t *array, gboolean *error)
1565 {
1566     GList *ret = NULL, *ptr;
1567     json_t *object, *member;
1568     size_t n;
1569     int i;
1570     FolderPerm *perm;
1571     const char *str;
1572 
1573     *error = FALSE;
1574 
1575     n = json_array_size (array);
1576     for (i = 0; i < n; ++i) {
1577         object = json_array_get (array, i);
1578 
1579         perm = g_new0 (FolderPerm, 1);
1580 
1581         member = json_object_get (object, "path");
1582         if (!member) {
1583             seaf_warning ("Invalid folder perm response format: no path.\n");
1584             *error = TRUE;
1585             goto out;
1586         }
1587         str = json_string_value(member);
1588         if (!str) {
1589             seaf_warning ("Invalid folder perm response format: invalid path.\n");
1590             *error = TRUE;
1591             goto out;
1592         }
1593         perm->path = canonical_perm_path (str);
1594 
1595         member = json_object_get (object, "permission");
1596         if (!member) {
1597             seaf_warning ("Invalid folder perm response format: no permission.\n");
1598             *error = TRUE;
1599             goto out;
1600         }
1601         str = json_string_value(member);
1602         if (!str) {
1603             seaf_warning ("Invalid folder perm response format: invalid permission.\n");
1604             *error = TRUE;
1605             goto out;
1606         }
1607         perm->permission = g_strdup(str);
1608 
1609         ret = g_list_append (ret, perm);
1610     }
1611 
1612 out:
1613     if (*error) {
1614         for (ptr = ret; ptr; ptr = ptr->next)
1615             folder_perm_free ((FolderPerm *)ptr->data);
1616         g_list_free (ret);
1617         ret = NULL;
1618     }
1619 
1620     return ret;
1621 }
1622 
1623 static int
parse_folder_perms(const char * rsp_content,int rsp_size,GetFolderPermsData * data)1624 parse_folder_perms (const char *rsp_content, int rsp_size, GetFolderPermsData *data)
1625 {
1626     json_t *array = NULL, *object, *member;
1627     json_error_t jerror;
1628     size_t n;
1629     int i;
1630     GList *results = NULL, *ptr;
1631     HttpFolderPermRes *res;
1632     const char *repo_id;
1633     int ret = 0;
1634     gboolean error;
1635 
1636     array = json_loadb (rsp_content, rsp_size, 0, &jerror);
1637     if (!array) {
1638         seaf_warning ("Parse response failed: %s.\n", jerror.text);
1639         return -1;
1640     }
1641 
1642     n = json_array_size (array);
1643     for (i = 0; i < n; ++i) {
1644         object = json_array_get (array, i);
1645 
1646         res = g_new0 (HttpFolderPermRes, 1);
1647 
1648         member = json_object_get (object, "repo_id");
1649         if (!member) {
1650             seaf_warning ("Invalid folder perm response format: no repo_id.\n");
1651             ret = -1;
1652             goto out;
1653         }
1654         repo_id = json_string_value(member);
1655         if (strlen(repo_id) != 36) {
1656             seaf_warning ("Invalid folder perm response format: invalid repo_id.\n");
1657             ret = -1;
1658             goto out;
1659         }
1660         memcpy (res->repo_id, repo_id, 36);
1661 
1662         member = json_object_get (object, "ts");
1663         if (!member) {
1664             seaf_warning ("Invalid folder perm response format: no timestamp.\n");
1665             ret = -1;
1666             goto out;
1667         }
1668         res->timestamp = json_integer_value (member);
1669 
1670         member = json_object_get (object, "user_perms");
1671         if (!member) {
1672             seaf_warning ("Invalid folder perm response format: no user_perms.\n");
1673             ret = -1;
1674             goto out;
1675         }
1676         res->user_perms = parse_permission_list (member, &error);
1677         if (error) {
1678             ret = -1;
1679             goto out;
1680         }
1681 
1682         member = json_object_get (object, "group_perms");
1683         if (!member) {
1684             seaf_warning ("Invalid folder perm response format: no group_perms.\n");
1685             ret = -1;
1686             goto out;
1687         }
1688         res->group_perms = parse_permission_list (member, &error);
1689         if (error) {
1690             ret = -1;
1691             goto out;
1692         }
1693 
1694         results = g_list_append (results, res);
1695     }
1696 
1697 out:
1698     json_decref (array);
1699 
1700     if (ret < 0) {
1701         for (ptr = results; ptr; ptr = ptr->next)
1702             http_folder_perm_res_free ((HttpFolderPermRes *)ptr->data);
1703         g_list_free (results);
1704     } else {
1705         data->results = results;
1706     }
1707 
1708     return ret;
1709 }
1710 
1711 static char *
compose_get_folder_perms_request(GList * requests)1712 compose_get_folder_perms_request (GList *requests)
1713 {
1714     GList *ptr;
1715     HttpFolderPermReq *req;
1716     json_t *object, *array;
1717     char *req_str = NULL;
1718 
1719     array = json_array ();
1720 
1721     for (ptr = requests; ptr; ptr = ptr->next) {
1722         req = ptr->data;
1723 
1724         object = json_object ();
1725         json_object_set_new (object, "repo_id", json_string(req->repo_id));
1726         json_object_set_new (object, "token", json_string(req->token));
1727         json_object_set_new (object, "ts", json_integer(req->timestamp));
1728 
1729         json_array_append_new (array, object);
1730     }
1731 
1732     req_str = json_dumps (array, 0);
1733     if (!req_str) {
1734         seaf_warning ("Faile to json_dumps.\n");
1735     }
1736 
1737     json_decref (array);
1738     return req_str;
1739 }
1740 
1741 static void *
get_folder_perms_thread(void * vdata)1742 get_folder_perms_thread (void *vdata)
1743 {
1744     GetFolderPermsData *data = vdata;
1745     HttpTxPriv *priv = seaf->http_tx_mgr->priv;
1746     ConnectionPool *pool;
1747     Connection *conn;
1748     CURL *curl;
1749     char *url;
1750     char *req_content = NULL;
1751     int status;
1752     char *rsp_content = NULL;
1753     gint64 rsp_size;
1754     GList *ptr;
1755 
1756     pool = find_connection_pool (priv, data->host);
1757     if (!pool) {
1758         seaf_warning ("Failed to create connection pool for host %s.\n", data->host);
1759         return vdata;
1760     }
1761 
1762     conn = connection_pool_get_connection (pool);
1763     if (!conn) {
1764         seaf_warning ("Failed to get connection to host %s.\n", data->host);
1765         return vdata;
1766     }
1767 
1768     curl = conn->curl;
1769 
1770     if (!data->use_fileserver_port)
1771         url = g_strdup_printf ("%s/seafhttp/repo/folder-perm", data->host);
1772     else
1773         url = g_strdup_printf ("%s/repo/folder-perm", data->host);
1774 
1775     req_content = compose_get_folder_perms_request (data->requests);
1776     if (!req_content)
1777         goto out;
1778 
1779     if (http_post (curl, url, NULL, req_content, strlen(req_content),
1780                    &status, &rsp_content, &rsp_size, TRUE, NULL) < 0) {
1781         conn->release = TRUE;
1782         goto out;
1783     }
1784 
1785     if (status == HTTP_OK) {
1786         if (parse_folder_perms (rsp_content, rsp_size, data) < 0)
1787             goto out;
1788         data->success = TRUE;
1789     } else {
1790         seaf_warning ("Bad response code for GET %s: %d.\n", url, status);
1791     }
1792 
1793 out:
1794     for (ptr = data->requests; ptr; ptr = ptr->next)
1795         http_folder_perm_req_free ((HttpFolderPermReq *)ptr->data);
1796     g_list_free (data->requests);
1797 
1798     g_free (url);
1799     g_free (req_content);
1800     g_free (rsp_content);
1801     connection_pool_return_connection (pool, conn);
1802     return vdata;
1803 }
1804 
1805 static void
get_folder_perms_done(void * vdata)1806 get_folder_perms_done (void *vdata)
1807 {
1808     GetFolderPermsData *data = vdata;
1809     HttpFolderPerms cb_data;
1810 
1811     memset (&cb_data, 0, sizeof(cb_data));
1812     cb_data.success = data->success;
1813     cb_data.results = data->results;
1814 
1815     data->callback (&cb_data, data->user_data);
1816 
1817     GList *ptr;
1818     for (ptr = data->results; ptr; ptr = ptr->next)
1819         http_folder_perm_res_free ((HttpFolderPermRes *)ptr->data);
1820     g_list_free (data->results);
1821 
1822     g_free (data->host);
1823     g_free (data);
1824 }
1825 
1826 int
http_tx_manager_get_folder_perms(HttpTxManager * manager,const char * host,gboolean use_fileserver_port,GList * folder_perm_requests,HttpGetFolderPermsCallback callback,void * user_data)1827 http_tx_manager_get_folder_perms (HttpTxManager *manager,
1828                                   const char *host,
1829                                   gboolean use_fileserver_port,
1830                                   GList *folder_perm_requests,
1831                                   HttpGetFolderPermsCallback callback,
1832                                   void *user_data)
1833 {
1834     GetFolderPermsData *data = g_new0 (GetFolderPermsData, 1);
1835 
1836     data->host = g_strdup(host);
1837     data->requests = folder_perm_requests;
1838     data->callback = callback;
1839     data->user_data = user_data;
1840     data->use_fileserver_port = use_fileserver_port;
1841 
1842     if (seaf_job_manager_schedule_job (seaf->job_mgr,
1843                                        get_folder_perms_thread,
1844                                        get_folder_perms_done,
1845                                        data) < 0) {
1846         g_free (data->host);
1847         g_free (data);
1848         return -1;
1849     }
1850 
1851     return 0;
1852 }
1853 
1854 /* Get Locked Files. */
1855 
1856 void
http_locked_files_req_free(HttpLockedFilesReq * req)1857 http_locked_files_req_free (HttpLockedFilesReq *req)
1858 {
1859     if (!req)
1860         return;
1861     g_free (req->token);
1862     g_free (req);
1863 }
1864 
1865 void
http_locked_files_res_free(HttpLockedFilesRes * res)1866 http_locked_files_res_free (HttpLockedFilesRes *res)
1867 {
1868     if (!res)
1869         return;
1870 
1871     g_hash_table_destroy (res->locked_files);
1872     g_free (res);
1873 }
1874 
1875 typedef struct {
1876     char *host;
1877     gboolean use_fileserver_port;
1878     GList *requests;
1879     HttpGetLockedFilesCallback callback;
1880     void *user_data;
1881 
1882     gboolean success;
1883     GList *results;
1884 } GetLockedFilesData;
1885 
1886 static GHashTable *
parse_locked_file_list(json_t * array)1887 parse_locked_file_list (json_t *array)
1888 {
1889     GHashTable *ret = NULL;
1890     size_t n, i;
1891     json_t *obj, *string, *integer;
1892 
1893     ret = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL);
1894     if (!ret) {
1895         return NULL;
1896     }
1897 
1898     n = json_array_size (array);
1899     for (i = 0; i < n; ++i) {
1900         obj = json_array_get (array, i);
1901         string = json_object_get (obj, "path");
1902         if (!string) {
1903             g_hash_table_destroy (ret);
1904             return NULL;
1905         }
1906         integer = json_object_get (obj, "by_me");
1907         if (!integer) {
1908             g_hash_table_destroy (ret);
1909             return NULL;
1910         }
1911         g_hash_table_insert (ret,
1912                              g_strdup(json_string_value(string)),
1913                              (void*)(long)json_integer_value(integer));
1914     }
1915 
1916     return ret;
1917 }
1918 
1919 static int
parse_locked_files(const char * rsp_content,int rsp_size,GetLockedFilesData * data)1920 parse_locked_files (const char *rsp_content, int rsp_size, GetLockedFilesData *data)
1921 {
1922     json_t *array = NULL, *object, *member;
1923     json_error_t jerror;
1924     size_t n;
1925     int i;
1926     GList *results = NULL;
1927     HttpLockedFilesRes *res;
1928     const char *repo_id;
1929     int ret = 0;
1930 
1931     array = json_loadb (rsp_content, rsp_size, 0, &jerror);
1932     if (!array) {
1933         seaf_warning ("Parse response failed: %s.\n", jerror.text);
1934         return -1;
1935     }
1936 
1937     n = json_array_size (array);
1938     for (i = 0; i < n; ++i) {
1939         object = json_array_get (array, i);
1940 
1941         res = g_new0 (HttpLockedFilesRes, 1);
1942 
1943         member = json_object_get (object, "repo_id");
1944         if (!member) {
1945             seaf_warning ("Invalid locked files response format: no repo_id.\n");
1946             ret = -1;
1947             goto out;
1948         }
1949         repo_id = json_string_value(member);
1950         if (strlen(repo_id) != 36) {
1951             seaf_warning ("Invalid locked files response format: invalid repo_id.\n");
1952             ret = -1;
1953             goto out;
1954         }
1955         memcpy (res->repo_id, repo_id, 36);
1956 
1957         member = json_object_get (object, "ts");
1958         if (!member) {
1959             seaf_warning ("Invalid locked files response format: no timestamp.\n");
1960             ret = -1;
1961             goto out;
1962         }
1963         res->timestamp = json_integer_value (member);
1964 
1965         member = json_object_get (object, "locked_files");
1966         if (!member) {
1967             seaf_warning ("Invalid locked files response format: no locked_files.\n");
1968             ret = -1;
1969             goto out;
1970         }
1971 
1972         res->locked_files = parse_locked_file_list (member);
1973         if (res->locked_files == NULL) {
1974             ret = -1;
1975             goto out;
1976         }
1977 
1978         results = g_list_append (results, res);
1979     }
1980 
1981 out:
1982     json_decref (array);
1983 
1984     if (ret < 0) {
1985         g_list_free_full (results, (GDestroyNotify)http_locked_files_res_free);
1986     } else {
1987         data->results = results;
1988     }
1989 
1990     return ret;
1991 }
1992 
1993 static char *
compose_get_locked_files_request(GList * requests)1994 compose_get_locked_files_request (GList *requests)
1995 {
1996     GList *ptr;
1997     HttpLockedFilesReq *req;
1998     json_t *object, *array;
1999     char *req_str = NULL;
2000 
2001     array = json_array ();
2002 
2003     for (ptr = requests; ptr; ptr = ptr->next) {
2004         req = ptr->data;
2005 
2006         object = json_object ();
2007         json_object_set_new (object, "repo_id", json_string(req->repo_id));
2008         json_object_set_new (object, "token", json_string(req->token));
2009         json_object_set_new (object, "ts", json_integer(req->timestamp));
2010 
2011         json_array_append_new (array, object);
2012     }
2013 
2014     req_str = json_dumps (array, 0);
2015     if (!req_str) {
2016         seaf_warning ("Faile to json_dumps.\n");
2017     }
2018 
2019     json_decref (array);
2020     return req_str;
2021 }
2022 
2023 static void *
get_locked_files_thread(void * vdata)2024 get_locked_files_thread (void *vdata)
2025 {
2026     GetLockedFilesData *data = vdata;
2027     HttpTxPriv *priv = seaf->http_tx_mgr->priv;
2028     ConnectionPool *pool;
2029     Connection *conn;
2030     CURL *curl;
2031     char *url;
2032     char *req_content = NULL;
2033     int status;
2034     char *rsp_content = NULL;
2035     gint64 rsp_size;
2036 
2037     pool = find_connection_pool (priv, data->host);
2038     if (!pool) {
2039         seaf_warning ("Failed to create connection pool for host %s.\n", data->host);
2040         return vdata;
2041     }
2042 
2043     conn = connection_pool_get_connection (pool);
2044     if (!conn) {
2045         seaf_warning ("Failed to get connection to host %s.\n", data->host);
2046         return vdata;
2047     }
2048 
2049     curl = conn->curl;
2050 
2051     if (!data->use_fileserver_port)
2052         url = g_strdup_printf ("%s/seafhttp/repo/locked-files", data->host);
2053     else
2054         url = g_strdup_printf ("%s/repo/locked-files", data->host);
2055 
2056     req_content = compose_get_locked_files_request (data->requests);
2057     if (!req_content)
2058         goto out;
2059 
2060     if (http_post (curl, url, NULL, req_content, strlen(req_content),
2061                    &status, &rsp_content, &rsp_size, TRUE, NULL) < 0) {
2062         conn->release = TRUE;
2063         goto out;
2064     }
2065 
2066     if (status == HTTP_OK) {
2067         if (parse_locked_files (rsp_content, rsp_size, data) < 0)
2068             goto out;
2069         data->success = TRUE;
2070     } else {
2071         seaf_warning ("Bad response code for GET %s: %d.\n", url, status);
2072     }
2073 
2074 out:
2075     g_list_free_full (data->requests, (GDestroyNotify)http_locked_files_req_free);
2076 
2077     g_free (url);
2078     g_free (req_content);
2079     g_free (rsp_content);
2080     connection_pool_return_connection (pool, conn);
2081     return vdata;
2082 }
2083 
2084 static void
get_locked_files_done(void * vdata)2085 get_locked_files_done (void *vdata)
2086 {
2087     GetLockedFilesData *data = vdata;
2088     HttpLockedFiles cb_data;
2089 
2090     memset (&cb_data, 0, sizeof(cb_data));
2091     cb_data.success = data->success;
2092     cb_data.results = data->results;
2093 
2094     data->callback (&cb_data, data->user_data);
2095 
2096     g_list_free_full (data->results, (GDestroyNotify)http_locked_files_res_free);
2097 
2098     g_free (data->host);
2099     g_free (data);
2100 }
2101 
2102 int
http_tx_manager_get_locked_files(HttpTxManager * manager,const char * host,gboolean use_fileserver_port,GList * locked_files_requests,HttpGetLockedFilesCallback callback,void * user_data)2103 http_tx_manager_get_locked_files (HttpTxManager *manager,
2104                                   const char *host,
2105                                   gboolean use_fileserver_port,
2106                                   GList *locked_files_requests,
2107                                   HttpGetLockedFilesCallback callback,
2108                                   void *user_data)
2109 {
2110     GetLockedFilesData *data = g_new0 (GetLockedFilesData, 1);
2111 
2112     data->host = g_strdup(host);
2113     data->requests = locked_files_requests;
2114     data->callback = callback;
2115     data->user_data = user_data;
2116     data->use_fileserver_port = use_fileserver_port;
2117 
2118     if (seaf_job_manager_schedule_job (seaf->job_mgr,
2119                                        get_locked_files_thread,
2120                                        get_locked_files_done,
2121                                        data) < 0) {
2122         g_free (data->host);
2123         g_free (data);
2124         return -1;
2125     }
2126 
2127     return 0;
2128 }
2129 
2130 /* Synchronous interfaces for locking/unlocking a file on the server. */
2131 
2132 int
http_tx_manager_lock_file(HttpTxManager * manager,const char * host,gboolean use_fileserver_port,const char * token,const char * repo_id,const char * path)2133 http_tx_manager_lock_file (HttpTxManager *manager,
2134                            const char *host,
2135                            gboolean use_fileserver_port,
2136                            const char *token,
2137                            const char *repo_id,
2138                            const char *path)
2139 {
2140     HttpTxPriv *priv = seaf->http_tx_mgr->priv;
2141     ConnectionPool *pool;
2142     Connection *conn;
2143     CURL *curl;
2144     char *url;
2145     int status;
2146     int ret = 0;
2147 
2148     pool = find_connection_pool (priv, host);
2149     if (!pool) {
2150         seaf_warning ("Failed to create connection pool for host %s.\n", host);
2151         return -1;
2152     }
2153 
2154     conn = connection_pool_get_connection (pool);
2155     if (!conn) {
2156         seaf_warning ("Failed to get connection to host %s.\n", host);
2157         return -1;
2158     }
2159 
2160     curl = conn->curl;
2161 
2162     char *esc_path = g_uri_escape_string(path, NULL, FALSE);
2163     if (!use_fileserver_port)
2164         url = g_strdup_printf ("%s/seafhttp/repo/%s/lock-file?p=%s", host, repo_id, esc_path);
2165     else
2166         url = g_strdup_printf ("%s/repo/%s/lock-file?p=%s", host, repo_id, esc_path);
2167     g_free (esc_path);
2168 
2169     if (http_put (curl, url, token, NULL, 0, NULL, NULL,
2170                   &status, NULL, NULL, TRUE, NULL) < 0) {
2171         conn->release = TRUE;
2172         ret = -1;
2173         goto out;
2174     }
2175 
2176     if (status != HTTP_OK) {
2177         seaf_warning ("Bad response code for PUT %s: %d.\n", url, status);
2178         ret = -1;
2179     }
2180 
2181 out:
2182     g_free (url);
2183     connection_pool_return_connection (pool, conn);
2184     return ret;
2185 }
2186 
2187 int
http_tx_manager_unlock_file(HttpTxManager * manager,const char * host,gboolean use_fileserver_port,const char * token,const char * repo_id,const char * path)2188 http_tx_manager_unlock_file (HttpTxManager *manager,
2189                              const char *host,
2190                              gboolean use_fileserver_port,
2191                              const char *token,
2192                              const char *repo_id,
2193                              const char *path)
2194 {
2195     HttpTxPriv *priv = seaf->http_tx_mgr->priv;
2196     ConnectionPool *pool;
2197     Connection *conn;
2198     CURL *curl;
2199     char *url;
2200     int status;
2201     int ret = 0;
2202 
2203     pool = find_connection_pool (priv, host);
2204     if (!pool) {
2205         seaf_warning ("Failed to create connection pool for host %s.\n", host);
2206         return -1;
2207     }
2208 
2209     conn = connection_pool_get_connection (pool);
2210     if (!conn) {
2211         seaf_warning ("Failed to get connection to host %s.\n", host);
2212         return -1;
2213     }
2214 
2215     curl = conn->curl;
2216 
2217     char *esc_path = g_uri_escape_string(path, NULL, FALSE);
2218     if (!use_fileserver_port)
2219         url = g_strdup_printf ("%s/seafhttp/repo/%s/unlock-file?p=%s", host, repo_id, esc_path);
2220     else
2221         url = g_strdup_printf ("%s/repo/%s/unlock-file?p=%s", host, repo_id, esc_path);
2222     g_free (esc_path);
2223 
2224     if (http_put (curl, url, token, NULL, 0, NULL, NULL,
2225                   &status, NULL, NULL, TRUE, NULL) < 0) {
2226         conn->release = TRUE;
2227         ret = -1;
2228         goto out;
2229     }
2230 
2231     if (status != HTTP_OK) {
2232         seaf_warning ("Bad response code for PUT %s: %d.\n", url, status);
2233         ret = -1;
2234     }
2235 
2236 out:
2237     g_free (url);
2238     connection_pool_return_connection (pool, conn);
2239     return ret;
2240 }
2241 
2242 static char *
repo_id_list_to_json(GList * repo_id_list)2243 repo_id_list_to_json (GList *repo_id_list)
2244 {
2245     json_t *array = json_array();
2246     GList *ptr;
2247     char *repo_id;
2248 
2249     for (ptr = repo_id_list; ptr; ptr = ptr->next) {
2250         repo_id = ptr->data;
2251         json_array_append_new (array, json_string(repo_id));
2252     }
2253 
2254     char *data = json_dumps (array, JSON_COMPACT);
2255     if (!data) {
2256         seaf_warning ("Failed to dump json.\n");
2257         json_decref (array);
2258         return NULL;
2259     }
2260 
2261     json_decref (array);
2262     return data;
2263 }
2264 
2265 static GHashTable *
repo_head_commit_map_from_json(const char * json_str,gint64 len)2266 repo_head_commit_map_from_json (const char *json_str, gint64 len)
2267 {
2268     json_t *object;
2269     json_error_t jerror;
2270     GHashTable *ret;
2271 
2272     object = json_loadb (json_str, (size_t)len, 0, &jerror);
2273     if (!object) {
2274         seaf_warning ("Failed to load json: %s\n", jerror.text);
2275         return NULL;
2276     }
2277 
2278     ret = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, g_free);
2279 
2280     void *iter = json_object_iter (object);
2281     const char *key;
2282     json_t *value;
2283     while (iter) {
2284         key = json_object_iter_key (iter);
2285         value = json_object_iter_value (iter);
2286         if (json_typeof(value) != JSON_STRING) {
2287             seaf_warning ("Bad json object format when parsing head commit id map.\n");
2288             g_hash_table_destroy (ret);
2289             goto out;
2290         }
2291         g_hash_table_replace (ret, g_strdup (key), g_strdup(json_string_value(value)));
2292         iter = json_object_iter_next (object, iter);
2293     }
2294 
2295 out:
2296     json_decref (object);
2297     return ret;
2298 }
2299 
2300 GHashTable *
http_tx_manager_get_head_commit_ids(HttpTxManager * manager,const char * host,gboolean use_fileserver_port,GList * repo_id_list)2301 http_tx_manager_get_head_commit_ids (HttpTxManager *manager,
2302                                      const char *host,
2303                                      gboolean use_fileserver_port,
2304                                      GList *repo_id_list)
2305 {
2306     HttpTxPriv *priv = seaf->http_tx_mgr->priv;
2307     ConnectionPool *pool;
2308     Connection *conn;
2309     CURL *curl;
2310     char *url;
2311     char *req_content = NULL;
2312     gint64 req_size;
2313     int status;
2314     char *rsp_content = NULL;
2315     gint64 rsp_size;
2316     GHashTable *map = NULL;
2317 
2318     pool = find_connection_pool (priv, host);
2319     if (!pool) {
2320         seaf_warning ("Failed to create connection pool for host %s.\n", host);
2321         return NULL;
2322     }
2323 
2324     conn = connection_pool_get_connection (pool);
2325     if (!conn) {
2326         seaf_warning ("Failed to get connection to host %s.\n", host);
2327         return NULL;
2328     }
2329 
2330     curl = conn->curl;
2331 
2332     if (!use_fileserver_port)
2333         url = g_strdup_printf ("%s/seafhttp/repo/head-commits-multi/", host);
2334     else
2335         url = g_strdup_printf ("%s/repo/head-commits-multi/", host);
2336 
2337     req_content = repo_id_list_to_json (repo_id_list);
2338     req_size = strlen(req_content);
2339 
2340     if (http_post (curl, url, NULL, req_content, req_size,
2341                    &status, &rsp_content, &rsp_size, TRUE, NULL) < 0) {
2342         conn->release = TRUE;
2343         goto out;
2344     }
2345 
2346     if (status != HTTP_OK) {
2347         seaf_warning ("Bad response code for POST %s: %d\n", url, status);
2348         goto out;
2349     }
2350 
2351     map = repo_head_commit_map_from_json (rsp_content, rsp_size);
2352 
2353 out:
2354     g_free (url);
2355     connection_pool_return_connection (pool, conn);
2356     /* returned by json_dumps(). */
2357     free (req_content);
2358     g_free (rsp_content);
2359     return map;
2360 }
2361 
2362 static gboolean
remove_task_help(gpointer key,gpointer value,gpointer user_data)2363 remove_task_help (gpointer key, gpointer value, gpointer user_data)
2364 {
2365     HttpTxTask *task = value;
2366     const char *repo_id = user_data;
2367 
2368     if (strcmp(task->repo_id, repo_id) != 0)
2369         return FALSE;
2370 
2371     return TRUE;
2372 }
2373 
2374 static void
clean_tasks_for_repo(HttpTxManager * manager,const char * repo_id)2375 clean_tasks_for_repo (HttpTxManager *manager, const char *repo_id)
2376 {
2377     g_hash_table_foreach_remove (manager->priv->download_tasks,
2378                                  remove_task_help, (gpointer)repo_id);
2379 
2380     g_hash_table_foreach_remove (manager->priv->upload_tasks,
2381                                  remove_task_help, (gpointer)repo_id);
2382 }
2383 
2384 static int
check_permission(HttpTxTask * task,Connection * conn)2385 check_permission (HttpTxTask *task, Connection *conn)
2386 {
2387     CURL *curl;
2388     char *url;
2389     int status;
2390     char *rsp_content = NULL;
2391     gint64 rsp_size;
2392     int ret = 0;
2393     json_t *rsp_obj = NULL, *reason = NULL, *unsyncable_path = NULL;
2394     const char *reason_str = NULL, *unsyncable_path_str = NULL;
2395     json_error_t jerror;
2396 
2397     curl = conn->curl;
2398 
2399     const char *type = (task->type == HTTP_TASK_TYPE_DOWNLOAD) ? "download" : "upload";
2400     const char *url_prefix = (task->use_fileserver_port) ? "" : "seafhttp/";
2401     if (seaf->client_name) {
2402         char *client_name = g_uri_escape_string (seaf->client_name,
2403                                                  NULL, FALSE);
2404         url = g_strdup_printf ("%s/%srepo/%s/permission-check/?op=%s"
2405                                "&client_id=%s&client_name=%s",
2406                                task->host, url_prefix, task->repo_id, type,
2407                                seaf->client_id, client_name);
2408         g_free (client_name);
2409     } else {
2410         url = g_strdup_printf ("%s/%srepo/%s/permission-check/?op=%s",
2411                                task->host, url_prefix, task->repo_id, type);
2412     }
2413 
2414     int curl_error;
2415     if (http_get (curl, url, task->token, &status, &rsp_content, &rsp_size, NULL, NULL, TRUE, &curl_error) < 0) {
2416         conn->release = TRUE;
2417         handle_curl_errors (task, curl_error);
2418         ret = -1;
2419         goto out;
2420     }
2421 
2422     if (status != HTTP_OK) {
2423         seaf_warning ("Bad response code for GET %s: %d.\n", url, status);
2424 
2425         if (status != HTTP_FORBIDDEN || !rsp_content) {
2426             handle_http_errors (task, status);
2427             ret = -1;
2428             goto out;
2429         }
2430 
2431         rsp_obj = json_loadb (rsp_content, rsp_size, 0 ,&jerror);
2432         if (!rsp_obj) {
2433             seaf_warning ("Parse check permission response failed: %s.\n", jerror.text);
2434             handle_http_errors (task, status);
2435             json_decref (rsp_obj);
2436             ret = -1;
2437             goto out;
2438         }
2439 
2440         reason = json_object_get (rsp_obj, "reason");
2441         if (!reason) {
2442             handle_http_errors (task, status);
2443             json_decref (rsp_obj);
2444             ret = -1;
2445             goto out;
2446         }
2447 
2448         reason_str = json_string_value (reason);
2449         if (g_strcmp0 (reason_str, "no write permission") == 0) {
2450             task->error = SYNC_ERROR_ID_NO_WRITE_PERMISSION;
2451         } else if (g_strcmp0 (reason_str, "unsyncable share permission") == 0) {
2452             task->error = SYNC_ERROR_ID_PERM_NOT_SYNCABLE;
2453 
2454             unsyncable_path = json_object_get (rsp_obj, "unsyncable_path");
2455             if (!unsyncable_path) {
2456                 json_decref (rsp_obj);
2457                 ret = -1;
2458                 goto out;
2459             }
2460 
2461             unsyncable_path_str = json_string_value (unsyncable_path);
2462             if (unsyncable_path_str)
2463                 seaf_repo_manager_record_sync_error (task->repo_id, task->repo_name,
2464                                                      unsyncable_path_str,
2465                                                      SYNC_ERROR_ID_PERM_NOT_SYNCABLE);
2466         } else {
2467             task->error = SYNC_ERROR_ID_ACCESS_DENIED;
2468         }
2469 
2470         ret = -1;
2471     }
2472 
2473 out:
2474     g_free (url);
2475     g_free (rsp_content);
2476     curl_easy_reset (curl);
2477 
2478     return ret;
2479 }
2480 
2481 /* Upload. */
2482 
2483 static void *http_upload_thread (void *vdata);
2484 static void http_upload_done (void *vdata);
2485 
2486 int
http_tx_manager_add_upload(HttpTxManager * manager,const char * repo_id,int repo_version,const char * host,const char * token,int protocol_version,gboolean use_fileserver_port,GError ** error)2487 http_tx_manager_add_upload (HttpTxManager *manager,
2488                             const char *repo_id,
2489                             int repo_version,
2490                             const char *host,
2491                             const char *token,
2492                             int protocol_version,
2493                             gboolean use_fileserver_port,
2494                             GError **error)
2495 {
2496     HttpTxTask *task;
2497     SeafRepo *repo;
2498 
2499     if (!repo_id || !token || !host) {
2500         g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_BAD_ARGS, "Empty argument(s)");
2501         return -1;
2502     }
2503 
2504     repo = seaf_repo_manager_get_repo (seaf->repo_mgr, repo_id);
2505     if (!repo) {
2506         g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_BAD_ARGS, "Repo not found");
2507         return -1;
2508     }
2509 
2510     clean_tasks_for_repo (manager, repo_id);
2511 
2512     task = http_tx_task_new (manager, repo_id, repo_version,
2513                              HTTP_TASK_TYPE_UPLOAD, FALSE,
2514                              host, token, NULL, NULL);
2515 
2516     task->protocol_version = protocol_version;
2517 
2518     task->state = HTTP_TASK_STATE_NORMAL;
2519 
2520     task->use_fileserver_port = use_fileserver_port;
2521 
2522     task->repo_name = g_strdup(repo->name);
2523 
2524     g_hash_table_insert (manager->priv->upload_tasks,
2525                          g_strdup(repo_id),
2526                          task);
2527 
2528     if (seaf_job_manager_schedule_job (seaf->job_mgr,
2529                                        http_upload_thread,
2530                                        http_upload_done,
2531                                        task) < 0) {
2532         g_hash_table_remove (manager->priv->upload_tasks, repo_id);
2533         return -1;
2534     }
2535 
2536     return 0;
2537 }
2538 
2539 static gboolean
dirent_same(SeafDirent * dent1,SeafDirent * dent2)2540 dirent_same (SeafDirent *dent1, SeafDirent *dent2)
2541 {
2542     return (strcmp(dent1->id, dent2->id) == 0 &&
2543             dent1->mode == dent2->mode &&
2544             dent1->mtime == dent2->mtime);
2545 }
2546 
2547 typedef struct {
2548     HttpTxTask *task;
2549     gint64 delta;
2550     GHashTable *active_paths;
2551 } CalcQuotaDeltaData;
2552 
2553 static int
check_quota_and_active_paths_diff_files(int n,const char * basedir,SeafDirent * files[],void * vdata)2554 check_quota_and_active_paths_diff_files (int n, const char *basedir,
2555                                          SeafDirent *files[], void *vdata)
2556 {
2557     CalcQuotaDeltaData *data = vdata;
2558     SeafDirent *file1 = files[0];
2559     SeafDirent *file2 = files[1];
2560     gint64 size1, size2;
2561     char *path;
2562 
2563     if (file1 && file2) {
2564         size1 = file1->size;
2565         size2 = file2->size;
2566         data->delta += (size1 - size2);
2567 
2568         if (!dirent_same (file1, file2)) {
2569             path = g_strconcat(basedir, file1->name, NULL);
2570             g_hash_table_replace (data->active_paths, path, (void*)(long)S_IFREG);
2571         }
2572     } else if (file1 && !file2) {
2573         data->delta += file1->size;
2574 
2575         path = g_strconcat (basedir, file1->name, NULL);
2576         g_hash_table_replace (data->active_paths, path, (void*)(long)S_IFREG);
2577     } else if (!file1 && file2) {
2578         data->delta -= file2->size;
2579     }
2580 
2581     return 0;
2582 }
2583 
2584 static int
check_quota_and_active_paths_diff_dirs(int n,const char * basedir,SeafDirent * dirs[],void * vdata,gboolean * recurse)2585 check_quota_and_active_paths_diff_dirs (int n, const char *basedir,
2586                                         SeafDirent *dirs[], void *vdata,
2587                                         gboolean *recurse)
2588 {
2589     CalcQuotaDeltaData *data = vdata;
2590     SeafDirent *dir1 = dirs[0];
2591     SeafDirent *dir2 = dirs[1];
2592     char *path;
2593 
2594     /* When a new empty dir is created, or a dir became empty. */
2595     if ((!dir2 && dir1 && strcmp(dir1->id, EMPTY_SHA1) == 0) ||
2596 	(dir2 && dir1 && strcmp(dir1->id, EMPTY_SHA1) == 0 && strcmp(dir2->id, EMPTY_SHA1) != 0)) {
2597         path = g_strconcat (basedir, dir1->name, NULL);
2598         g_hash_table_replace (data->active_paths, path, (void*)(long)S_IFDIR);
2599     }
2600 
2601     return 0;
2602 }
2603 
2604 static int
calculate_upload_size_delta_and_active_paths(HttpTxTask * task,gint64 * delta,GHashTable * active_paths)2605 calculate_upload_size_delta_and_active_paths (HttpTxTask *task,
2606                                               gint64 *delta,
2607                                               GHashTable *active_paths)
2608 {
2609     int ret = 0;
2610     SeafBranch *local = NULL, *master = NULL;
2611     SeafCommit *local_head = NULL, *master_head = NULL;
2612 
2613     local = seaf_branch_manager_get_branch (seaf->branch_mgr, task->repo_id, "local");
2614     if (!local) {
2615         seaf_warning ("Branch local not found for repo %.8s.\n", task->repo_id);
2616         ret = -1;
2617         goto out;
2618     }
2619     master = seaf_branch_manager_get_branch (seaf->branch_mgr, task->repo_id, "master");
2620     if (!master) {
2621         seaf_warning ("Branch master not found for repo %.8s.\n", task->repo_id);
2622         ret = -1;
2623         goto out;
2624     }
2625 
2626     local_head = seaf_commit_manager_get_commit (seaf->commit_mgr,
2627                                                  task->repo_id, task->repo_version,
2628                                                  local->commit_id);
2629     if (!local_head) {
2630         seaf_warning ("Local head commit not found for repo %.8s.\n",
2631                       task->repo_id);
2632         ret = -1;
2633         goto out;
2634     }
2635     master_head = seaf_commit_manager_get_commit (seaf->commit_mgr,
2636                                                  task->repo_id, task->repo_version,
2637                                                  master->commit_id);
2638     if (!master_head) {
2639         seaf_warning ("Master head commit not found for repo %.8s.\n",
2640                       task->repo_id);
2641         ret = -1;
2642         goto out;
2643     }
2644 
2645     CalcQuotaDeltaData data;
2646     memset (&data, 0, sizeof(data));
2647     data.task = task;
2648     data.active_paths = active_paths;
2649 
2650     DiffOptions opts;
2651     memset (&opts, 0, sizeof(opts));
2652     memcpy (opts.store_id, task->repo_id, 36);
2653     opts.version = task->repo_version;
2654     opts.file_cb = check_quota_and_active_paths_diff_files;
2655     opts.dir_cb = check_quota_and_active_paths_diff_dirs;
2656     opts.data = &data;
2657 
2658     const char *trees[2];
2659     trees[0] = local_head->root_id;
2660     trees[1] = master_head->root_id;
2661     if (diff_trees (2, trees, &opts) < 0) {
2662         seaf_warning ("Failed to diff local and master head for repo %.8s.\n",
2663                       task->repo_id);
2664         ret = -1;
2665         goto out;
2666     }
2667 
2668     *delta = data.delta;
2669 
2670 out:
2671     seaf_branch_unref (local);
2672     seaf_branch_unref (master);
2673     seaf_commit_unref (local_head);
2674     seaf_commit_unref (master_head);
2675 
2676     return ret;
2677 }
2678 
2679 static int
check_quota(HttpTxTask * task,Connection * conn,gint64 delta)2680 check_quota (HttpTxTask *task, Connection *conn, gint64 delta)
2681 {
2682     CURL *curl;
2683     char *url;
2684     int status;
2685     int ret = 0;
2686 
2687     curl = conn->curl;
2688 
2689     if (!task->use_fileserver_port)
2690         url = g_strdup_printf ("%s/seafhttp/repo/%s/quota-check/?delta=%"G_GINT64_FORMAT"",
2691                                task->host, task->repo_id, delta);
2692     else
2693         url = g_strdup_printf ("%s/repo/%s/quota-check/?delta=%"G_GINT64_FORMAT"",
2694                                task->host, task->repo_id, delta);
2695 
2696     int curl_error;
2697     if (http_get (curl, url, task->token, &status, NULL, NULL, NULL, NULL, TRUE, &curl_error) < 0) {
2698         conn->release = TRUE;
2699         handle_curl_errors (task, curl_error);
2700         ret = -1;
2701         goto out;
2702     }
2703 
2704     if (status != HTTP_OK) {
2705         seaf_warning ("Bad response code for GET %s: %d.\n", url, status);
2706         handle_http_errors (task, status);
2707         ret = -1;
2708     }
2709 
2710 out:
2711     g_free (url);
2712     curl_easy_reset (curl);
2713 
2714     return ret;
2715 }
2716 
2717 static int
send_commit_object(HttpTxTask * task,Connection * conn)2718 send_commit_object (HttpTxTask *task, Connection *conn)
2719 {
2720     CURL *curl;
2721     char *url;
2722     int status;
2723     char *data;
2724     int len;
2725     int ret = 0;
2726 
2727     if (seaf_obj_store_read_obj (seaf->commit_mgr->obj_store,
2728                                  task->repo_id, task->repo_version,
2729                                  task->head, (void**)&data, &len) < 0) {
2730         seaf_warning ("Failed to read commit %s.\n", task->head);
2731         task->error = SYNC_ERROR_ID_LOCAL_DATA_CORRUPT;
2732         return -1;
2733     }
2734 
2735     curl = conn->curl;
2736 
2737     if (!task->use_fileserver_port)
2738         url = g_strdup_printf ("%s/seafhttp/repo/%s/commit/%s",
2739                                task->host, task->repo_id, task->head);
2740     else
2741         url = g_strdup_printf ("%s/repo/%s/commit/%s",
2742                                task->host, task->repo_id, task->head);
2743 
2744     int curl_error;
2745     if (http_put (curl, url, task->token,
2746                   data, len,
2747                   NULL, NULL,
2748                   &status, NULL, NULL, TRUE, &curl_error) < 0) {
2749         conn->release = TRUE;
2750         handle_curl_errors (task, curl_error);
2751         ret = -1;
2752         goto out;
2753     }
2754 
2755     if (status != HTTP_OK) {
2756         seaf_warning ("Bad response code for PUT %s: %d.\n", url, status);
2757         handle_http_errors (task, status);
2758         ret = -1;
2759     }
2760 
2761 out:
2762     g_free (url);
2763     curl_easy_reset (curl);
2764     g_free (data);
2765 
2766     return ret;
2767 }
2768 
2769 typedef struct {
2770     GList **pret;
2771     GHashTable *checked_objs;
2772 } CalcFsListData;
2773 
2774 static int
collect_file_ids(int n,const char * basedir,SeafDirent * files[],void * vdata)2775 collect_file_ids (int n, const char *basedir, SeafDirent *files[], void *vdata)
2776 {
2777     SeafDirent *file1 = files[0];
2778     SeafDirent *file2 = files[1];
2779     CalcFsListData *data = vdata;
2780     GList **pret = data->pret;
2781     int dummy;
2782 
2783     if (!file1 || strcmp (file1->id, EMPTY_SHA1) == 0)
2784         return 0;
2785 
2786     if (g_hash_table_lookup (data->checked_objs, file1->id))
2787         return 0;
2788 
2789     if (!file2 || strcmp (file1->id, file2->id) != 0) {
2790         *pret = g_list_prepend (*pret, g_strdup(file1->id));
2791         g_hash_table_insert (data->checked_objs, g_strdup(file1->id), &dummy);
2792     }
2793 
2794     return 0;
2795 }
2796 
2797 static int
collect_dir_ids(int n,const char * basedir,SeafDirent * dirs[],void * vdata,gboolean * recurse)2798 collect_dir_ids (int n, const char *basedir, SeafDirent *dirs[], void *vdata,
2799                  gboolean *recurse)
2800 {
2801     SeafDirent *dir1 = dirs[0];
2802     SeafDirent *dir2 = dirs[1];
2803     CalcFsListData *data = vdata;
2804     GList **pret = data->pret;
2805     int dummy;
2806 
2807     if (!dir1 || strcmp (dir1->id, EMPTY_SHA1) == 0)
2808         return 0;
2809 
2810     if (g_hash_table_lookup (data->checked_objs, dir1->id))
2811         return 0;
2812 
2813     if (!dir2 || strcmp (dir1->id, dir2->id) != 0) {
2814         *pret = g_list_prepend (*pret, g_strdup(dir1->id));
2815         g_hash_table_insert (data->checked_objs, g_strdup(dir1->id), &dummy);
2816     }
2817 
2818     return 0;
2819 }
2820 
2821 static GList *
calculate_send_fs_object_list(HttpTxTask * task)2822 calculate_send_fs_object_list (HttpTxTask *task)
2823 {
2824     GList *ret = NULL;
2825     SeafBranch *local = NULL, *master = NULL;
2826     SeafCommit *local_head = NULL, *master_head = NULL;
2827     GList *ptr;
2828 
2829     local = seaf_branch_manager_get_branch (seaf->branch_mgr, task->repo_id, "local");
2830     if (!local) {
2831         seaf_warning ("Branch local not found for repo %.8s.\n", task->repo_id);
2832         goto out;
2833     }
2834     master = seaf_branch_manager_get_branch (seaf->branch_mgr, task->repo_id, "master");
2835     if (!master) {
2836         seaf_warning ("Branch master not found for repo %.8s.\n", task->repo_id);
2837         goto out;
2838     }
2839 
2840     local_head = seaf_commit_manager_get_commit (seaf->commit_mgr,
2841                                                  task->repo_id, task->repo_version,
2842                                                  local->commit_id);
2843     if (!local_head) {
2844         seaf_warning ("Local head commit not found for repo %.8s.\n",
2845                       task->repo_id);
2846         goto out;
2847     }
2848     master_head = seaf_commit_manager_get_commit (seaf->commit_mgr,
2849                                                   task->repo_id, task->repo_version,
2850                                                   master->commit_id);
2851     if (!master_head) {
2852         seaf_warning ("Master head commit not found for repo %.8s.\n",
2853                       task->repo_id);
2854         goto out;
2855     }
2856 
2857     /* Diff won't traverse the root object itself. */
2858     if (strcmp (local_head->root_id, master_head->root_id) != 0)
2859         ret = g_list_prepend (ret, g_strdup(local_head->root_id));
2860 
2861     CalcFsListData *data = g_new0(CalcFsListData, 1);
2862     data->pret = &ret;
2863     data->checked_objs = g_hash_table_new_full (g_str_hash, g_str_equal,
2864                                                 g_free, NULL);
2865 
2866     DiffOptions opts;
2867     memset (&opts, 0, sizeof(opts));
2868     memcpy (opts.store_id, task->repo_id, 36);
2869     opts.version = task->repo_version;
2870     opts.file_cb = collect_file_ids;
2871     opts.dir_cb = collect_dir_ids;
2872     opts.data = data;
2873 
2874     const char *trees[2];
2875     trees[0] = local_head->root_id;
2876     trees[1] = master_head->root_id;
2877     if (diff_trees (2, trees, &opts) < 0) {
2878         seaf_warning ("Failed to diff local and master head for repo %.8s.\n",
2879                       task->repo_id);
2880         for (ptr = ret; ptr; ptr = ptr->next)
2881             g_free (ptr->data);
2882         ret = NULL;
2883     }
2884 
2885     g_hash_table_destroy (data->checked_objs);
2886     g_free (data);
2887 
2888 out:
2889     seaf_branch_unref (local);
2890     seaf_branch_unref (master);
2891     seaf_commit_unref (local_head);
2892     seaf_commit_unref (master_head);
2893     return ret;
2894 }
2895 
2896 #define ID_LIST_SEGMENT_N 1000
2897 
2898 static int
upload_check_id_list_segment(HttpTxTask * task,Connection * conn,const char * url,GList ** send_id_list,GList ** recv_id_list)2899 upload_check_id_list_segment (HttpTxTask *task, Connection *conn, const char *url,
2900                               GList **send_id_list, GList **recv_id_list)
2901 {
2902     json_t *array;
2903     json_error_t jerror;
2904     char *obj_id;
2905     int n_sent = 0;
2906     char *data = NULL;
2907     int len;
2908     CURL *curl;
2909     int status;
2910     char *rsp_content = NULL;
2911     gint64 rsp_size;
2912     int ret = 0;
2913 
2914     /* Convert object id list to JSON format. */
2915 
2916     array = json_array ();
2917 
2918     while (*send_id_list != NULL) {
2919         obj_id = (*send_id_list)->data;
2920         json_array_append_new (array, json_string(obj_id));
2921 
2922         *send_id_list = g_list_delete_link (*send_id_list, *send_id_list);
2923         g_free (obj_id);
2924 
2925         if (++n_sent >= ID_LIST_SEGMENT_N)
2926             break;
2927     }
2928 
2929     seaf_debug ("Check %d ids for %s:%s.\n",
2930                 n_sent, task->host, task->repo_id);
2931 
2932     data = json_dumps (array, 0);
2933     len = strlen(data);
2934     json_decref (array);
2935 
2936     /* Send fs object id list. */
2937 
2938     curl = conn->curl;
2939 
2940     int curl_error;
2941     if (http_post (curl, url, task->token,
2942                    data, len,
2943                    &status, &rsp_content, &rsp_size, TRUE, &curl_error) < 0) {
2944         conn->release = TRUE;
2945         handle_curl_errors (task, curl_error);
2946         ret = -1;
2947         goto out;
2948     }
2949 
2950     if (status != HTTP_OK) {
2951         seaf_warning ("Bad response code for POST %s: %d.\n", url, status);
2952         handle_http_errors (task, status);
2953         ret = -1;
2954         goto out;
2955     }
2956 
2957     /* Process needed object id list. */
2958 
2959     array = json_loadb (rsp_content, rsp_size, 0, &jerror);
2960     if (!array) {
2961         seaf_warning ("Invalid JSON response from the server: %s.\n", jerror.text);
2962         task->error = SYNC_ERROR_ID_SERVER;
2963         ret = -1;
2964         goto out;
2965     }
2966 
2967     int i;
2968     size_t n = json_array_size (array);
2969     json_t *str;
2970 
2971     seaf_debug ("%lu objects or blocks are needed for %s:%s.\n",
2972                 n, task->host, task->repo_id);
2973 
2974     for (i = 0; i < n; ++i) {
2975         str = json_array_get (array, i);
2976         if (!str) {
2977             seaf_warning ("Invalid JSON response from the server.\n");
2978             json_decref (array);
2979             ret = -1;
2980             goto out;
2981         }
2982 
2983         *recv_id_list = g_list_prepend (*recv_id_list, g_strdup(json_string_value(str)));
2984     }
2985 
2986     json_decref (array);
2987 
2988 out:
2989     curl_easy_reset (curl);
2990     g_free (data);
2991     g_free (rsp_content);
2992 
2993     return ret;
2994 }
2995 
2996 #define MAX_OBJECT_PACK_SIZE (1 << 20) /* 1MB */
2997 
2998 typedef struct {
2999     char obj_id[40];
3000     guint32 obj_size;
3001     guint8 object[0];
3002 } __attribute__((__packed__)) ObjectHeader;
3003 
3004 static int
send_fs_objects(HttpTxTask * task,Connection * conn,GList ** send_fs_list)3005 send_fs_objects (HttpTxTask *task, Connection *conn, GList **send_fs_list)
3006 {
3007     struct evbuffer *buf;
3008     ObjectHeader hdr;
3009     char *obj_id;
3010     char *data;
3011     int len;
3012     int total_size;
3013     unsigned char *package;
3014     CURL *curl;
3015     char *url = NULL;
3016     int status;
3017     int ret = 0;
3018     int n_sent = 0;
3019 
3020     buf = evbuffer_new ();
3021     curl = conn->curl;
3022 
3023     while (*send_fs_list != NULL) {
3024         obj_id = (*send_fs_list)->data;
3025 
3026         if (seaf_obj_store_read_obj (seaf->fs_mgr->obj_store,
3027                                      task->repo_id, task->repo_version,
3028                                      obj_id, (void **)&data, &len) < 0) {
3029             seaf_warning ("Failed to read fs object %s in repo %s.\n",
3030                           obj_id, task->repo_id);
3031             task->error = SYNC_ERROR_ID_LOCAL_DATA_CORRUPT;
3032             ret = -1;
3033             goto out;
3034         }
3035 
3036         ++n_sent;
3037 
3038         memcpy (hdr.obj_id, obj_id, 40);
3039         hdr.obj_size = htonl (len);
3040 
3041         evbuffer_add (buf, &hdr, sizeof(hdr));
3042         evbuffer_add (buf, data, len);
3043 
3044         g_free (data);
3045         *send_fs_list = g_list_delete_link (*send_fs_list, *send_fs_list);
3046         g_free (obj_id);
3047 
3048         total_size = evbuffer_get_length (buf);
3049         if (total_size >= MAX_OBJECT_PACK_SIZE)
3050             break;
3051     }
3052 
3053     seaf_debug ("Sending %d fs objects for %s:%s.\n",
3054                 n_sent, task->host, task->repo_id);
3055 
3056     package = evbuffer_pullup (buf, -1);
3057 
3058     if (!task->use_fileserver_port)
3059         url = g_strdup_printf ("%s/seafhttp/repo/%s/recv-fs/",
3060                                task->host, task->repo_id);
3061     else
3062         url = g_strdup_printf ("%s/repo/%s/recv-fs/",
3063                                task->host, task->repo_id);
3064 
3065     int curl_error;
3066     if (http_post (curl, url, task->token,
3067                    (char *)package, evbuffer_get_length(buf),
3068                    &status, NULL, NULL, TRUE, &curl_error) < 0) {
3069         conn->release = TRUE;
3070         handle_curl_errors (task, curl_error);
3071         ret = -1;
3072         goto out;
3073     }
3074 
3075     if (status != HTTP_OK) {
3076         seaf_warning ("Bad response code for POST %s: %d.\n", url, status);
3077         handle_http_errors (task, status);
3078         ret = -1;
3079     }
3080 
3081 out:
3082     g_free (url);
3083     evbuffer_free (buf);
3084     curl_easy_reset (curl);
3085 
3086     return ret;
3087 }
3088 
3089 typedef struct {
3090     GList *block_list;
3091     GHashTable *added_blocks;
3092     HttpTxTask *task;
3093 } CalcBlockListData;
3094 
3095 static void
add_to_block_list(GList ** block_list,GHashTable * added_blocks,const char * block_id)3096 add_to_block_list (GList **block_list, GHashTable *added_blocks, const char *block_id)
3097 {
3098     int dummy;
3099 
3100     if (g_hash_table_lookup (added_blocks, block_id))
3101         return;
3102 
3103     *block_list = g_list_prepend (*block_list, g_strdup(block_id));
3104     g_hash_table_insert (added_blocks, g_strdup(block_id), &dummy);
3105 }
3106 
3107 static int
block_list_diff_files(int n,const char * basedir,SeafDirent * files[],void * vdata)3108 block_list_diff_files (int n, const char *basedir, SeafDirent *files[], void *vdata)
3109 {
3110     SeafDirent *file1 = files[0];
3111     SeafDirent *file2 = files[1];
3112     CalcBlockListData *data = vdata;
3113     HttpTxTask *task = data->task;
3114     Seafile *f1 = NULL, *f2 = NULL;
3115     int i;
3116 
3117     if (file1 && strcmp (file1->id, EMPTY_SHA1) != 0) {
3118         if (!file2) {
3119             f1 = seaf_fs_manager_get_seafile (seaf->fs_mgr,
3120                                               task->repo_id, task->repo_version,
3121                                               file1->id);
3122             if (!f1) {
3123                 seaf_warning ("Failed to get seafile object %s:%s.\n",
3124                               task->repo_id, file1->id);
3125                 return -1;
3126             }
3127             for (i = 0; i < f1->n_blocks; ++i)
3128                 add_to_block_list (&data->block_list, data->added_blocks,
3129                                    f1->blk_sha1s[i]);
3130             seafile_unref (f1);
3131         } else if (strcmp (file1->id, file2->id) != 0) {
3132             f1 = seaf_fs_manager_get_seafile (seaf->fs_mgr,
3133                                               task->repo_id, task->repo_version,
3134                                               file1->id);
3135             if (!f1) {
3136                 seaf_warning ("Failed to get seafile object %s:%s.\n",
3137                               task->repo_id, file1->id);
3138                 return -1;
3139             }
3140             f2 = seaf_fs_manager_get_seafile (seaf->fs_mgr,
3141                                               task->repo_id, task->repo_version,
3142                                               file2->id);
3143             if (!f2) {
3144                 seafile_unref (f1);
3145                 seaf_warning ("Failed to get seafile object %s:%s.\n",
3146                               task->repo_id, file2->id);
3147                 return -1;
3148             }
3149 
3150             GHashTable *h = g_hash_table_new (g_str_hash, g_str_equal);
3151             int dummy;
3152             for (i = 0; i < f2->n_blocks; ++i)
3153                 g_hash_table_insert (h, f2->blk_sha1s[i], &dummy);
3154 
3155             for (i = 0; i < f1->n_blocks; ++i)
3156                 if (!g_hash_table_lookup (h, f1->blk_sha1s[i]))
3157                     add_to_block_list (&data->block_list, data->added_blocks,
3158                                        f1->blk_sha1s[i]);
3159 
3160             seafile_unref (f1);
3161             seafile_unref (f2);
3162             g_hash_table_destroy (h);
3163         }
3164     }
3165 
3166     return 0;
3167 }
3168 
3169 static int
block_list_diff_dirs(int n,const char * basedir,SeafDirent * dirs[],void * data,gboolean * recurse)3170 block_list_diff_dirs (int n, const char *basedir, SeafDirent *dirs[], void *data,
3171                       gboolean *recurse)
3172 {
3173     /* Do nothing */
3174     return 0;
3175 }
3176 
3177 static int
calculate_block_list(HttpTxTask * task,GList ** plist)3178 calculate_block_list (HttpTxTask *task, GList **plist)
3179 {
3180     int ret = 0;
3181     SeafBranch *local = NULL, *master = NULL;
3182     SeafCommit *local_head = NULL, *master_head = NULL;
3183 
3184     local = seaf_branch_manager_get_branch (seaf->branch_mgr, task->repo_id, "local");
3185     if (!local) {
3186         seaf_warning ("Branch local not found for repo %.8s.\n", task->repo_id);
3187         ret = -1;
3188         goto out;
3189     }
3190     master = seaf_branch_manager_get_branch (seaf->branch_mgr, task->repo_id, "master");
3191     if (!master) {
3192         seaf_warning ("Branch master not found for repo %.8s.\n", task->repo_id);
3193         ret = -1;
3194         goto out;
3195     }
3196 
3197     local_head = seaf_commit_manager_get_commit (seaf->commit_mgr,
3198                                                  task->repo_id, task->repo_version,
3199                                                  local->commit_id);
3200     if (!local_head) {
3201         seaf_warning ("Local head commit not found for repo %.8s.\n",
3202                       task->repo_id);
3203         ret = -1;
3204         goto out;
3205     }
3206     master_head = seaf_commit_manager_get_commit (seaf->commit_mgr,
3207                                                  task->repo_id, task->repo_version,
3208                                                  master->commit_id);
3209     if (!master_head) {
3210         seaf_warning ("Master head commit not found for repo %.8s.\n",
3211                       task->repo_id);
3212         ret = -1;
3213         goto out;
3214     }
3215 
3216     CalcBlockListData data;
3217     memset (&data, 0, sizeof(data));
3218     data.added_blocks = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL);
3219     data.task = task;
3220 
3221     DiffOptions opts;
3222     memset (&opts, 0, sizeof(opts));
3223     memcpy (opts.store_id, task->repo_id, 36);
3224     opts.version = task->repo_version;
3225     opts.file_cb = block_list_diff_files;
3226     opts.dir_cb = block_list_diff_dirs;
3227     opts.data = &data;
3228 
3229     const char *trees[2];
3230     trees[0] = local_head->root_id;
3231     trees[1] = master_head->root_id;
3232     if (diff_trees (2, trees, &opts) < 0) {
3233         seaf_warning ("Failed to diff local and master head for repo %.8s.\n",
3234                       task->repo_id);
3235         g_hash_table_destroy (data.added_blocks);
3236 
3237         GList *ptr;
3238         for (ptr = data.block_list; ptr; ptr = ptr->next)
3239             g_free (ptr->data);
3240 
3241         ret = -1;
3242         goto out;
3243     }
3244 
3245     g_hash_table_destroy (data.added_blocks);
3246     *plist = data.block_list;
3247 
3248 out:
3249     seaf_branch_unref (local);
3250     seaf_branch_unref (master);
3251     seaf_commit_unref (local_head);
3252     seaf_commit_unref (master_head);
3253     return ret;
3254 }
3255 
3256 typedef struct {
3257     char block_id[41];
3258     BlockHandle *block;
3259     HttpTxTask *task;
3260 } SendBlockData;
3261 
3262 static size_t
send_block_callback(void * ptr,size_t size,size_t nmemb,void * userp)3263 send_block_callback (void *ptr, size_t size, size_t nmemb, void *userp)
3264 {
3265     size_t realsize = size *nmemb;
3266     SendBlockData *data = userp;
3267     HttpTxTask *task = data->task;
3268     int n;
3269 
3270     if (task->state == HTTP_TASK_STATE_CANCELED || task->all_stop)
3271         return CURL_READFUNC_ABORT;
3272 
3273     n = seaf_block_manager_read_block (seaf->block_mgr,
3274                                        data->block,
3275                                        ptr, realsize);
3276     if (n < 0) {
3277         seaf_warning ("Failed to read block %s in repo %.8s.\n",
3278                       data->block_id, task->repo_id);
3279         task->error = SYNC_ERROR_ID_LOCAL_DATA_CORRUPT;
3280         return CURL_READFUNC_ABORT;
3281     }
3282 
3283     /* Update global transferred bytes. */
3284     g_atomic_int_add (&(seaf->sync_mgr->sent_bytes), n);
3285 
3286     /* Update transferred bytes for this task */
3287     g_atomic_int_add (&task->tx_bytes, n);
3288 
3289     /* If uploaded bytes exceeds the limit, wait until the counter
3290      * is reset. We check the counter every 100 milliseconds, so we
3291      * can waste up to 100 milliseconds without sending data after
3292      * the counter is reset.
3293      */
3294     while (1) {
3295         gint sent = g_atomic_int_get(&(seaf->sync_mgr->sent_bytes));
3296         if (seaf->sync_mgr->upload_limit > 0 &&
3297             sent > seaf->sync_mgr->upload_limit)
3298             /* 100 milliseconds */
3299             g_usleep (100000);
3300         else
3301             break;
3302     }
3303 
3304     return n;
3305 }
3306 
3307 static int
send_block(HttpTxTask * task,Connection * conn,const char * block_id,guint32 * psize)3308 send_block (HttpTxTask *task, Connection *conn, const char *block_id, guint32 *psize)
3309 {
3310     CURL *curl;
3311     char *url;
3312     int status;
3313     BlockMetadata *bmd;
3314     BlockHandle *block;
3315     int ret = 0;
3316 
3317     bmd = seaf_block_manager_stat_block (seaf->block_mgr,
3318                                          task->repo_id, task->repo_version,
3319                                          block_id);
3320     if (!bmd) {
3321         seaf_warning ("Failed to stat block %s in repo %s.\n",
3322                       block_id, task->repo_id);
3323         task->error = SYNC_ERROR_ID_LOCAL_DATA_CORRUPT;
3324         return -1;
3325     }
3326 
3327     block = seaf_block_manager_open_block (seaf->block_mgr,
3328                                            task->repo_id, task->repo_version,
3329                                            block_id, BLOCK_READ);
3330     if (!block) {
3331         seaf_warning ("Failed to open block %s in repo %s.\n",
3332                       block_id, task->repo_id);
3333         task->error = SYNC_ERROR_ID_LOCAL_DATA_CORRUPT;
3334         g_free (bmd);
3335         return -1;
3336     }
3337 
3338     SendBlockData data;
3339     memset (&data, 0, sizeof(data));
3340     memcpy (data.block_id, block_id, 40);
3341     data.block = block;
3342     data.task = task;
3343 
3344     curl = conn->curl;
3345 
3346     if (!task->use_fileserver_port)
3347         url = g_strdup_printf ("%s/seafhttp/repo/%s/block/%s",
3348                                task->host, task->repo_id, block_id);
3349     else
3350         url = g_strdup_printf ("%s/repo/%s/block/%s",
3351                                task->host, task->repo_id, block_id);
3352 
3353     int curl_error;
3354     if (http_put (curl, url, task->token,
3355                   NULL, bmd->size,
3356                   send_block_callback, &data,
3357                   &status, NULL, NULL, TRUE, &curl_error) < 0) {
3358         if (task->state == HTTP_TASK_STATE_CANCELED)
3359             goto out;
3360 
3361         if (task->error == SYNC_ERROR_ID_NO_ERROR) {
3362             /* Only release connection when it's a network error */
3363             conn->release = TRUE;
3364             handle_curl_errors (task, curl_error);
3365         }
3366         ret = -1;
3367         goto out;
3368     }
3369 
3370     if (status != HTTP_OK) {
3371         seaf_warning ("Bad response code for PUT %s: %d.\n", url, status);
3372         handle_http_errors (task, status);
3373         ret = -1;
3374         goto out;
3375     }
3376 
3377     if (psize)
3378         *psize = bmd->size;
3379 
3380 out:
3381     g_free (url);
3382     curl_easy_reset (curl);
3383     g_free (bmd);
3384     seaf_block_manager_close_block (seaf->block_mgr, block);
3385     seaf_block_manager_block_handle_free (seaf->block_mgr, block);
3386 
3387     return ret;
3388 }
3389 
3390 typedef struct BlockUploadData {
3391     HttpTxTask *http_task;
3392     ConnectionPool *cpool;
3393     GAsyncQueue *finished_tasks;
3394 } BlockUploadData;
3395 
3396 typedef struct BlockUploadTask {
3397     char block_id[41];
3398     int result;
3399     guint32 block_size;
3400 } BlockUploadTask;
3401 
3402 static void
block_upload_task_free(BlockUploadTask * task)3403 block_upload_task_free (BlockUploadTask *task)
3404 {
3405     g_free (task);
3406 }
3407 
3408 static void
upload_block_thread_func(gpointer data,gpointer user_data)3409 upload_block_thread_func (gpointer data, gpointer user_data)
3410 {
3411     BlockUploadTask *task = data;
3412     BlockUploadData *tx_data = user_data;
3413     HttpTxTask *http_task = tx_data->http_task;
3414     Connection *conn;
3415     int ret = 0;
3416 
3417     conn = connection_pool_get_connection (tx_data->cpool);
3418     if (!conn) {
3419         seaf_warning ("Failed to get connection to host %s.\n", http_task->host);
3420         http_task->error = SYNC_ERROR_ID_NOT_ENOUGH_MEMORY;
3421         ret = -1;
3422         goto out;
3423     }
3424 
3425     ret = send_block (http_task, conn, task->block_id, &task->block_size);
3426 
3427     connection_pool_return_connection (tx_data->cpool, conn);
3428 
3429 out:
3430     task->result = ret;
3431     g_async_queue_push (tx_data->finished_tasks, task);
3432 }
3433 
3434 #define DEFAULT_UPLOAD_BLOCK_THREADS 3
3435 
3436 static int
multi_threaded_send_blocks(HttpTxTask * http_task,GList * block_list)3437 multi_threaded_send_blocks (HttpTxTask *http_task, GList *block_list)
3438 {
3439     HttpTxPriv *priv = seaf->http_tx_mgr->priv;
3440     GThreadPool *tpool;
3441     GAsyncQueue *finished_tasks;
3442     GHashTable *pending_tasks;
3443     ConnectionPool *cpool;
3444     GList *ptr;
3445     char *block_id;
3446     BlockUploadTask *task;
3447     SyncInfo *info;
3448     int ret = 0;
3449 
3450     if (block_list == NULL)
3451         return 0;
3452 
3453     cpool = find_connection_pool (priv, http_task->host);
3454     if (!cpool) {
3455         seaf_warning ("Failed to create connection pool for host %s.\n", http_task->host);
3456         http_task->error = SYNC_ERROR_ID_NOT_ENOUGH_MEMORY;
3457         return -1;
3458     }
3459 
3460     finished_tasks = g_async_queue_new ();
3461 
3462     BlockUploadData data;
3463     data.http_task = http_task;
3464     data.finished_tasks = finished_tasks;
3465     data.cpool = cpool;
3466 
3467     tpool = g_thread_pool_new (upload_block_thread_func, &data,
3468                                DEFAULT_UPLOAD_BLOCK_THREADS, FALSE, NULL);
3469 
3470     pending_tasks = g_hash_table_new_full (g_str_hash, g_str_equal,
3471                                            g_free,
3472                                            (GDestroyNotify)block_upload_task_free);
3473 
3474     info = seaf_sync_manager_get_sync_info (seaf->sync_mgr, http_task->repo_id);
3475 
3476     for (ptr = block_list; ptr; ptr = ptr->next) {
3477         block_id = ptr->data;
3478 
3479         task = g_new0 (BlockUploadTask, 1);
3480         memcpy (task->block_id, block_id, 40);
3481 
3482         if (!g_hash_table_lookup (pending_tasks, block_id)) {
3483             g_hash_table_insert (pending_tasks, g_strdup(block_id), task);
3484             g_thread_pool_push (tpool, task, NULL);
3485         } else {
3486             g_free (task);
3487         }
3488     }
3489 
3490     while ((task = g_async_queue_pop (finished_tasks)) != NULL) {
3491         if (task->result < 0 || http_task->state == HTTP_TASK_STATE_CANCELED) {
3492             ret = task->result;
3493             http_task->all_stop = TRUE;
3494             break;
3495         }
3496 
3497         ++(http_task->done_blocks);
3498 
3499         if (info && info->multipart_upload) {
3500             info->uploaded_bytes += (gint64)task->block_size;
3501         }
3502 
3503         g_hash_table_remove (pending_tasks, task->block_id);
3504         if (g_hash_table_size(pending_tasks) == 0)
3505             break;
3506     }
3507 
3508     g_thread_pool_free (tpool, TRUE, TRUE);
3509 
3510     g_hash_table_destroy (pending_tasks);
3511 
3512     g_async_queue_unref (finished_tasks);
3513 
3514     return ret;
3515 }
3516 
3517 static void
notify_permission_error(HttpTxTask * task,const char * error_str)3518 notify_permission_error (HttpTxTask *task, const char *error_str)
3519 {
3520     HttpTxPriv *priv = seaf->http_tx_mgr->priv;
3521     GMatchInfo *match_info;
3522     char *path;
3523 
3524     if (g_regex_match (priv->locked_error_regex, error_str, 0, &match_info)) {
3525         path = g_match_info_fetch (match_info, 1);
3526         seaf_repo_manager_record_sync_error (task->repo_id, task->repo_name, path,
3527                                              SYNC_ERROR_ID_FILE_LOCKED);
3528         g_free (path);
3529 
3530         /* Set more accurate error. */
3531         task->error = SYNC_ERROR_ID_FILE_LOCKED;
3532     } else if (g_regex_match (priv->folder_perm_error_regex, error_str, 0, &match_info)) {
3533         path = g_match_info_fetch (match_info, 1);
3534         /* The path returned by server begins with '/'. */
3535         seaf_repo_manager_record_sync_error (task->repo_id, task->repo_name,
3536                                              (path[0] == '/') ? (path + 1) : path,
3537                                              SYNC_ERROR_ID_FOLDER_PERM_DENIED);
3538         g_free (path);
3539 
3540         task->error = SYNC_ERROR_ID_FOLDER_PERM_DENIED;
3541     }
3542 
3543     g_match_info_free (match_info);
3544 }
3545 
3546 static int
update_branch(HttpTxTask * task,Connection * conn)3547 update_branch (HttpTxTask *task, Connection *conn)
3548 {
3549     CURL *curl;
3550     char *url;
3551     int status;
3552     char *rsp_content;
3553     char *rsp_content_str = NULL;
3554     gint64 rsp_size;
3555     int ret = 0;
3556 
3557     curl = conn->curl;
3558 
3559     if (!task->use_fileserver_port)
3560         url = g_strdup_printf ("%s/seafhttp/repo/%s/commit/HEAD/?head=%s",
3561                                task->host, task->repo_id, task->head);
3562     else
3563         url = g_strdup_printf ("%s/repo/%s/commit/HEAD/?head=%s",
3564                                task->host, task->repo_id, task->head);
3565 
3566     int curl_error;
3567     if (http_put (curl, url, task->token,
3568                   NULL, 0,
3569                   NULL, NULL,
3570                   &status, &rsp_content, &rsp_size, TRUE, &curl_error) < 0) {
3571         conn->release = TRUE;
3572         handle_curl_errors (task, curl_error);
3573         ret = -1;
3574         goto out;
3575     }
3576 
3577     if (status != HTTP_OK) {
3578         seaf_warning ("Bad response code for PUT %s: %d.\n", url, status);
3579         handle_http_errors (task, status);
3580 
3581         if (status == HTTP_FORBIDDEN) {
3582             rsp_content_str = g_new0 (gchar, rsp_size + 1);
3583             memcpy (rsp_content_str, rsp_content, rsp_size);
3584             seaf_warning ("%s\n", rsp_content_str);
3585             notify_permission_error (task, rsp_content_str);
3586             g_free (rsp_content_str);
3587         }
3588 
3589         ret = -1;
3590     }
3591 
3592 out:
3593     g_free (url);
3594     g_free (rsp_content);
3595     curl_easy_reset (curl);
3596 
3597     return ret;
3598 }
3599 
3600 static void
update_master_branch(HttpTxTask * task)3601 update_master_branch (HttpTxTask *task)
3602 {
3603     SeafBranch *branch;
3604     branch = seaf_branch_manager_get_branch (seaf->branch_mgr,
3605                                              task->repo_id,
3606                                              "master");
3607     if (!branch) {
3608         branch = seaf_branch_new ("master", task->repo_id, task->head);
3609         seaf_branch_manager_add_branch (seaf->branch_mgr, branch);
3610         seaf_branch_unref (branch);
3611     } else {
3612         seaf_branch_set_commit (branch, task->head);
3613         seaf_branch_manager_update_branch (seaf->branch_mgr, branch);
3614         seaf_branch_unref (branch);
3615     }
3616 }
3617 
3618 static void
set_path_status_syncing(gpointer key,gpointer value,gpointer user_data)3619 set_path_status_syncing (gpointer key, gpointer value, gpointer user_data)
3620 {
3621     HttpTxTask *task = user_data;
3622     char *path = key;
3623     int mode = (int)(long)value;
3624     seaf_sync_manager_update_active_path (seaf->sync_mgr,
3625                                           task->repo_id,
3626                                           path,
3627                                           mode,
3628                                           SYNC_STATUS_SYNCING,
3629                                           TRUE);
3630 }
3631 
3632 static void
set_path_status_synced(gpointer key,gpointer value,gpointer user_data)3633 set_path_status_synced (gpointer key, gpointer value, gpointer user_data)
3634 {
3635     HttpTxTask *task = user_data;
3636     char *path = key;
3637     int mode = (int)(long)value;
3638     seaf_sync_manager_update_active_path (seaf->sync_mgr,
3639                                           task->repo_id,
3640                                           path,
3641                                           mode,
3642                                           SYNC_STATUS_SYNCED,
3643                                           TRUE);
3644 }
3645 
3646 static void *
http_upload_thread(void * vdata)3647 http_upload_thread (void *vdata)
3648 {
3649     HttpTxTask *task = vdata;
3650     HttpTxPriv *priv = seaf->http_tx_mgr->priv;
3651     ConnectionPool *pool;
3652     Connection *conn = NULL;
3653     char *url = NULL;
3654     GList *send_fs_list = NULL, *needed_fs_list = NULL;
3655     GList *block_list = NULL, *needed_block_list = NULL;
3656     GHashTable *active_paths = NULL;
3657 
3658     SeafBranch *local = seaf_branch_manager_get_branch (seaf->branch_mgr,
3659                                                         task->repo_id, "local");
3660     if (!local) {
3661         seaf_warning ("Failed to get branch local of repo %.8s.\n", task->repo_id);
3662         task->error = SYNC_ERROR_ID_LOCAL_DATA_CORRUPT;
3663         return NULL;
3664     }
3665     memcpy (task->head, local->commit_id, 40);
3666     seaf_branch_unref (local);
3667 
3668     pool = find_connection_pool (priv, task->host);
3669     if (!pool) {
3670         seaf_warning ("Failed to create connection pool for host %s.\n", task->host);
3671         task->error = SYNC_ERROR_ID_NOT_ENOUGH_MEMORY;
3672         goto out;
3673     }
3674 
3675     conn = connection_pool_get_connection (pool);
3676     if (!conn) {
3677         seaf_warning ("Failed to get connection to host %s.\n", task->host);
3678         task->error = SYNC_ERROR_ID_NOT_ENOUGH_MEMORY;
3679         goto out;
3680     }
3681 
3682     /* seaf_message ("Upload with HTTP sync protocol version %d.\n", */
3683     /*               task->protocol_version); */
3684 
3685     transition_state (task, task->state, HTTP_TASK_RT_STATE_CHECK);
3686 
3687     gint64 delta = 0;
3688     active_paths = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL);
3689 
3690     if (calculate_upload_size_delta_and_active_paths (task, &delta, active_paths) < 0) {
3691         seaf_warning ("Failed to calculate upload size delta for repo %s.\n",
3692                       task->repo_id);
3693         task->error = SYNC_ERROR_ID_LOCAL_DATA_CORRUPT;
3694         goto out;
3695     }
3696 
3697     g_hash_table_foreach (active_paths, set_path_status_syncing, task);
3698 
3699     if (check_permission (task, conn) < 0) {
3700         seaf_warning ("Upload permission denied for repo %.8s on server %s.\n",
3701                       task->repo_id, task->host);
3702         goto out;
3703     }
3704 
3705     if (check_quota (task, conn, delta) < 0) {
3706         seaf_warning ("Not enough quota for repo %.8s on server %s.\n",
3707                       task->repo_id, task->host);
3708         goto out;
3709     }
3710 
3711     if (task->state == HTTP_TASK_STATE_CANCELED)
3712         goto out;
3713 
3714     transition_state (task, task->state, HTTP_TASK_RT_STATE_COMMIT);
3715 
3716     if (send_commit_object (task, conn) < 0) {
3717         seaf_warning ("Failed to send head commit for repo %.8s.\n", task->repo_id);
3718         goto out;
3719     }
3720 
3721     if (task->state == HTTP_TASK_STATE_CANCELED)
3722         goto out;
3723 
3724     transition_state (task, task->state, HTTP_TASK_RT_STATE_FS);
3725 
3726     send_fs_list = calculate_send_fs_object_list (task);
3727     if (!send_fs_list) {
3728         seaf_warning ("Failed to calculate fs object list for repo %.8s.\n",
3729                       task->repo_id);
3730         task->error = SYNC_ERROR_ID_LOCAL_DATA_CORRUPT;
3731         goto out;
3732     }
3733 
3734     if (!task->use_fileserver_port)
3735         url = g_strdup_printf ("%s/seafhttp/repo/%s/check-fs/",
3736                                task->host, task->repo_id);
3737     else
3738         url = g_strdup_printf ("%s/repo/%s/check-fs/",
3739                                task->host, task->repo_id);
3740 
3741     while (send_fs_list != NULL) {
3742         if (upload_check_id_list_segment (task, conn, url,
3743                                           &send_fs_list, &needed_fs_list) < 0) {
3744             seaf_warning ("Failed to check fs list for repo %.8s.\n", task->repo_id);
3745             goto out;
3746         }
3747 
3748         if (task->state == HTTP_TASK_STATE_CANCELED)
3749             goto out;
3750     }
3751     g_free (url);
3752     url = NULL;
3753 
3754     while (needed_fs_list != NULL) {
3755         if (send_fs_objects (task, conn, &needed_fs_list) < 0) {
3756             seaf_warning ("Failed to send fs objects for repo %.8s.\n", task->repo_id);
3757             goto out;
3758         }
3759 
3760         if (task->state == HTTP_TASK_STATE_CANCELED)
3761             goto out;
3762     }
3763 
3764     transition_state (task, task->state, HTTP_TASK_RT_STATE_BLOCK);
3765 
3766     if (calculate_block_list (task, &block_list) < 0) {
3767         seaf_warning ("Failed to calculate block list for repo %.8s.\n",
3768                       task->repo_id);
3769         task->error = SYNC_ERROR_ID_LOCAL_DATA_CORRUPT;
3770         goto out;
3771     }
3772 
3773     if (!task->use_fileserver_port)
3774         url = g_strdup_printf ("%s/seafhttp/repo/%s/check-blocks/",
3775                                task->host, task->repo_id);
3776     else
3777         url = g_strdup_printf ("%s/repo/%s/check-blocks/",
3778                                task->host, task->repo_id);
3779 
3780     while (block_list != NULL) {
3781         if (upload_check_id_list_segment (task, conn, url,
3782                                           &block_list, &needed_block_list) < 0) {
3783             seaf_warning ("Failed to check block list for repo %.8s.\n",
3784                           task->repo_id);
3785             goto out;
3786         }
3787 
3788         if (task->state == HTTP_TASK_STATE_CANCELED)
3789             goto out;
3790     }
3791     g_free (url);
3792     url = NULL;
3793 
3794     task->n_blocks = g_list_length (needed_block_list);
3795 
3796     seaf_debug ("%d blocks to send for %s:%s.\n",
3797                 task->n_blocks, task->host, task->repo_id);
3798 
3799     if (multi_threaded_send_blocks(task, needed_block_list) < 0 ||
3800         task->state == HTTP_TASK_STATE_CANCELED)
3801         goto out;
3802 
3803     transition_state (task, task->state, HTTP_TASK_RT_STATE_UPDATE_BRANCH);
3804 
3805     if (update_branch (task, conn) < 0) {
3806         seaf_warning ("Failed to update branch of repo %.8s.\n", task->repo_id);
3807         goto out;
3808     }
3809 
3810     /* After successful upload, the cached 'master' branch should be updated to
3811      * the head commit of 'local' branch.
3812      */
3813     update_master_branch (task);
3814 
3815     if (active_paths != NULL)
3816         g_hash_table_foreach (active_paths, set_path_status_synced, task);
3817 
3818 out:
3819     string_list_free (send_fs_list);
3820     string_list_free (needed_fs_list);
3821     string_list_free (block_list);
3822     string_list_free (needed_block_list);
3823 
3824     if (active_paths)
3825         g_hash_table_destroy (active_paths);
3826 
3827     g_free (url);
3828 
3829     connection_pool_return_connection (pool, conn);
3830 
3831     return vdata;
3832 }
3833 
3834 static void
http_upload_done(void * vdata)3835 http_upload_done (void *vdata)
3836 {
3837     HttpTxTask *task = vdata;
3838 
3839     if (task->error != SYNC_ERROR_ID_NO_ERROR)
3840         transition_state (task, HTTP_TASK_STATE_ERROR, HTTP_TASK_RT_STATE_FINISHED);
3841     else if (task->state == HTTP_TASK_STATE_CANCELED)
3842         transition_state (task, task->state, HTTP_TASK_RT_STATE_FINISHED);
3843     else
3844         transition_state (task, HTTP_TASK_STATE_FINISHED, HTTP_TASK_RT_STATE_FINISHED);
3845 }
3846 
3847 /* Download */
3848 
3849 static void *http_download_thread (void *vdata);
3850 static void http_download_done (void *vdata);
3851 
3852 int
http_tx_manager_add_download(HttpTxManager * manager,const char * repo_id,int repo_version,const char * host,const char * token,const char * server_head_id,gboolean is_clone,const char * passwd,const char * worktree,int protocol_version,const char * email,gboolean use_fileserver_port,const char * repo_name,GError ** error)3853 http_tx_manager_add_download (HttpTxManager *manager,
3854                               const char *repo_id,
3855                               int repo_version,
3856                               const char *host,
3857                               const char *token,
3858                               const char *server_head_id,
3859                               gboolean is_clone,
3860                               const char *passwd,
3861                               const char *worktree,
3862                               int protocol_version,
3863                               const char *email,
3864                               gboolean use_fileserver_port,
3865                               const char *repo_name,
3866                               GError **error)
3867 {
3868     HttpTxTask *task;
3869     SeafRepo *repo;
3870 
3871     if (!repo_id || !token || !host || !server_head_id || !email) {
3872         g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_BAD_ARGS, "Empty argument(s)");
3873         return -1;
3874     }
3875 
3876     if (!is_clone) {
3877         repo = seaf_repo_manager_get_repo (seaf->repo_mgr, repo_id);
3878         if (!repo) {
3879             g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_BAD_ARGS, "Repo not found");
3880             return -1;
3881         }
3882     }
3883 
3884     clean_tasks_for_repo (manager, repo_id);
3885 
3886     task = http_tx_task_new (manager, repo_id, repo_version,
3887                              HTTP_TASK_TYPE_DOWNLOAD, is_clone,
3888                              host, token, passwd, worktree);
3889 
3890     memcpy (task->head, server_head_id, 40);
3891     task->protocol_version = protocol_version;
3892     task->email = g_strdup(email);
3893 
3894     task->state = HTTP_TASK_STATE_NORMAL;
3895 
3896     task->use_fileserver_port = use_fileserver_port;
3897 
3898     task->blk_ref_cnts = g_hash_table_new_full (g_str_hash, g_str_equal,
3899                                                 g_free, g_free);
3900     pthread_mutex_init (&task->ref_cnt_lock, NULL);
3901 
3902     g_hash_table_insert (manager->priv->download_tasks,
3903                          g_strdup(repo_id),
3904                          task);
3905 
3906     task->repo_name = g_strdup(repo_name);
3907 
3908     if (seaf_job_manager_schedule_job (seaf->job_mgr,
3909                                        http_download_thread,
3910                                        http_download_done,
3911                                        task) < 0) {
3912         g_hash_table_remove (manager->priv->download_tasks, repo_id);
3913         return -1;
3914     }
3915 
3916     return 0;
3917 }
3918 
3919 static int
get_commit_object(HttpTxTask * task,Connection * conn)3920 get_commit_object (HttpTxTask *task, Connection *conn)
3921 {
3922     CURL *curl;
3923     char *url;
3924     int status;
3925     char *rsp_content = NULL;
3926     gint64 rsp_size;
3927     int ret = 0;
3928 
3929     curl = conn->curl;
3930 
3931     if (!task->use_fileserver_port)
3932         url = g_strdup_printf ("%s/seafhttp/repo/%s/commit/%s",
3933                                task->host, task->repo_id, task->head);
3934     else
3935         url = g_strdup_printf ("%s/repo/%s/commit/%s",
3936                                task->host, task->repo_id, task->head);
3937 
3938     int curl_error;
3939     if (http_get (curl, url, task->token, &status,
3940                   &rsp_content, &rsp_size,
3941                   NULL, NULL, TRUE, &curl_error) < 0) {
3942         conn->release = TRUE;
3943         handle_curl_errors (task, curl_error);
3944         ret = -1;
3945         goto out;
3946     }
3947 
3948     if (status != HTTP_OK) {
3949         seaf_warning ("Bad response code for GET %s: %d.\n", url, status);
3950         handle_http_errors (task, status);
3951         ret = -1;
3952         goto out;
3953     }
3954 
3955     int rc = seaf_obj_store_write_obj (seaf->commit_mgr->obj_store,
3956                                        task->repo_id, task->repo_version,
3957                                        task->head,
3958                                        rsp_content,
3959                                        rsp_size,
3960                                        FALSE);
3961     if (rc < 0) {
3962         seaf_warning ("Failed to save commit %s in repo %.8s.\n",
3963                       task->head, task->repo_id);
3964         task->error = SYNC_ERROR_ID_WRITE_LOCAL_DATA;
3965         ret = -1;
3966     }
3967 
3968 out:
3969     g_free (url);
3970     g_free (rsp_content);
3971     curl_easy_reset (curl);
3972 
3973     return ret;
3974 }
3975 
3976 static int
get_needed_fs_id_list(HttpTxTask * task,Connection * conn,GList ** fs_id_list)3977 get_needed_fs_id_list (HttpTxTask *task, Connection *conn, GList **fs_id_list)
3978 {
3979     SeafBranch *master;
3980     CURL *curl;
3981     char *url = NULL;
3982     int status;
3983     char *rsp_content = NULL;
3984     gint64 rsp_size;
3985     int ret = 0;
3986     json_t *array;
3987     json_error_t jerror;
3988     const char *obj_id;
3989 
3990     const char *url_prefix = (task->use_fileserver_port) ? "" : "seafhttp/";
3991 
3992     if (!task->is_clone) {
3993         master = seaf_branch_manager_get_branch (seaf->branch_mgr,
3994                                                  task->repo_id,
3995                                                  "master");
3996         if (!master) {
3997             seaf_warning ("Failed to get branch master for repo %.8s.\n",
3998                           task->repo_id);
3999             return -1;
4000         }
4001 
4002         url = g_strdup_printf ("%s/%srepo/%s/fs-id-list/"
4003                                "?server-head=%s&client-head=%s",
4004                                task->host, url_prefix, task->repo_id,
4005                                task->head, master->commit_id);
4006 
4007         seaf_branch_unref (master);
4008     } else {
4009         url = g_strdup_printf ("%s/%srepo/%s/fs-id-list/?server-head=%s",
4010                                task->host, url_prefix, task->repo_id, task->head);
4011     }
4012 
4013     curl = conn->curl;
4014 
4015     int curl_error;
4016     if (http_get (curl, url, task->token, &status,
4017                   &rsp_content, &rsp_size,
4018                   NULL, NULL, (!task->is_clone), &curl_error) < 0) {
4019         conn->release = TRUE;
4020         handle_curl_errors (task, curl_error);
4021         ret = -1;
4022         goto out;
4023     }
4024 
4025     if (status != HTTP_OK) {
4026         seaf_warning ("Bad response code for GET %s: %d.\n", url, status);
4027         handle_http_errors (task, status);
4028         ret = -1;
4029         goto out;
4030     }
4031 
4032     array = json_loadb (rsp_content, rsp_size, 0, &jerror);
4033     if (!array) {
4034         seaf_warning ("Invalid JSON response from the server: %s.\n", jerror.text);
4035         task->error = SYNC_ERROR_ID_SERVER;
4036         ret = -1;
4037         goto out;
4038     }
4039 
4040     int i;
4041     size_t n = json_array_size (array);
4042     json_t *str;
4043 
4044     seaf_debug ("Received fs object list size %lu from %s:%s.\n",
4045                 n, task->host, task->repo_id);
4046 
4047     task->n_fs_objs = (int)n;
4048 
4049     GHashTable *checked_objs = g_hash_table_new_full (g_str_hash, g_str_equal,
4050                                                       g_free, NULL);
4051 
4052     for (i = 0; i < n; ++i) {
4053         str = json_array_get (array, i);
4054         if (!str) {
4055             seaf_warning ("Invalid JSON response from the server.\n");
4056             json_decref (array);
4057             string_list_free (*fs_id_list);
4058             ret = -1;
4059             goto out;
4060         }
4061 
4062         obj_id = json_string_value(str);
4063 
4064         if (g_hash_table_lookup (checked_objs, obj_id)) {
4065             ++(task->done_fs_objs);
4066             continue;
4067         }
4068         char *key = g_strdup(obj_id);
4069         g_hash_table_replace (checked_objs, key, key);
4070 
4071         if (!seaf_obj_store_obj_exists (seaf->fs_mgr->obj_store,
4072                                         task->repo_id, task->repo_version,
4073                                         obj_id)) {
4074             *fs_id_list = g_list_prepend (*fs_id_list, g_strdup(obj_id));
4075         } else if (task->is_clone) {
4076             gboolean io_error = FALSE;
4077             gboolean sound;
4078             sound = seaf_fs_manager_verify_object (seaf->fs_mgr,
4079                                                    task->repo_id, task->repo_version,
4080                                                    obj_id, FALSE, &io_error);
4081             if (!sound && !io_error) {
4082                 *fs_id_list = g_list_prepend (*fs_id_list, g_strdup(obj_id));
4083             } else {
4084                 ++(task->done_fs_objs);
4085             }
4086         } else {
4087             ++(task->done_fs_objs);
4088         }
4089     }
4090 
4091     json_decref (array);
4092     g_hash_table_destroy (checked_objs);
4093 
4094 out:
4095     g_free (url);
4096     g_free (rsp_content);
4097     curl_easy_reset (curl);
4098 
4099     return ret;
4100 }
4101 
4102 #define GET_FS_OBJECT_N 100
4103 
4104 static int
get_fs_objects(HttpTxTask * task,Connection * conn,GList ** fs_list)4105 get_fs_objects (HttpTxTask *task, Connection *conn, GList **fs_list)
4106 {
4107     json_t *array;
4108     char *obj_id;
4109     int n_sent = 0;
4110     char *data = NULL;
4111     int len;
4112     CURL *curl;
4113     char *url = NULL;
4114     int status;
4115     char *rsp_content = NULL;
4116     gint64 rsp_size;
4117     int ret = 0;
4118     GHashTable *requested;
4119 
4120     /* Convert object id list to JSON format. */
4121 
4122     requested = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL);
4123 
4124     array = json_array ();
4125 
4126     while (*fs_list != NULL) {
4127         obj_id = (*fs_list)->data;
4128         json_array_append_new (array, json_string(obj_id));
4129 
4130         *fs_list = g_list_delete_link (*fs_list, *fs_list);
4131 
4132         g_hash_table_replace (requested, obj_id, obj_id);
4133 
4134         if (++n_sent >= GET_FS_OBJECT_N)
4135             break;
4136     }
4137 
4138     seaf_debug ("Requesting %d fs objects from %s:%s.\n",
4139                 n_sent, task->host, task->repo_id);
4140 
4141     data = json_dumps (array, 0);
4142     len = strlen(data);
4143     json_decref (array);
4144 
4145     /* Send fs object id list. */
4146 
4147     curl = conn->curl;
4148 
4149     if (!task->use_fileserver_port)
4150         url = g_strdup_printf ("%s/seafhttp/repo/%s/pack-fs/", task->host, task->repo_id);
4151     else
4152         url = g_strdup_printf ("%s/repo/%s/pack-fs/", task->host, task->repo_id);
4153 
4154     int curl_error;
4155     if (http_post (curl, url, task->token,
4156                    data, len,
4157                    &status, &rsp_content, &rsp_size, TRUE, &curl_error) < 0) {
4158         conn->release = TRUE;
4159         handle_curl_errors (task, curl_error);
4160         ret = -1;
4161         goto out;
4162     }
4163 
4164     if (status != HTTP_OK) {
4165         seaf_warning ("Bad response code for POST %s: %d.\n", url, status);
4166         handle_http_errors (task, status);
4167         ret = -1;
4168         goto out;
4169     }
4170 
4171     /* Save received fs objects. */
4172 
4173     int n_recv = 0;
4174     char *p = rsp_content;
4175     ObjectHeader *hdr = (ObjectHeader *)p;
4176     char recv_obj_id[41];
4177     int n = 0;
4178     int size;
4179     int rc;
4180     while (n < rsp_size) {
4181         memcpy (recv_obj_id, hdr->obj_id, 40);
4182         recv_obj_id[40] = 0;
4183         size = ntohl (hdr->obj_size);
4184         if (n + sizeof(ObjectHeader) + size > rsp_size) {
4185             seaf_warning ("Incomplete object package received for repo %.8s.\n",
4186                           task->repo_id);
4187             task->error = SYNC_ERROR_ID_SERVER;
4188             ret = -1;
4189             goto out;
4190         }
4191 
4192         ++n_recv;
4193 
4194         rc = seaf_obj_store_write_obj (seaf->fs_mgr->obj_store,
4195                                        task->repo_id, task->repo_version,
4196                                        recv_obj_id,
4197                                        hdr->object,
4198                                        size, FALSE);
4199         if (rc < 0) {
4200             seaf_warning ("Failed to write fs object %s in repo %.8s.\n",
4201                           recv_obj_id, task->repo_id);
4202             task->error = SYNC_ERROR_ID_WRITE_LOCAL_DATA;
4203             ret = -1;
4204             goto out;
4205         }
4206 
4207         g_hash_table_remove (requested, recv_obj_id);
4208 
4209         ++(task->done_fs_objs);
4210 
4211         p += (sizeof(ObjectHeader) + size);
4212         n += (sizeof(ObjectHeader) + size);
4213         hdr = (ObjectHeader *)p;
4214     }
4215 
4216     seaf_debug ("Received %d fs objects from %s:%s.\n",
4217                 n_recv, task->host, task->repo_id);
4218 
4219     /* The server may not return all the objects we requested.
4220      * So we need to add back the remaining object ids into fs_list.
4221      */
4222     GHashTableIter iter;
4223     gpointer key, value;
4224     g_hash_table_iter_init (&iter, requested);
4225     while (g_hash_table_iter_next (&iter, &key, &value)) {
4226         obj_id = key;
4227         *fs_list = g_list_prepend (*fs_list, g_strdup(obj_id));
4228     }
4229     g_hash_table_destroy (requested);
4230 
4231 out:
4232     g_free (url);
4233     g_free (data);
4234     g_free (rsp_content);
4235     curl_easy_reset (curl);
4236 
4237     return ret;
4238 }
4239 
4240 typedef struct {
4241     char block_id[41];
4242     BlockHandle *block;
4243     HttpTxTask *task;
4244 } GetBlockData;
4245 
4246 static size_t
get_block_callback(void * ptr,size_t size,size_t nmemb,void * userp)4247 get_block_callback (void *ptr, size_t size, size_t nmemb, void *userp)
4248 {
4249     size_t realsize = size *nmemb;
4250     SendBlockData *data = userp;
4251     HttpTxTask *task = data->task;
4252     size_t n;
4253 
4254     if (task->state == HTTP_TASK_STATE_CANCELED || task->all_stop)
4255         return 0;
4256 
4257     n = seaf_block_manager_write_block (seaf->block_mgr,
4258                                         data->block,
4259                                         ptr, realsize);
4260     if (n < realsize) {
4261         seaf_warning ("Failed to write block %s in repo %.8s.\n",
4262                       data->block_id, task->repo_id);
4263         task->error = SYNC_ERROR_ID_WRITE_LOCAL_DATA;
4264         return n;
4265     }
4266 
4267     /* Update global transferred bytes. */
4268     g_atomic_int_add (&(seaf->sync_mgr->recv_bytes), n);
4269 
4270     /* Update transferred bytes for this task */
4271     g_atomic_int_add (&task->tx_bytes, n);
4272 
4273     /* If uploaded bytes exceeds the limit, wait until the counter
4274      * is reset. We check the counter every 100 milliseconds, so we
4275      * can waste up to 100 milliseconds without sending data after
4276      * the counter is reset.
4277      */
4278     while (1) {
4279         gint sent = g_atomic_int_get(&(seaf->sync_mgr->recv_bytes));
4280         if (seaf->sync_mgr->download_limit > 0 &&
4281             sent > seaf->sync_mgr->download_limit)
4282             /* 100 milliseconds */
4283             g_usleep (100000);
4284         else
4285             break;
4286     }
4287 
4288     return n;
4289 }
4290 
4291 int
get_block(HttpTxTask * task,Connection * conn,const char * block_id)4292 get_block (HttpTxTask *task, Connection *conn, const char *block_id)
4293 {
4294     CURL *curl;
4295     char *url;
4296     int status;
4297     BlockHandle *block;
4298     int ret = 0;
4299     int *pcnt;
4300 
4301     block = seaf_block_manager_open_block (seaf->block_mgr,
4302                                            task->repo_id, task->repo_version,
4303                                            block_id, BLOCK_WRITE);
4304     if (!block) {
4305         seaf_warning ("Failed to open block %s in repo %.8s.\n",
4306                       block_id, task->repo_id);
4307         return -1;
4308     }
4309 
4310     GetBlockData data;
4311     memcpy (data.block_id, block_id, 40);
4312     data.block = block;
4313     data.task = task;
4314 
4315     curl = conn->curl;
4316 
4317     if (!task->use_fileserver_port)
4318         url = g_strdup_printf ("%s/seafhttp/repo/%s/block/%s",
4319                                task->host, task->repo_id, block_id);
4320     else
4321         url = g_strdup_printf ("%s/repo/%s/block/%s",
4322                                task->host, task->repo_id, block_id);
4323 
4324     int curl_error;
4325     if (http_get (curl, url, task->token, &status, NULL, NULL,
4326                   get_block_callback, &data, TRUE, &curl_error) < 0) {
4327         if (task->state == HTTP_TASK_STATE_CANCELED)
4328             goto error;
4329 
4330         if (task->error == SYNC_ERROR_ID_NO_ERROR) {
4331             /* Only release the connection when it's a network error. */
4332             conn->release = TRUE;
4333             handle_curl_errors (task, curl_error);
4334         }
4335         ret = -1;
4336         goto error;
4337     }
4338 
4339     if (status != HTTP_OK) {
4340         seaf_warning ("Bad response code for GET %s: %d.\n", url, status);
4341         handle_http_errors (task, status);
4342         ret = -1;
4343         goto error;
4344     }
4345 
4346     BlockMetadata *bmd = seaf_block_manager_stat_block_by_handle(seaf->block_mgr, block);
4347     if (bmd == NULL) {
4348         seaf_warning ("Failed to get block %s meta data in repo %.8s.\n", block_id, task->repo_id);
4349         ret = -1;
4350         goto error;
4351     }
4352 
4353     seaf_block_manager_close_block (seaf->block_mgr, block);
4354 
4355     pthread_mutex_lock (&task->ref_cnt_lock);
4356 
4357     task->done_download += bmd->size;
4358     g_free (bmd);
4359 
4360     /* Don't overwrite the block if other thread already downloaded it.
4361      * Since we've locked ref_cnt_lock, we can be sure the block won't be removed.
4362      */
4363     if (!seaf_block_manager_block_exists (seaf->block_mgr,
4364                                           task->repo_id, task->repo_version,
4365                                           block_id) &&
4366         seaf_block_manager_commit_block (seaf->block_mgr, block) < 0)
4367     {
4368         seaf_warning ("Failed to commit block %s in repo %.8s.\n",
4369                       block_id, task->repo_id);
4370         task->error = SYNC_ERROR_ID_WRITE_LOCAL_DATA;
4371         ret = -1;
4372     }
4373 
4374     if (ret == 0) {
4375         pcnt = g_hash_table_lookup (task->blk_ref_cnts, block_id);
4376         if (!pcnt) {
4377             pcnt = g_new0(int, 1);
4378             g_hash_table_insert (task->blk_ref_cnts, g_strdup(block_id), pcnt);
4379         }
4380         *pcnt += 1;
4381     }
4382 
4383     pthread_mutex_unlock (&task->ref_cnt_lock);
4384 
4385     seaf_block_manager_block_handle_free (seaf->block_mgr, block);
4386 
4387     g_free (url);
4388 
4389     return ret;
4390 
4391 error:
4392     g_free (url);
4393 
4394     seaf_block_manager_close_block (seaf->block_mgr, block);
4395     seaf_block_manager_block_handle_free (seaf->block_mgr, block);
4396 
4397     return ret;
4398 }
4399 
4400 int
http_tx_task_download_file_blocks(HttpTxTask * task,const char * file_id)4401 http_tx_task_download_file_blocks (HttpTxTask *task, const char *file_id)
4402 {
4403     Seafile *file;
4404     HttpTxPriv *priv = seaf->http_tx_mgr->priv;
4405     ConnectionPool *pool;
4406     Connection *conn;
4407     int ret = 0;
4408 
4409     file = seaf_fs_manager_get_seafile (seaf->fs_mgr,
4410                                         task->repo_id,
4411                                         task->repo_version,
4412                                         file_id);
4413     if (!file) {
4414         seaf_warning ("Failed to find seafile object %s in repo %.8s.\n",
4415                       file_id, task->repo_id);
4416         return -1;
4417     }
4418 
4419     pool = find_connection_pool (priv, task->host);
4420     if (!pool) {
4421         seaf_warning ("Failed to create connection pool for host %s.\n", task->host);
4422         task->error = SYNC_ERROR_ID_NOT_ENOUGH_MEMORY;
4423         seafile_unref (file);
4424         return -1;
4425     }
4426 
4427     conn = connection_pool_get_connection (pool);
4428     if (!conn) {
4429         seaf_warning ("Failed to get connection to host %s.\n", task->host);
4430         task->error = SYNC_ERROR_ID_NOT_ENOUGH_MEMORY;
4431         seafile_unref (file);
4432         return -1;
4433     }
4434 
4435     int i;
4436     char *block_id;
4437     for (i = 0; i < file->n_blocks; ++i) {
4438         block_id = file->blk_sha1s[i];
4439         if (seaf_block_manager_block_exists (seaf->block_mgr,
4440                                              task->repo_id, task->repo_version,
4441                                              block_id)) {
4442             BlockMetadata *bmd;
4443             bmd = seaf_block_manager_stat_block (seaf->block_mgr,
4444                                                  task->repo_id, task->repo_version,
4445                                                  block_id);
4446             if (bmd) {
4447                 task->done_download += bmd->size;
4448                 g_free (bmd);
4449                 continue;
4450             }
4451         }
4452 
4453         ret = get_block (task, conn, block_id);
4454         if (ret < 0 || task->state == HTTP_TASK_STATE_CANCELED)
4455             break;
4456     }
4457 
4458     connection_pool_return_connection (pool, conn);
4459 
4460     seafile_unref (file);
4461 
4462     return ret;
4463 }
4464 
4465 static int
update_local_repo(HttpTxTask * task)4466 update_local_repo (HttpTxTask *task)
4467 {
4468     SeafRepo *repo;
4469     SeafCommit *new_head;
4470     SeafBranch *branch;
4471     int ret = 0;
4472 
4473     new_head = seaf_commit_manager_get_commit (seaf->commit_mgr,
4474                                                task->repo_id,
4475                                                task->repo_version,
4476                                                task->head);
4477     if (!new_head) {
4478         seaf_warning ("Failed to get commit %s:%s.\n", task->repo_id, task->head);
4479         task->error = SYNC_ERROR_ID_LOCAL_DATA_CORRUPT;
4480         return -1;
4481     }
4482 
4483     /* If repo doesn't exist, create it.
4484      * Note that branch doesn't exist either in this case.
4485      */
4486     repo = seaf_repo_manager_get_repo (seaf->repo_mgr, new_head->repo_id);
4487     if (task->is_clone) {
4488         if (repo != NULL)
4489             goto out;
4490 
4491         repo = seaf_repo_new (new_head->repo_id, NULL, NULL);
4492         if (repo == NULL) {
4493             /* create repo failed */
4494             task->error = SYNC_ERROR_ID_NOT_ENOUGH_MEMORY;
4495             ret = -1;
4496             goto out;
4497         }
4498 
4499         seaf_repo_from_commit (repo, new_head);
4500 
4501         seaf_repo_manager_add_repo (seaf->repo_mgr, repo);
4502 
4503         /* If it's a new repo, create 'local' and 'master' branch */
4504         branch = seaf_branch_new ("local", task->repo_id, task->head);
4505         seaf_branch_manager_add_branch (seaf->branch_mgr, branch);
4506         seaf_branch_unref (branch);
4507 
4508         branch = seaf_branch_new ("master", task->repo_id, task->head);
4509         seaf_branch_manager_add_branch (seaf->branch_mgr, branch);
4510         seaf_branch_unref (branch);
4511     } else {
4512         if (!repo) {
4513             task->error = SYNC_ERROR_ID_LOCAL_DATA_CORRUPT;
4514             ret = -1;
4515             goto out;
4516         }
4517 
4518         branch = seaf_branch_manager_get_branch (seaf->branch_mgr,
4519                                                  task->repo_id,
4520                                                  "master");
4521         if (!branch) {
4522             seaf_warning ("Branch master not found for repo %.8s.\n", task->repo_id);
4523             task->error = SYNC_ERROR_ID_LOCAL_DATA_CORRUPT;
4524             ret = -1;
4525             goto out;
4526         }
4527         seaf_branch_set_commit (branch, new_head->commit_id);
4528         seaf_branch_manager_update_branch (seaf->branch_mgr, branch);
4529         seaf_branch_unref (branch);
4530 
4531         /* Update repo head branch. */
4532         seaf_branch_set_commit (repo->head, new_head->commit_id);
4533         seaf_branch_manager_update_branch (seaf->branch_mgr, repo->head);
4534 
4535         if (g_strcmp0 (repo->name, new_head->repo_name) != 0)
4536             seaf_repo_set_name (repo, new_head->repo_name);
4537     }
4538 
4539 out:
4540     seaf_commit_unref (new_head);
4541     return ret;
4542 }
4543 
4544 static void *
http_download_thread(void * vdata)4545 http_download_thread (void *vdata)
4546 {
4547     HttpTxTask *task = vdata;
4548     HttpTxPriv *priv = seaf->http_tx_mgr->priv;
4549     ConnectionPool *pool;
4550     Connection *conn = NULL;
4551     GList *fs_id_list = NULL;
4552 
4553     pool = find_connection_pool (priv, task->host);
4554     if (!pool) {
4555         seaf_warning ("Failed to create connection pool for host %s.\n", task->host);
4556         task->error = SYNC_ERROR_ID_NOT_ENOUGH_MEMORY;
4557         goto out;
4558     }
4559 
4560     conn = connection_pool_get_connection (pool);
4561     if (!conn) {
4562         seaf_warning ("Failed to get connection to host %s.\n", task->host);
4563         task->error = SYNC_ERROR_ID_NOT_ENOUGH_MEMORY;
4564         goto out;
4565     }
4566 
4567     /* seaf_message ("Download with HTTP sync protocol version %d.\n", */
4568     /*               task->protocol_version); */
4569 
4570     transition_state (task, task->state, HTTP_TASK_RT_STATE_CHECK);
4571 
4572     if (check_permission (task, conn) < 0) {
4573         seaf_warning ("Download permission denied for repo %.8s on server %s.\n",
4574                       task->repo_id, task->host);
4575         goto out;
4576     }
4577 
4578     if (task->state == HTTP_TASK_STATE_CANCELED)
4579         goto out;
4580 
4581     transition_state (task, task->state, HTTP_TASK_RT_STATE_COMMIT);
4582 
4583     if (get_commit_object (task, conn) < 0) {
4584         seaf_warning ("Failed to get server head commit for repo %.8s on server %s.\n",
4585                       task->repo_id, task->host);
4586         goto out;
4587     }
4588 
4589     if (task->state == HTTP_TASK_STATE_CANCELED)
4590         goto out;
4591 
4592     transition_state (task, task->state, HTTP_TASK_RT_STATE_FS);
4593 
4594     if (get_needed_fs_id_list (task, conn, &fs_id_list) < 0) {
4595         seaf_warning ("Failed to get fs id list for repo %.8s on server %s.\n",
4596                       task->repo_id, task->host);
4597         goto out;
4598     }
4599 
4600     if (task->state == HTTP_TASK_STATE_CANCELED)
4601         goto out;
4602 
4603     while (fs_id_list != NULL) {
4604         if (get_fs_objects (task, conn, &fs_id_list) < 0) {
4605             seaf_warning ("Failed to get fs objects for repo %.8s on server %s.\n",
4606                           task->repo_id, task->host);
4607             goto out;
4608         }
4609 
4610         if (task->state == HTTP_TASK_STATE_CANCELED)
4611             goto out;
4612     }
4613 
4614     transition_state (task, task->state, HTTP_TASK_RT_STATE_BLOCK);
4615 
4616     /* Record download head commit id, so that we can resume download
4617      * if this download is interrupted.
4618      */
4619     seaf_repo_manager_set_repo_property (seaf->repo_mgr,
4620                                          task->repo_id,
4621                                          REPO_PROP_DOWNLOAD_HEAD,
4622                                          task->head);
4623 
4624     int rc = seaf_repo_fetch_and_checkout (task, task->head);
4625     switch (rc) {
4626     case FETCH_CHECKOUT_SUCCESS:
4627         break;
4628     case FETCH_CHECKOUT_CANCELED:
4629         goto out;
4630     case FETCH_CHECKOUT_FAILED:
4631         task->error = SYNC_ERROR_ID_WRITE_LOCAL_DATA;
4632         goto out;
4633     case FETCH_CHECKOUT_TRANSFER_ERROR:
4634         goto out;
4635     case FETCH_CHECKOUT_LOCKED:
4636         task->error = SYNC_ERROR_ID_FILE_LOCKED_BY_APP;
4637         goto out;
4638     }
4639 
4640     update_local_repo (task);
4641 
4642 out:
4643     connection_pool_return_connection (pool, conn);
4644     string_list_free (fs_id_list);
4645     return vdata;
4646 }
4647 
4648 static void
http_download_done(void * vdata)4649 http_download_done (void *vdata)
4650 {
4651     HttpTxTask *task = vdata;
4652 
4653     if (task->error != SYNC_ERROR_ID_NO_ERROR)
4654         transition_state (task, HTTP_TASK_STATE_ERROR, HTTP_TASK_RT_STATE_FINISHED);
4655     else if (task->state == HTTP_TASK_STATE_CANCELED)
4656         transition_state (task, task->state, HTTP_TASK_RT_STATE_FINISHED);
4657     else
4658         transition_state (task, HTTP_TASK_STATE_FINISHED, HTTP_TASK_RT_STATE_FINISHED);
4659 }
4660 
4661 GList*
http_tx_manager_get_upload_tasks(HttpTxManager * manager)4662 http_tx_manager_get_upload_tasks (HttpTxManager *manager)
4663 {
4664     return g_hash_table_get_values (manager->priv->upload_tasks);
4665 }
4666 
4667 GList*
http_tx_manager_get_download_tasks(HttpTxManager * manager)4668 http_tx_manager_get_download_tasks (HttpTxManager *manager)
4669 {
4670     return g_hash_table_get_values (manager->priv->download_tasks);
4671 }
4672 
4673 HttpTxTask *
http_tx_manager_find_task(HttpTxManager * manager,const char * repo_id)4674 http_tx_manager_find_task (HttpTxManager *manager, const char *repo_id)
4675 {
4676     HttpTxTask *task = NULL;
4677 
4678     task = g_hash_table_lookup (manager->priv->upload_tasks, repo_id);
4679     if (task)
4680         return task;
4681 
4682     task = g_hash_table_lookup (manager->priv->download_tasks, repo_id);
4683     return task;
4684 }
4685 
4686 void
http_tx_manager_cancel_task(HttpTxManager * manager,const char * repo_id,int task_type)4687 http_tx_manager_cancel_task (HttpTxManager *manager,
4688                              const char *repo_id,
4689                              int task_type)
4690 {
4691     HttpTxTask *task = NULL;
4692 
4693     if (task_type == HTTP_TASK_TYPE_DOWNLOAD)
4694         task = g_hash_table_lookup (manager->priv->download_tasks, repo_id);
4695     else
4696         task = g_hash_table_lookup (manager->priv->upload_tasks, repo_id);
4697 
4698     if (!task)
4699         return;
4700 
4701     if (task->state != HTTP_TASK_STATE_NORMAL) {
4702         seaf_warning ("Cannot cancel task not in NORMAL state.\n");
4703         return;
4704     }
4705 
4706     if (task->runtime_state == HTTP_TASK_RT_STATE_INIT) {
4707         transition_state (task, HTTP_TASK_STATE_CANCELED, HTTP_TASK_RT_STATE_FINISHED);
4708         return;
4709     }
4710 
4711     /* Only change state. runtime_state will be changed in worker thread. */
4712     transition_state (task, HTTP_TASK_STATE_CANCELED, task->runtime_state);
4713 }
4714 
4715 int
http_tx_task_get_rate(HttpTxTask * task)4716 http_tx_task_get_rate (HttpTxTask *task)
4717 {
4718     return task->last_tx_bytes;
4719 }
4720 
4721 const char *
http_task_state_to_str(int state)4722 http_task_state_to_str (int state)
4723 {
4724     if (state < 0 || state >= N_HTTP_TASK_STATE)
4725         return "unknown";
4726 
4727     return http_task_state_str[state];
4728 }
4729 
4730 const char *
http_task_rt_state_to_str(int rt_state)4731 http_task_rt_state_to_str (int rt_state)
4732 {
4733     if (rt_state < 0 || rt_state >= N_HTTP_TASK_RT_STATE)
4734         return "unknown";
4735 
4736     return http_task_rt_state_str[rt_state];
4737 }
4738