1 /*
2 * Copyright (C) 2021 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3 *
4 * This file is part of MooseFS.
5 *
6 * MooseFS is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, version 2 (only).
9 *
10 * MooseFS is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with MooseFS; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18 * or visit http://www.gnu.org/licenses/gpl-2.0.html
19 */
20
21 #include <inttypes.h>
22 #include <stdlib.h>
23 #include <pthread.h>
24
25 #include "MFSCommunication.h"
26 #include "massert.h"
27 #include "lwthread.h"
28 #ifdef MFSMOUNT
29 #include "mfs_fuse.h"
30 #include "fdcache.h"
31 #endif
32 #include "chunksdatacache.h"
33 #include "readdata.h"
34
35 #define MAX_UNUSED_CNT 100
36
37 enum {CHUNK_CHANGED,FLENG_CHANGED,EXIT};
38
39 typedef struct _extra_packets {
40 uint32_t cmd;
41 uint32_t inode;
42 uint32_t chindx;
43 uint64_t chunkid;
44 uint32_t version;
45 uint64_t fleng;
46 uint8_t truncflag;
47 struct _extra_packets *next;
48 } extra_packets;
49
50 static extra_packets *ep_head,**ep_tail;
51 static extra_packets *ep_unused;
52 static uint32_t ep_unused_cnt;
53 static pthread_mutex_t ep_lock;
54 static pthread_cond_t ep_cond;
55 static pthread_t ep_worker;
56
ep_get_packet(void)57 static inline extra_packets* ep_get_packet(void) {
58 extra_packets *ep;
59 if (ep_unused!=NULL) {
60 ep = ep_unused;
61 ep_unused = ep_unused->next;
62 ep_unused_cnt--;
63 } else {
64 ep = malloc(sizeof(extra_packets));
65 passert(ep);
66 }
67 return ep;
68 }
69
ep_append_packet(extra_packets * ep)70 static inline void ep_append_packet(extra_packets *ep) {
71 uint8_t wakeup;
72 wakeup = (ep_head==NULL)?1:0;
73 ep->next = NULL;
74 *ep_tail = ep;
75 ep_tail = &(ep->next);
76 if (wakeup) {
77 pthread_cond_signal(&ep_cond);
78 }
79 }
80
ep_free_packet(extra_packets * ep)81 static inline void ep_free_packet(extra_packets *ep) {
82 if (ep_unused_cnt>=MAX_UNUSED_CNT) {
83 free(ep);
84 } else {
85 ep->next = ep_unused;
86 ep_unused = ep;
87 ep_unused_cnt++;
88 }
89 }
90
ep_thread(void * arg)91 void* ep_thread(void *arg) {
92 extra_packets *ep = NULL;
93 zassert(pthread_mutex_lock(&ep_lock));
94 while (1) {
95 while (ep_head==NULL) {
96 zassert(pthread_cond_wait(&ep_cond,&ep_lock));
97 }
98 ep = ep_head;
99 ep_head = ep->next;
100 if (ep_head==NULL) {
101 ep_tail = &ep_head;
102 }
103 zassert(pthread_mutex_unlock(&ep_lock));
104 switch (ep->cmd) {
105 case CHUNK_CHANGED:
106 chunksdatacache_change(ep->inode,ep->chindx,ep->chunkid,ep->version);
107 if (ep->truncflag) {
108 chunksdatacache_clear_inode(ep->inode,ep->chindx+1);
109 read_inode_clear_cache(ep->inode,(uint64_t)(ep->chindx)*MFSCHUNKSIZE,0);
110 read_inode_set_length_passive(ep->inode,ep->fleng);
111 #ifdef MFSMOUNT
112 fdcache_invalidate(ep->inode);
113 mfs_inode_change_fleng(ep->inode,ep->fleng);
114 mfs_inode_clear_cache(ep->inode,(uint64_t)(ep->chindx)*MFSCHUNKSIZE,0);
115 #endif
116 } else {
117 read_inode_clear_cache(ep->inode,(uint64_t)(ep->chindx)*MFSCHUNKSIZE,MFSCHUNKSIZE);
118 #ifdef MFSMOUNT
119 fdcache_invalidate(ep->inode);
120 mfs_inode_clear_cache(ep->inode,(uint64_t)(ep->chindx)*MFSCHUNKSIZE,MFSCHUNKSIZE);
121 #endif
122 }
123 break;
124 case FLENG_CHANGED:
125 read_inode_set_length_passive(ep->inode,ep->fleng);
126 #ifdef MFSMOUNT
127 fdcache_invalidate(ep->inode);
128 mfs_inode_clear_cache(ep->inode,ep->fleng,0);
129 mfs_inode_change_fleng(ep->inode,ep->fleng);
130 #endif
131 break;
132 default:
133 free(ep);
134 return arg;
135 }
136 zassert(pthread_mutex_lock(&ep_lock));
137 ep_free_packet(ep);
138 }
139 zassert(pthread_mutex_unlock(&ep_lock)); // pro forma - unreachable
140 return arg; // pro forma - unreachable
141 }
142
ep_chunk_has_changed(uint32_t inode,uint32_t chindx,uint64_t chunkid,uint32_t version,uint64_t fleng,uint8_t truncflag)143 void ep_chunk_has_changed(uint32_t inode,uint32_t chindx,uint64_t chunkid,uint32_t version,uint64_t fleng,uint8_t truncflag) {
144 extra_packets *ep;
145 zassert(pthread_mutex_lock(&ep_lock));
146 ep = ep_get_packet();
147 ep->cmd = CHUNK_CHANGED;
148 ep->inode = inode;
149 ep->chindx = chindx;
150 ep->chunkid = chunkid;
151 ep->version = version;
152 ep->fleng = fleng;
153 ep->truncflag = truncflag;
154 ep_append_packet(ep);
155 zassert(pthread_mutex_unlock(&ep_lock));
156 }
157
ep_fleng_has_changed(uint32_t inode,uint64_t fleng)158 void ep_fleng_has_changed(uint32_t inode,uint64_t fleng) {
159 extra_packets *ep;
160 zassert(pthread_mutex_lock(&ep_lock));
161 ep = ep_get_packet();
162 ep->cmd = FLENG_CHANGED;
163 ep->inode = inode;
164 ep->fleng = fleng;
165 ep_append_packet(ep);
166 zassert(pthread_mutex_unlock(&ep_lock));
167 }
168
ep_term(void)169 void ep_term(void) {
170 extra_packets *ep,*epn;
171 zassert(pthread_mutex_lock(&ep_lock));
172 ep = ep_get_packet();
173 ep->cmd = EXIT;
174 ep_append_packet(ep);
175 zassert(pthread_mutex_unlock(&ep_lock));
176 pthread_join(ep_worker,NULL);
177 for (ep = ep_head ; ep ; ep = epn) {
178 epn = ep->next;
179 free(ep);
180 }
181 for (ep = ep_unused ; ep ; ep = epn) {
182 epn = ep->next;
183 free(ep);
184 }
185 zassert(pthread_cond_destroy(&ep_cond));
186 zassert(pthread_mutex_destroy(&ep_lock));
187 }
188
ep_init(void)189 void ep_init(void) {
190 ep_head = NULL;
191 ep_tail = &(ep_head);
192 ep_unused = NULL;
193 ep_unused_cnt = 0;
194 zassert(pthread_mutex_init(&ep_lock,NULL));
195 zassert(pthread_cond_init(&ep_cond,NULL));
196 lwt_minthread_create(&ep_worker,0,ep_thread,NULL);
197 }
198