xref: /reactos/base/services/nfsd/pnfs_io.c (revision b707be90)
1 /* NFSv4.1 client for Windows
2  * Copyright � 2012 The Regents of the University of Michigan
3  *
4  * Olga Kornievskaia <aglo@umich.edu>
5  * Casey Bodley <cbodley@umich.edu>
6  *
7  * This library is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU Lesser General Public License as published by
9  * the Free Software Foundation; either version 2.1 of the License, or (at
10  * your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful, but
13  * without any warranty; without even the implied warranty of merchantability
14  * or fitness for a particular purpose.  See the GNU Lesser General Public
15  * License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with this library; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA
20  */
21 
22 #include <stdio.h>
23 #include <process.h>
24 
25 #include "nfs41_ops.h"
26 #include "util.h"
27 #include "daemon_debug.h"
28 
29 
30 #define IOLVL 2 /* dprintf level for pnfs io logging */
31 
32 #define file_layout_entry(pos) list_container(pos, pnfs_file_layout, layout.entry)
33 
34 typedef struct __pnfs_io_pattern {
35     struct __pnfs_io_thread *threads;
36     nfs41_root              *root;
37     nfs41_path_fh           *meta_file;
38     const stateid_arg       *stateid;
39     pnfs_layout_state       *state;
40     unsigned char           *buffer;
41     uint64_t                offset_start;
42     uint64_t                offset_end;
43     uint32_t                count;
44     uint32_t                default_lease;
45 } pnfs_io_pattern;
46 
47 typedef struct __pnfs_io_thread {
48     nfs41_write_verf        verf;
49     pnfs_io_pattern         *pattern;
50     pnfs_file_layout        *layout;
51     nfs41_path_fh           *file;
52     uint64_t                offset;
53     uint32_t                id;
54     enum stable_how4        stable;
55 } pnfs_io_thread;
56 
57 typedef struct __pnfs_io_unit {
58     unsigned char           *buffer;
59     uint64_t                offset;
60     uint64_t                length;
61     uint32_t                stripeid;
62     uint32_t                serverid;
63 } pnfs_io_unit;
64 
65 typedef uint32_t (WINAPI *pnfs_io_thread_fn)(void*);
66 
67 
68 static enum pnfs_status stripe_next_unit(
69     IN const pnfs_file_layout *layout,
70     IN uint32_t stripeid,
71     IN uint64_t *position,
72     IN uint64_t offset_end,
73     OUT pnfs_io_unit *io);
74 
75 /* 13.4.2. Interpreting the File Layout Using Sparse Packing
76  * http://tools.ietf.org/html/rfc5661#section-13.4.2 */
77 
78 static enum pnfs_status get_sparse_fh(
79     IN pnfs_file_layout *layout,
80     IN nfs41_path_fh *meta_file,
81     IN uint32_t stripeid,
82     OUT nfs41_path_fh **file_out)
83 {
84     const uint32_t filehandle_count = layout->filehandles.count;
85     const uint32_t server_count = layout->device->servers.count;
86     enum pnfs_status status = PNFS_SUCCESS;
87 
88     if (filehandle_count == server_count) {
89         const uint32_t serverid = data_server_index(layout->device, stripeid);
90         *file_out = &layout->filehandles.arr[serverid];
91     } else if (filehandle_count == 1) {
92         *file_out = &layout->filehandles.arr[0];
93     } else if (filehandle_count == 0) {
94         *file_out = meta_file;
95     } else {
96         eprintf("invalid sparse layout! has %u file handles "
97             "and %u servers\n", filehandle_count, server_count);
98         status = PNFSERR_INVALID_FH_LIST;
99     }
100     return status;
101 }
102 
103 /* 13.4.3. Interpreting the File Layout Using Dense Packing
104 * http://tools.ietf.org/html/rfc5661#section-13.4.3 */
105 
106 static enum pnfs_status get_dense_fh(
107     IN pnfs_file_layout *layout,
108     IN uint32_t stripeid,
109     OUT nfs41_path_fh **file_out)
110 {
111     const uint32_t filehandle_count = layout->filehandles.count;
112     const uint32_t stripe_count = layout->device->stripes.count;
113     enum pnfs_status status = PNFS_SUCCESS;
114 
115     if (filehandle_count == stripe_count) {
116         *file_out = &layout->filehandles.arr[stripeid];
117     } else {
118         eprintf("invalid dense layout! has %u file handles "
119             "and %u stripes\n", filehandle_count, stripe_count);
120         status = PNFSERR_INVALID_FH_LIST;
121     }
122     return status;
123 }
124 
125 static __inline bool_t layout_compatible(
126     IN const pnfs_layout *layout,
127     IN enum pnfs_iomode iomode,
128     IN uint64_t position)
129 {
130     return layout->iomode >= iomode
131         && layout->offset <= position
132         && position < layout->offset + layout->length;
133 }
134 
135 /* count stripes for all layout segments that intersect the range
136  * and have not been covered by previous segments */
137 static uint32_t thread_count(
138     IN pnfs_layout_state *state,
139     IN enum pnfs_iomode iomode,
140     IN uint64_t offset,
141     IN uint64_t length)
142 {
143     uint64_t position = offset;
144     struct list_entry *entry;
145     uint32_t count = 0;
146 
147     list_for_each(entry, &state->layouts) {
148         pnfs_file_layout *layout = file_layout_entry(entry);
149 
150         if (!layout_compatible(&layout->layout, iomode, position))
151             continue;
152 
153         position = layout->layout.offset + layout->layout.length;
154         count += layout->device->stripes.count;
155     }
156     return count;
157 }
158 
159 static enum pnfs_status thread_init(
160     IN pnfs_io_pattern *pattern,
161     IN pnfs_io_thread *thread,
162     IN pnfs_file_layout *layout,
163     IN uint32_t stripeid,
164     IN uint64_t offset)
165 {
166     thread->pattern = pattern;
167     thread->layout = layout;
168     thread->stable = FILE_SYNC4;
169     thread->offset = offset;
170     thread->id = stripeid;
171 
172     return is_dense(layout) ? get_dense_fh(layout, stripeid, &thread->file)
173         : get_sparse_fh(layout, pattern->meta_file, stripeid, &thread->file);
174 }
175 
176 static enum pnfs_status pattern_threads_init(
177     IN pnfs_io_pattern *pattern,
178     IN enum pnfs_iomode iomode,
179     IN uint64_t offset,
180     IN uint64_t length)
181 {
182     pnfs_io_unit io;
183     uint64_t position = offset;
184     struct list_entry *entry;
185     uint32_t s, t = 0;
186     enum pnfs_status status = PNFS_SUCCESS;
187 
188     list_for_each(entry, &pattern->state->layouts) {
189         pnfs_file_layout *layout = file_layout_entry(entry);
190 
191         if (!layout_compatible(&layout->layout, iomode, position))
192             continue;
193 
194         for (s = 0; s < layout->device->stripes.count; s++) {
195             uint64_t off = position;
196 
197             /* does the range contain this stripe? */
198             status = stripe_next_unit(layout, s, &off, offset + length, &io);
199             if (status != PNFS_PENDING)
200                 continue;
201 
202             if (t >= pattern->count) { /* miscounted threads needed? */
203                 status = PNFSERR_NO_LAYOUT;
204                 goto out;
205             }
206 
207             status = thread_init(pattern, &pattern->threads[t++], layout, s, off);
208             if (status)
209                 goto out;
210         }
211         position = layout->layout.offset + layout->layout.length;
212     }
213 
214     if (position < offset + length) {
215         /* unable to satisfy the entire range */
216         status = PNFSERR_NO_LAYOUT;
217         goto out;
218     }
219 
220     /* update the pattern with the actual number of threads used */
221     pattern->count = t;
222 out:
223     return status;
224 }
225 
226 static enum pnfs_status pattern_init(
227     IN pnfs_io_pattern *pattern,
228     IN nfs41_root *root,
229     IN nfs41_path_fh *meta_file,
230     IN const stateid_arg *stateid,
231     IN pnfs_layout_state *state,
232     IN unsigned char *buffer,
233     IN enum pnfs_iomode iomode,
234     IN uint64_t offset,
235     IN uint64_t length,
236     IN uint32_t default_lease)
237 {
238     enum pnfs_status status;
239 
240     /* calculate an upper bound on the number of threads to allocate */
241     pattern->count = thread_count(state, iomode, offset, length);
242     pattern->threads = calloc(pattern->count, sizeof(pnfs_io_thread));
243     if (pattern->threads == NULL) {
244         status = PNFSERR_RESOURCES;
245         goto out;
246     }
247 
248     /* information shared between threads */
249     pattern->root = root;
250     pattern->meta_file = meta_file;
251     pattern->stateid = stateid;
252     pattern->state = state;
253     pattern->buffer = buffer;
254     pattern->offset_start = offset;
255     pattern->offset_end = offset + length;
256     pattern->default_lease = default_lease;
257 
258     /* initialize a thread for every stripe necessary to cover the range */
259     status = pattern_threads_init(pattern, iomode, offset, length);
260     if (status)
261         goto out_err_free;
262 
263     /* take a reference on the layout so we don't return it during io */
264     pnfs_layout_io_start(state);
265 out:
266     return status;
267 
268 out_err_free:
269     free(pattern->threads);
270     pattern->threads = NULL;
271     goto out;
272 }
273 
274 static void pattern_free(
275     IN pnfs_io_pattern *pattern)
276 {
277     /* inform the layout that our io is finished */
278     pnfs_layout_io_finished(pattern->state);
279     free(pattern->threads);
280 }
281 
282 static __inline uint64_t positive_remainder(
283     IN uint64_t dividend,
284     IN uint32_t divisor)
285 {
286     const uint64_t remainder = dividend % divisor;
287     return remainder < divisor ? remainder : remainder + divisor;
288 }
289 
290 /* return the next unit of the given stripeid */
291 static enum pnfs_status stripe_next_unit(
292     IN const pnfs_file_layout *layout,
293     IN uint32_t stripeid,
294     IN uint64_t *position,
295     IN uint64_t offset_end,
296     OUT pnfs_io_unit *io)
297 {
298     const uint32_t unit_size = layout_unit_size(layout);
299     const uint32_t stripe_count = layout->device->stripes.count;
300     uint64_t sui = stripe_unit_number(layout, *position, unit_size);
301 
302     /* advance to the desired stripeid */
303     sui += abs(stripeid - stripe_index(layout, sui, stripe_count));
304 
305     io->offset = stripe_unit_offset(layout, sui, unit_size);
306     if (io->offset < *position) /* don't start before position */
307         io->offset = *position;
308     else
309         *position = io->offset;
310 
311     io->length = stripe_unit_offset(layout, sui + 1, unit_size);
312     if (io->length > offset_end) /* don't end past offset_end */
313         io->length = offset_end;
314 
315     if (io->offset >= io->length) /* nothing to do, return success */
316         return PNFS_SUCCESS;
317 
318     io->length -= io->offset;
319 
320     if (is_dense(layout)) {
321         const uint64_t rel_offset = io->offset - layout->pattern_offset;
322         const uint64_t remainder = positive_remainder(rel_offset, unit_size);
323         const uint32_t stride = unit_size * stripe_count;
324 
325         io->offset = (rel_offset / stride) * unit_size + remainder;
326     }
327     return PNFS_PENDING;
328 }
329 
330 static enum pnfs_status thread_next_unit(
331     IN pnfs_io_thread *thread,
332     OUT pnfs_io_unit *io)
333 {
334     pnfs_io_pattern *pattern = thread->pattern;
335     pnfs_layout_state *state = pattern->state;
336     enum pnfs_status status;
337 
338     AcquireSRWLockShared(&state->lock);
339 
340     /* stop io if the layout is recalled */
341     status = pnfs_layout_recall_status(state, &thread->layout->layout);
342     if (status)
343         goto out_unlock;
344 
345     status = stripe_next_unit(thread->layout, thread->id,
346         &thread->offset, pattern->offset_end, io);
347     if (status == PNFS_PENDING)
348         io->buffer = pattern->buffer + thread->offset - pattern->offset_start;
349 
350 out_unlock:
351     ReleaseSRWLockShared(&state->lock);
352     return status;
353 }
354 
355 static enum pnfs_status thread_data_server(
356     IN pnfs_io_thread *thread,
357     OUT pnfs_data_server **server_out)
358 {
359     pnfs_file_device *device = thread->layout->device;
360     const uint32_t serverid = data_server_index(device, thread->id);
361 
362     if (serverid >= device->servers.count)
363         return PNFSERR_INVALID_DS_INDEX;
364 
365     *server_out = &device->servers.arr[serverid];
366     return PNFS_SUCCESS;
367 }
368 
369 static enum pnfs_status pattern_join(
370     IN HANDLE *threads,
371     IN DWORD count)
372 {
373     DWORD status;
374     /* WaitForMultipleObjects() supports a maximum of 64 objects */
375     while (count) {
376         const DWORD n = min(count, MAXIMUM_WAIT_OBJECTS);
377         status = WaitForMultipleObjects(n, threads, TRUE, INFINITE);
378         if (status != WAIT_OBJECT_0)
379             return PNFSERR_RESOURCES;
380 
381         count -= n;
382         threads += n;
383     }
384     return PNFS_SUCCESS;
385 }
386 
387 static enum pnfs_status pattern_fork(
388     IN pnfs_io_pattern *pattern,
389     IN pnfs_io_thread_fn thread_fn)
390 {
391     HANDLE *threads;
392     uint32_t i;
393     enum pnfs_status status = PNFS_SUCCESS;
394 
395     if (pattern->count == 0)
396         goto out;
397 
398     if (pattern->count == 1) {
399         /* no need to fork if there's only 1 thread */
400         status = (enum pnfs_status)thread_fn(pattern->threads);
401         goto out;
402     }
403 
404     /* create a thread for each unit that has actual io */
405     threads = calloc(pattern->count, sizeof(HANDLE));
406     if (threads == NULL) {
407         status = PNFSERR_RESOURCES;
408         goto out;
409     }
410 
411     for (i = 0; i < pattern->count; i++) {
412         threads[i] = (HANDLE)_beginthreadex(NULL, 0,
413             thread_fn, &pattern->threads[i], 0, NULL);
414         if (threads[i] == NULL) {
415             eprintf("_beginthreadex() failed with %d\n", GetLastError());
416             pattern->count = i; /* join any threads already started */
417             break;
418         }
419     }
420 
421     /* wait on all threads to finish */
422     status = pattern_join(threads, pattern->count);
423     if (status) {
424         eprintf("pattern_join() failed with %s\n", pnfs_error_string(status));
425         goto out;
426     }
427 
428     for (i = 0; i < pattern->count; i++) {
429         /* keep track of the most severe error returned by a thread */
430         DWORD exitcode;
431         if (GetExitCodeThread(threads[i], &exitcode))
432             status = max(status, (enum pnfs_status)exitcode);
433 
434         CloseHandle(threads[i]);
435     }
436 
437     free(threads);
438 out:
439     return status;
440 }
441 
442 static uint64_t pattern_bytes_transferred(
443     IN pnfs_io_pattern *pattern,
444     OUT OPTIONAL enum stable_how4 *stable)
445 {
446     uint64_t lowest_offset = pattern->offset_end;
447     uint32_t i;
448 
449     if (stable) *stable = FILE_SYNC4;
450 
451     for (i = 0; i < pattern->count; i++) {
452         lowest_offset = min(lowest_offset, pattern->threads[i].offset);
453         if (stable) *stable = min(*stable, pattern->threads[i].stable);
454     }
455     return lowest_offset - pattern->offset_start;
456 }
457 
458 
459 static enum pnfs_status map_ds_error(
460     IN enum nfsstat4 nfsstat,
461     IN pnfs_layout_state *state,
462     IN const pnfs_file_layout *layout)
463 {
464     switch (nfsstat) {
465     case NO_ERROR:
466         return PNFS_SUCCESS;
467 
468     /* 13.11 Layout Revocation and Fencing
469      * http://tools.ietf.org/html/rfc5661#section-13.11
470      * if we've been fenced, we'll either get ERR_STALE when we PUTFH
471      * something in layout.filehandles, or ERR_PNFS_NO_LAYOUT when
472      * attempting to READ or WRITE */
473     case NFS4ERR_STALE:
474     case NFS4ERR_PNFS_NO_LAYOUT:
475         dprintf(IOLVL, "data server fencing detected!\n");
476 
477         pnfs_layout_recall_fenced(state, &layout->layout);
478 
479         /* return CHANGED to prevent any further use of the layout */
480         return PNFSERR_LAYOUT_CHANGED;
481 
482     default:
483         return PNFSERR_IO;
484     }
485 }
486 
487 static uint32_t WINAPI file_layout_read_thread(void *args)
488 {
489     pnfs_io_unit io;
490     stateid_arg stateid;
491     pnfs_io_thread *thread = (pnfs_io_thread*)args;
492     pnfs_io_pattern *pattern = thread->pattern;
493     pnfs_data_server *server;
494     nfs41_client *client;
495     uint32_t maxreadsize, bytes_read, total_read;
496     enum pnfs_status status;
497     enum nfsstat4 nfsstat;
498     bool_t eof;
499 
500     dprintf(IOLVL, "--> file_layout_read_thread(%u)\n", thread->id);
501 
502     /* get the data server for this thread */
503     status = thread_data_server(thread, &server);
504     if (status) {
505         eprintf("thread_data_server() failed with %s\n",
506             pnfs_error_string(status));
507         goto out;
508     }
509     /* find or establish a client for this data server */
510     status = pnfs_data_server_client(pattern->root,
511         server, pattern->default_lease, &client);
512     if (status) {
513         eprintf("pnfs_data_server_client() failed with %s\n",
514             pnfs_error_string(status));
515         goto out;
516     }
517 
518     memcpy(&stateid, pattern->stateid, sizeof(stateid));
519     stateid.stateid.seqid = 0;
520 
521     total_read = 0;
522     while (thread_next_unit(thread, &io) == PNFS_PENDING) {
523         maxreadsize = max_read_size(client->session, &thread->file->fh);
524         if (io.length > maxreadsize)
525             io.length = maxreadsize;
526 
527         nfsstat = nfs41_read(client->session, thread->file, &stateid,
528             io.offset, (uint32_t)io.length, io.buffer, &bytes_read, &eof);
529         if (nfsstat) {
530             eprintf("nfs41_read() failed with %s\n",
531                 nfs_error_string(nfsstat));
532             status = map_ds_error(nfsstat, pattern->state, thread->layout);
533             break;
534         }
535 
536         total_read += bytes_read;
537         thread->offset += bytes_read;
538 
539         if (eof) {
540             dprintf(IOLVL, "read thread %u reached eof: offset %llu\n",
541                 thread->id, thread->offset);
542             status = total_read ? PNFS_SUCCESS : PNFS_READ_EOF;
543             break;
544         }
545     }
546 out:
547     dprintf(IOLVL, "<-- file_layout_read_thread(%u) returning %s\n",
548         thread->id, pnfs_error_string(status));
549     return status;
550 }
551 
552 static uint32_t WINAPI file_layout_write_thread(void *args)
553 {
554     pnfs_io_unit io;
555     stateid_arg stateid;
556     pnfs_io_thread *thread = (pnfs_io_thread*)args;
557     pnfs_io_pattern *pattern = thread->pattern;
558     pnfs_data_server *server;
559     nfs41_client *client;
560     const uint64_t offset_start = thread->offset;
561     uint64_t commit_min, commit_max;
562     uint32_t maxwritesize, bytes_written, total_written;
563     enum pnfs_status status;
564     enum nfsstat4 nfsstat;
565 
566     dprintf(IOLVL, "--> file_layout_write_thread(%u)\n", thread->id);
567 
568     /* get the data server for this thread */
569     status = thread_data_server(thread, &server);
570     if (status) {
571         eprintf("thread_data_server() failed with %s\n",
572             pnfs_error_string(status));
573         goto out;
574     }
575     /* find or establish a client for this data server */
576     status = pnfs_data_server_client(pattern->root,
577         server, pattern->default_lease, &client);
578     if (status) {
579         eprintf("pnfs_data_server_client() failed with %s\n",
580             pnfs_error_string(status));
581         goto out;
582     }
583 
584     memcpy(&stateid, pattern->stateid, sizeof(stateid));
585     stateid.stateid.seqid = 0;
586 
587     maxwritesize = max_write_size(client->session, &thread->file->fh);
588 
589 retry_write:
590     thread->offset = offset_start;
591     thread->stable = FILE_SYNC4;
592     commit_min = NFS4_UINT64_MAX;
593     commit_max = 0;
594     total_written = 0;
595 
596     while (thread_next_unit(thread, &io) == PNFS_PENDING) {
597         if (io.length > maxwritesize)
598             io.length = maxwritesize;
599 
600         nfsstat = nfs41_write(client->session, thread->file, &stateid,
601             io.buffer, (uint32_t)io.length, io.offset, UNSTABLE4,
602             &bytes_written, &thread->verf, NULL);
603         if (nfsstat) {
604             eprintf("nfs41_write() failed with %s\n",
605                 nfs_error_string(nfsstat));
606             status = map_ds_error(nfsstat, pattern->state, thread->layout);
607             break;
608         }
609         if (!verify_write(&thread->verf, &thread->stable))
610             goto retry_write;
611 
612         total_written += bytes_written;
613         thread->offset += bytes_written;
614 
615         /* track the range for commit */
616         if (commit_min > io.offset)
617             commit_min = io.offset;
618         if (commit_max < io.offset + io.length)
619             commit_max = io.offset + io.length;
620     }
621 
622     /* nothing to commit */
623     if (commit_max <= commit_min)
624         goto out;
625     /* layout changed; redo all io against metadata server */
626     if (status == PNFSERR_LAYOUT_CHANGED)
627         goto out;
628     /* the data is already in stable storage */
629     if (thread->stable != UNSTABLE4)
630         goto out;
631     /* the metadata server expects us to commit there instead */
632     if (should_commit_to_mds(thread->layout))
633         goto out;
634 
635     dprintf(1, "sending COMMIT to data server for offset=%lld len=%lld\n",
636         commit_min, commit_max - commit_min);
637     nfsstat = nfs41_commit(client->session, thread->file,
638         commit_min, (uint32_t)(commit_max - commit_min), 0, &thread->verf, NULL);
639 
640     if (nfsstat)
641         status = map_ds_error(nfsstat, pattern->state, thread->layout);
642     else if (!verify_commit(&thread->verf)) {
643         /* resend the writes unless the layout was recalled */
644         if (status != PNFSERR_LAYOUT_RECALLED)
645             goto retry_write;
646         status = PNFSERR_IO;
647     } else {
648         /* on successful commit, leave pnfs_status unchanged; if the
649          * layout was recalled, we still want to return the error */
650         thread->stable = DATA_SYNC4;
651     }
652 out:
653     dprintf(IOLVL, "<-- file_layout_write_thread(%u) returning %s\n",
654         thread->id, pnfs_error_string(status));
655     return status;
656 }
657 
658 
659 enum pnfs_status pnfs_read(
660     IN nfs41_root *root,
661     IN nfs41_open_state *state,
662     IN stateid_arg *stateid,
663     IN pnfs_layout_state *layout,
664     IN uint64_t offset,
665     IN uint64_t length,
666     OUT unsigned char *buffer_out,
667     OUT ULONG *len_out)
668 {
669     pnfs_io_pattern pattern;
670     enum pnfs_status status;
671 
672     dprintf(IOLVL, "--> pnfs_read(%llu, %llu)\n", offset, length);
673 
674     *len_out = 0;
675 
676     AcquireSRWLockExclusive(&layout->lock);
677 
678     /* get layouts/devices for the entire range; PNFS_PENDING means we
679      * dropped the lock to send an rpc, so repeat until it succeeds */
680     do {
681         status = pnfs_layout_state_prepare(layout, state->session,
682             &state->file, stateid, PNFS_IOMODE_READ, offset, length);
683     } while (status == PNFS_PENDING);
684 
685     if (status == PNFS_SUCCESS) {
686         /* interpret the layout and set up threads for io */
687         status = pattern_init(&pattern, root, &state->file, stateid,
688             layout, buffer_out, PNFS_IOMODE_READ, offset, length,
689             state->session->lease_time);
690         if (status)
691             eprintf("pattern_init() failed with %s\n",
692                 pnfs_error_string(status));
693     }
694 
695     ReleaseSRWLockExclusive(&layout->lock);
696 
697     if (status)
698         goto out;
699 
700     status = pattern_fork(&pattern, file_layout_read_thread);
701     if (status != PNFS_SUCCESS && status != PNFS_READ_EOF)
702         goto out_free_pattern;
703 
704     *len_out = (ULONG)pattern_bytes_transferred(&pattern, NULL);
705 
706 out_free_pattern:
707     pattern_free(&pattern);
708 out:
709     dprintf(IOLVL, "<-- pnfs_read() returning %s\n",
710         pnfs_error_string(status));
711     return status;
712 }
713 
714 static enum pnfs_status mds_commit(
715     IN nfs41_open_state *state,
716     IN uint64_t offset,
717     IN uint32_t length,
718     IN const pnfs_io_pattern *pattern,
719     OUT nfs41_file_info *info)
720 {
721     nfs41_write_verf verf;
722     enum nfsstat4 nfsstat;
723     enum pnfs_status status = PNFS_SUCCESS;
724     uint32_t i;
725 
726     nfsstat = nfs41_commit(state->session,
727         &state->file, offset, length, 1, &verf, info);
728     if (nfsstat) {
729         eprintf("nfs41_commit() to mds failed with %s\n",
730             nfs_error_string(nfsstat));
731         status = PNFSERR_IO;
732         goto out;
733     }
734 
735     /* 13.7. COMMIT through Metadata Server:
736      * If nfl_util & NFL4_UFLG_COMMIT_THRU_MDS is TRUE, then in order to
737      * maintain the current NFSv4.1 commit and recovery model, the data
738      * servers MUST return a common writeverf verifier in all WRITE
739      * responses for a given file layout, and the metadata server's
740      * COMMIT implementation must return the same writeverf. */
741     for (i = 0; i < pattern->count; i++) {
742         const pnfs_io_thread *thread = &pattern->threads[i];
743         if (thread->stable != UNSTABLE4) /* already committed */
744             continue;
745 
746         if (!should_commit_to_mds(thread->layout)) {
747             /* commit to mds is not allowed on this layout */
748             eprintf("mds commit: failed to commit to data server\n");
749             status = PNFSERR_IO;
750             break;
751         }
752         if (memcmp(verf.verf, thread->verf.verf, NFS4_VERIFIER_SIZE) != 0) {
753             eprintf("mds commit verifier doesn't match ds write verifiers\n");
754             status = PNFSERR_IO;
755             break;
756         }
757     }
758 out:
759     return status;
760 }
761 
762 static enum pnfs_status layout_commit(
763     IN nfs41_open_state *state,
764     IN pnfs_layout_state *layout,
765     IN uint64_t offset,
766     IN uint64_t length,
767     OUT nfs41_file_info *info)
768 {
769     stateid4 layout_stateid;
770     uint64_t last_offset = offset + length - 1;
771     uint64_t *new_last_offset = NULL;
772     enum nfsstat4 nfsstat;
773     enum pnfs_status status = PNFS_SUCCESS;
774 
775     AcquireSRWLockExclusive(&state->lock);
776     /* if this is past the current eof, update the open state's
777      * last offset, and pass a pointer to LAYOUTCOMMIT */
778     if (state->pnfs_last_offset < last_offset ||
779         (state->pnfs_last_offset == 0 && last_offset == 0)) {
780         state->pnfs_last_offset = last_offset;
781         new_last_offset = &last_offset;
782     }
783     ReleaseSRWLockExclusive(&state->lock);
784 
785     AcquireSRWLockShared(&layout->lock);
786     memcpy(&layout_stateid, &layout->stateid, sizeof(layout_stateid));
787     ReleaseSRWLockShared(&layout->lock);
788 
789     dprintf(1, "LAYOUTCOMMIT for offset=%lld len=%lld new_last_offset=%u\n",
790         offset, length, new_last_offset ? 1 : 0);
791     nfsstat = pnfs_rpc_layoutcommit(state->session, &state->file,
792         &layout_stateid, offset, length, new_last_offset, NULL, info);
793     if (nfsstat) {
794         dprintf(IOLVL, "pnfs_rpc_layoutcommit() failed with %s\n",
795             nfs_error_string(nfsstat));
796         status = PNFSERR_IO;
797     }
798     return status;
799 }
800 
801 enum pnfs_status pnfs_write(
802     IN nfs41_root *root,
803     IN nfs41_open_state *state,
804     IN stateid_arg *stateid,
805     IN pnfs_layout_state *layout,
806     IN uint64_t offset,
807     IN uint64_t length,
808     IN unsigned char *buffer,
809     OUT ULONG *len_out,
810     OUT nfs41_file_info *info)
811 {
812     pnfs_io_pattern pattern;
813     enum stable_how4 stable;
814     enum pnfs_status status;
815 
816     dprintf(IOLVL, "--> pnfs_write(%llu, %llu)\n", offset, length);
817 
818     *len_out = 0;
819 
820     AcquireSRWLockExclusive(&layout->lock);
821 
822     /* get layouts/devices for the entire range; PNFS_PENDING means we
823      * dropped the lock to send an rpc, so repeat until it succeeds */
824     do {
825         status = pnfs_layout_state_prepare(layout, state->session,
826             &state->file, stateid, PNFS_IOMODE_RW, offset, length);
827     } while (status == PNFS_PENDING);
828 
829     if (status == PNFS_SUCCESS) {
830         /* interpret the layout and set up threads for io */
831         status = pattern_init(&pattern, root, &state->file, stateid,
832             layout, buffer, PNFS_IOMODE_RW, offset, length,
833             state->session->lease_time);
834         if (status)
835             eprintf("pattern_init() failed with %s\n",
836                 pnfs_error_string(status));
837     }
838 
839     ReleaseSRWLockExclusive(&layout->lock);
840 
841     if (status)
842         goto out;
843 
844     status = pattern_fork(&pattern, file_layout_write_thread);
845     /* on layout recall, we still attempt to commit what we wrote */
846     if (status != PNFS_SUCCESS && status != PNFSERR_LAYOUT_RECALLED)
847         goto out_free_pattern;
848 
849     *len_out = (ULONG)pattern_bytes_transferred(&pattern, &stable);
850     if (*len_out == 0)
851         goto out_free_pattern;
852 
853     if (stable == UNSTABLE4) {
854         /* send COMMIT to the mds and verify against all ds writes */
855         status = mds_commit(state, offset, *len_out, &pattern, info);
856     } else if (stable == DATA_SYNC4) {
857         /* send LAYOUTCOMMIT to sync the metadata */
858         status = layout_commit(state, layout, offset, *len_out, info);
859     } else {
860         /* send a GETATTR to update the cached size */
861         bitmap4 attr_request;
862         nfs41_superblock_getattr_mask(state->file.fh.superblock, &attr_request);
863         nfs41_getattr(state->session, &state->file, &attr_request, info);
864     }
865 out_free_pattern:
866     pattern_free(&pattern);
867 out:
868     dprintf(IOLVL, "<-- pnfs_write() returning %s\n",
869         pnfs_error_string(status));
870     return status;
871 }
872