1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3 #include "common.h"
4
5 #include "net.h"
6
7 #include <pthread.h>
8 #include <curl/curl.h>
9 #include <jansson.h>
10 #include <event2/buffer.h>
11
12 #ifdef WIN32
13 #include <windows.h>
14 #include <wincrypt.h>
15 #endif
16
17 #ifndef USE_GPL_CRYPTO
18 #include <openssl/pem.h>
19 #include <openssl/x509.h>
20 #include <openssl/bio.h>
21 #include <openssl/asn1.h>
22 #include <openssl/ssl.h>
23 #endif
24
25 #include "seafile-config.h"
26
27 #include "seafile-session.h"
28 #include "http-tx-mgr.h"
29
30 #include "seafile-error-impl.h"
31 #include "utils.h"
32 #include "diff-simple.h"
33
34 #define DEBUG_FLAG SEAFILE_DEBUG_TRANSFER
35 #include "log.h"
36
37 #include "timer.h"
38
39 #define HTTP_OK 200
40 #define HTTP_BAD_REQUEST 400
41 #define HTTP_FORBIDDEN 403
42 #define HTTP_NOT_FOUND 404
43 #define HTTP_NO_QUOTA 443
44 #define HTTP_REPO_DELETED 444
45 #define HTTP_REPO_CORRUPTED 445
46 #define HTTP_INTERNAL_SERVER_ERROR 500
47
48 #define RESET_BYTES_INTERVAL_MSEC 1000
49
50 #define CLEAR_POOL_ERR_CNT 3
51
52 #ifndef SEAFILE_CLIENT_VERSION
53 #define SEAFILE_CLIENT_VERSION PACKAGE_VERSION
54 #endif
55
56 #ifdef WIN32
57 #define USER_AGENT_OS "Windows NT"
58 #endif
59
60 #ifdef __APPLE__
61 #define USER_AGENT_OS "Apple OS X"
62 #endif
63
64 #ifdef __linux__
65 #define USER_AGENT_OS "Linux"
66 #endif
67
68 #if defined __FreeBSD__ || defined __NetBSD__ || defined __OpenBSD__ || defined __DragonFly__
69 #define USER_AGENT_OS "BSD"
70 #endif
71
72 #ifdef __FreeBSD__
73 #define USER_AGENT_OS "FreeBSD"
74 #endif
75
76 #ifdef __DragonFly__
77 #define USER_AGENT_OS "DragonFly"
78 #endif
79
80 #ifdef __NetBSD__
81 #define USER_AGENT_OS "NetBSD"
82 #endif
83
84 #ifdef __OpenBSD__
85 #define USER_AGENT_OS "OpenBSD"
86 #endif
87
88 struct _Connection {
89 CURL *curl;
90 gint64 ctime; /* Used to clean up unused connection. */
91 gboolean release; /* If TRUE, the connection will be released. */
92 };
93 typedef struct _Connection Connection;
94
95 struct _ConnectionPool {
96 char *host;
97 GQueue *queue;
98 pthread_mutex_t lock;
99 int err_cnt;
100 };
101 typedef struct _ConnectionPool ConnectionPool;
102
103 struct _HttpTxPriv {
104 GHashTable *download_tasks;
105 GHashTable *upload_tasks;
106
107 GHashTable *connection_pools; /* host -> connection pool */
108 pthread_mutex_t pools_lock;
109
110 SeafTimer *reset_bytes_timer;
111
112 char *ca_bundle_path;
113
114 /* Regex to parse error message returned by update-branch. */
115 GRegex *locked_error_regex;
116 GRegex *folder_perm_error_regex;
117 };
118 typedef struct _HttpTxPriv HttpTxPriv;
119
120 /* Http Tx Task */
121
122 static HttpTxTask *
http_tx_task_new(HttpTxManager * mgr,const char * repo_id,int repo_version,int type,gboolean is_clone,const char * host,const char * token,const char * passwd,const char * worktree)123 http_tx_task_new (HttpTxManager *mgr,
124 const char *repo_id,
125 int repo_version,
126 int type,
127 gboolean is_clone,
128 const char *host,
129 const char *token,
130 const char *passwd,
131 const char *worktree)
132 {
133 HttpTxTask *task = g_new0 (HttpTxTask, 1);
134
135 task->manager = mgr;
136 memcpy (task->repo_id, repo_id, 36);
137 task->repo_version = repo_version;
138 task->type = type;
139 task->is_clone = is_clone;
140
141 task->host = g_strdup(host);
142 task->token = g_strdup(token);
143
144 if (passwd)
145 task->passwd = g_strdup(passwd);
146 if (worktree)
147 task->worktree = g_strdup(worktree);
148
149 task->error = SYNC_ERROR_ID_NO_ERROR;
150
151 return task;
152 }
153
154 static void
http_tx_task_free(HttpTxTask * task)155 http_tx_task_free (HttpTxTask *task)
156 {
157 g_free (task->host);
158 g_free (task->token);
159 g_free (task->passwd);
160 g_free (task->worktree);
161 g_free (task->email);
162 g_free (task->repo_name);
163 if (task->type == HTTP_TASK_TYPE_DOWNLOAD) {
164 g_hash_table_destroy (task->blk_ref_cnts);
165 }
166 g_free (task);
167 }
168
169 static const char *http_task_state_str[] = {
170 "normal",
171 "canceled",
172 "finished",
173 "error",
174 };
175
176 static const char *http_task_rt_state_str[] = {
177 "init",
178 "check",
179 "commit",
180 "fs",
181 "data",
182 "update-branch",
183 "finished",
184 };
185
186 /* Http connection and connection pool. */
187
188 static Connection *
connection_new()189 connection_new ()
190 {
191 Connection *conn = g_new0 (Connection, 1);
192
193 conn->curl = curl_easy_init();
194 conn->ctime = (gint64)time(NULL);
195
196 return conn;
197 }
198
199 static void
connection_free(Connection * conn)200 connection_free (Connection *conn)
201 {
202 curl_easy_cleanup (conn->curl);
203 g_free (conn);
204 }
205
206 static ConnectionPool *
connection_pool_new(const char * host)207 connection_pool_new (const char *host)
208 {
209 ConnectionPool *pool = g_new0 (ConnectionPool, 1);
210 pool->host = g_strdup(host);
211 pool->queue = g_queue_new ();
212 pthread_mutex_init (&pool->lock, NULL);
213 return pool;
214 }
215
216 static ConnectionPool *
find_connection_pool(HttpTxPriv * priv,const char * host)217 find_connection_pool (HttpTxPriv *priv, const char *host)
218 {
219 ConnectionPool *pool;
220
221 pthread_mutex_lock (&priv->pools_lock);
222 pool = g_hash_table_lookup (priv->connection_pools, host);
223 if (!pool) {
224 pool = connection_pool_new (host);
225 g_hash_table_insert (priv->connection_pools, g_strdup(host), pool);
226 }
227 pthread_mutex_unlock (&priv->pools_lock);
228
229 return pool;
230 }
231
232 static Connection *
connection_pool_get_connection(ConnectionPool * pool)233 connection_pool_get_connection (ConnectionPool *pool)
234 {
235 Connection *conn = NULL;
236
237 pthread_mutex_lock (&pool->lock);
238 conn = g_queue_pop_head (pool->queue);
239 if (!conn) {
240 conn = connection_new ();
241 }
242 pthread_mutex_unlock (&pool->lock);
243
244 return conn;
245 }
246
247 static void
connection_pool_clear(ConnectionPool * pool)248 connection_pool_clear (ConnectionPool *pool)
249 {
250 Connection *conn = NULL;
251
252 while (1) {
253 conn = g_queue_pop_head (pool->queue);
254 if (!conn)
255 break;
256 connection_free (conn);
257 }
258 }
259
260 static void
connection_pool_return_connection(ConnectionPool * pool,Connection * conn)261 connection_pool_return_connection (ConnectionPool *pool, Connection *conn)
262 {
263 if (!conn)
264 return;
265
266 if (conn->release) {
267 connection_free (conn);
268
269 pthread_mutex_lock (&pool->lock);
270 if (++pool->err_cnt >= CLEAR_POOL_ERR_CNT) {
271 connection_pool_clear (pool);
272 }
273 pthread_mutex_unlock (&pool->lock);
274
275 return;
276 }
277
278 curl_easy_reset (conn->curl);
279
280 /* Reset error count when one connection succeeded. */
281 pthread_mutex_lock (&pool->lock);
282 pool->err_cnt = 0;
283 g_queue_push_tail (pool->queue, conn);
284 pthread_mutex_unlock (&pool->lock);
285 }
286
287 #define LOCKED_ERROR_PATTERN "File (.+) is locked"
288 #define FOLDER_PERM_ERROR_PATTERN "Update to path (.+) is not allowed by folder permission settings"
289
290 HttpTxManager *
http_tx_manager_new(struct _SeafileSession * seaf)291 http_tx_manager_new (struct _SeafileSession *seaf)
292 {
293 HttpTxManager *mgr = g_new0 (HttpTxManager, 1);
294 HttpTxPriv *priv = g_new0 (HttpTxPriv, 1);
295
296 mgr->seaf = seaf;
297
298 priv->download_tasks = g_hash_table_new_full (g_str_hash, g_str_equal,
299 g_free,
300 (GDestroyNotify)http_tx_task_free);
301 priv->upload_tasks = g_hash_table_new_full (g_str_hash, g_str_equal,
302 g_free,
303 (GDestroyNotify)http_tx_task_free);
304
305 priv->connection_pools = g_hash_table_new (g_str_hash, g_str_equal);
306 pthread_mutex_init (&priv->pools_lock, NULL);
307
308 priv->ca_bundle_path = g_build_filename (seaf->seaf_dir, "ca-bundle.pem", NULL);
309
310 GError *error = NULL;
311 priv->locked_error_regex = g_regex_new (LOCKED_ERROR_PATTERN, 0, 0, &error);
312 if (error) {
313 seaf_warning ("Failed to create regex '%s': %s\n", LOCKED_ERROR_PATTERN, error->message);
314 g_clear_error (&error);
315 }
316
317 priv->folder_perm_error_regex = g_regex_new (FOLDER_PERM_ERROR_PATTERN, 0, 0, &error);
318 if (error) {
319 seaf_warning ("Failed to create regex '%s': %s\n", FOLDER_PERM_ERROR_PATTERN, error->message);
320 g_clear_error (&error);
321 }
322
323 mgr->priv = priv;
324
325 return mgr;
326 }
327
328 static int
reset_bytes(void * vdata)329 reset_bytes (void *vdata)
330 {
331 HttpTxManager *mgr = vdata;
332 HttpTxPriv *priv = mgr->priv;
333 GHashTableIter iter;
334 gpointer key, value;
335 HttpTxTask *task;
336
337 g_hash_table_iter_init (&iter, priv->download_tasks);
338 while (g_hash_table_iter_next (&iter, &key, &value)) {
339 task = value;
340 task->last_tx_bytes = g_atomic_int_get (&task->tx_bytes);
341 g_atomic_int_set (&task->tx_bytes, 0);
342 }
343
344 g_hash_table_iter_init (&iter, priv->upload_tasks);
345 while (g_hash_table_iter_next (&iter, &key, &value)) {
346 task = value;
347 task->last_tx_bytes = g_atomic_int_get (&task->tx_bytes);
348 g_atomic_int_set (&task->tx_bytes, 0);
349 }
350
351 return 1;
352 }
353
354 int
http_tx_manager_start(HttpTxManager * mgr)355 http_tx_manager_start (HttpTxManager *mgr)
356 {
357 #ifdef WIN32
358 /* Remove existing ca-bundle file on start. */
359 g_unlink (mgr->priv->ca_bundle_path);
360 #endif
361
362 /* TODO: add a timer to clean up unused Http connections. */
363
364 mgr->priv->reset_bytes_timer = seaf_timer_new (reset_bytes,
365 mgr,
366 RESET_BYTES_INTERVAL_MSEC);
367
368 return 0;
369 }
370
371 /* Common Utility Functions. */
372
373 #ifndef USE_GPL_CRYPTO
374
375 #ifdef WIN32
376
377 /* static void */
378 /* write_cert_name_to_pem_file (FILE *f, PCCERT_CONTEXT pc) */
379 /* { */
380 /* char *name; */
381 /* DWORD size; */
382
383 /* fprintf (f, "\n"); */
384
385 /* if (!CertGetCertificateContextProperty(pc, */
386 /* CERT_FRIENDLY_NAME_PROP_ID, */
387 /* NULL, &size)) { */
388 /* return; */
389 /* } */
390
391 /* name = g_malloc ((gsize)size); */
392 /* if (!name) { */
393 /* seaf_warning ("Failed to alloc memory\n"); */
394 /* return; */
395 /* } */
396
397 /* if (!CertGetCertificateContextProperty(pc, */
398 /* CERT_FRIENDLY_NAME_PROP_ID, */
399 /* name, &size)) { */
400 /* g_free (name); */
401 /* return; */
402 /* } */
403
404 /* if (fwrite(name, (size_t)size, 1, f) != 1) { */
405 /* seaf_warning ("Failed to write pem file.\n"); */
406 /* g_free (name); */
407 /* return; */
408 /* } */
409 /* fprintf (f, "\n"); */
410
411 /* g_free (name); */
412 /* } */
413
414 static void
write_cert_to_pem_file(FILE * f,PCCERT_CONTEXT pc)415 write_cert_to_pem_file (FILE *f, PCCERT_CONTEXT pc)
416 {
417 const unsigned char *der = pc->pbCertEncoded;
418 X509 *cert;
419
420 /* write_cert_name_to_pem_file (f, pc); */
421
422 cert = d2i_X509 (NULL, &der, (int)pc->cbCertEncoded);
423 if (!cert) {
424 seaf_warning ("Failed to parse certificate from DER.\n");
425 return;
426 }
427
428 /* Don't add expired certs to pem file, otherwise openssl will
429 * complain certificate expired.
430 */
431 if (X509_cmp_current_time (X509_get_notAfter(cert)) < 0)
432 return;
433
434 if (!PEM_write_X509 (f, cert)) {
435 seaf_warning ("Failed to write certificate.\n");
436 X509_free (cert);
437 return;
438 }
439
440 X509_free (cert);
441 }
442
443 static int
load_ca_from_store(FILE * f,const wchar_t * store_name)444 load_ca_from_store (FILE *f, const wchar_t *store_name)
445 {
446 HCERTSTORE store;
447
448 store = CertOpenSystemStoreW (0, store_name);
449 if (!store) {
450 seaf_warning ("Failed to open system cert store: %lu\n", GetLastError());
451 return -1;
452 }
453
454 PCCERT_CONTEXT pc = NULL;
455 while (1) {
456 pc = CertFindCertificateInStore (store, X509_ASN_ENCODING, 0, CERT_FIND_ANY, NULL, pc);
457 if (!pc)
458 break;
459 write_cert_to_pem_file (f, pc);
460 }
461
462 CertCloseStore(store, 0);
463
464 return 0;
465 }
466
467 static int
create_ca_bundle(const char * ca_bundle_path)468 create_ca_bundle (const char *ca_bundle_path)
469 {
470 FILE *f;
471 int ret = 0;
472
473 f = g_fopen (ca_bundle_path, "w+b");
474 if (!f) {
475 seaf_warning ("Failed to open cabundle file %s: %s\n",
476 ca_bundle_path, strerror(errno));
477 return -1;
478 }
479
480 if (load_ca_from_store (f, L"ROOT") < 0) {
481 seaf_warning ("Failed to load ca from ROOT store.\n");
482 ret = -1;
483 goto out;
484 }
485
486 if (load_ca_from_store (f, L"CA") < 0) {
487 seaf_warning ("Failed to load ca from CA store.\n");
488 ret = -1;
489 goto out;
490 }
491
492 out:
493 fclose (f);
494 return ret;
495 }
496
497 #endif /* WIN32 */
498
499 #ifndef __linux__
500 static void
load_ca_bundle(CURL * curl)501 load_ca_bundle (CURL *curl)
502 {
503 char *ca_bundle_path = seaf->http_tx_mgr->priv->ca_bundle_path;
504
505 /* On MacOS the certs are loaded by seafile applet instead of seaf-daemon */
506 if (!seaf_util_exists (ca_bundle_path)) {
507 #ifdef WIN32
508 if (create_ca_bundle (ca_bundle_path) < 0)
509 return;
510 #else
511 return;
512 #endif
513 }
514
515 curl_easy_setopt (curl, CURLOPT_CAINFO, ca_bundle_path);
516 }
517 #endif /* __linux__ */
518
519 static gboolean
load_certs(sqlite3_stmt * stmt,void * vdata)520 load_certs (sqlite3_stmt *stmt, void *vdata)
521 {
522 X509_STORE *store = vdata;
523 X509 *saved = NULL;
524 const char *pem_b64;
525 char *pem = NULL;
526 BIO *b = NULL;
527 gboolean ret = TRUE;
528
529 pem_b64 = (const char *)sqlite3_column_text (stmt, 0);
530
531 gsize len;
532 pem = (char *)g_base64_decode (pem_b64, &len);
533 if (!pem) {
534 seaf_warning ("Failed to decode base64.\n");
535 goto out;
536 }
537
538 b = BIO_new (BIO_s_mem());
539 if (!b) {
540 seaf_warning ("Failed to alloc BIO\n");
541 goto out;
542 }
543
544 if (BIO_write (b, pem, len) != len) {
545 seaf_warning ("Failed to write pem to BIO\n");
546 goto out;
547 }
548
549 saved = PEM_read_bio_X509 (b, NULL, 0, NULL);
550 if (!saved) {
551 seaf_warning ("Failed to read PEM from BIO\n");
552 goto out;
553 }
554
555 X509_STORE_add_cert (store, saved);
556
557 out:
558 g_free (pem);
559 if (b)
560 BIO_free (b);
561 if (saved)
562 X509_free (saved);
563
564 return ret;
565 }
566
567 static int
load_certs_from_db(X509_STORE * store)568 load_certs_from_db (X509_STORE *store)
569 {
570 char *cert_db_path = NULL;
571 sqlite3 *db = NULL;
572 char *sql;
573 int ret = 0;
574
575 cert_db_path = g_build_filename (seaf->seaf_dir, "certs.db", NULL);
576 if (sqlite_open_db (cert_db_path, &db) < 0) {
577 seaf_warning ("Failed to open certs.db\n");
578 ret = -1;
579 goto out;
580 }
581
582 sql = "SELECT cert FROM Certs;";
583
584 if (sqlite_foreach_selected_row (db, sql, load_certs, store) < 0) {
585 ret = -1;
586 goto out;
587 }
588
589 out:
590 g_free (cert_db_path);
591 if (db)
592 sqlite_close_db (db);
593
594 return ret;
595 }
596
597 static CURLcode
ssl_callback(CURL * curl,void * ssl_ctx,void * userptr)598 ssl_callback (CURL *curl, void *ssl_ctx, void *userptr)
599 {
600 SSL_CTX *ctx = ssl_ctx;
601 X509_STORE *store;
602
603 store = SSL_CTX_get_cert_store(ctx);
604
605 /* Add all certs stored in db as trusted CA certs.
606 * This workaround has one limitation though. The self-signed certs cannot
607 * contain chain. It must be the CA itself.
608 */
609 load_certs_from_db (store);
610
611 return CURLE_OK;
612 }
613
614 #endif /* USE_GPL_CRYPTO */
615
616 static void
set_proxy(CURL * curl,gboolean is_https)617 set_proxy (CURL *curl, gboolean is_https)
618 {
619 /* Disable proxy if proxy options are not set properly. */
620 if (!seaf->use_http_proxy || !seaf->http_proxy_type || !seaf->http_proxy_addr) {
621 curl_easy_setopt (curl, CURLOPT_PROXY, NULL);
622 return;
623 }
624
625 if (g_strcmp0(seaf->http_proxy_type, PROXY_TYPE_HTTP) == 0) {
626 curl_easy_setopt(curl, CURLOPT_PROXYTYPE, CURLPROXY_HTTP);
627 /* Use CONNECT method create a SSL tunnel if https is used. */
628 if (is_https)
629 curl_easy_setopt(curl, CURLOPT_HTTPPROXYTUNNEL, 1L);
630 curl_easy_setopt(curl, CURLOPT_PROXY, seaf->http_proxy_addr);
631 curl_easy_setopt(curl, CURLOPT_PROXYPORT,
632 seaf->http_proxy_port > 0 ? seaf->http_proxy_port : 80);
633 if (seaf->http_proxy_username && seaf->http_proxy_password) {
634 curl_easy_setopt(curl, CURLOPT_PROXYAUTH,
635 CURLAUTH_BASIC |
636 CURLAUTH_DIGEST |
637 CURLAUTH_DIGEST_IE |
638 CURLAUTH_GSSNEGOTIATE |
639 CURLAUTH_NTLM);
640 curl_easy_setopt(curl, CURLOPT_PROXYUSERNAME, seaf->http_proxy_username);
641 curl_easy_setopt(curl, CURLOPT_PROXYPASSWORD, seaf->http_proxy_password);
642 }
643 } else if (g_strcmp0(seaf->http_proxy_type, PROXY_TYPE_SOCKS) == 0) {
644 if (seaf->http_proxy_port < 0)
645 return;
646 curl_easy_setopt(curl, CURLOPT_PROXYTYPE, CURLPROXY_SOCKS5);
647 curl_easy_setopt(curl, CURLOPT_PROXY, seaf->http_proxy_addr);
648 curl_easy_setopt(curl, CURLOPT_PROXYPORT, seaf->http_proxy_port);
649 }
650 }
651
652 #ifdef WIN32
653 static int
sockopt_callback(void * clientp,curl_socket_t curlfd,curlsocktype purpose)654 sockopt_callback (void *clientp, curl_socket_t curlfd, curlsocktype purpose)
655 {
656 /* Set large enough TCP buffer size.
657 * This greatly enhances sync speed for high latency network.
658 * Windows by default use 8KB buffers, which is too small for such case.
659 * Linux has auto-tuning for TCP buffers, so don't need to set manually.
660 * OSX is TBD.
661 */
662
663 #define DEFAULT_SNDBUF_SIZE (1 << 17) /* 128KB */
664
665 /* Set send buffer size. */
666 int sndbuf_size;
667 socklen_t optlen;
668
669 optlen = sizeof(int);
670 getsockopt (curlfd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf_size, &optlen);
671
672 if (sndbuf_size < DEFAULT_SNDBUF_SIZE) {
673 sndbuf_size = DEFAULT_SNDBUF_SIZE;
674 optlen = sizeof(int);
675 setsockopt (curlfd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf_size, optlen);
676 }
677
678 /* Disable Nagle's algorithm. */
679 int val = 1;
680 optlen = sizeof(int);
681 setsockopt (curlfd, IPPROTO_TCP, TCP_NODELAY, (char *)&val, optlen);
682
683 return CURL_SOCKOPT_OK;
684 }
685 #endif /* WIN32 */
686
687 typedef struct _HttpResponse {
688 char *content;
689 size_t size;
690 } HttpResponse;
691
692 static size_t
recv_response(void * contents,size_t size,size_t nmemb,void * userp)693 recv_response (void *contents, size_t size, size_t nmemb, void *userp)
694 {
695 size_t realsize = size * nmemb;
696 HttpResponse *rsp = userp;
697
698 rsp->content = g_realloc (rsp->content, rsp->size + realsize);
699 if (!rsp->content) {
700 seaf_warning ("Not enough memory.\n");
701 /* return a value other than realsize to signify an error. */
702 return 0;
703 }
704
705 memcpy (rsp->content + rsp->size, contents, realsize);
706 rsp->size += realsize;
707
708 return realsize;
709 }
710
711 extern FILE *seafile_get_log_fp ();
712
713 #define HTTP_TIMEOUT_SEC 300
714
715 typedef size_t (*HttpRecvCallback) (void *, size_t, size_t, void *);
716
717 /*
718 * The @timeout parameter is for detecting network connection problems.
719 * The @timeout parameter should be set to TRUE for data-transfer-only operations,
720 * such as getting objects, blocks. For operations that requires calculations
721 * on the server side, the timeout should be set to FALSE. Otherwise when
722 * the server sometimes takes more than 45 seconds to calculate the result,
723 * the client will time out.
724 */
725 static int
http_get(CURL * curl,const char * url,const char * token,int * rsp_status,char ** rsp_content,gint64 * rsp_size,HttpRecvCallback callback,void * cb_data,gboolean timeout,int * pcurl_error)726 http_get (CURL *curl, const char *url, const char *token,
727 int *rsp_status, char **rsp_content, gint64 *rsp_size,
728 HttpRecvCallback callback, void *cb_data,
729 gboolean timeout, int *pcurl_error)
730 {
731 char *token_header;
732 struct curl_slist *headers = NULL;
733 int ret = 0;
734
735 if (seafile_debug_flag_is_set (SEAFILE_DEBUG_CURL)) {
736 curl_easy_setopt (curl, CURLOPT_VERBOSE, 1);
737 curl_easy_setopt (curl, CURLOPT_STDERR, seafile_get_log_fp());
738 }
739
740 headers = curl_slist_append (headers, "User-Agent: Seafile/"SEAFILE_CLIENT_VERSION" ("USER_AGENT_OS")");
741
742 if (token) {
743 token_header = g_strdup_printf ("Seafile-Repo-Token: %s", token);
744 headers = curl_slist_append (headers, token_header);
745 g_free (token_header);
746 }
747
748 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
749
750 curl_easy_setopt(curl, CURLOPT_URL, url);
751 curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L);
752
753 if (timeout) {
754 /* Set low speed limit to 1 bytes. This effectively means no data. */
755 curl_easy_setopt(curl, CURLOPT_LOW_SPEED_LIMIT, 1);
756 curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, HTTP_TIMEOUT_SEC);
757 }
758
759 if (seaf->disable_verify_certificate) {
760 curl_easy_setopt (curl, CURLOPT_SSL_VERIFYPEER, 0L);
761 curl_easy_setopt (curl, CURLOPT_SSL_VERIFYHOST, 0L);
762 }
763
764 HttpResponse rsp;
765 memset (&rsp, 0, sizeof(rsp));
766 if (rsp_content) {
767 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, recv_response);
768 curl_easy_setopt(curl, CURLOPT_WRITEDATA, &rsp);
769 } else if (callback) {
770 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, callback);
771 curl_easy_setopt(curl, CURLOPT_WRITEDATA, cb_data);
772 }
773
774 gboolean is_https = (strncasecmp(url, "https", strlen("https")) == 0);
775 set_proxy (curl, is_https);
776
777 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
778
779 #ifndef USE_GPL_CRYPTO
780 #if defined WIN32 || defined __APPLE__
781 load_ca_bundle (curl);
782 #endif
783 #endif
784
785 #ifndef USE_GPL_CRYPTO
786 if (!seaf->disable_verify_certificate) {
787 curl_easy_setopt (curl, CURLOPT_SSL_CTX_FUNCTION, ssl_callback);
788 curl_easy_setopt (curl, CURLOPT_SSL_CTX_DATA, url);
789 }
790 #endif
791
792 #ifdef WIN32
793 curl_easy_setopt (curl, CURLOPT_SOCKOPTFUNCTION, sockopt_callback);
794 #endif
795
796 int rc = curl_easy_perform (curl);
797 if (rc != 0) {
798 seaf_warning ("libcurl failed to GET %s: %s.\n",
799 url, curl_easy_strerror(rc));
800 if (pcurl_error)
801 *pcurl_error = rc;
802 ret = -1;
803 goto out;
804 }
805
806 long status;
807 rc = curl_easy_getinfo (curl, CURLINFO_RESPONSE_CODE, &status);
808 if (rc != CURLE_OK) {
809 seaf_warning ("Failed to get status code for GET %s.\n", url);
810 ret = -1;
811 goto out;
812 }
813
814 *rsp_status = status;
815
816 if (rsp_content) {
817 *rsp_content = rsp.content;
818 *rsp_size = rsp.size;
819 }
820
821 out:
822 if (ret < 0)
823 g_free (rsp.content);
824 curl_slist_free_all (headers);
825 return ret;
826 }
827
828 typedef struct _HttpRequest {
829 const char *content;
830 size_t size;
831 } HttpRequest;
832
833 static size_t
send_request(void * ptr,size_t size,size_t nmemb,void * userp)834 send_request (void *ptr, size_t size, size_t nmemb, void *userp)
835 {
836 size_t realsize = size *nmemb;
837 size_t copy_size;
838 HttpRequest *req = userp;
839
840 if (req->size == 0)
841 return 0;
842
843 copy_size = MIN(req->size, realsize);
844 memcpy (ptr, req->content, copy_size);
845 req->size -= copy_size;
846 req->content = req->content + copy_size;
847
848 return copy_size;
849 }
850
851 typedef size_t (*HttpSendCallback) (void *, size_t, size_t, void *);
852
853 static int
http_put(CURL * curl,const char * url,const char * token,const char * req_content,gint64 req_size,HttpSendCallback callback,void * cb_data,int * rsp_status,char ** rsp_content,gint64 * rsp_size,gboolean timeout,int * pcurl_error)854 http_put (CURL *curl, const char *url, const char *token,
855 const char *req_content, gint64 req_size,
856 HttpSendCallback callback, void *cb_data,
857 int *rsp_status, char **rsp_content, gint64 *rsp_size,
858 gboolean timeout, int *pcurl_error)
859 {
860 char *token_header;
861 struct curl_slist *headers = NULL;
862 int ret = 0;
863
864 if (seafile_debug_flag_is_set (SEAFILE_DEBUG_CURL)) {
865 curl_easy_setopt (curl, CURLOPT_VERBOSE, 1);
866 curl_easy_setopt (curl, CURLOPT_STDERR, seafile_get_log_fp());
867 }
868
869 headers = curl_slist_append (headers, "User-Agent: Seafile/"SEAFILE_CLIENT_VERSION" ("USER_AGENT_OS")");
870 /* Disable the default "Expect: 100-continue" header */
871 headers = curl_slist_append (headers, "Expect:");
872
873 if (token) {
874 token_header = g_strdup_printf ("Seafile-Repo-Token: %s", token);
875 headers = curl_slist_append (headers, token_header);
876 g_free (token_header);
877 }
878
879 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
880
881 curl_easy_setopt(curl, CURLOPT_URL, url);
882 curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L);
883
884 if (timeout) {
885 /* Set low speed limit to 1 bytes. This effectively means no data. */
886 curl_easy_setopt(curl, CURLOPT_LOW_SPEED_LIMIT, 1);
887 curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, HTTP_TIMEOUT_SEC);
888 }
889
890 if (seaf->disable_verify_certificate) {
891 curl_easy_setopt (curl, CURLOPT_SSL_VERIFYPEER, 0L);
892 curl_easy_setopt (curl, CURLOPT_SSL_VERIFYHOST, 0L);
893 }
894
895 HttpRequest req;
896 if (req_content) {
897 memset (&req, 0, sizeof(req));
898 req.content = req_content;
899 req.size = req_size;
900 curl_easy_setopt(curl, CURLOPT_READFUNCTION, send_request);
901 curl_easy_setopt(curl, CURLOPT_READDATA, &req);
902 curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, (curl_off_t)req_size);
903 } else if (callback != NULL) {
904 curl_easy_setopt(curl, CURLOPT_READFUNCTION, callback);
905 curl_easy_setopt(curl, CURLOPT_READDATA, cb_data);
906 curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, (curl_off_t)req_size);
907 } else {
908 curl_easy_setopt (curl, CURLOPT_INFILESIZE_LARGE, (curl_off_t)0);
909 }
910
911 curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L);
912
913 HttpResponse rsp;
914 memset (&rsp, 0, sizeof(rsp));
915 if (rsp_content) {
916 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, recv_response);
917 curl_easy_setopt(curl, CURLOPT_WRITEDATA, &rsp);
918 }
919
920 gboolean is_https = (strncasecmp(url, "https", strlen("https")) == 0);
921 set_proxy (curl, is_https);
922
923 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
924
925 #ifndef USE_GPL_CRYPTO
926 #if defined WIN32 || defined __APPLE__
927 load_ca_bundle (curl);
928 #endif
929 #endif
930
931 #ifndef USE_GPL_CRYPTO
932 if (!seaf->disable_verify_certificate) {
933 curl_easy_setopt (curl, CURLOPT_SSL_CTX_FUNCTION, ssl_callback);
934 curl_easy_setopt (curl, CURLOPT_SSL_CTX_DATA, url);
935 }
936 #endif
937
938 #ifdef WIN32
939 curl_easy_setopt (curl, CURLOPT_SOCKOPTFUNCTION, sockopt_callback);
940 #endif
941
942 int rc = curl_easy_perform (curl);
943 if (rc != 0) {
944 seaf_warning ("libcurl failed to PUT %s: %s.\n",
945 url, curl_easy_strerror(rc));
946 if (pcurl_error)
947 *pcurl_error = rc;
948 ret = -1;
949 goto out;
950 }
951
952 long status;
953 rc = curl_easy_getinfo (curl, CURLINFO_RESPONSE_CODE, &status);
954 if (rc != CURLE_OK) {
955 seaf_warning ("Failed to get status code for PUT %s.\n", url);
956 ret = -1;
957 goto out;
958 }
959
960 *rsp_status = status;
961
962 if (rsp_content) {
963 *rsp_content = rsp.content;
964 *rsp_size = rsp.size;
965 }
966
967 out:
968 if (ret < 0)
969 g_free (rsp.content);
970 curl_slist_free_all (headers);
971 return ret;
972 }
973
974 static int
http_post(CURL * curl,const char * url,const char * token,const char * req_content,gint64 req_size,int * rsp_status,char ** rsp_content,gint64 * rsp_size,gboolean timeout,int * pcurl_error)975 http_post (CURL *curl, const char *url, const char *token,
976 const char *req_content, gint64 req_size,
977 int *rsp_status, char **rsp_content, gint64 *rsp_size,
978 gboolean timeout, int *pcurl_error)
979 {
980 char *token_header;
981 struct curl_slist *headers = NULL;
982 int ret = 0;
983
984 g_return_val_if_fail (req_content != NULL, -1);
985
986 if (seafile_debug_flag_is_set (SEAFILE_DEBUG_CURL)) {
987 curl_easy_setopt (curl, CURLOPT_VERBOSE, 1);
988 curl_easy_setopt (curl, CURLOPT_STDERR, seafile_get_log_fp());
989 }
990
991 headers = curl_slist_append (headers, "User-Agent: Seafile/"SEAFILE_CLIENT_VERSION" ("USER_AGENT_OS")");
992 /* Disable the default "Expect: 100-continue" header */
993 headers = curl_slist_append (headers, "Expect:");
994
995 if (token) {
996 token_header = g_strdup_printf ("Seafile-Repo-Token: %s", token);
997 headers = curl_slist_append (headers, token_header);
998 g_free (token_header);
999 }
1000
1001 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
1002
1003 curl_easy_setopt(curl, CURLOPT_URL, url);
1004 curl_easy_setopt(curl, CURLOPT_POST, 1L);
1005
1006 if (timeout) {
1007 /* Set low speed limit to 1 bytes. This effectively means no data. */
1008 curl_easy_setopt(curl, CURLOPT_LOW_SPEED_LIMIT, 1);
1009 curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, HTTP_TIMEOUT_SEC);
1010 }
1011
1012 if (seaf->disable_verify_certificate) {
1013 curl_easy_setopt (curl, CURLOPT_SSL_VERIFYPEER, 0L);
1014 curl_easy_setopt (curl, CURLOPT_SSL_VERIFYHOST, 0L);
1015 }
1016
1017 HttpRequest req;
1018 memset (&req, 0, sizeof(req));
1019 req.content = req_content;
1020 req.size = req_size;
1021 curl_easy_setopt(curl, CURLOPT_READFUNCTION, send_request);
1022 curl_easy_setopt(curl, CURLOPT_READDATA, &req);
1023 curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t)req_size);
1024
1025 curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L);
1026
1027 HttpResponse rsp;
1028 memset (&rsp, 0, sizeof(rsp));
1029 if (rsp_content) {
1030 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, recv_response);
1031 curl_easy_setopt(curl, CURLOPT_WRITEDATA, &rsp);
1032 }
1033
1034 #ifndef USE_GPL_CRYPTO
1035 #if defined WIN32 || defined __APPLE__
1036 load_ca_bundle (curl);
1037 #endif
1038 #endif
1039
1040 #ifndef USE_GPL_CRYPTO
1041 if (!seaf->disable_verify_certificate) {
1042 curl_easy_setopt (curl, CURLOPT_SSL_CTX_FUNCTION, ssl_callback);
1043 curl_easy_setopt (curl, CURLOPT_SSL_CTX_DATA, url);
1044 }
1045 #endif
1046
1047 gboolean is_https = (strncasecmp(url, "https", strlen("https")) == 0);
1048 set_proxy (curl, is_https);
1049
1050 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
1051 /* All POST requests should remain POST after redirect. */
1052 curl_easy_setopt(curl, CURLOPT_POSTREDIR, CURL_REDIR_POST_ALL);
1053
1054 #ifdef WIN32
1055 curl_easy_setopt (curl, CURLOPT_SOCKOPTFUNCTION, sockopt_callback);
1056 #endif
1057
1058 int rc = curl_easy_perform (curl);
1059 if (rc != 0) {
1060 seaf_warning ("libcurl failed to POST %s: %s.\n",
1061 url, curl_easy_strerror(rc));
1062 if (pcurl_error)
1063 *pcurl_error = rc;
1064 ret = -1;
1065 goto out;
1066 }
1067
1068 long status;
1069 rc = curl_easy_getinfo (curl, CURLINFO_RESPONSE_CODE, &status);
1070 if (rc != CURLE_OK) {
1071 seaf_warning ("Failed to get status code for POST %s.\n", url);
1072 ret = -1;
1073 goto out;
1074 }
1075
1076 *rsp_status = status;
1077
1078 if (rsp_content) {
1079 *rsp_content = rsp.content;
1080 *rsp_size = rsp.size;
1081 }
1082
1083 out:
1084 if (ret < 0)
1085 g_free (rsp.content);
1086 curl_slist_free_all (headers);
1087 return ret;
1088 }
1089
1090 static int
http_error_to_http_task_error(int status)1091 http_error_to_http_task_error (int status)
1092 {
1093 if (status == HTTP_BAD_REQUEST)
1094 /* This is usually a bug in the client. Set to general error. */
1095 return SYNC_ERROR_ID_GENERAL_ERROR;
1096 else if (status == HTTP_FORBIDDEN)
1097 return SYNC_ERROR_ID_ACCESS_DENIED;
1098 else if (status >= HTTP_INTERNAL_SERVER_ERROR)
1099 return SYNC_ERROR_ID_SERVER;
1100 else if (status == HTTP_NOT_FOUND)
1101 return SYNC_ERROR_ID_SERVER;
1102 else if (status == HTTP_NO_QUOTA)
1103 return SYNC_ERROR_ID_QUOTA_FULL;
1104 else if (status == HTTP_REPO_DELETED)
1105 return SYNC_ERROR_ID_SERVER_REPO_DELETED;
1106 else if (status == HTTP_REPO_CORRUPTED)
1107 return SYNC_ERROR_ID_SERVER_REPO_CORRUPT;
1108 else
1109 return SYNC_ERROR_ID_GENERAL_ERROR;
1110 }
1111
1112 static void
handle_http_errors(HttpTxTask * task,int status)1113 handle_http_errors (HttpTxTask *task, int status)
1114 {
1115 task->error = http_error_to_http_task_error (status);
1116 }
1117
1118 static int
curl_error_to_http_task_error(int curl_error)1119 curl_error_to_http_task_error (int curl_error)
1120 {
1121 if (curl_error == CURLE_SSL_CACERT ||
1122 curl_error == CURLE_PEER_FAILED_VERIFICATION)
1123 return SYNC_ERROR_ID_SSL;
1124
1125 switch (curl_error) {
1126 case CURLE_COULDNT_RESOLVE_PROXY:
1127 return SYNC_ERROR_ID_RESOLVE_PROXY;
1128 case CURLE_COULDNT_RESOLVE_HOST:
1129 return SYNC_ERROR_ID_RESOLVE_HOST;
1130 case CURLE_COULDNT_CONNECT:
1131 return SYNC_ERROR_ID_CONNECT;
1132 case CURLE_OPERATION_TIMEDOUT:
1133 return SYNC_ERROR_ID_TX_TIMEOUT;
1134 case CURLE_SSL_CONNECT_ERROR:
1135 case CURLE_SSL_CERTPROBLEM:
1136 case CURLE_SSL_CACERT_BADFILE:
1137 case CURLE_SSL_ISSUER_ERROR:
1138 return SYNC_ERROR_ID_SSL;
1139 case CURLE_GOT_NOTHING:
1140 case CURLE_SEND_ERROR:
1141 case CURLE_RECV_ERROR:
1142 return SYNC_ERROR_ID_TX;
1143 case CURLE_SEND_FAIL_REWIND:
1144 return SYNC_ERROR_ID_UNHANDLED_REDIRECT;
1145 default:
1146 return SYNC_ERROR_ID_NETWORK;
1147 }
1148 }
1149
1150 static void
handle_curl_errors(HttpTxTask * task,int curl_error)1151 handle_curl_errors (HttpTxTask *task, int curl_error)
1152 {
1153 task->error = curl_error_to_http_task_error (curl_error);
1154 }
1155
1156 static void
emit_transfer_done_signal(HttpTxTask * task)1157 emit_transfer_done_signal (HttpTxTask *task)
1158 {
1159 if (task->type == HTTP_TASK_TYPE_DOWNLOAD)
1160 g_signal_emit_by_name (seaf, "repo-http-fetched", task);
1161 else
1162 g_signal_emit_by_name (seaf, "repo-http-uploaded", task);
1163 }
1164
1165 static void
transition_state(HttpTxTask * task,int state,int rt_state)1166 transition_state (HttpTxTask *task, int state, int rt_state)
1167 {
1168 seaf_message ("Transfer repo '%.8s': ('%s', '%s') --> ('%s', '%s')\n",
1169 task->repo_id,
1170 http_task_state_to_str(task->state),
1171 http_task_rt_state_to_str(task->runtime_state),
1172 http_task_state_to_str(state),
1173 http_task_rt_state_to_str(rt_state));
1174
1175 if (state != task->state)
1176 task->state = state;
1177 task->runtime_state = rt_state;
1178
1179 if (rt_state == HTTP_TASK_RT_STATE_FINISHED) {
1180 /* Clear download head info. */
1181 if (task->type == HTTP_TASK_TYPE_DOWNLOAD &&
1182 state == HTTP_TASK_STATE_FINISHED)
1183 seaf_repo_manager_set_repo_property (seaf->repo_mgr,
1184 task->repo_id,
1185 REPO_PROP_DOWNLOAD_HEAD,
1186 EMPTY_SHA1);
1187
1188 emit_transfer_done_signal (task);
1189 }
1190 }
1191
1192 typedef struct {
1193 char *host;
1194 gboolean use_fileserver_port;
1195 HttpProtocolVersionCallback callback;
1196 void *user_data;
1197
1198 gboolean success;
1199 gboolean not_supported;
1200 int version;
1201 int error_code;
1202 } CheckProtocolData;
1203
1204 static int
parse_protocol_version(const char * rsp_content,int rsp_size,CheckProtocolData * data)1205 parse_protocol_version (const char *rsp_content, int rsp_size, CheckProtocolData *data)
1206 {
1207 json_t *object = NULL;
1208 json_error_t jerror;
1209 int version;
1210
1211 object = json_loadb (rsp_content, rsp_size, 0, &jerror);
1212 if (!object) {
1213 seaf_warning ("Parse response failed: %s.\n", jerror.text);
1214 return -1;
1215 }
1216
1217 if (json_object_has_member (object, "version")) {
1218 version = json_object_get_int_member (object, "version");
1219 data->version = version;
1220 } else {
1221 seaf_warning ("Response doesn't contain protocol version.\n");
1222 json_decref (object);
1223 return -1;
1224 }
1225
1226 json_decref (object);
1227 return 0;
1228 }
1229
1230 static void *
check_protocol_version_thread(void * vdata)1231 check_protocol_version_thread (void *vdata)
1232 {
1233 CheckProtocolData *data = vdata;
1234 HttpTxPriv *priv = seaf->http_tx_mgr->priv;
1235 ConnectionPool *pool;
1236 Connection *conn;
1237 CURL *curl;
1238 char *url;
1239 int status;
1240 char *rsp_content = NULL;
1241 gint64 rsp_size;
1242
1243 pool = find_connection_pool (priv, data->host);
1244 if (!pool) {
1245 seaf_warning ("Failed to create connection pool for host %s.\n", data->host);
1246 return vdata;
1247 }
1248
1249 conn = connection_pool_get_connection (pool);
1250 if (!conn) {
1251 seaf_warning ("Failed to get connection to host %s.\n", data->host);
1252 return vdata;
1253 }
1254
1255 curl = conn->curl;
1256
1257 if (!data->use_fileserver_port)
1258 url = g_strdup_printf ("%s/seafhttp/protocol-version", data->host);
1259 else
1260 url = g_strdup_printf ("%s/protocol-version", data->host);
1261
1262 int curl_error;
1263 if (http_get (curl, url, NULL, &status, &rsp_content, &rsp_size, NULL, NULL, TRUE, &curl_error) < 0) {
1264 conn->release = TRUE;
1265 data->error_code = curl_error_to_http_task_error (curl_error);
1266 goto out;
1267 }
1268
1269 data->success = TRUE;
1270
1271 if (status == HTTP_OK) {
1272 if (rsp_size == 0)
1273 data->not_supported = TRUE;
1274 else if (parse_protocol_version (rsp_content, rsp_size, data) < 0)
1275 data->not_supported = TRUE;
1276 } else {
1277 seaf_warning ("Bad response code for GET %s: %d.\n", url, status);
1278 data->not_supported = TRUE;
1279 data->error_code = http_error_to_http_task_error (status);
1280 }
1281
1282 out:
1283 g_free (url);
1284 g_free (rsp_content);
1285 connection_pool_return_connection (pool, conn);
1286
1287 return vdata;
1288 }
1289
1290 static void
check_protocol_version_done(void * vdata)1291 check_protocol_version_done (void *vdata)
1292 {
1293 CheckProtocolData *data = vdata;
1294 HttpProtocolVersion result;
1295
1296 memset (&result, 0, sizeof(result));
1297 result.check_success = data->success;
1298 result.not_supported = data->not_supported;
1299 result.version = data->version;
1300 result.error_code = data->error_code;
1301
1302 data->callback (&result, data->user_data);
1303
1304 g_free (data->host);
1305 g_free (data);
1306 }
1307
1308 int
http_tx_manager_check_protocol_version(HttpTxManager * manager,const char * host,gboolean use_fileserver_port,HttpProtocolVersionCallback callback,void * user_data)1309 http_tx_manager_check_protocol_version (HttpTxManager *manager,
1310 const char *host,
1311 gboolean use_fileserver_port,
1312 HttpProtocolVersionCallback callback,
1313 void *user_data)
1314 {
1315 CheckProtocolData *data = g_new0 (CheckProtocolData, 1);
1316
1317 data->host = g_strdup(host);
1318 data->use_fileserver_port = use_fileserver_port;
1319 data->callback = callback;
1320 data->user_data = user_data;
1321
1322 int ret = seaf_job_manager_schedule_job (seaf->job_mgr,
1323 check_protocol_version_thread,
1324 check_protocol_version_done,
1325 data);
1326 if (ret < 0) {
1327 g_free (data->host);
1328 g_free (data);
1329 }
1330
1331 return ret;
1332 }
1333
1334 /* Check Head Commit. */
1335
1336 typedef struct {
1337 char repo_id[41];
1338 int repo_version;
1339 char *host;
1340 char *token;
1341 gboolean use_fileserver_port;
1342 HttpHeadCommitCallback callback;
1343 void *user_data;
1344
1345 gboolean success;
1346 gboolean is_corrupt;
1347 gboolean is_deleted;
1348 char head_commit[41];
1349 int error_code;
1350 } CheckHeadData;
1351
1352 static int
parse_head_commit_info(const char * rsp_content,int rsp_size,CheckHeadData * data)1353 parse_head_commit_info (const char *rsp_content, int rsp_size, CheckHeadData *data)
1354 {
1355 json_t *object = NULL;
1356 json_error_t jerror;
1357 const char *head_commit;
1358
1359 object = json_loadb (rsp_content, rsp_size, 0, &jerror);
1360 if (!object) {
1361 seaf_warning ("Parse response failed: %s.\n", jerror.text);
1362 return -1;
1363 }
1364
1365 if (json_object_has_member (object, "is_corrupted") &&
1366 json_object_get_int_member (object, "is_corrupted"))
1367 data->is_corrupt = TRUE;
1368
1369 if (!data->is_corrupt) {
1370 head_commit = json_object_get_string_member (object, "head_commit_id");
1371 if (!head_commit) {
1372 seaf_warning ("Check head commit for repo %s failed. "
1373 "Response doesn't contain head commit id.\n",
1374 data->repo_id);
1375 json_decref (object);
1376 return -1;
1377 }
1378 memcpy (data->head_commit, head_commit, 40);
1379 }
1380
1381 json_decref (object);
1382 return 0;
1383 }
1384
1385 static void *
check_head_commit_thread(void * vdata)1386 check_head_commit_thread (void *vdata)
1387 {
1388 CheckHeadData *data = vdata;
1389 HttpTxPriv *priv = seaf->http_tx_mgr->priv;
1390 ConnectionPool *pool;
1391 Connection *conn;
1392 CURL *curl;
1393 char *url;
1394 int status;
1395 char *rsp_content = NULL;
1396 gint64 rsp_size;
1397
1398 pool = find_connection_pool (priv, data->host);
1399 if (!pool) {
1400 seaf_warning ("Failed to create connection pool for host %s.\n", data->host);
1401 return vdata;
1402 }
1403
1404 conn = connection_pool_get_connection (pool);
1405 if (!conn) {
1406 seaf_warning ("Failed to get connection to host %s.\n", data->host);
1407 return vdata;
1408 }
1409
1410 curl = conn->curl;
1411
1412 if (!data->use_fileserver_port)
1413 url = g_strdup_printf ("%s/seafhttp/repo/%s/commit/HEAD",
1414 data->host, data->repo_id);
1415 else
1416 url = g_strdup_printf ("%s/repo/%s/commit/HEAD",
1417 data->host, data->repo_id);
1418
1419 int curl_error;
1420 if (http_get (curl, url, data->token, &status, &rsp_content, &rsp_size,
1421 NULL, NULL, TRUE, &curl_error) < 0) {
1422 conn->release = TRUE;
1423 data->error_code = curl_error_to_http_task_error (curl_error);
1424 goto out;
1425 }
1426
1427 if (status == HTTP_OK) {
1428 if (parse_head_commit_info (rsp_content, rsp_size, data) < 0) {
1429 data->error_code = SYNC_ERROR_ID_NETWORK;
1430 goto out;
1431 }
1432 data->success = TRUE;
1433 } else if (status == HTTP_REPO_DELETED) {
1434 data->is_deleted = TRUE;
1435 data->success = TRUE;
1436 } else {
1437 seaf_warning ("Bad response code for GET %s: %d.\n", url, status);
1438 data->error_code = http_error_to_http_task_error (status);
1439 }
1440
1441 out:
1442 g_free (url);
1443 g_free (rsp_content);
1444 connection_pool_return_connection (pool, conn);
1445 return vdata;
1446 }
1447
1448 static void
check_head_commit_done(void * vdata)1449 check_head_commit_done (void *vdata)
1450 {
1451 CheckHeadData *data = vdata;
1452 HttpHeadCommit result;
1453
1454 memset (&result, 0, sizeof(result));
1455 result.check_success = data->success;
1456 result.is_corrupt = data->is_corrupt;
1457 result.is_deleted = data->is_deleted;
1458 memcpy (result.head_commit, data->head_commit, 40);
1459 result.error_code = data->error_code;
1460
1461 data->callback (&result, data->user_data);
1462
1463 g_free (data->host);
1464 g_free (data->token);
1465 g_free (data);
1466 }
1467
1468 int
http_tx_manager_check_head_commit(HttpTxManager * manager,const char * repo_id,int repo_version,const char * host,const char * token,gboolean use_fileserver_port,HttpHeadCommitCallback callback,void * user_data)1469 http_tx_manager_check_head_commit (HttpTxManager *manager,
1470 const char *repo_id,
1471 int repo_version,
1472 const char *host,
1473 const char *token,
1474 gboolean use_fileserver_port,
1475 HttpHeadCommitCallback callback,
1476 void *user_data)
1477 {
1478 CheckHeadData *data = g_new0 (CheckHeadData, 1);
1479
1480 memcpy (data->repo_id, repo_id, 36);
1481 data->repo_version = repo_version;
1482 data->host = g_strdup(host);
1483 data->token = g_strdup(token);
1484 data->callback = callback;
1485 data->user_data = user_data;
1486 data->use_fileserver_port = use_fileserver_port;
1487
1488 if (seaf_job_manager_schedule_job (seaf->job_mgr,
1489 check_head_commit_thread,
1490 check_head_commit_done,
1491 data) < 0) {
1492 g_free (data->host);
1493 g_free (data->token);
1494 g_free (data);
1495 return -1;
1496 }
1497
1498 return 0;
1499 }
1500
1501 /* Get folder permissions. */
1502
1503 void
http_folder_perm_req_free(HttpFolderPermReq * req)1504 http_folder_perm_req_free (HttpFolderPermReq *req)
1505 {
1506 if (!req)
1507 return;
1508 g_free (req->token);
1509 g_free (req);
1510 }
1511
1512 void
http_folder_perm_res_free(HttpFolderPermRes * res)1513 http_folder_perm_res_free (HttpFolderPermRes *res)
1514 {
1515 GList *ptr;
1516
1517 if (!res)
1518 return;
1519 for (ptr = res->user_perms; ptr; ptr = ptr->next)
1520 folder_perm_free ((FolderPerm *)ptr->data);
1521 for (ptr = res->group_perms; ptr; ptr = ptr->next)
1522 folder_perm_free ((FolderPerm *)ptr->data);
1523 g_free (res);
1524 }
1525
1526 typedef struct {
1527 char *host;
1528 gboolean use_fileserver_port;
1529 GList *requests;
1530 HttpGetFolderPermsCallback callback;
1531 void *user_data;
1532
1533 gboolean success;
1534 GList *results;
1535 } GetFolderPermsData;
1536
1537 /* Make sure the path starts with '/' but doesn't end with '/'. */
1538 static char *
canonical_perm_path(const char * path)1539 canonical_perm_path (const char *path)
1540 {
1541 int len = strlen(path);
1542 char *copy, *ret;
1543
1544 if (strcmp (path, "/") == 0)
1545 return g_strdup(path);
1546
1547 if (path[0] == '/' && path[len-1] != '/')
1548 return g_strdup(path);
1549
1550 copy = g_strdup(path);
1551
1552 if (copy[len-1] == '/')
1553 copy[len-1] = 0;
1554
1555 if (copy[0] != '/')
1556 ret = g_strconcat ("/", copy, NULL);
1557 else
1558 ret = copy;
1559
1560 return ret;
1561 }
1562
1563 static GList *
parse_permission_list(json_t * array,gboolean * error)1564 parse_permission_list (json_t *array, gboolean *error)
1565 {
1566 GList *ret = NULL, *ptr;
1567 json_t *object, *member;
1568 size_t n;
1569 int i;
1570 FolderPerm *perm;
1571 const char *str;
1572
1573 *error = FALSE;
1574
1575 n = json_array_size (array);
1576 for (i = 0; i < n; ++i) {
1577 object = json_array_get (array, i);
1578
1579 perm = g_new0 (FolderPerm, 1);
1580
1581 member = json_object_get (object, "path");
1582 if (!member) {
1583 seaf_warning ("Invalid folder perm response format: no path.\n");
1584 *error = TRUE;
1585 goto out;
1586 }
1587 str = json_string_value(member);
1588 if (!str) {
1589 seaf_warning ("Invalid folder perm response format: invalid path.\n");
1590 *error = TRUE;
1591 goto out;
1592 }
1593 perm->path = canonical_perm_path (str);
1594
1595 member = json_object_get (object, "permission");
1596 if (!member) {
1597 seaf_warning ("Invalid folder perm response format: no permission.\n");
1598 *error = TRUE;
1599 goto out;
1600 }
1601 str = json_string_value(member);
1602 if (!str) {
1603 seaf_warning ("Invalid folder perm response format: invalid permission.\n");
1604 *error = TRUE;
1605 goto out;
1606 }
1607 perm->permission = g_strdup(str);
1608
1609 ret = g_list_append (ret, perm);
1610 }
1611
1612 out:
1613 if (*error) {
1614 for (ptr = ret; ptr; ptr = ptr->next)
1615 folder_perm_free ((FolderPerm *)ptr->data);
1616 g_list_free (ret);
1617 ret = NULL;
1618 }
1619
1620 return ret;
1621 }
1622
1623 static int
parse_folder_perms(const char * rsp_content,int rsp_size,GetFolderPermsData * data)1624 parse_folder_perms (const char *rsp_content, int rsp_size, GetFolderPermsData *data)
1625 {
1626 json_t *array = NULL, *object, *member;
1627 json_error_t jerror;
1628 size_t n;
1629 int i;
1630 GList *results = NULL, *ptr;
1631 HttpFolderPermRes *res;
1632 const char *repo_id;
1633 int ret = 0;
1634 gboolean error;
1635
1636 array = json_loadb (rsp_content, rsp_size, 0, &jerror);
1637 if (!array) {
1638 seaf_warning ("Parse response failed: %s.\n", jerror.text);
1639 return -1;
1640 }
1641
1642 n = json_array_size (array);
1643 for (i = 0; i < n; ++i) {
1644 object = json_array_get (array, i);
1645
1646 res = g_new0 (HttpFolderPermRes, 1);
1647
1648 member = json_object_get (object, "repo_id");
1649 if (!member) {
1650 seaf_warning ("Invalid folder perm response format: no repo_id.\n");
1651 ret = -1;
1652 goto out;
1653 }
1654 repo_id = json_string_value(member);
1655 if (strlen(repo_id) != 36) {
1656 seaf_warning ("Invalid folder perm response format: invalid repo_id.\n");
1657 ret = -1;
1658 goto out;
1659 }
1660 memcpy (res->repo_id, repo_id, 36);
1661
1662 member = json_object_get (object, "ts");
1663 if (!member) {
1664 seaf_warning ("Invalid folder perm response format: no timestamp.\n");
1665 ret = -1;
1666 goto out;
1667 }
1668 res->timestamp = json_integer_value (member);
1669
1670 member = json_object_get (object, "user_perms");
1671 if (!member) {
1672 seaf_warning ("Invalid folder perm response format: no user_perms.\n");
1673 ret = -1;
1674 goto out;
1675 }
1676 res->user_perms = parse_permission_list (member, &error);
1677 if (error) {
1678 ret = -1;
1679 goto out;
1680 }
1681
1682 member = json_object_get (object, "group_perms");
1683 if (!member) {
1684 seaf_warning ("Invalid folder perm response format: no group_perms.\n");
1685 ret = -1;
1686 goto out;
1687 }
1688 res->group_perms = parse_permission_list (member, &error);
1689 if (error) {
1690 ret = -1;
1691 goto out;
1692 }
1693
1694 results = g_list_append (results, res);
1695 }
1696
1697 out:
1698 json_decref (array);
1699
1700 if (ret < 0) {
1701 for (ptr = results; ptr; ptr = ptr->next)
1702 http_folder_perm_res_free ((HttpFolderPermRes *)ptr->data);
1703 g_list_free (results);
1704 } else {
1705 data->results = results;
1706 }
1707
1708 return ret;
1709 }
1710
1711 static char *
compose_get_folder_perms_request(GList * requests)1712 compose_get_folder_perms_request (GList *requests)
1713 {
1714 GList *ptr;
1715 HttpFolderPermReq *req;
1716 json_t *object, *array;
1717 char *req_str = NULL;
1718
1719 array = json_array ();
1720
1721 for (ptr = requests; ptr; ptr = ptr->next) {
1722 req = ptr->data;
1723
1724 object = json_object ();
1725 json_object_set_new (object, "repo_id", json_string(req->repo_id));
1726 json_object_set_new (object, "token", json_string(req->token));
1727 json_object_set_new (object, "ts", json_integer(req->timestamp));
1728
1729 json_array_append_new (array, object);
1730 }
1731
1732 req_str = json_dumps (array, 0);
1733 if (!req_str) {
1734 seaf_warning ("Faile to json_dumps.\n");
1735 }
1736
1737 json_decref (array);
1738 return req_str;
1739 }
1740
1741 static void *
get_folder_perms_thread(void * vdata)1742 get_folder_perms_thread (void *vdata)
1743 {
1744 GetFolderPermsData *data = vdata;
1745 HttpTxPriv *priv = seaf->http_tx_mgr->priv;
1746 ConnectionPool *pool;
1747 Connection *conn;
1748 CURL *curl;
1749 char *url;
1750 char *req_content = NULL;
1751 int status;
1752 char *rsp_content = NULL;
1753 gint64 rsp_size;
1754 GList *ptr;
1755
1756 pool = find_connection_pool (priv, data->host);
1757 if (!pool) {
1758 seaf_warning ("Failed to create connection pool for host %s.\n", data->host);
1759 return vdata;
1760 }
1761
1762 conn = connection_pool_get_connection (pool);
1763 if (!conn) {
1764 seaf_warning ("Failed to get connection to host %s.\n", data->host);
1765 return vdata;
1766 }
1767
1768 curl = conn->curl;
1769
1770 if (!data->use_fileserver_port)
1771 url = g_strdup_printf ("%s/seafhttp/repo/folder-perm", data->host);
1772 else
1773 url = g_strdup_printf ("%s/repo/folder-perm", data->host);
1774
1775 req_content = compose_get_folder_perms_request (data->requests);
1776 if (!req_content)
1777 goto out;
1778
1779 if (http_post (curl, url, NULL, req_content, strlen(req_content),
1780 &status, &rsp_content, &rsp_size, TRUE, NULL) < 0) {
1781 conn->release = TRUE;
1782 goto out;
1783 }
1784
1785 if (status == HTTP_OK) {
1786 if (parse_folder_perms (rsp_content, rsp_size, data) < 0)
1787 goto out;
1788 data->success = TRUE;
1789 } else {
1790 seaf_warning ("Bad response code for GET %s: %d.\n", url, status);
1791 }
1792
1793 out:
1794 for (ptr = data->requests; ptr; ptr = ptr->next)
1795 http_folder_perm_req_free ((HttpFolderPermReq *)ptr->data);
1796 g_list_free (data->requests);
1797
1798 g_free (url);
1799 g_free (req_content);
1800 g_free (rsp_content);
1801 connection_pool_return_connection (pool, conn);
1802 return vdata;
1803 }
1804
1805 static void
get_folder_perms_done(void * vdata)1806 get_folder_perms_done (void *vdata)
1807 {
1808 GetFolderPermsData *data = vdata;
1809 HttpFolderPerms cb_data;
1810
1811 memset (&cb_data, 0, sizeof(cb_data));
1812 cb_data.success = data->success;
1813 cb_data.results = data->results;
1814
1815 data->callback (&cb_data, data->user_data);
1816
1817 GList *ptr;
1818 for (ptr = data->results; ptr; ptr = ptr->next)
1819 http_folder_perm_res_free ((HttpFolderPermRes *)ptr->data);
1820 g_list_free (data->results);
1821
1822 g_free (data->host);
1823 g_free (data);
1824 }
1825
1826 int
http_tx_manager_get_folder_perms(HttpTxManager * manager,const char * host,gboolean use_fileserver_port,GList * folder_perm_requests,HttpGetFolderPermsCallback callback,void * user_data)1827 http_tx_manager_get_folder_perms (HttpTxManager *manager,
1828 const char *host,
1829 gboolean use_fileserver_port,
1830 GList *folder_perm_requests,
1831 HttpGetFolderPermsCallback callback,
1832 void *user_data)
1833 {
1834 GetFolderPermsData *data = g_new0 (GetFolderPermsData, 1);
1835
1836 data->host = g_strdup(host);
1837 data->requests = folder_perm_requests;
1838 data->callback = callback;
1839 data->user_data = user_data;
1840 data->use_fileserver_port = use_fileserver_port;
1841
1842 if (seaf_job_manager_schedule_job (seaf->job_mgr,
1843 get_folder_perms_thread,
1844 get_folder_perms_done,
1845 data) < 0) {
1846 g_free (data->host);
1847 g_free (data);
1848 return -1;
1849 }
1850
1851 return 0;
1852 }
1853
1854 /* Get Locked Files. */
1855
1856 void
http_locked_files_req_free(HttpLockedFilesReq * req)1857 http_locked_files_req_free (HttpLockedFilesReq *req)
1858 {
1859 if (!req)
1860 return;
1861 g_free (req->token);
1862 g_free (req);
1863 }
1864
1865 void
http_locked_files_res_free(HttpLockedFilesRes * res)1866 http_locked_files_res_free (HttpLockedFilesRes *res)
1867 {
1868 if (!res)
1869 return;
1870
1871 g_hash_table_destroy (res->locked_files);
1872 g_free (res);
1873 }
1874
1875 typedef struct {
1876 char *host;
1877 gboolean use_fileserver_port;
1878 GList *requests;
1879 HttpGetLockedFilesCallback callback;
1880 void *user_data;
1881
1882 gboolean success;
1883 GList *results;
1884 } GetLockedFilesData;
1885
1886 static GHashTable *
parse_locked_file_list(json_t * array)1887 parse_locked_file_list (json_t *array)
1888 {
1889 GHashTable *ret = NULL;
1890 size_t n, i;
1891 json_t *obj, *string, *integer;
1892
1893 ret = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL);
1894 if (!ret) {
1895 return NULL;
1896 }
1897
1898 n = json_array_size (array);
1899 for (i = 0; i < n; ++i) {
1900 obj = json_array_get (array, i);
1901 string = json_object_get (obj, "path");
1902 if (!string) {
1903 g_hash_table_destroy (ret);
1904 return NULL;
1905 }
1906 integer = json_object_get (obj, "by_me");
1907 if (!integer) {
1908 g_hash_table_destroy (ret);
1909 return NULL;
1910 }
1911 g_hash_table_insert (ret,
1912 g_strdup(json_string_value(string)),
1913 (void*)(long)json_integer_value(integer));
1914 }
1915
1916 return ret;
1917 }
1918
1919 static int
parse_locked_files(const char * rsp_content,int rsp_size,GetLockedFilesData * data)1920 parse_locked_files (const char *rsp_content, int rsp_size, GetLockedFilesData *data)
1921 {
1922 json_t *array = NULL, *object, *member;
1923 json_error_t jerror;
1924 size_t n;
1925 int i;
1926 GList *results = NULL;
1927 HttpLockedFilesRes *res;
1928 const char *repo_id;
1929 int ret = 0;
1930
1931 array = json_loadb (rsp_content, rsp_size, 0, &jerror);
1932 if (!array) {
1933 seaf_warning ("Parse response failed: %s.\n", jerror.text);
1934 return -1;
1935 }
1936
1937 n = json_array_size (array);
1938 for (i = 0; i < n; ++i) {
1939 object = json_array_get (array, i);
1940
1941 res = g_new0 (HttpLockedFilesRes, 1);
1942
1943 member = json_object_get (object, "repo_id");
1944 if (!member) {
1945 seaf_warning ("Invalid locked files response format: no repo_id.\n");
1946 ret = -1;
1947 goto out;
1948 }
1949 repo_id = json_string_value(member);
1950 if (strlen(repo_id) != 36) {
1951 seaf_warning ("Invalid locked files response format: invalid repo_id.\n");
1952 ret = -1;
1953 goto out;
1954 }
1955 memcpy (res->repo_id, repo_id, 36);
1956
1957 member = json_object_get (object, "ts");
1958 if (!member) {
1959 seaf_warning ("Invalid locked files response format: no timestamp.\n");
1960 ret = -1;
1961 goto out;
1962 }
1963 res->timestamp = json_integer_value (member);
1964
1965 member = json_object_get (object, "locked_files");
1966 if (!member) {
1967 seaf_warning ("Invalid locked files response format: no locked_files.\n");
1968 ret = -1;
1969 goto out;
1970 }
1971
1972 res->locked_files = parse_locked_file_list (member);
1973 if (res->locked_files == NULL) {
1974 ret = -1;
1975 goto out;
1976 }
1977
1978 results = g_list_append (results, res);
1979 }
1980
1981 out:
1982 json_decref (array);
1983
1984 if (ret < 0) {
1985 g_list_free_full (results, (GDestroyNotify)http_locked_files_res_free);
1986 } else {
1987 data->results = results;
1988 }
1989
1990 return ret;
1991 }
1992
1993 static char *
compose_get_locked_files_request(GList * requests)1994 compose_get_locked_files_request (GList *requests)
1995 {
1996 GList *ptr;
1997 HttpLockedFilesReq *req;
1998 json_t *object, *array;
1999 char *req_str = NULL;
2000
2001 array = json_array ();
2002
2003 for (ptr = requests; ptr; ptr = ptr->next) {
2004 req = ptr->data;
2005
2006 object = json_object ();
2007 json_object_set_new (object, "repo_id", json_string(req->repo_id));
2008 json_object_set_new (object, "token", json_string(req->token));
2009 json_object_set_new (object, "ts", json_integer(req->timestamp));
2010
2011 json_array_append_new (array, object);
2012 }
2013
2014 req_str = json_dumps (array, 0);
2015 if (!req_str) {
2016 seaf_warning ("Faile to json_dumps.\n");
2017 }
2018
2019 json_decref (array);
2020 return req_str;
2021 }
2022
2023 static void *
get_locked_files_thread(void * vdata)2024 get_locked_files_thread (void *vdata)
2025 {
2026 GetLockedFilesData *data = vdata;
2027 HttpTxPriv *priv = seaf->http_tx_mgr->priv;
2028 ConnectionPool *pool;
2029 Connection *conn;
2030 CURL *curl;
2031 char *url;
2032 char *req_content = NULL;
2033 int status;
2034 char *rsp_content = NULL;
2035 gint64 rsp_size;
2036
2037 pool = find_connection_pool (priv, data->host);
2038 if (!pool) {
2039 seaf_warning ("Failed to create connection pool for host %s.\n", data->host);
2040 return vdata;
2041 }
2042
2043 conn = connection_pool_get_connection (pool);
2044 if (!conn) {
2045 seaf_warning ("Failed to get connection to host %s.\n", data->host);
2046 return vdata;
2047 }
2048
2049 curl = conn->curl;
2050
2051 if (!data->use_fileserver_port)
2052 url = g_strdup_printf ("%s/seafhttp/repo/locked-files", data->host);
2053 else
2054 url = g_strdup_printf ("%s/repo/locked-files", data->host);
2055
2056 req_content = compose_get_locked_files_request (data->requests);
2057 if (!req_content)
2058 goto out;
2059
2060 if (http_post (curl, url, NULL, req_content, strlen(req_content),
2061 &status, &rsp_content, &rsp_size, TRUE, NULL) < 0) {
2062 conn->release = TRUE;
2063 goto out;
2064 }
2065
2066 if (status == HTTP_OK) {
2067 if (parse_locked_files (rsp_content, rsp_size, data) < 0)
2068 goto out;
2069 data->success = TRUE;
2070 } else {
2071 seaf_warning ("Bad response code for GET %s: %d.\n", url, status);
2072 }
2073
2074 out:
2075 g_list_free_full (data->requests, (GDestroyNotify)http_locked_files_req_free);
2076
2077 g_free (url);
2078 g_free (req_content);
2079 g_free (rsp_content);
2080 connection_pool_return_connection (pool, conn);
2081 return vdata;
2082 }
2083
2084 static void
get_locked_files_done(void * vdata)2085 get_locked_files_done (void *vdata)
2086 {
2087 GetLockedFilesData *data = vdata;
2088 HttpLockedFiles cb_data;
2089
2090 memset (&cb_data, 0, sizeof(cb_data));
2091 cb_data.success = data->success;
2092 cb_data.results = data->results;
2093
2094 data->callback (&cb_data, data->user_data);
2095
2096 g_list_free_full (data->results, (GDestroyNotify)http_locked_files_res_free);
2097
2098 g_free (data->host);
2099 g_free (data);
2100 }
2101
2102 int
http_tx_manager_get_locked_files(HttpTxManager * manager,const char * host,gboolean use_fileserver_port,GList * locked_files_requests,HttpGetLockedFilesCallback callback,void * user_data)2103 http_tx_manager_get_locked_files (HttpTxManager *manager,
2104 const char *host,
2105 gboolean use_fileserver_port,
2106 GList *locked_files_requests,
2107 HttpGetLockedFilesCallback callback,
2108 void *user_data)
2109 {
2110 GetLockedFilesData *data = g_new0 (GetLockedFilesData, 1);
2111
2112 data->host = g_strdup(host);
2113 data->requests = locked_files_requests;
2114 data->callback = callback;
2115 data->user_data = user_data;
2116 data->use_fileserver_port = use_fileserver_port;
2117
2118 if (seaf_job_manager_schedule_job (seaf->job_mgr,
2119 get_locked_files_thread,
2120 get_locked_files_done,
2121 data) < 0) {
2122 g_free (data->host);
2123 g_free (data);
2124 return -1;
2125 }
2126
2127 return 0;
2128 }
2129
2130 /* Synchronous interfaces for locking/unlocking a file on the server. */
2131
2132 int
http_tx_manager_lock_file(HttpTxManager * manager,const char * host,gboolean use_fileserver_port,const char * token,const char * repo_id,const char * path)2133 http_tx_manager_lock_file (HttpTxManager *manager,
2134 const char *host,
2135 gboolean use_fileserver_port,
2136 const char *token,
2137 const char *repo_id,
2138 const char *path)
2139 {
2140 HttpTxPriv *priv = seaf->http_tx_mgr->priv;
2141 ConnectionPool *pool;
2142 Connection *conn;
2143 CURL *curl;
2144 char *url;
2145 int status;
2146 int ret = 0;
2147
2148 pool = find_connection_pool (priv, host);
2149 if (!pool) {
2150 seaf_warning ("Failed to create connection pool for host %s.\n", host);
2151 return -1;
2152 }
2153
2154 conn = connection_pool_get_connection (pool);
2155 if (!conn) {
2156 seaf_warning ("Failed to get connection to host %s.\n", host);
2157 return -1;
2158 }
2159
2160 curl = conn->curl;
2161
2162 char *esc_path = g_uri_escape_string(path, NULL, FALSE);
2163 if (!use_fileserver_port)
2164 url = g_strdup_printf ("%s/seafhttp/repo/%s/lock-file?p=%s", host, repo_id, esc_path);
2165 else
2166 url = g_strdup_printf ("%s/repo/%s/lock-file?p=%s", host, repo_id, esc_path);
2167 g_free (esc_path);
2168
2169 if (http_put (curl, url, token, NULL, 0, NULL, NULL,
2170 &status, NULL, NULL, TRUE, NULL) < 0) {
2171 conn->release = TRUE;
2172 ret = -1;
2173 goto out;
2174 }
2175
2176 if (status != HTTP_OK) {
2177 seaf_warning ("Bad response code for PUT %s: %d.\n", url, status);
2178 ret = -1;
2179 }
2180
2181 out:
2182 g_free (url);
2183 connection_pool_return_connection (pool, conn);
2184 return ret;
2185 }
2186
2187 int
http_tx_manager_unlock_file(HttpTxManager * manager,const char * host,gboolean use_fileserver_port,const char * token,const char * repo_id,const char * path)2188 http_tx_manager_unlock_file (HttpTxManager *manager,
2189 const char *host,
2190 gboolean use_fileserver_port,
2191 const char *token,
2192 const char *repo_id,
2193 const char *path)
2194 {
2195 HttpTxPriv *priv = seaf->http_tx_mgr->priv;
2196 ConnectionPool *pool;
2197 Connection *conn;
2198 CURL *curl;
2199 char *url;
2200 int status;
2201 int ret = 0;
2202
2203 pool = find_connection_pool (priv, host);
2204 if (!pool) {
2205 seaf_warning ("Failed to create connection pool for host %s.\n", host);
2206 return -1;
2207 }
2208
2209 conn = connection_pool_get_connection (pool);
2210 if (!conn) {
2211 seaf_warning ("Failed to get connection to host %s.\n", host);
2212 return -1;
2213 }
2214
2215 curl = conn->curl;
2216
2217 char *esc_path = g_uri_escape_string(path, NULL, FALSE);
2218 if (!use_fileserver_port)
2219 url = g_strdup_printf ("%s/seafhttp/repo/%s/unlock-file?p=%s", host, repo_id, esc_path);
2220 else
2221 url = g_strdup_printf ("%s/repo/%s/unlock-file?p=%s", host, repo_id, esc_path);
2222 g_free (esc_path);
2223
2224 if (http_put (curl, url, token, NULL, 0, NULL, NULL,
2225 &status, NULL, NULL, TRUE, NULL) < 0) {
2226 conn->release = TRUE;
2227 ret = -1;
2228 goto out;
2229 }
2230
2231 if (status != HTTP_OK) {
2232 seaf_warning ("Bad response code for PUT %s: %d.\n", url, status);
2233 ret = -1;
2234 }
2235
2236 out:
2237 g_free (url);
2238 connection_pool_return_connection (pool, conn);
2239 return ret;
2240 }
2241
2242 static char *
repo_id_list_to_json(GList * repo_id_list)2243 repo_id_list_to_json (GList *repo_id_list)
2244 {
2245 json_t *array = json_array();
2246 GList *ptr;
2247 char *repo_id;
2248
2249 for (ptr = repo_id_list; ptr; ptr = ptr->next) {
2250 repo_id = ptr->data;
2251 json_array_append_new (array, json_string(repo_id));
2252 }
2253
2254 char *data = json_dumps (array, JSON_COMPACT);
2255 if (!data) {
2256 seaf_warning ("Failed to dump json.\n");
2257 json_decref (array);
2258 return NULL;
2259 }
2260
2261 json_decref (array);
2262 return data;
2263 }
2264
2265 static GHashTable *
repo_head_commit_map_from_json(const char * json_str,gint64 len)2266 repo_head_commit_map_from_json (const char *json_str, gint64 len)
2267 {
2268 json_t *object;
2269 json_error_t jerror;
2270 GHashTable *ret;
2271
2272 object = json_loadb (json_str, (size_t)len, 0, &jerror);
2273 if (!object) {
2274 seaf_warning ("Failed to load json: %s\n", jerror.text);
2275 return NULL;
2276 }
2277
2278 ret = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, g_free);
2279
2280 void *iter = json_object_iter (object);
2281 const char *key;
2282 json_t *value;
2283 while (iter) {
2284 key = json_object_iter_key (iter);
2285 value = json_object_iter_value (iter);
2286 if (json_typeof(value) != JSON_STRING) {
2287 seaf_warning ("Bad json object format when parsing head commit id map.\n");
2288 g_hash_table_destroy (ret);
2289 goto out;
2290 }
2291 g_hash_table_replace (ret, g_strdup (key), g_strdup(json_string_value(value)));
2292 iter = json_object_iter_next (object, iter);
2293 }
2294
2295 out:
2296 json_decref (object);
2297 return ret;
2298 }
2299
2300 GHashTable *
http_tx_manager_get_head_commit_ids(HttpTxManager * manager,const char * host,gboolean use_fileserver_port,GList * repo_id_list)2301 http_tx_manager_get_head_commit_ids (HttpTxManager *manager,
2302 const char *host,
2303 gboolean use_fileserver_port,
2304 GList *repo_id_list)
2305 {
2306 HttpTxPriv *priv = seaf->http_tx_mgr->priv;
2307 ConnectionPool *pool;
2308 Connection *conn;
2309 CURL *curl;
2310 char *url;
2311 char *req_content = NULL;
2312 gint64 req_size;
2313 int status;
2314 char *rsp_content = NULL;
2315 gint64 rsp_size;
2316 GHashTable *map = NULL;
2317
2318 pool = find_connection_pool (priv, host);
2319 if (!pool) {
2320 seaf_warning ("Failed to create connection pool for host %s.\n", host);
2321 return NULL;
2322 }
2323
2324 conn = connection_pool_get_connection (pool);
2325 if (!conn) {
2326 seaf_warning ("Failed to get connection to host %s.\n", host);
2327 return NULL;
2328 }
2329
2330 curl = conn->curl;
2331
2332 if (!use_fileserver_port)
2333 url = g_strdup_printf ("%s/seafhttp/repo/head-commits-multi/", host);
2334 else
2335 url = g_strdup_printf ("%s/repo/head-commits-multi/", host);
2336
2337 req_content = repo_id_list_to_json (repo_id_list);
2338 req_size = strlen(req_content);
2339
2340 if (http_post (curl, url, NULL, req_content, req_size,
2341 &status, &rsp_content, &rsp_size, TRUE, NULL) < 0) {
2342 conn->release = TRUE;
2343 goto out;
2344 }
2345
2346 if (status != HTTP_OK) {
2347 seaf_warning ("Bad response code for POST %s: %d\n", url, status);
2348 goto out;
2349 }
2350
2351 map = repo_head_commit_map_from_json (rsp_content, rsp_size);
2352
2353 out:
2354 g_free (url);
2355 connection_pool_return_connection (pool, conn);
2356 /* returned by json_dumps(). */
2357 free (req_content);
2358 g_free (rsp_content);
2359 return map;
2360 }
2361
2362 static gboolean
remove_task_help(gpointer key,gpointer value,gpointer user_data)2363 remove_task_help (gpointer key, gpointer value, gpointer user_data)
2364 {
2365 HttpTxTask *task = value;
2366 const char *repo_id = user_data;
2367
2368 if (strcmp(task->repo_id, repo_id) != 0)
2369 return FALSE;
2370
2371 return TRUE;
2372 }
2373
2374 static void
clean_tasks_for_repo(HttpTxManager * manager,const char * repo_id)2375 clean_tasks_for_repo (HttpTxManager *manager, const char *repo_id)
2376 {
2377 g_hash_table_foreach_remove (manager->priv->download_tasks,
2378 remove_task_help, (gpointer)repo_id);
2379
2380 g_hash_table_foreach_remove (manager->priv->upload_tasks,
2381 remove_task_help, (gpointer)repo_id);
2382 }
2383
2384 static int
check_permission(HttpTxTask * task,Connection * conn)2385 check_permission (HttpTxTask *task, Connection *conn)
2386 {
2387 CURL *curl;
2388 char *url;
2389 int status;
2390 char *rsp_content = NULL;
2391 gint64 rsp_size;
2392 int ret = 0;
2393 json_t *rsp_obj = NULL, *reason = NULL, *unsyncable_path = NULL;
2394 const char *reason_str = NULL, *unsyncable_path_str = NULL;
2395 json_error_t jerror;
2396
2397 curl = conn->curl;
2398
2399 const char *type = (task->type == HTTP_TASK_TYPE_DOWNLOAD) ? "download" : "upload";
2400 const char *url_prefix = (task->use_fileserver_port) ? "" : "seafhttp/";
2401 if (seaf->client_name) {
2402 char *client_name = g_uri_escape_string (seaf->client_name,
2403 NULL, FALSE);
2404 url = g_strdup_printf ("%s/%srepo/%s/permission-check/?op=%s"
2405 "&client_id=%s&client_name=%s",
2406 task->host, url_prefix, task->repo_id, type,
2407 seaf->client_id, client_name);
2408 g_free (client_name);
2409 } else {
2410 url = g_strdup_printf ("%s/%srepo/%s/permission-check/?op=%s",
2411 task->host, url_prefix, task->repo_id, type);
2412 }
2413
2414 int curl_error;
2415 if (http_get (curl, url, task->token, &status, &rsp_content, &rsp_size, NULL, NULL, TRUE, &curl_error) < 0) {
2416 conn->release = TRUE;
2417 handle_curl_errors (task, curl_error);
2418 ret = -1;
2419 goto out;
2420 }
2421
2422 if (status != HTTP_OK) {
2423 seaf_warning ("Bad response code for GET %s: %d.\n", url, status);
2424
2425 if (status != HTTP_FORBIDDEN || !rsp_content) {
2426 handle_http_errors (task, status);
2427 ret = -1;
2428 goto out;
2429 }
2430
2431 rsp_obj = json_loadb (rsp_content, rsp_size, 0 ,&jerror);
2432 if (!rsp_obj) {
2433 seaf_warning ("Parse check permission response failed: %s.\n", jerror.text);
2434 handle_http_errors (task, status);
2435 json_decref (rsp_obj);
2436 ret = -1;
2437 goto out;
2438 }
2439
2440 reason = json_object_get (rsp_obj, "reason");
2441 if (!reason) {
2442 handle_http_errors (task, status);
2443 json_decref (rsp_obj);
2444 ret = -1;
2445 goto out;
2446 }
2447
2448 reason_str = json_string_value (reason);
2449 if (g_strcmp0 (reason_str, "no write permission") == 0) {
2450 task->error = SYNC_ERROR_ID_NO_WRITE_PERMISSION;
2451 } else if (g_strcmp0 (reason_str, "unsyncable share permission") == 0) {
2452 task->error = SYNC_ERROR_ID_PERM_NOT_SYNCABLE;
2453
2454 unsyncable_path = json_object_get (rsp_obj, "unsyncable_path");
2455 if (!unsyncable_path) {
2456 json_decref (rsp_obj);
2457 ret = -1;
2458 goto out;
2459 }
2460
2461 unsyncable_path_str = json_string_value (unsyncable_path);
2462 if (unsyncable_path_str)
2463 seaf_repo_manager_record_sync_error (task->repo_id, task->repo_name,
2464 unsyncable_path_str,
2465 SYNC_ERROR_ID_PERM_NOT_SYNCABLE);
2466 } else {
2467 task->error = SYNC_ERROR_ID_ACCESS_DENIED;
2468 }
2469
2470 ret = -1;
2471 }
2472
2473 out:
2474 g_free (url);
2475 g_free (rsp_content);
2476 curl_easy_reset (curl);
2477
2478 return ret;
2479 }
2480
2481 /* Upload. */
2482
2483 static void *http_upload_thread (void *vdata);
2484 static void http_upload_done (void *vdata);
2485
2486 int
http_tx_manager_add_upload(HttpTxManager * manager,const char * repo_id,int repo_version,const char * host,const char * token,int protocol_version,gboolean use_fileserver_port,GError ** error)2487 http_tx_manager_add_upload (HttpTxManager *manager,
2488 const char *repo_id,
2489 int repo_version,
2490 const char *host,
2491 const char *token,
2492 int protocol_version,
2493 gboolean use_fileserver_port,
2494 GError **error)
2495 {
2496 HttpTxTask *task;
2497 SeafRepo *repo;
2498
2499 if (!repo_id || !token || !host) {
2500 g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_BAD_ARGS, "Empty argument(s)");
2501 return -1;
2502 }
2503
2504 repo = seaf_repo_manager_get_repo (seaf->repo_mgr, repo_id);
2505 if (!repo) {
2506 g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_BAD_ARGS, "Repo not found");
2507 return -1;
2508 }
2509
2510 clean_tasks_for_repo (manager, repo_id);
2511
2512 task = http_tx_task_new (manager, repo_id, repo_version,
2513 HTTP_TASK_TYPE_UPLOAD, FALSE,
2514 host, token, NULL, NULL);
2515
2516 task->protocol_version = protocol_version;
2517
2518 task->state = HTTP_TASK_STATE_NORMAL;
2519
2520 task->use_fileserver_port = use_fileserver_port;
2521
2522 task->repo_name = g_strdup(repo->name);
2523
2524 g_hash_table_insert (manager->priv->upload_tasks,
2525 g_strdup(repo_id),
2526 task);
2527
2528 if (seaf_job_manager_schedule_job (seaf->job_mgr,
2529 http_upload_thread,
2530 http_upload_done,
2531 task) < 0) {
2532 g_hash_table_remove (manager->priv->upload_tasks, repo_id);
2533 return -1;
2534 }
2535
2536 return 0;
2537 }
2538
2539 static gboolean
dirent_same(SeafDirent * dent1,SeafDirent * dent2)2540 dirent_same (SeafDirent *dent1, SeafDirent *dent2)
2541 {
2542 return (strcmp(dent1->id, dent2->id) == 0 &&
2543 dent1->mode == dent2->mode &&
2544 dent1->mtime == dent2->mtime);
2545 }
2546
2547 typedef struct {
2548 HttpTxTask *task;
2549 gint64 delta;
2550 GHashTable *active_paths;
2551 } CalcQuotaDeltaData;
2552
2553 static int
check_quota_and_active_paths_diff_files(int n,const char * basedir,SeafDirent * files[],void * vdata)2554 check_quota_and_active_paths_diff_files (int n, const char *basedir,
2555 SeafDirent *files[], void *vdata)
2556 {
2557 CalcQuotaDeltaData *data = vdata;
2558 SeafDirent *file1 = files[0];
2559 SeafDirent *file2 = files[1];
2560 gint64 size1, size2;
2561 char *path;
2562
2563 if (file1 && file2) {
2564 size1 = file1->size;
2565 size2 = file2->size;
2566 data->delta += (size1 - size2);
2567
2568 if (!dirent_same (file1, file2)) {
2569 path = g_strconcat(basedir, file1->name, NULL);
2570 g_hash_table_replace (data->active_paths, path, (void*)(long)S_IFREG);
2571 }
2572 } else if (file1 && !file2) {
2573 data->delta += file1->size;
2574
2575 path = g_strconcat (basedir, file1->name, NULL);
2576 g_hash_table_replace (data->active_paths, path, (void*)(long)S_IFREG);
2577 } else if (!file1 && file2) {
2578 data->delta -= file2->size;
2579 }
2580
2581 return 0;
2582 }
2583
2584 static int
check_quota_and_active_paths_diff_dirs(int n,const char * basedir,SeafDirent * dirs[],void * vdata,gboolean * recurse)2585 check_quota_and_active_paths_diff_dirs (int n, const char *basedir,
2586 SeafDirent *dirs[], void *vdata,
2587 gboolean *recurse)
2588 {
2589 CalcQuotaDeltaData *data = vdata;
2590 SeafDirent *dir1 = dirs[0];
2591 SeafDirent *dir2 = dirs[1];
2592 char *path;
2593
2594 /* When a new empty dir is created, or a dir became empty. */
2595 if ((!dir2 && dir1 && strcmp(dir1->id, EMPTY_SHA1) == 0) ||
2596 (dir2 && dir1 && strcmp(dir1->id, EMPTY_SHA1) == 0 && strcmp(dir2->id, EMPTY_SHA1) != 0)) {
2597 path = g_strconcat (basedir, dir1->name, NULL);
2598 g_hash_table_replace (data->active_paths, path, (void*)(long)S_IFDIR);
2599 }
2600
2601 return 0;
2602 }
2603
2604 static int
calculate_upload_size_delta_and_active_paths(HttpTxTask * task,gint64 * delta,GHashTable * active_paths)2605 calculate_upload_size_delta_and_active_paths (HttpTxTask *task,
2606 gint64 *delta,
2607 GHashTable *active_paths)
2608 {
2609 int ret = 0;
2610 SeafBranch *local = NULL, *master = NULL;
2611 SeafCommit *local_head = NULL, *master_head = NULL;
2612
2613 local = seaf_branch_manager_get_branch (seaf->branch_mgr, task->repo_id, "local");
2614 if (!local) {
2615 seaf_warning ("Branch local not found for repo %.8s.\n", task->repo_id);
2616 ret = -1;
2617 goto out;
2618 }
2619 master = seaf_branch_manager_get_branch (seaf->branch_mgr, task->repo_id, "master");
2620 if (!master) {
2621 seaf_warning ("Branch master not found for repo %.8s.\n", task->repo_id);
2622 ret = -1;
2623 goto out;
2624 }
2625
2626 local_head = seaf_commit_manager_get_commit (seaf->commit_mgr,
2627 task->repo_id, task->repo_version,
2628 local->commit_id);
2629 if (!local_head) {
2630 seaf_warning ("Local head commit not found for repo %.8s.\n",
2631 task->repo_id);
2632 ret = -1;
2633 goto out;
2634 }
2635 master_head = seaf_commit_manager_get_commit (seaf->commit_mgr,
2636 task->repo_id, task->repo_version,
2637 master->commit_id);
2638 if (!master_head) {
2639 seaf_warning ("Master head commit not found for repo %.8s.\n",
2640 task->repo_id);
2641 ret = -1;
2642 goto out;
2643 }
2644
2645 CalcQuotaDeltaData data;
2646 memset (&data, 0, sizeof(data));
2647 data.task = task;
2648 data.active_paths = active_paths;
2649
2650 DiffOptions opts;
2651 memset (&opts, 0, sizeof(opts));
2652 memcpy (opts.store_id, task->repo_id, 36);
2653 opts.version = task->repo_version;
2654 opts.file_cb = check_quota_and_active_paths_diff_files;
2655 opts.dir_cb = check_quota_and_active_paths_diff_dirs;
2656 opts.data = &data;
2657
2658 const char *trees[2];
2659 trees[0] = local_head->root_id;
2660 trees[1] = master_head->root_id;
2661 if (diff_trees (2, trees, &opts) < 0) {
2662 seaf_warning ("Failed to diff local and master head for repo %.8s.\n",
2663 task->repo_id);
2664 ret = -1;
2665 goto out;
2666 }
2667
2668 *delta = data.delta;
2669
2670 out:
2671 seaf_branch_unref (local);
2672 seaf_branch_unref (master);
2673 seaf_commit_unref (local_head);
2674 seaf_commit_unref (master_head);
2675
2676 return ret;
2677 }
2678
2679 static int
check_quota(HttpTxTask * task,Connection * conn,gint64 delta)2680 check_quota (HttpTxTask *task, Connection *conn, gint64 delta)
2681 {
2682 CURL *curl;
2683 char *url;
2684 int status;
2685 int ret = 0;
2686
2687 curl = conn->curl;
2688
2689 if (!task->use_fileserver_port)
2690 url = g_strdup_printf ("%s/seafhttp/repo/%s/quota-check/?delta=%"G_GINT64_FORMAT"",
2691 task->host, task->repo_id, delta);
2692 else
2693 url = g_strdup_printf ("%s/repo/%s/quota-check/?delta=%"G_GINT64_FORMAT"",
2694 task->host, task->repo_id, delta);
2695
2696 int curl_error;
2697 if (http_get (curl, url, task->token, &status, NULL, NULL, NULL, NULL, TRUE, &curl_error) < 0) {
2698 conn->release = TRUE;
2699 handle_curl_errors (task, curl_error);
2700 ret = -1;
2701 goto out;
2702 }
2703
2704 if (status != HTTP_OK) {
2705 seaf_warning ("Bad response code for GET %s: %d.\n", url, status);
2706 handle_http_errors (task, status);
2707 ret = -1;
2708 }
2709
2710 out:
2711 g_free (url);
2712 curl_easy_reset (curl);
2713
2714 return ret;
2715 }
2716
2717 static int
send_commit_object(HttpTxTask * task,Connection * conn)2718 send_commit_object (HttpTxTask *task, Connection *conn)
2719 {
2720 CURL *curl;
2721 char *url;
2722 int status;
2723 char *data;
2724 int len;
2725 int ret = 0;
2726
2727 if (seaf_obj_store_read_obj (seaf->commit_mgr->obj_store,
2728 task->repo_id, task->repo_version,
2729 task->head, (void**)&data, &len) < 0) {
2730 seaf_warning ("Failed to read commit %s.\n", task->head);
2731 task->error = SYNC_ERROR_ID_LOCAL_DATA_CORRUPT;
2732 return -1;
2733 }
2734
2735 curl = conn->curl;
2736
2737 if (!task->use_fileserver_port)
2738 url = g_strdup_printf ("%s/seafhttp/repo/%s/commit/%s",
2739 task->host, task->repo_id, task->head);
2740 else
2741 url = g_strdup_printf ("%s/repo/%s/commit/%s",
2742 task->host, task->repo_id, task->head);
2743
2744 int curl_error;
2745 if (http_put (curl, url, task->token,
2746 data, len,
2747 NULL, NULL,
2748 &status, NULL, NULL, TRUE, &curl_error) < 0) {
2749 conn->release = TRUE;
2750 handle_curl_errors (task, curl_error);
2751 ret = -1;
2752 goto out;
2753 }
2754
2755 if (status != HTTP_OK) {
2756 seaf_warning ("Bad response code for PUT %s: %d.\n", url, status);
2757 handle_http_errors (task, status);
2758 ret = -1;
2759 }
2760
2761 out:
2762 g_free (url);
2763 curl_easy_reset (curl);
2764 g_free (data);
2765
2766 return ret;
2767 }
2768
2769 typedef struct {
2770 GList **pret;
2771 GHashTable *checked_objs;
2772 } CalcFsListData;
2773
2774 static int
collect_file_ids(int n,const char * basedir,SeafDirent * files[],void * vdata)2775 collect_file_ids (int n, const char *basedir, SeafDirent *files[], void *vdata)
2776 {
2777 SeafDirent *file1 = files[0];
2778 SeafDirent *file2 = files[1];
2779 CalcFsListData *data = vdata;
2780 GList **pret = data->pret;
2781 int dummy;
2782
2783 if (!file1 || strcmp (file1->id, EMPTY_SHA1) == 0)
2784 return 0;
2785
2786 if (g_hash_table_lookup (data->checked_objs, file1->id))
2787 return 0;
2788
2789 if (!file2 || strcmp (file1->id, file2->id) != 0) {
2790 *pret = g_list_prepend (*pret, g_strdup(file1->id));
2791 g_hash_table_insert (data->checked_objs, g_strdup(file1->id), &dummy);
2792 }
2793
2794 return 0;
2795 }
2796
2797 static int
collect_dir_ids(int n,const char * basedir,SeafDirent * dirs[],void * vdata,gboolean * recurse)2798 collect_dir_ids (int n, const char *basedir, SeafDirent *dirs[], void *vdata,
2799 gboolean *recurse)
2800 {
2801 SeafDirent *dir1 = dirs[0];
2802 SeafDirent *dir2 = dirs[1];
2803 CalcFsListData *data = vdata;
2804 GList **pret = data->pret;
2805 int dummy;
2806
2807 if (!dir1 || strcmp (dir1->id, EMPTY_SHA1) == 0)
2808 return 0;
2809
2810 if (g_hash_table_lookup (data->checked_objs, dir1->id))
2811 return 0;
2812
2813 if (!dir2 || strcmp (dir1->id, dir2->id) != 0) {
2814 *pret = g_list_prepend (*pret, g_strdup(dir1->id));
2815 g_hash_table_insert (data->checked_objs, g_strdup(dir1->id), &dummy);
2816 }
2817
2818 return 0;
2819 }
2820
2821 static GList *
calculate_send_fs_object_list(HttpTxTask * task)2822 calculate_send_fs_object_list (HttpTxTask *task)
2823 {
2824 GList *ret = NULL;
2825 SeafBranch *local = NULL, *master = NULL;
2826 SeafCommit *local_head = NULL, *master_head = NULL;
2827 GList *ptr;
2828
2829 local = seaf_branch_manager_get_branch (seaf->branch_mgr, task->repo_id, "local");
2830 if (!local) {
2831 seaf_warning ("Branch local not found for repo %.8s.\n", task->repo_id);
2832 goto out;
2833 }
2834 master = seaf_branch_manager_get_branch (seaf->branch_mgr, task->repo_id, "master");
2835 if (!master) {
2836 seaf_warning ("Branch master not found for repo %.8s.\n", task->repo_id);
2837 goto out;
2838 }
2839
2840 local_head = seaf_commit_manager_get_commit (seaf->commit_mgr,
2841 task->repo_id, task->repo_version,
2842 local->commit_id);
2843 if (!local_head) {
2844 seaf_warning ("Local head commit not found for repo %.8s.\n",
2845 task->repo_id);
2846 goto out;
2847 }
2848 master_head = seaf_commit_manager_get_commit (seaf->commit_mgr,
2849 task->repo_id, task->repo_version,
2850 master->commit_id);
2851 if (!master_head) {
2852 seaf_warning ("Master head commit not found for repo %.8s.\n",
2853 task->repo_id);
2854 goto out;
2855 }
2856
2857 /* Diff won't traverse the root object itself. */
2858 if (strcmp (local_head->root_id, master_head->root_id) != 0)
2859 ret = g_list_prepend (ret, g_strdup(local_head->root_id));
2860
2861 CalcFsListData *data = g_new0(CalcFsListData, 1);
2862 data->pret = &ret;
2863 data->checked_objs = g_hash_table_new_full (g_str_hash, g_str_equal,
2864 g_free, NULL);
2865
2866 DiffOptions opts;
2867 memset (&opts, 0, sizeof(opts));
2868 memcpy (opts.store_id, task->repo_id, 36);
2869 opts.version = task->repo_version;
2870 opts.file_cb = collect_file_ids;
2871 opts.dir_cb = collect_dir_ids;
2872 opts.data = data;
2873
2874 const char *trees[2];
2875 trees[0] = local_head->root_id;
2876 trees[1] = master_head->root_id;
2877 if (diff_trees (2, trees, &opts) < 0) {
2878 seaf_warning ("Failed to diff local and master head for repo %.8s.\n",
2879 task->repo_id);
2880 for (ptr = ret; ptr; ptr = ptr->next)
2881 g_free (ptr->data);
2882 ret = NULL;
2883 }
2884
2885 g_hash_table_destroy (data->checked_objs);
2886 g_free (data);
2887
2888 out:
2889 seaf_branch_unref (local);
2890 seaf_branch_unref (master);
2891 seaf_commit_unref (local_head);
2892 seaf_commit_unref (master_head);
2893 return ret;
2894 }
2895
2896 #define ID_LIST_SEGMENT_N 1000
2897
2898 static int
upload_check_id_list_segment(HttpTxTask * task,Connection * conn,const char * url,GList ** send_id_list,GList ** recv_id_list)2899 upload_check_id_list_segment (HttpTxTask *task, Connection *conn, const char *url,
2900 GList **send_id_list, GList **recv_id_list)
2901 {
2902 json_t *array;
2903 json_error_t jerror;
2904 char *obj_id;
2905 int n_sent = 0;
2906 char *data = NULL;
2907 int len;
2908 CURL *curl;
2909 int status;
2910 char *rsp_content = NULL;
2911 gint64 rsp_size;
2912 int ret = 0;
2913
2914 /* Convert object id list to JSON format. */
2915
2916 array = json_array ();
2917
2918 while (*send_id_list != NULL) {
2919 obj_id = (*send_id_list)->data;
2920 json_array_append_new (array, json_string(obj_id));
2921
2922 *send_id_list = g_list_delete_link (*send_id_list, *send_id_list);
2923 g_free (obj_id);
2924
2925 if (++n_sent >= ID_LIST_SEGMENT_N)
2926 break;
2927 }
2928
2929 seaf_debug ("Check %d ids for %s:%s.\n",
2930 n_sent, task->host, task->repo_id);
2931
2932 data = json_dumps (array, 0);
2933 len = strlen(data);
2934 json_decref (array);
2935
2936 /* Send fs object id list. */
2937
2938 curl = conn->curl;
2939
2940 int curl_error;
2941 if (http_post (curl, url, task->token,
2942 data, len,
2943 &status, &rsp_content, &rsp_size, TRUE, &curl_error) < 0) {
2944 conn->release = TRUE;
2945 handle_curl_errors (task, curl_error);
2946 ret = -1;
2947 goto out;
2948 }
2949
2950 if (status != HTTP_OK) {
2951 seaf_warning ("Bad response code for POST %s: %d.\n", url, status);
2952 handle_http_errors (task, status);
2953 ret = -1;
2954 goto out;
2955 }
2956
2957 /* Process needed object id list. */
2958
2959 array = json_loadb (rsp_content, rsp_size, 0, &jerror);
2960 if (!array) {
2961 seaf_warning ("Invalid JSON response from the server: %s.\n", jerror.text);
2962 task->error = SYNC_ERROR_ID_SERVER;
2963 ret = -1;
2964 goto out;
2965 }
2966
2967 int i;
2968 size_t n = json_array_size (array);
2969 json_t *str;
2970
2971 seaf_debug ("%lu objects or blocks are needed for %s:%s.\n",
2972 n, task->host, task->repo_id);
2973
2974 for (i = 0; i < n; ++i) {
2975 str = json_array_get (array, i);
2976 if (!str) {
2977 seaf_warning ("Invalid JSON response from the server.\n");
2978 json_decref (array);
2979 ret = -1;
2980 goto out;
2981 }
2982
2983 *recv_id_list = g_list_prepend (*recv_id_list, g_strdup(json_string_value(str)));
2984 }
2985
2986 json_decref (array);
2987
2988 out:
2989 curl_easy_reset (curl);
2990 g_free (data);
2991 g_free (rsp_content);
2992
2993 return ret;
2994 }
2995
2996 #define MAX_OBJECT_PACK_SIZE (1 << 20) /* 1MB */
2997
2998 typedef struct {
2999 char obj_id[40];
3000 guint32 obj_size;
3001 guint8 object[0];
3002 } __attribute__((__packed__)) ObjectHeader;
3003
3004 static int
send_fs_objects(HttpTxTask * task,Connection * conn,GList ** send_fs_list)3005 send_fs_objects (HttpTxTask *task, Connection *conn, GList **send_fs_list)
3006 {
3007 struct evbuffer *buf;
3008 ObjectHeader hdr;
3009 char *obj_id;
3010 char *data;
3011 int len;
3012 int total_size;
3013 unsigned char *package;
3014 CURL *curl;
3015 char *url = NULL;
3016 int status;
3017 int ret = 0;
3018 int n_sent = 0;
3019
3020 buf = evbuffer_new ();
3021 curl = conn->curl;
3022
3023 while (*send_fs_list != NULL) {
3024 obj_id = (*send_fs_list)->data;
3025
3026 if (seaf_obj_store_read_obj (seaf->fs_mgr->obj_store,
3027 task->repo_id, task->repo_version,
3028 obj_id, (void **)&data, &len) < 0) {
3029 seaf_warning ("Failed to read fs object %s in repo %s.\n",
3030 obj_id, task->repo_id);
3031 task->error = SYNC_ERROR_ID_LOCAL_DATA_CORRUPT;
3032 ret = -1;
3033 goto out;
3034 }
3035
3036 ++n_sent;
3037
3038 memcpy (hdr.obj_id, obj_id, 40);
3039 hdr.obj_size = htonl (len);
3040
3041 evbuffer_add (buf, &hdr, sizeof(hdr));
3042 evbuffer_add (buf, data, len);
3043
3044 g_free (data);
3045 *send_fs_list = g_list_delete_link (*send_fs_list, *send_fs_list);
3046 g_free (obj_id);
3047
3048 total_size = evbuffer_get_length (buf);
3049 if (total_size >= MAX_OBJECT_PACK_SIZE)
3050 break;
3051 }
3052
3053 seaf_debug ("Sending %d fs objects for %s:%s.\n",
3054 n_sent, task->host, task->repo_id);
3055
3056 package = evbuffer_pullup (buf, -1);
3057
3058 if (!task->use_fileserver_port)
3059 url = g_strdup_printf ("%s/seafhttp/repo/%s/recv-fs/",
3060 task->host, task->repo_id);
3061 else
3062 url = g_strdup_printf ("%s/repo/%s/recv-fs/",
3063 task->host, task->repo_id);
3064
3065 int curl_error;
3066 if (http_post (curl, url, task->token,
3067 (char *)package, evbuffer_get_length(buf),
3068 &status, NULL, NULL, TRUE, &curl_error) < 0) {
3069 conn->release = TRUE;
3070 handle_curl_errors (task, curl_error);
3071 ret = -1;
3072 goto out;
3073 }
3074
3075 if (status != HTTP_OK) {
3076 seaf_warning ("Bad response code for POST %s: %d.\n", url, status);
3077 handle_http_errors (task, status);
3078 ret = -1;
3079 }
3080
3081 out:
3082 g_free (url);
3083 evbuffer_free (buf);
3084 curl_easy_reset (curl);
3085
3086 return ret;
3087 }
3088
3089 typedef struct {
3090 GList *block_list;
3091 GHashTable *added_blocks;
3092 HttpTxTask *task;
3093 } CalcBlockListData;
3094
3095 static void
add_to_block_list(GList ** block_list,GHashTable * added_blocks,const char * block_id)3096 add_to_block_list (GList **block_list, GHashTable *added_blocks, const char *block_id)
3097 {
3098 int dummy;
3099
3100 if (g_hash_table_lookup (added_blocks, block_id))
3101 return;
3102
3103 *block_list = g_list_prepend (*block_list, g_strdup(block_id));
3104 g_hash_table_insert (added_blocks, g_strdup(block_id), &dummy);
3105 }
3106
3107 static int
block_list_diff_files(int n,const char * basedir,SeafDirent * files[],void * vdata)3108 block_list_diff_files (int n, const char *basedir, SeafDirent *files[], void *vdata)
3109 {
3110 SeafDirent *file1 = files[0];
3111 SeafDirent *file2 = files[1];
3112 CalcBlockListData *data = vdata;
3113 HttpTxTask *task = data->task;
3114 Seafile *f1 = NULL, *f2 = NULL;
3115 int i;
3116
3117 if (file1 && strcmp (file1->id, EMPTY_SHA1) != 0) {
3118 if (!file2) {
3119 f1 = seaf_fs_manager_get_seafile (seaf->fs_mgr,
3120 task->repo_id, task->repo_version,
3121 file1->id);
3122 if (!f1) {
3123 seaf_warning ("Failed to get seafile object %s:%s.\n",
3124 task->repo_id, file1->id);
3125 return -1;
3126 }
3127 for (i = 0; i < f1->n_blocks; ++i)
3128 add_to_block_list (&data->block_list, data->added_blocks,
3129 f1->blk_sha1s[i]);
3130 seafile_unref (f1);
3131 } else if (strcmp (file1->id, file2->id) != 0) {
3132 f1 = seaf_fs_manager_get_seafile (seaf->fs_mgr,
3133 task->repo_id, task->repo_version,
3134 file1->id);
3135 if (!f1) {
3136 seaf_warning ("Failed to get seafile object %s:%s.\n",
3137 task->repo_id, file1->id);
3138 return -1;
3139 }
3140 f2 = seaf_fs_manager_get_seafile (seaf->fs_mgr,
3141 task->repo_id, task->repo_version,
3142 file2->id);
3143 if (!f2) {
3144 seafile_unref (f1);
3145 seaf_warning ("Failed to get seafile object %s:%s.\n",
3146 task->repo_id, file2->id);
3147 return -1;
3148 }
3149
3150 GHashTable *h = g_hash_table_new (g_str_hash, g_str_equal);
3151 int dummy;
3152 for (i = 0; i < f2->n_blocks; ++i)
3153 g_hash_table_insert (h, f2->blk_sha1s[i], &dummy);
3154
3155 for (i = 0; i < f1->n_blocks; ++i)
3156 if (!g_hash_table_lookup (h, f1->blk_sha1s[i]))
3157 add_to_block_list (&data->block_list, data->added_blocks,
3158 f1->blk_sha1s[i]);
3159
3160 seafile_unref (f1);
3161 seafile_unref (f2);
3162 g_hash_table_destroy (h);
3163 }
3164 }
3165
3166 return 0;
3167 }
3168
3169 static int
block_list_diff_dirs(int n,const char * basedir,SeafDirent * dirs[],void * data,gboolean * recurse)3170 block_list_diff_dirs (int n, const char *basedir, SeafDirent *dirs[], void *data,
3171 gboolean *recurse)
3172 {
3173 /* Do nothing */
3174 return 0;
3175 }
3176
3177 static int
calculate_block_list(HttpTxTask * task,GList ** plist)3178 calculate_block_list (HttpTxTask *task, GList **plist)
3179 {
3180 int ret = 0;
3181 SeafBranch *local = NULL, *master = NULL;
3182 SeafCommit *local_head = NULL, *master_head = NULL;
3183
3184 local = seaf_branch_manager_get_branch (seaf->branch_mgr, task->repo_id, "local");
3185 if (!local) {
3186 seaf_warning ("Branch local not found for repo %.8s.\n", task->repo_id);
3187 ret = -1;
3188 goto out;
3189 }
3190 master = seaf_branch_manager_get_branch (seaf->branch_mgr, task->repo_id, "master");
3191 if (!master) {
3192 seaf_warning ("Branch master not found for repo %.8s.\n", task->repo_id);
3193 ret = -1;
3194 goto out;
3195 }
3196
3197 local_head = seaf_commit_manager_get_commit (seaf->commit_mgr,
3198 task->repo_id, task->repo_version,
3199 local->commit_id);
3200 if (!local_head) {
3201 seaf_warning ("Local head commit not found for repo %.8s.\n",
3202 task->repo_id);
3203 ret = -1;
3204 goto out;
3205 }
3206 master_head = seaf_commit_manager_get_commit (seaf->commit_mgr,
3207 task->repo_id, task->repo_version,
3208 master->commit_id);
3209 if (!master_head) {
3210 seaf_warning ("Master head commit not found for repo %.8s.\n",
3211 task->repo_id);
3212 ret = -1;
3213 goto out;
3214 }
3215
3216 CalcBlockListData data;
3217 memset (&data, 0, sizeof(data));
3218 data.added_blocks = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL);
3219 data.task = task;
3220
3221 DiffOptions opts;
3222 memset (&opts, 0, sizeof(opts));
3223 memcpy (opts.store_id, task->repo_id, 36);
3224 opts.version = task->repo_version;
3225 opts.file_cb = block_list_diff_files;
3226 opts.dir_cb = block_list_diff_dirs;
3227 opts.data = &data;
3228
3229 const char *trees[2];
3230 trees[0] = local_head->root_id;
3231 trees[1] = master_head->root_id;
3232 if (diff_trees (2, trees, &opts) < 0) {
3233 seaf_warning ("Failed to diff local and master head for repo %.8s.\n",
3234 task->repo_id);
3235 g_hash_table_destroy (data.added_blocks);
3236
3237 GList *ptr;
3238 for (ptr = data.block_list; ptr; ptr = ptr->next)
3239 g_free (ptr->data);
3240
3241 ret = -1;
3242 goto out;
3243 }
3244
3245 g_hash_table_destroy (data.added_blocks);
3246 *plist = data.block_list;
3247
3248 out:
3249 seaf_branch_unref (local);
3250 seaf_branch_unref (master);
3251 seaf_commit_unref (local_head);
3252 seaf_commit_unref (master_head);
3253 return ret;
3254 }
3255
3256 typedef struct {
3257 char block_id[41];
3258 BlockHandle *block;
3259 HttpTxTask *task;
3260 } SendBlockData;
3261
3262 static size_t
send_block_callback(void * ptr,size_t size,size_t nmemb,void * userp)3263 send_block_callback (void *ptr, size_t size, size_t nmemb, void *userp)
3264 {
3265 size_t realsize = size *nmemb;
3266 SendBlockData *data = userp;
3267 HttpTxTask *task = data->task;
3268 int n;
3269
3270 if (task->state == HTTP_TASK_STATE_CANCELED || task->all_stop)
3271 return CURL_READFUNC_ABORT;
3272
3273 n = seaf_block_manager_read_block (seaf->block_mgr,
3274 data->block,
3275 ptr, realsize);
3276 if (n < 0) {
3277 seaf_warning ("Failed to read block %s in repo %.8s.\n",
3278 data->block_id, task->repo_id);
3279 task->error = SYNC_ERROR_ID_LOCAL_DATA_CORRUPT;
3280 return CURL_READFUNC_ABORT;
3281 }
3282
3283 /* Update global transferred bytes. */
3284 g_atomic_int_add (&(seaf->sync_mgr->sent_bytes), n);
3285
3286 /* Update transferred bytes for this task */
3287 g_atomic_int_add (&task->tx_bytes, n);
3288
3289 /* If uploaded bytes exceeds the limit, wait until the counter
3290 * is reset. We check the counter every 100 milliseconds, so we
3291 * can waste up to 100 milliseconds without sending data after
3292 * the counter is reset.
3293 */
3294 while (1) {
3295 gint sent = g_atomic_int_get(&(seaf->sync_mgr->sent_bytes));
3296 if (seaf->sync_mgr->upload_limit > 0 &&
3297 sent > seaf->sync_mgr->upload_limit)
3298 /* 100 milliseconds */
3299 g_usleep (100000);
3300 else
3301 break;
3302 }
3303
3304 return n;
3305 }
3306
3307 static int
send_block(HttpTxTask * task,Connection * conn,const char * block_id,guint32 * psize)3308 send_block (HttpTxTask *task, Connection *conn, const char *block_id, guint32 *psize)
3309 {
3310 CURL *curl;
3311 char *url;
3312 int status;
3313 BlockMetadata *bmd;
3314 BlockHandle *block;
3315 int ret = 0;
3316
3317 bmd = seaf_block_manager_stat_block (seaf->block_mgr,
3318 task->repo_id, task->repo_version,
3319 block_id);
3320 if (!bmd) {
3321 seaf_warning ("Failed to stat block %s in repo %s.\n",
3322 block_id, task->repo_id);
3323 task->error = SYNC_ERROR_ID_LOCAL_DATA_CORRUPT;
3324 return -1;
3325 }
3326
3327 block = seaf_block_manager_open_block (seaf->block_mgr,
3328 task->repo_id, task->repo_version,
3329 block_id, BLOCK_READ);
3330 if (!block) {
3331 seaf_warning ("Failed to open block %s in repo %s.\n",
3332 block_id, task->repo_id);
3333 task->error = SYNC_ERROR_ID_LOCAL_DATA_CORRUPT;
3334 g_free (bmd);
3335 return -1;
3336 }
3337
3338 SendBlockData data;
3339 memset (&data, 0, sizeof(data));
3340 memcpy (data.block_id, block_id, 40);
3341 data.block = block;
3342 data.task = task;
3343
3344 curl = conn->curl;
3345
3346 if (!task->use_fileserver_port)
3347 url = g_strdup_printf ("%s/seafhttp/repo/%s/block/%s",
3348 task->host, task->repo_id, block_id);
3349 else
3350 url = g_strdup_printf ("%s/repo/%s/block/%s",
3351 task->host, task->repo_id, block_id);
3352
3353 int curl_error;
3354 if (http_put (curl, url, task->token,
3355 NULL, bmd->size,
3356 send_block_callback, &data,
3357 &status, NULL, NULL, TRUE, &curl_error) < 0) {
3358 if (task->state == HTTP_TASK_STATE_CANCELED)
3359 goto out;
3360
3361 if (task->error == SYNC_ERROR_ID_NO_ERROR) {
3362 /* Only release connection when it's a network error */
3363 conn->release = TRUE;
3364 handle_curl_errors (task, curl_error);
3365 }
3366 ret = -1;
3367 goto out;
3368 }
3369
3370 if (status != HTTP_OK) {
3371 seaf_warning ("Bad response code for PUT %s: %d.\n", url, status);
3372 handle_http_errors (task, status);
3373 ret = -1;
3374 goto out;
3375 }
3376
3377 if (psize)
3378 *psize = bmd->size;
3379
3380 out:
3381 g_free (url);
3382 curl_easy_reset (curl);
3383 g_free (bmd);
3384 seaf_block_manager_close_block (seaf->block_mgr, block);
3385 seaf_block_manager_block_handle_free (seaf->block_mgr, block);
3386
3387 return ret;
3388 }
3389
3390 typedef struct BlockUploadData {
3391 HttpTxTask *http_task;
3392 ConnectionPool *cpool;
3393 GAsyncQueue *finished_tasks;
3394 } BlockUploadData;
3395
3396 typedef struct BlockUploadTask {
3397 char block_id[41];
3398 int result;
3399 guint32 block_size;
3400 } BlockUploadTask;
3401
3402 static void
block_upload_task_free(BlockUploadTask * task)3403 block_upload_task_free (BlockUploadTask *task)
3404 {
3405 g_free (task);
3406 }
3407
3408 static void
upload_block_thread_func(gpointer data,gpointer user_data)3409 upload_block_thread_func (gpointer data, gpointer user_data)
3410 {
3411 BlockUploadTask *task = data;
3412 BlockUploadData *tx_data = user_data;
3413 HttpTxTask *http_task = tx_data->http_task;
3414 Connection *conn;
3415 int ret = 0;
3416
3417 conn = connection_pool_get_connection (tx_data->cpool);
3418 if (!conn) {
3419 seaf_warning ("Failed to get connection to host %s.\n", http_task->host);
3420 http_task->error = SYNC_ERROR_ID_NOT_ENOUGH_MEMORY;
3421 ret = -1;
3422 goto out;
3423 }
3424
3425 ret = send_block (http_task, conn, task->block_id, &task->block_size);
3426
3427 connection_pool_return_connection (tx_data->cpool, conn);
3428
3429 out:
3430 task->result = ret;
3431 g_async_queue_push (tx_data->finished_tasks, task);
3432 }
3433
3434 #define DEFAULT_UPLOAD_BLOCK_THREADS 3
3435
3436 static int
multi_threaded_send_blocks(HttpTxTask * http_task,GList * block_list)3437 multi_threaded_send_blocks (HttpTxTask *http_task, GList *block_list)
3438 {
3439 HttpTxPriv *priv = seaf->http_tx_mgr->priv;
3440 GThreadPool *tpool;
3441 GAsyncQueue *finished_tasks;
3442 GHashTable *pending_tasks;
3443 ConnectionPool *cpool;
3444 GList *ptr;
3445 char *block_id;
3446 BlockUploadTask *task;
3447 SyncInfo *info;
3448 int ret = 0;
3449
3450 if (block_list == NULL)
3451 return 0;
3452
3453 cpool = find_connection_pool (priv, http_task->host);
3454 if (!cpool) {
3455 seaf_warning ("Failed to create connection pool for host %s.\n", http_task->host);
3456 http_task->error = SYNC_ERROR_ID_NOT_ENOUGH_MEMORY;
3457 return -1;
3458 }
3459
3460 finished_tasks = g_async_queue_new ();
3461
3462 BlockUploadData data;
3463 data.http_task = http_task;
3464 data.finished_tasks = finished_tasks;
3465 data.cpool = cpool;
3466
3467 tpool = g_thread_pool_new (upload_block_thread_func, &data,
3468 DEFAULT_UPLOAD_BLOCK_THREADS, FALSE, NULL);
3469
3470 pending_tasks = g_hash_table_new_full (g_str_hash, g_str_equal,
3471 g_free,
3472 (GDestroyNotify)block_upload_task_free);
3473
3474 info = seaf_sync_manager_get_sync_info (seaf->sync_mgr, http_task->repo_id);
3475
3476 for (ptr = block_list; ptr; ptr = ptr->next) {
3477 block_id = ptr->data;
3478
3479 task = g_new0 (BlockUploadTask, 1);
3480 memcpy (task->block_id, block_id, 40);
3481
3482 if (!g_hash_table_lookup (pending_tasks, block_id)) {
3483 g_hash_table_insert (pending_tasks, g_strdup(block_id), task);
3484 g_thread_pool_push (tpool, task, NULL);
3485 } else {
3486 g_free (task);
3487 }
3488 }
3489
3490 while ((task = g_async_queue_pop (finished_tasks)) != NULL) {
3491 if (task->result < 0 || http_task->state == HTTP_TASK_STATE_CANCELED) {
3492 ret = task->result;
3493 http_task->all_stop = TRUE;
3494 break;
3495 }
3496
3497 ++(http_task->done_blocks);
3498
3499 if (info && info->multipart_upload) {
3500 info->uploaded_bytes += (gint64)task->block_size;
3501 }
3502
3503 g_hash_table_remove (pending_tasks, task->block_id);
3504 if (g_hash_table_size(pending_tasks) == 0)
3505 break;
3506 }
3507
3508 g_thread_pool_free (tpool, TRUE, TRUE);
3509
3510 g_hash_table_destroy (pending_tasks);
3511
3512 g_async_queue_unref (finished_tasks);
3513
3514 return ret;
3515 }
3516
3517 static void
notify_permission_error(HttpTxTask * task,const char * error_str)3518 notify_permission_error (HttpTxTask *task, const char *error_str)
3519 {
3520 HttpTxPriv *priv = seaf->http_tx_mgr->priv;
3521 GMatchInfo *match_info;
3522 char *path;
3523
3524 if (g_regex_match (priv->locked_error_regex, error_str, 0, &match_info)) {
3525 path = g_match_info_fetch (match_info, 1);
3526 seaf_repo_manager_record_sync_error (task->repo_id, task->repo_name, path,
3527 SYNC_ERROR_ID_FILE_LOCKED);
3528 g_free (path);
3529
3530 /* Set more accurate error. */
3531 task->error = SYNC_ERROR_ID_FILE_LOCKED;
3532 } else if (g_regex_match (priv->folder_perm_error_regex, error_str, 0, &match_info)) {
3533 path = g_match_info_fetch (match_info, 1);
3534 /* The path returned by server begins with '/'. */
3535 seaf_repo_manager_record_sync_error (task->repo_id, task->repo_name,
3536 (path[0] == '/') ? (path + 1) : path,
3537 SYNC_ERROR_ID_FOLDER_PERM_DENIED);
3538 g_free (path);
3539
3540 task->error = SYNC_ERROR_ID_FOLDER_PERM_DENIED;
3541 }
3542
3543 g_match_info_free (match_info);
3544 }
3545
3546 static int
update_branch(HttpTxTask * task,Connection * conn)3547 update_branch (HttpTxTask *task, Connection *conn)
3548 {
3549 CURL *curl;
3550 char *url;
3551 int status;
3552 char *rsp_content;
3553 char *rsp_content_str = NULL;
3554 gint64 rsp_size;
3555 int ret = 0;
3556
3557 curl = conn->curl;
3558
3559 if (!task->use_fileserver_port)
3560 url = g_strdup_printf ("%s/seafhttp/repo/%s/commit/HEAD/?head=%s",
3561 task->host, task->repo_id, task->head);
3562 else
3563 url = g_strdup_printf ("%s/repo/%s/commit/HEAD/?head=%s",
3564 task->host, task->repo_id, task->head);
3565
3566 int curl_error;
3567 if (http_put (curl, url, task->token,
3568 NULL, 0,
3569 NULL, NULL,
3570 &status, &rsp_content, &rsp_size, TRUE, &curl_error) < 0) {
3571 conn->release = TRUE;
3572 handle_curl_errors (task, curl_error);
3573 ret = -1;
3574 goto out;
3575 }
3576
3577 if (status != HTTP_OK) {
3578 seaf_warning ("Bad response code for PUT %s: %d.\n", url, status);
3579 handle_http_errors (task, status);
3580
3581 if (status == HTTP_FORBIDDEN) {
3582 rsp_content_str = g_new0 (gchar, rsp_size + 1);
3583 memcpy (rsp_content_str, rsp_content, rsp_size);
3584 seaf_warning ("%s\n", rsp_content_str);
3585 notify_permission_error (task, rsp_content_str);
3586 g_free (rsp_content_str);
3587 }
3588
3589 ret = -1;
3590 }
3591
3592 out:
3593 g_free (url);
3594 g_free (rsp_content);
3595 curl_easy_reset (curl);
3596
3597 return ret;
3598 }
3599
3600 static void
update_master_branch(HttpTxTask * task)3601 update_master_branch (HttpTxTask *task)
3602 {
3603 SeafBranch *branch;
3604 branch = seaf_branch_manager_get_branch (seaf->branch_mgr,
3605 task->repo_id,
3606 "master");
3607 if (!branch) {
3608 branch = seaf_branch_new ("master", task->repo_id, task->head);
3609 seaf_branch_manager_add_branch (seaf->branch_mgr, branch);
3610 seaf_branch_unref (branch);
3611 } else {
3612 seaf_branch_set_commit (branch, task->head);
3613 seaf_branch_manager_update_branch (seaf->branch_mgr, branch);
3614 seaf_branch_unref (branch);
3615 }
3616 }
3617
3618 static void
set_path_status_syncing(gpointer key,gpointer value,gpointer user_data)3619 set_path_status_syncing (gpointer key, gpointer value, gpointer user_data)
3620 {
3621 HttpTxTask *task = user_data;
3622 char *path = key;
3623 int mode = (int)(long)value;
3624 seaf_sync_manager_update_active_path (seaf->sync_mgr,
3625 task->repo_id,
3626 path,
3627 mode,
3628 SYNC_STATUS_SYNCING,
3629 TRUE);
3630 }
3631
3632 static void
set_path_status_synced(gpointer key,gpointer value,gpointer user_data)3633 set_path_status_synced (gpointer key, gpointer value, gpointer user_data)
3634 {
3635 HttpTxTask *task = user_data;
3636 char *path = key;
3637 int mode = (int)(long)value;
3638 seaf_sync_manager_update_active_path (seaf->sync_mgr,
3639 task->repo_id,
3640 path,
3641 mode,
3642 SYNC_STATUS_SYNCED,
3643 TRUE);
3644 }
3645
3646 static void *
http_upload_thread(void * vdata)3647 http_upload_thread (void *vdata)
3648 {
3649 HttpTxTask *task = vdata;
3650 HttpTxPriv *priv = seaf->http_tx_mgr->priv;
3651 ConnectionPool *pool;
3652 Connection *conn = NULL;
3653 char *url = NULL;
3654 GList *send_fs_list = NULL, *needed_fs_list = NULL;
3655 GList *block_list = NULL, *needed_block_list = NULL;
3656 GHashTable *active_paths = NULL;
3657
3658 SeafBranch *local = seaf_branch_manager_get_branch (seaf->branch_mgr,
3659 task->repo_id, "local");
3660 if (!local) {
3661 seaf_warning ("Failed to get branch local of repo %.8s.\n", task->repo_id);
3662 task->error = SYNC_ERROR_ID_LOCAL_DATA_CORRUPT;
3663 return NULL;
3664 }
3665 memcpy (task->head, local->commit_id, 40);
3666 seaf_branch_unref (local);
3667
3668 pool = find_connection_pool (priv, task->host);
3669 if (!pool) {
3670 seaf_warning ("Failed to create connection pool for host %s.\n", task->host);
3671 task->error = SYNC_ERROR_ID_NOT_ENOUGH_MEMORY;
3672 goto out;
3673 }
3674
3675 conn = connection_pool_get_connection (pool);
3676 if (!conn) {
3677 seaf_warning ("Failed to get connection to host %s.\n", task->host);
3678 task->error = SYNC_ERROR_ID_NOT_ENOUGH_MEMORY;
3679 goto out;
3680 }
3681
3682 /* seaf_message ("Upload with HTTP sync protocol version %d.\n", */
3683 /* task->protocol_version); */
3684
3685 transition_state (task, task->state, HTTP_TASK_RT_STATE_CHECK);
3686
3687 gint64 delta = 0;
3688 active_paths = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL);
3689
3690 if (calculate_upload_size_delta_and_active_paths (task, &delta, active_paths) < 0) {
3691 seaf_warning ("Failed to calculate upload size delta for repo %s.\n",
3692 task->repo_id);
3693 task->error = SYNC_ERROR_ID_LOCAL_DATA_CORRUPT;
3694 goto out;
3695 }
3696
3697 g_hash_table_foreach (active_paths, set_path_status_syncing, task);
3698
3699 if (check_permission (task, conn) < 0) {
3700 seaf_warning ("Upload permission denied for repo %.8s on server %s.\n",
3701 task->repo_id, task->host);
3702 goto out;
3703 }
3704
3705 if (check_quota (task, conn, delta) < 0) {
3706 seaf_warning ("Not enough quota for repo %.8s on server %s.\n",
3707 task->repo_id, task->host);
3708 goto out;
3709 }
3710
3711 if (task->state == HTTP_TASK_STATE_CANCELED)
3712 goto out;
3713
3714 transition_state (task, task->state, HTTP_TASK_RT_STATE_COMMIT);
3715
3716 if (send_commit_object (task, conn) < 0) {
3717 seaf_warning ("Failed to send head commit for repo %.8s.\n", task->repo_id);
3718 goto out;
3719 }
3720
3721 if (task->state == HTTP_TASK_STATE_CANCELED)
3722 goto out;
3723
3724 transition_state (task, task->state, HTTP_TASK_RT_STATE_FS);
3725
3726 send_fs_list = calculate_send_fs_object_list (task);
3727 if (!send_fs_list) {
3728 seaf_warning ("Failed to calculate fs object list for repo %.8s.\n",
3729 task->repo_id);
3730 task->error = SYNC_ERROR_ID_LOCAL_DATA_CORRUPT;
3731 goto out;
3732 }
3733
3734 if (!task->use_fileserver_port)
3735 url = g_strdup_printf ("%s/seafhttp/repo/%s/check-fs/",
3736 task->host, task->repo_id);
3737 else
3738 url = g_strdup_printf ("%s/repo/%s/check-fs/",
3739 task->host, task->repo_id);
3740
3741 while (send_fs_list != NULL) {
3742 if (upload_check_id_list_segment (task, conn, url,
3743 &send_fs_list, &needed_fs_list) < 0) {
3744 seaf_warning ("Failed to check fs list for repo %.8s.\n", task->repo_id);
3745 goto out;
3746 }
3747
3748 if (task->state == HTTP_TASK_STATE_CANCELED)
3749 goto out;
3750 }
3751 g_free (url);
3752 url = NULL;
3753
3754 while (needed_fs_list != NULL) {
3755 if (send_fs_objects (task, conn, &needed_fs_list) < 0) {
3756 seaf_warning ("Failed to send fs objects for repo %.8s.\n", task->repo_id);
3757 goto out;
3758 }
3759
3760 if (task->state == HTTP_TASK_STATE_CANCELED)
3761 goto out;
3762 }
3763
3764 transition_state (task, task->state, HTTP_TASK_RT_STATE_BLOCK);
3765
3766 if (calculate_block_list (task, &block_list) < 0) {
3767 seaf_warning ("Failed to calculate block list for repo %.8s.\n",
3768 task->repo_id);
3769 task->error = SYNC_ERROR_ID_LOCAL_DATA_CORRUPT;
3770 goto out;
3771 }
3772
3773 if (!task->use_fileserver_port)
3774 url = g_strdup_printf ("%s/seafhttp/repo/%s/check-blocks/",
3775 task->host, task->repo_id);
3776 else
3777 url = g_strdup_printf ("%s/repo/%s/check-blocks/",
3778 task->host, task->repo_id);
3779
3780 while (block_list != NULL) {
3781 if (upload_check_id_list_segment (task, conn, url,
3782 &block_list, &needed_block_list) < 0) {
3783 seaf_warning ("Failed to check block list for repo %.8s.\n",
3784 task->repo_id);
3785 goto out;
3786 }
3787
3788 if (task->state == HTTP_TASK_STATE_CANCELED)
3789 goto out;
3790 }
3791 g_free (url);
3792 url = NULL;
3793
3794 task->n_blocks = g_list_length (needed_block_list);
3795
3796 seaf_debug ("%d blocks to send for %s:%s.\n",
3797 task->n_blocks, task->host, task->repo_id);
3798
3799 if (multi_threaded_send_blocks(task, needed_block_list) < 0 ||
3800 task->state == HTTP_TASK_STATE_CANCELED)
3801 goto out;
3802
3803 transition_state (task, task->state, HTTP_TASK_RT_STATE_UPDATE_BRANCH);
3804
3805 if (update_branch (task, conn) < 0) {
3806 seaf_warning ("Failed to update branch of repo %.8s.\n", task->repo_id);
3807 goto out;
3808 }
3809
3810 /* After successful upload, the cached 'master' branch should be updated to
3811 * the head commit of 'local' branch.
3812 */
3813 update_master_branch (task);
3814
3815 if (active_paths != NULL)
3816 g_hash_table_foreach (active_paths, set_path_status_synced, task);
3817
3818 out:
3819 string_list_free (send_fs_list);
3820 string_list_free (needed_fs_list);
3821 string_list_free (block_list);
3822 string_list_free (needed_block_list);
3823
3824 if (active_paths)
3825 g_hash_table_destroy (active_paths);
3826
3827 g_free (url);
3828
3829 connection_pool_return_connection (pool, conn);
3830
3831 return vdata;
3832 }
3833
3834 static void
http_upload_done(void * vdata)3835 http_upload_done (void *vdata)
3836 {
3837 HttpTxTask *task = vdata;
3838
3839 if (task->error != SYNC_ERROR_ID_NO_ERROR)
3840 transition_state (task, HTTP_TASK_STATE_ERROR, HTTP_TASK_RT_STATE_FINISHED);
3841 else if (task->state == HTTP_TASK_STATE_CANCELED)
3842 transition_state (task, task->state, HTTP_TASK_RT_STATE_FINISHED);
3843 else
3844 transition_state (task, HTTP_TASK_STATE_FINISHED, HTTP_TASK_RT_STATE_FINISHED);
3845 }
3846
3847 /* Download */
3848
3849 static void *http_download_thread (void *vdata);
3850 static void http_download_done (void *vdata);
3851
3852 int
http_tx_manager_add_download(HttpTxManager * manager,const char * repo_id,int repo_version,const char * host,const char * token,const char * server_head_id,gboolean is_clone,const char * passwd,const char * worktree,int protocol_version,const char * email,gboolean use_fileserver_port,const char * repo_name,GError ** error)3853 http_tx_manager_add_download (HttpTxManager *manager,
3854 const char *repo_id,
3855 int repo_version,
3856 const char *host,
3857 const char *token,
3858 const char *server_head_id,
3859 gboolean is_clone,
3860 const char *passwd,
3861 const char *worktree,
3862 int protocol_version,
3863 const char *email,
3864 gboolean use_fileserver_port,
3865 const char *repo_name,
3866 GError **error)
3867 {
3868 HttpTxTask *task;
3869 SeafRepo *repo;
3870
3871 if (!repo_id || !token || !host || !server_head_id || !email) {
3872 g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_BAD_ARGS, "Empty argument(s)");
3873 return -1;
3874 }
3875
3876 if (!is_clone) {
3877 repo = seaf_repo_manager_get_repo (seaf->repo_mgr, repo_id);
3878 if (!repo) {
3879 g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_BAD_ARGS, "Repo not found");
3880 return -1;
3881 }
3882 }
3883
3884 clean_tasks_for_repo (manager, repo_id);
3885
3886 task = http_tx_task_new (manager, repo_id, repo_version,
3887 HTTP_TASK_TYPE_DOWNLOAD, is_clone,
3888 host, token, passwd, worktree);
3889
3890 memcpy (task->head, server_head_id, 40);
3891 task->protocol_version = protocol_version;
3892 task->email = g_strdup(email);
3893
3894 task->state = HTTP_TASK_STATE_NORMAL;
3895
3896 task->use_fileserver_port = use_fileserver_port;
3897
3898 task->blk_ref_cnts = g_hash_table_new_full (g_str_hash, g_str_equal,
3899 g_free, g_free);
3900 pthread_mutex_init (&task->ref_cnt_lock, NULL);
3901
3902 g_hash_table_insert (manager->priv->download_tasks,
3903 g_strdup(repo_id),
3904 task);
3905
3906 task->repo_name = g_strdup(repo_name);
3907
3908 if (seaf_job_manager_schedule_job (seaf->job_mgr,
3909 http_download_thread,
3910 http_download_done,
3911 task) < 0) {
3912 g_hash_table_remove (manager->priv->download_tasks, repo_id);
3913 return -1;
3914 }
3915
3916 return 0;
3917 }
3918
3919 static int
get_commit_object(HttpTxTask * task,Connection * conn)3920 get_commit_object (HttpTxTask *task, Connection *conn)
3921 {
3922 CURL *curl;
3923 char *url;
3924 int status;
3925 char *rsp_content = NULL;
3926 gint64 rsp_size;
3927 int ret = 0;
3928
3929 curl = conn->curl;
3930
3931 if (!task->use_fileserver_port)
3932 url = g_strdup_printf ("%s/seafhttp/repo/%s/commit/%s",
3933 task->host, task->repo_id, task->head);
3934 else
3935 url = g_strdup_printf ("%s/repo/%s/commit/%s",
3936 task->host, task->repo_id, task->head);
3937
3938 int curl_error;
3939 if (http_get (curl, url, task->token, &status,
3940 &rsp_content, &rsp_size,
3941 NULL, NULL, TRUE, &curl_error) < 0) {
3942 conn->release = TRUE;
3943 handle_curl_errors (task, curl_error);
3944 ret = -1;
3945 goto out;
3946 }
3947
3948 if (status != HTTP_OK) {
3949 seaf_warning ("Bad response code for GET %s: %d.\n", url, status);
3950 handle_http_errors (task, status);
3951 ret = -1;
3952 goto out;
3953 }
3954
3955 int rc = seaf_obj_store_write_obj (seaf->commit_mgr->obj_store,
3956 task->repo_id, task->repo_version,
3957 task->head,
3958 rsp_content,
3959 rsp_size,
3960 FALSE);
3961 if (rc < 0) {
3962 seaf_warning ("Failed to save commit %s in repo %.8s.\n",
3963 task->head, task->repo_id);
3964 task->error = SYNC_ERROR_ID_WRITE_LOCAL_DATA;
3965 ret = -1;
3966 }
3967
3968 out:
3969 g_free (url);
3970 g_free (rsp_content);
3971 curl_easy_reset (curl);
3972
3973 return ret;
3974 }
3975
3976 static int
get_needed_fs_id_list(HttpTxTask * task,Connection * conn,GList ** fs_id_list)3977 get_needed_fs_id_list (HttpTxTask *task, Connection *conn, GList **fs_id_list)
3978 {
3979 SeafBranch *master;
3980 CURL *curl;
3981 char *url = NULL;
3982 int status;
3983 char *rsp_content = NULL;
3984 gint64 rsp_size;
3985 int ret = 0;
3986 json_t *array;
3987 json_error_t jerror;
3988 const char *obj_id;
3989
3990 const char *url_prefix = (task->use_fileserver_port) ? "" : "seafhttp/";
3991
3992 if (!task->is_clone) {
3993 master = seaf_branch_manager_get_branch (seaf->branch_mgr,
3994 task->repo_id,
3995 "master");
3996 if (!master) {
3997 seaf_warning ("Failed to get branch master for repo %.8s.\n",
3998 task->repo_id);
3999 return -1;
4000 }
4001
4002 url = g_strdup_printf ("%s/%srepo/%s/fs-id-list/"
4003 "?server-head=%s&client-head=%s",
4004 task->host, url_prefix, task->repo_id,
4005 task->head, master->commit_id);
4006
4007 seaf_branch_unref (master);
4008 } else {
4009 url = g_strdup_printf ("%s/%srepo/%s/fs-id-list/?server-head=%s",
4010 task->host, url_prefix, task->repo_id, task->head);
4011 }
4012
4013 curl = conn->curl;
4014
4015 int curl_error;
4016 if (http_get (curl, url, task->token, &status,
4017 &rsp_content, &rsp_size,
4018 NULL, NULL, (!task->is_clone), &curl_error) < 0) {
4019 conn->release = TRUE;
4020 handle_curl_errors (task, curl_error);
4021 ret = -1;
4022 goto out;
4023 }
4024
4025 if (status != HTTP_OK) {
4026 seaf_warning ("Bad response code for GET %s: %d.\n", url, status);
4027 handle_http_errors (task, status);
4028 ret = -1;
4029 goto out;
4030 }
4031
4032 array = json_loadb (rsp_content, rsp_size, 0, &jerror);
4033 if (!array) {
4034 seaf_warning ("Invalid JSON response from the server: %s.\n", jerror.text);
4035 task->error = SYNC_ERROR_ID_SERVER;
4036 ret = -1;
4037 goto out;
4038 }
4039
4040 int i;
4041 size_t n = json_array_size (array);
4042 json_t *str;
4043
4044 seaf_debug ("Received fs object list size %lu from %s:%s.\n",
4045 n, task->host, task->repo_id);
4046
4047 task->n_fs_objs = (int)n;
4048
4049 GHashTable *checked_objs = g_hash_table_new_full (g_str_hash, g_str_equal,
4050 g_free, NULL);
4051
4052 for (i = 0; i < n; ++i) {
4053 str = json_array_get (array, i);
4054 if (!str) {
4055 seaf_warning ("Invalid JSON response from the server.\n");
4056 json_decref (array);
4057 string_list_free (*fs_id_list);
4058 ret = -1;
4059 goto out;
4060 }
4061
4062 obj_id = json_string_value(str);
4063
4064 if (g_hash_table_lookup (checked_objs, obj_id)) {
4065 ++(task->done_fs_objs);
4066 continue;
4067 }
4068 char *key = g_strdup(obj_id);
4069 g_hash_table_replace (checked_objs, key, key);
4070
4071 if (!seaf_obj_store_obj_exists (seaf->fs_mgr->obj_store,
4072 task->repo_id, task->repo_version,
4073 obj_id)) {
4074 *fs_id_list = g_list_prepend (*fs_id_list, g_strdup(obj_id));
4075 } else if (task->is_clone) {
4076 gboolean io_error = FALSE;
4077 gboolean sound;
4078 sound = seaf_fs_manager_verify_object (seaf->fs_mgr,
4079 task->repo_id, task->repo_version,
4080 obj_id, FALSE, &io_error);
4081 if (!sound && !io_error) {
4082 *fs_id_list = g_list_prepend (*fs_id_list, g_strdup(obj_id));
4083 } else {
4084 ++(task->done_fs_objs);
4085 }
4086 } else {
4087 ++(task->done_fs_objs);
4088 }
4089 }
4090
4091 json_decref (array);
4092 g_hash_table_destroy (checked_objs);
4093
4094 out:
4095 g_free (url);
4096 g_free (rsp_content);
4097 curl_easy_reset (curl);
4098
4099 return ret;
4100 }
4101
4102 #define GET_FS_OBJECT_N 100
4103
4104 static int
get_fs_objects(HttpTxTask * task,Connection * conn,GList ** fs_list)4105 get_fs_objects (HttpTxTask *task, Connection *conn, GList **fs_list)
4106 {
4107 json_t *array;
4108 char *obj_id;
4109 int n_sent = 0;
4110 char *data = NULL;
4111 int len;
4112 CURL *curl;
4113 char *url = NULL;
4114 int status;
4115 char *rsp_content = NULL;
4116 gint64 rsp_size;
4117 int ret = 0;
4118 GHashTable *requested;
4119
4120 /* Convert object id list to JSON format. */
4121
4122 requested = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL);
4123
4124 array = json_array ();
4125
4126 while (*fs_list != NULL) {
4127 obj_id = (*fs_list)->data;
4128 json_array_append_new (array, json_string(obj_id));
4129
4130 *fs_list = g_list_delete_link (*fs_list, *fs_list);
4131
4132 g_hash_table_replace (requested, obj_id, obj_id);
4133
4134 if (++n_sent >= GET_FS_OBJECT_N)
4135 break;
4136 }
4137
4138 seaf_debug ("Requesting %d fs objects from %s:%s.\n",
4139 n_sent, task->host, task->repo_id);
4140
4141 data = json_dumps (array, 0);
4142 len = strlen(data);
4143 json_decref (array);
4144
4145 /* Send fs object id list. */
4146
4147 curl = conn->curl;
4148
4149 if (!task->use_fileserver_port)
4150 url = g_strdup_printf ("%s/seafhttp/repo/%s/pack-fs/", task->host, task->repo_id);
4151 else
4152 url = g_strdup_printf ("%s/repo/%s/pack-fs/", task->host, task->repo_id);
4153
4154 int curl_error;
4155 if (http_post (curl, url, task->token,
4156 data, len,
4157 &status, &rsp_content, &rsp_size, TRUE, &curl_error) < 0) {
4158 conn->release = TRUE;
4159 handle_curl_errors (task, curl_error);
4160 ret = -1;
4161 goto out;
4162 }
4163
4164 if (status != HTTP_OK) {
4165 seaf_warning ("Bad response code for POST %s: %d.\n", url, status);
4166 handle_http_errors (task, status);
4167 ret = -1;
4168 goto out;
4169 }
4170
4171 /* Save received fs objects. */
4172
4173 int n_recv = 0;
4174 char *p = rsp_content;
4175 ObjectHeader *hdr = (ObjectHeader *)p;
4176 char recv_obj_id[41];
4177 int n = 0;
4178 int size;
4179 int rc;
4180 while (n < rsp_size) {
4181 memcpy (recv_obj_id, hdr->obj_id, 40);
4182 recv_obj_id[40] = 0;
4183 size = ntohl (hdr->obj_size);
4184 if (n + sizeof(ObjectHeader) + size > rsp_size) {
4185 seaf_warning ("Incomplete object package received for repo %.8s.\n",
4186 task->repo_id);
4187 task->error = SYNC_ERROR_ID_SERVER;
4188 ret = -1;
4189 goto out;
4190 }
4191
4192 ++n_recv;
4193
4194 rc = seaf_obj_store_write_obj (seaf->fs_mgr->obj_store,
4195 task->repo_id, task->repo_version,
4196 recv_obj_id,
4197 hdr->object,
4198 size, FALSE);
4199 if (rc < 0) {
4200 seaf_warning ("Failed to write fs object %s in repo %.8s.\n",
4201 recv_obj_id, task->repo_id);
4202 task->error = SYNC_ERROR_ID_WRITE_LOCAL_DATA;
4203 ret = -1;
4204 goto out;
4205 }
4206
4207 g_hash_table_remove (requested, recv_obj_id);
4208
4209 ++(task->done_fs_objs);
4210
4211 p += (sizeof(ObjectHeader) + size);
4212 n += (sizeof(ObjectHeader) + size);
4213 hdr = (ObjectHeader *)p;
4214 }
4215
4216 seaf_debug ("Received %d fs objects from %s:%s.\n",
4217 n_recv, task->host, task->repo_id);
4218
4219 /* The server may not return all the objects we requested.
4220 * So we need to add back the remaining object ids into fs_list.
4221 */
4222 GHashTableIter iter;
4223 gpointer key, value;
4224 g_hash_table_iter_init (&iter, requested);
4225 while (g_hash_table_iter_next (&iter, &key, &value)) {
4226 obj_id = key;
4227 *fs_list = g_list_prepend (*fs_list, g_strdup(obj_id));
4228 }
4229 g_hash_table_destroy (requested);
4230
4231 out:
4232 g_free (url);
4233 g_free (data);
4234 g_free (rsp_content);
4235 curl_easy_reset (curl);
4236
4237 return ret;
4238 }
4239
4240 typedef struct {
4241 char block_id[41];
4242 BlockHandle *block;
4243 HttpTxTask *task;
4244 } GetBlockData;
4245
4246 static size_t
get_block_callback(void * ptr,size_t size,size_t nmemb,void * userp)4247 get_block_callback (void *ptr, size_t size, size_t nmemb, void *userp)
4248 {
4249 size_t realsize = size *nmemb;
4250 SendBlockData *data = userp;
4251 HttpTxTask *task = data->task;
4252 size_t n;
4253
4254 if (task->state == HTTP_TASK_STATE_CANCELED || task->all_stop)
4255 return 0;
4256
4257 n = seaf_block_manager_write_block (seaf->block_mgr,
4258 data->block,
4259 ptr, realsize);
4260 if (n < realsize) {
4261 seaf_warning ("Failed to write block %s in repo %.8s.\n",
4262 data->block_id, task->repo_id);
4263 task->error = SYNC_ERROR_ID_WRITE_LOCAL_DATA;
4264 return n;
4265 }
4266
4267 /* Update global transferred bytes. */
4268 g_atomic_int_add (&(seaf->sync_mgr->recv_bytes), n);
4269
4270 /* Update transferred bytes for this task */
4271 g_atomic_int_add (&task->tx_bytes, n);
4272
4273 /* If uploaded bytes exceeds the limit, wait until the counter
4274 * is reset. We check the counter every 100 milliseconds, so we
4275 * can waste up to 100 milliseconds without sending data after
4276 * the counter is reset.
4277 */
4278 while (1) {
4279 gint sent = g_atomic_int_get(&(seaf->sync_mgr->recv_bytes));
4280 if (seaf->sync_mgr->download_limit > 0 &&
4281 sent > seaf->sync_mgr->download_limit)
4282 /* 100 milliseconds */
4283 g_usleep (100000);
4284 else
4285 break;
4286 }
4287
4288 return n;
4289 }
4290
4291 int
get_block(HttpTxTask * task,Connection * conn,const char * block_id)4292 get_block (HttpTxTask *task, Connection *conn, const char *block_id)
4293 {
4294 CURL *curl;
4295 char *url;
4296 int status;
4297 BlockHandle *block;
4298 int ret = 0;
4299 int *pcnt;
4300
4301 block = seaf_block_manager_open_block (seaf->block_mgr,
4302 task->repo_id, task->repo_version,
4303 block_id, BLOCK_WRITE);
4304 if (!block) {
4305 seaf_warning ("Failed to open block %s in repo %.8s.\n",
4306 block_id, task->repo_id);
4307 return -1;
4308 }
4309
4310 GetBlockData data;
4311 memcpy (data.block_id, block_id, 40);
4312 data.block = block;
4313 data.task = task;
4314
4315 curl = conn->curl;
4316
4317 if (!task->use_fileserver_port)
4318 url = g_strdup_printf ("%s/seafhttp/repo/%s/block/%s",
4319 task->host, task->repo_id, block_id);
4320 else
4321 url = g_strdup_printf ("%s/repo/%s/block/%s",
4322 task->host, task->repo_id, block_id);
4323
4324 int curl_error;
4325 if (http_get (curl, url, task->token, &status, NULL, NULL,
4326 get_block_callback, &data, TRUE, &curl_error) < 0) {
4327 if (task->state == HTTP_TASK_STATE_CANCELED)
4328 goto error;
4329
4330 if (task->error == SYNC_ERROR_ID_NO_ERROR) {
4331 /* Only release the connection when it's a network error. */
4332 conn->release = TRUE;
4333 handle_curl_errors (task, curl_error);
4334 }
4335 ret = -1;
4336 goto error;
4337 }
4338
4339 if (status != HTTP_OK) {
4340 seaf_warning ("Bad response code for GET %s: %d.\n", url, status);
4341 handle_http_errors (task, status);
4342 ret = -1;
4343 goto error;
4344 }
4345
4346 BlockMetadata *bmd = seaf_block_manager_stat_block_by_handle(seaf->block_mgr, block);
4347 if (bmd == NULL) {
4348 seaf_warning ("Failed to get block %s meta data in repo %.8s.\n", block_id, task->repo_id);
4349 ret = -1;
4350 goto error;
4351 }
4352
4353 seaf_block_manager_close_block (seaf->block_mgr, block);
4354
4355 pthread_mutex_lock (&task->ref_cnt_lock);
4356
4357 task->done_download += bmd->size;
4358 g_free (bmd);
4359
4360 /* Don't overwrite the block if other thread already downloaded it.
4361 * Since we've locked ref_cnt_lock, we can be sure the block won't be removed.
4362 */
4363 if (!seaf_block_manager_block_exists (seaf->block_mgr,
4364 task->repo_id, task->repo_version,
4365 block_id) &&
4366 seaf_block_manager_commit_block (seaf->block_mgr, block) < 0)
4367 {
4368 seaf_warning ("Failed to commit block %s in repo %.8s.\n",
4369 block_id, task->repo_id);
4370 task->error = SYNC_ERROR_ID_WRITE_LOCAL_DATA;
4371 ret = -1;
4372 }
4373
4374 if (ret == 0) {
4375 pcnt = g_hash_table_lookup (task->blk_ref_cnts, block_id);
4376 if (!pcnt) {
4377 pcnt = g_new0(int, 1);
4378 g_hash_table_insert (task->blk_ref_cnts, g_strdup(block_id), pcnt);
4379 }
4380 *pcnt += 1;
4381 }
4382
4383 pthread_mutex_unlock (&task->ref_cnt_lock);
4384
4385 seaf_block_manager_block_handle_free (seaf->block_mgr, block);
4386
4387 g_free (url);
4388
4389 return ret;
4390
4391 error:
4392 g_free (url);
4393
4394 seaf_block_manager_close_block (seaf->block_mgr, block);
4395 seaf_block_manager_block_handle_free (seaf->block_mgr, block);
4396
4397 return ret;
4398 }
4399
4400 int
http_tx_task_download_file_blocks(HttpTxTask * task,const char * file_id)4401 http_tx_task_download_file_blocks (HttpTxTask *task, const char *file_id)
4402 {
4403 Seafile *file;
4404 HttpTxPriv *priv = seaf->http_tx_mgr->priv;
4405 ConnectionPool *pool;
4406 Connection *conn;
4407 int ret = 0;
4408
4409 file = seaf_fs_manager_get_seafile (seaf->fs_mgr,
4410 task->repo_id,
4411 task->repo_version,
4412 file_id);
4413 if (!file) {
4414 seaf_warning ("Failed to find seafile object %s in repo %.8s.\n",
4415 file_id, task->repo_id);
4416 return -1;
4417 }
4418
4419 pool = find_connection_pool (priv, task->host);
4420 if (!pool) {
4421 seaf_warning ("Failed to create connection pool for host %s.\n", task->host);
4422 task->error = SYNC_ERROR_ID_NOT_ENOUGH_MEMORY;
4423 seafile_unref (file);
4424 return -1;
4425 }
4426
4427 conn = connection_pool_get_connection (pool);
4428 if (!conn) {
4429 seaf_warning ("Failed to get connection to host %s.\n", task->host);
4430 task->error = SYNC_ERROR_ID_NOT_ENOUGH_MEMORY;
4431 seafile_unref (file);
4432 return -1;
4433 }
4434
4435 int i;
4436 char *block_id;
4437 for (i = 0; i < file->n_blocks; ++i) {
4438 block_id = file->blk_sha1s[i];
4439 if (seaf_block_manager_block_exists (seaf->block_mgr,
4440 task->repo_id, task->repo_version,
4441 block_id)) {
4442 BlockMetadata *bmd;
4443 bmd = seaf_block_manager_stat_block (seaf->block_mgr,
4444 task->repo_id, task->repo_version,
4445 block_id);
4446 if (bmd) {
4447 task->done_download += bmd->size;
4448 g_free (bmd);
4449 continue;
4450 }
4451 }
4452
4453 ret = get_block (task, conn, block_id);
4454 if (ret < 0 || task->state == HTTP_TASK_STATE_CANCELED)
4455 break;
4456 }
4457
4458 connection_pool_return_connection (pool, conn);
4459
4460 seafile_unref (file);
4461
4462 return ret;
4463 }
4464
4465 static int
update_local_repo(HttpTxTask * task)4466 update_local_repo (HttpTxTask *task)
4467 {
4468 SeafRepo *repo;
4469 SeafCommit *new_head;
4470 SeafBranch *branch;
4471 int ret = 0;
4472
4473 new_head = seaf_commit_manager_get_commit (seaf->commit_mgr,
4474 task->repo_id,
4475 task->repo_version,
4476 task->head);
4477 if (!new_head) {
4478 seaf_warning ("Failed to get commit %s:%s.\n", task->repo_id, task->head);
4479 task->error = SYNC_ERROR_ID_LOCAL_DATA_CORRUPT;
4480 return -1;
4481 }
4482
4483 /* If repo doesn't exist, create it.
4484 * Note that branch doesn't exist either in this case.
4485 */
4486 repo = seaf_repo_manager_get_repo (seaf->repo_mgr, new_head->repo_id);
4487 if (task->is_clone) {
4488 if (repo != NULL)
4489 goto out;
4490
4491 repo = seaf_repo_new (new_head->repo_id, NULL, NULL);
4492 if (repo == NULL) {
4493 /* create repo failed */
4494 task->error = SYNC_ERROR_ID_NOT_ENOUGH_MEMORY;
4495 ret = -1;
4496 goto out;
4497 }
4498
4499 seaf_repo_from_commit (repo, new_head);
4500
4501 seaf_repo_manager_add_repo (seaf->repo_mgr, repo);
4502
4503 /* If it's a new repo, create 'local' and 'master' branch */
4504 branch = seaf_branch_new ("local", task->repo_id, task->head);
4505 seaf_branch_manager_add_branch (seaf->branch_mgr, branch);
4506 seaf_branch_unref (branch);
4507
4508 branch = seaf_branch_new ("master", task->repo_id, task->head);
4509 seaf_branch_manager_add_branch (seaf->branch_mgr, branch);
4510 seaf_branch_unref (branch);
4511 } else {
4512 if (!repo) {
4513 task->error = SYNC_ERROR_ID_LOCAL_DATA_CORRUPT;
4514 ret = -1;
4515 goto out;
4516 }
4517
4518 branch = seaf_branch_manager_get_branch (seaf->branch_mgr,
4519 task->repo_id,
4520 "master");
4521 if (!branch) {
4522 seaf_warning ("Branch master not found for repo %.8s.\n", task->repo_id);
4523 task->error = SYNC_ERROR_ID_LOCAL_DATA_CORRUPT;
4524 ret = -1;
4525 goto out;
4526 }
4527 seaf_branch_set_commit (branch, new_head->commit_id);
4528 seaf_branch_manager_update_branch (seaf->branch_mgr, branch);
4529 seaf_branch_unref (branch);
4530
4531 /* Update repo head branch. */
4532 seaf_branch_set_commit (repo->head, new_head->commit_id);
4533 seaf_branch_manager_update_branch (seaf->branch_mgr, repo->head);
4534
4535 if (g_strcmp0 (repo->name, new_head->repo_name) != 0)
4536 seaf_repo_set_name (repo, new_head->repo_name);
4537 }
4538
4539 out:
4540 seaf_commit_unref (new_head);
4541 return ret;
4542 }
4543
4544 static void *
http_download_thread(void * vdata)4545 http_download_thread (void *vdata)
4546 {
4547 HttpTxTask *task = vdata;
4548 HttpTxPriv *priv = seaf->http_tx_mgr->priv;
4549 ConnectionPool *pool;
4550 Connection *conn = NULL;
4551 GList *fs_id_list = NULL;
4552
4553 pool = find_connection_pool (priv, task->host);
4554 if (!pool) {
4555 seaf_warning ("Failed to create connection pool for host %s.\n", task->host);
4556 task->error = SYNC_ERROR_ID_NOT_ENOUGH_MEMORY;
4557 goto out;
4558 }
4559
4560 conn = connection_pool_get_connection (pool);
4561 if (!conn) {
4562 seaf_warning ("Failed to get connection to host %s.\n", task->host);
4563 task->error = SYNC_ERROR_ID_NOT_ENOUGH_MEMORY;
4564 goto out;
4565 }
4566
4567 /* seaf_message ("Download with HTTP sync protocol version %d.\n", */
4568 /* task->protocol_version); */
4569
4570 transition_state (task, task->state, HTTP_TASK_RT_STATE_CHECK);
4571
4572 if (check_permission (task, conn) < 0) {
4573 seaf_warning ("Download permission denied for repo %.8s on server %s.\n",
4574 task->repo_id, task->host);
4575 goto out;
4576 }
4577
4578 if (task->state == HTTP_TASK_STATE_CANCELED)
4579 goto out;
4580
4581 transition_state (task, task->state, HTTP_TASK_RT_STATE_COMMIT);
4582
4583 if (get_commit_object (task, conn) < 0) {
4584 seaf_warning ("Failed to get server head commit for repo %.8s on server %s.\n",
4585 task->repo_id, task->host);
4586 goto out;
4587 }
4588
4589 if (task->state == HTTP_TASK_STATE_CANCELED)
4590 goto out;
4591
4592 transition_state (task, task->state, HTTP_TASK_RT_STATE_FS);
4593
4594 if (get_needed_fs_id_list (task, conn, &fs_id_list) < 0) {
4595 seaf_warning ("Failed to get fs id list for repo %.8s on server %s.\n",
4596 task->repo_id, task->host);
4597 goto out;
4598 }
4599
4600 if (task->state == HTTP_TASK_STATE_CANCELED)
4601 goto out;
4602
4603 while (fs_id_list != NULL) {
4604 if (get_fs_objects (task, conn, &fs_id_list) < 0) {
4605 seaf_warning ("Failed to get fs objects for repo %.8s on server %s.\n",
4606 task->repo_id, task->host);
4607 goto out;
4608 }
4609
4610 if (task->state == HTTP_TASK_STATE_CANCELED)
4611 goto out;
4612 }
4613
4614 transition_state (task, task->state, HTTP_TASK_RT_STATE_BLOCK);
4615
4616 /* Record download head commit id, so that we can resume download
4617 * if this download is interrupted.
4618 */
4619 seaf_repo_manager_set_repo_property (seaf->repo_mgr,
4620 task->repo_id,
4621 REPO_PROP_DOWNLOAD_HEAD,
4622 task->head);
4623
4624 int rc = seaf_repo_fetch_and_checkout (task, task->head);
4625 switch (rc) {
4626 case FETCH_CHECKOUT_SUCCESS:
4627 break;
4628 case FETCH_CHECKOUT_CANCELED:
4629 goto out;
4630 case FETCH_CHECKOUT_FAILED:
4631 task->error = SYNC_ERROR_ID_WRITE_LOCAL_DATA;
4632 goto out;
4633 case FETCH_CHECKOUT_TRANSFER_ERROR:
4634 goto out;
4635 case FETCH_CHECKOUT_LOCKED:
4636 task->error = SYNC_ERROR_ID_FILE_LOCKED_BY_APP;
4637 goto out;
4638 }
4639
4640 update_local_repo (task);
4641
4642 out:
4643 connection_pool_return_connection (pool, conn);
4644 string_list_free (fs_id_list);
4645 return vdata;
4646 }
4647
4648 static void
http_download_done(void * vdata)4649 http_download_done (void *vdata)
4650 {
4651 HttpTxTask *task = vdata;
4652
4653 if (task->error != SYNC_ERROR_ID_NO_ERROR)
4654 transition_state (task, HTTP_TASK_STATE_ERROR, HTTP_TASK_RT_STATE_FINISHED);
4655 else if (task->state == HTTP_TASK_STATE_CANCELED)
4656 transition_state (task, task->state, HTTP_TASK_RT_STATE_FINISHED);
4657 else
4658 transition_state (task, HTTP_TASK_STATE_FINISHED, HTTP_TASK_RT_STATE_FINISHED);
4659 }
4660
4661 GList*
http_tx_manager_get_upload_tasks(HttpTxManager * manager)4662 http_tx_manager_get_upload_tasks (HttpTxManager *manager)
4663 {
4664 return g_hash_table_get_values (manager->priv->upload_tasks);
4665 }
4666
4667 GList*
http_tx_manager_get_download_tasks(HttpTxManager * manager)4668 http_tx_manager_get_download_tasks (HttpTxManager *manager)
4669 {
4670 return g_hash_table_get_values (manager->priv->download_tasks);
4671 }
4672
4673 HttpTxTask *
http_tx_manager_find_task(HttpTxManager * manager,const char * repo_id)4674 http_tx_manager_find_task (HttpTxManager *manager, const char *repo_id)
4675 {
4676 HttpTxTask *task = NULL;
4677
4678 task = g_hash_table_lookup (manager->priv->upload_tasks, repo_id);
4679 if (task)
4680 return task;
4681
4682 task = g_hash_table_lookup (manager->priv->download_tasks, repo_id);
4683 return task;
4684 }
4685
4686 void
http_tx_manager_cancel_task(HttpTxManager * manager,const char * repo_id,int task_type)4687 http_tx_manager_cancel_task (HttpTxManager *manager,
4688 const char *repo_id,
4689 int task_type)
4690 {
4691 HttpTxTask *task = NULL;
4692
4693 if (task_type == HTTP_TASK_TYPE_DOWNLOAD)
4694 task = g_hash_table_lookup (manager->priv->download_tasks, repo_id);
4695 else
4696 task = g_hash_table_lookup (manager->priv->upload_tasks, repo_id);
4697
4698 if (!task)
4699 return;
4700
4701 if (task->state != HTTP_TASK_STATE_NORMAL) {
4702 seaf_warning ("Cannot cancel task not in NORMAL state.\n");
4703 return;
4704 }
4705
4706 if (task->runtime_state == HTTP_TASK_RT_STATE_INIT) {
4707 transition_state (task, HTTP_TASK_STATE_CANCELED, HTTP_TASK_RT_STATE_FINISHED);
4708 return;
4709 }
4710
4711 /* Only change state. runtime_state will be changed in worker thread. */
4712 transition_state (task, HTTP_TASK_STATE_CANCELED, task->runtime_state);
4713 }
4714
4715 int
http_tx_task_get_rate(HttpTxTask * task)4716 http_tx_task_get_rate (HttpTxTask *task)
4717 {
4718 return task->last_tx_bytes;
4719 }
4720
4721 const char *
http_task_state_to_str(int state)4722 http_task_state_to_str (int state)
4723 {
4724 if (state < 0 || state >= N_HTTP_TASK_STATE)
4725 return "unknown";
4726
4727 return http_task_state_str[state];
4728 }
4729
4730 const char *
http_task_rt_state_to_str(int rt_state)4731 http_task_rt_state_to_str (int rt_state)
4732 {
4733 if (rt_state < 0 || rt_state >= N_HTTP_TASK_RT_STATE)
4734 return "unknown";
4735
4736 return http_task_rt_state_str[rt_state];
4737 }
4738