1 /*
2  * Copyright (C) 2021 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3  *
4  * This file is part of MooseFS.
5  *
6  * MooseFS is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, version 2 (only).
9  *
10  * MooseFS is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with MooseFS; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18  * or visit http://www.gnu.org/licenses/gpl-2.0.html
19  */
20 
21 #include <inttypes.h>
22 #include <stdlib.h>
23 #include <pthread.h>
24 
25 #include "MFSCommunication.h"
26 #include "massert.h"
27 #include "lwthread.h"
28 #ifdef MFSMOUNT
29 #include "mfs_fuse.h"
30 #include "fdcache.h"
31 #endif
32 #include "chunksdatacache.h"
33 #include "readdata.h"
34 
35 #define MAX_UNUSED_CNT 100
36 
37 enum {CHUNK_CHANGED,FLENG_CHANGED,EXIT};
38 
39 typedef struct _extra_packets {
40 	uint32_t cmd;
41 	uint32_t inode;
42 	uint32_t chindx;
43 	uint64_t chunkid;
44 	uint32_t version;
45 	uint64_t fleng;
46 	uint8_t truncflag;
47 	struct _extra_packets *next;
48 } extra_packets;
49 
50 static extra_packets *ep_head,**ep_tail;
51 static extra_packets *ep_unused;
52 static uint32_t ep_unused_cnt;
53 static pthread_mutex_t ep_lock;
54 static pthread_cond_t ep_cond;
55 static pthread_t ep_worker;
56 
ep_get_packet(void)57 static inline extra_packets* ep_get_packet(void) {
58 	extra_packets *ep;
59 	if (ep_unused!=NULL) {
60 		ep = ep_unused;
61 		ep_unused = ep_unused->next;
62 		ep_unused_cnt--;
63 	} else {
64 		ep = malloc(sizeof(extra_packets));
65 		passert(ep);
66 	}
67 	return ep;
68 }
69 
ep_append_packet(extra_packets * ep)70 static inline void ep_append_packet(extra_packets *ep) {
71 	uint8_t wakeup;
72 	wakeup = (ep_head==NULL)?1:0;
73 	ep->next = NULL;
74 	*ep_tail = ep;
75 	ep_tail = &(ep->next);
76 	if (wakeup) {
77 		pthread_cond_signal(&ep_cond);
78 	}
79 }
80 
ep_free_packet(extra_packets * ep)81 static inline void ep_free_packet(extra_packets *ep) {
82 	if (ep_unused_cnt>=MAX_UNUSED_CNT) {
83 		free(ep);
84 	} else {
85 		ep->next = ep_unused;
86 		ep_unused = ep;
87 		ep_unused_cnt++;
88 	}
89 }
90 
ep_thread(void * arg)91 void* ep_thread(void *arg) {
92 	extra_packets *ep = NULL;
93 	zassert(pthread_mutex_lock(&ep_lock));
94 	while (1) {
95 		while (ep_head==NULL) {
96 			zassert(pthread_cond_wait(&ep_cond,&ep_lock));
97 		}
98 		ep = ep_head;
99 		ep_head = ep->next;
100 		if (ep_head==NULL) {
101 			ep_tail = &ep_head;
102 		}
103 		zassert(pthread_mutex_unlock(&ep_lock));
104 		switch (ep->cmd) {
105 			case CHUNK_CHANGED:
106 				chunksdatacache_change(ep->inode,ep->chindx,ep->chunkid,ep->version);
107 				if (ep->truncflag) {
108 					chunksdatacache_clear_inode(ep->inode,ep->chindx+1);
109 					read_inode_clear_cache(ep->inode,(uint64_t)(ep->chindx)*MFSCHUNKSIZE,0);
110 					read_inode_set_length_passive(ep->inode,ep->fleng);
111 #ifdef MFSMOUNT
112 					fdcache_invalidate(ep->inode);
113 					mfs_inode_change_fleng(ep->inode,ep->fleng);
114 					mfs_inode_clear_cache(ep->inode,(uint64_t)(ep->chindx)*MFSCHUNKSIZE,0);
115 #endif
116 				} else {
117 					read_inode_clear_cache(ep->inode,(uint64_t)(ep->chindx)*MFSCHUNKSIZE,MFSCHUNKSIZE);
118 #ifdef MFSMOUNT
119 					fdcache_invalidate(ep->inode);
120 					mfs_inode_clear_cache(ep->inode,(uint64_t)(ep->chindx)*MFSCHUNKSIZE,MFSCHUNKSIZE);
121 #endif
122 				}
123 				break;
124 			case FLENG_CHANGED:
125 				read_inode_set_length_passive(ep->inode,ep->fleng);
126 #ifdef MFSMOUNT
127 				fdcache_invalidate(ep->inode);
128 				mfs_inode_clear_cache(ep->inode,ep->fleng,0);
129 				mfs_inode_change_fleng(ep->inode,ep->fleng);
130 #endif
131 				break;
132 			default:
133 				free(ep);
134 				return arg;
135 		}
136 		zassert(pthread_mutex_lock(&ep_lock));
137 		ep_free_packet(ep);
138 	}
139 	zassert(pthread_mutex_unlock(&ep_lock)); // pro forma - unreachable
140 	return arg; // pro forma - unreachable
141 }
142 
ep_chunk_has_changed(uint32_t inode,uint32_t chindx,uint64_t chunkid,uint32_t version,uint64_t fleng,uint8_t truncflag)143 void ep_chunk_has_changed(uint32_t inode,uint32_t chindx,uint64_t chunkid,uint32_t version,uint64_t fleng,uint8_t truncflag) {
144 	extra_packets *ep;
145 	zassert(pthread_mutex_lock(&ep_lock));
146 	ep = ep_get_packet();
147 	ep->cmd = CHUNK_CHANGED;
148 	ep->inode = inode;
149 	ep->chindx = chindx;
150 	ep->chunkid = chunkid;
151 	ep->version = version;
152 	ep->fleng = fleng;
153 	ep->truncflag = truncflag;
154 	ep_append_packet(ep);
155 	zassert(pthread_mutex_unlock(&ep_lock));
156 }
157 
ep_fleng_has_changed(uint32_t inode,uint64_t fleng)158 void ep_fleng_has_changed(uint32_t inode,uint64_t fleng) {
159 	extra_packets *ep;
160 	zassert(pthread_mutex_lock(&ep_lock));
161 	ep = ep_get_packet();
162 	ep->cmd = FLENG_CHANGED;
163 	ep->inode = inode;
164 	ep->fleng = fleng;
165 	ep_append_packet(ep);
166 	zassert(pthread_mutex_unlock(&ep_lock));
167 }
168 
ep_term(void)169 void ep_term(void) {
170 	extra_packets *ep,*epn;
171 	zassert(pthread_mutex_lock(&ep_lock));
172 	ep = ep_get_packet();
173 	ep->cmd = EXIT;
174 	ep_append_packet(ep);
175 	zassert(pthread_mutex_unlock(&ep_lock));
176 	pthread_join(ep_worker,NULL);
177 	for (ep = ep_head ; ep ; ep = epn) {
178 		epn = ep->next;
179 		free(ep);
180 	}
181 	for (ep = ep_unused ; ep ; ep = epn) {
182 		epn = ep->next;
183 		free(ep);
184 	}
185 	zassert(pthread_cond_destroy(&ep_cond));
186 	zassert(pthread_mutex_destroy(&ep_lock));
187 }
188 
ep_init(void)189 void ep_init(void) {
190 	ep_head = NULL;
191 	ep_tail = &(ep_head);
192 	ep_unused = NULL;
193 	ep_unused_cnt = 0;
194 	zassert(pthread_mutex_init(&ep_lock,NULL));
195 	zassert(pthread_cond_init(&ep_cond,NULL));
196 	lwt_minthread_create(&ep_worker,0,ep_thread,NULL);
197 }
198