xref: /linux/drivers/block/rbd.c (revision 1fec7093)
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3 
4 
5    based on drivers/block/osdblk.c:
6 
7    Copyright 2009 Red Hat, Inc.
8 
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12 
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17 
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21 
22 
23 
24    For usage instructions, please refer to:
25 
26                  Documentation/ABI/testing/sysfs-bus-rbd
27 
28  */
29 
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35 
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41 
42 #include "rbd_types.h"
43 
44 #define DRV_NAME "rbd"
45 #define DRV_NAME_LONG "rbd (rados block device)"
46 
47 #define RBD_MINORS_PER_MAJOR	256		/* max minors per blkdev */
48 
49 #define RBD_MAX_MD_NAME_LEN	(96 + sizeof(RBD_SUFFIX))
50 #define RBD_MAX_POOL_NAME_LEN	64
51 #define RBD_MAX_SNAP_NAME_LEN	32
52 #define RBD_MAX_OPT_LEN		1024
53 
54 #define RBD_SNAP_HEAD_NAME	"-"
55 
56 #define DEV_NAME_LEN		32
57 
58 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
59 
60 /*
61  * block device image metadata (in-memory version)
62  */
63 struct rbd_image_header {
64 	u64 image_size;
65 	char block_name[32];
66 	__u8 obj_order;
67 	__u8 crypt_type;
68 	__u8 comp_type;
69 	struct rw_semaphore snap_rwsem;
70 	struct ceph_snap_context *snapc;
71 	size_t snap_names_len;
72 	u64 snap_seq;
73 	u32 total_snaps;
74 
75 	char *snap_names;
76 	u64 *snap_sizes;
77 
78 	u64 obj_version;
79 };
80 
81 struct rbd_options {
82 	int	notify_timeout;
83 };
84 
85 /*
86  * an instance of the client.  multiple devices may share a client.
87  */
88 struct rbd_client {
89 	struct ceph_client	*client;
90 	struct rbd_options	*rbd_opts;
91 	struct kref		kref;
92 	struct list_head	node;
93 };
94 
95 struct rbd_req_coll;
96 
97 /*
98  * a single io request
99  */
100 struct rbd_request {
101 	struct request		*rq;		/* blk layer request */
102 	struct bio		*bio;		/* cloned bio */
103 	struct page		**pages;	/* list of used pages */
104 	u64			len;
105 	int			coll_index;
106 	struct rbd_req_coll	*coll;
107 };
108 
109 struct rbd_req_status {
110 	int done;
111 	int rc;
112 	u64 bytes;
113 };
114 
115 /*
116  * a collection of requests
117  */
118 struct rbd_req_coll {
119 	int			total;
120 	int			num_done;
121 	struct kref		kref;
122 	struct rbd_req_status	status[0];
123 };
124 
125 struct rbd_snap {
126 	struct	device		dev;
127 	const char		*name;
128 	size_t			size;
129 	struct list_head	node;
130 	u64			id;
131 };
132 
133 /*
134  * a single device
135  */
136 struct rbd_device {
137 	int			id;		/* blkdev unique id */
138 
139 	int			major;		/* blkdev assigned major */
140 	struct gendisk		*disk;		/* blkdev's gendisk and rq */
141 	struct request_queue	*q;
142 
143 	struct ceph_client	*client;
144 	struct rbd_client	*rbd_client;
145 
146 	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
147 
148 	spinlock_t		lock;		/* queue lock */
149 
150 	struct rbd_image_header	header;
151 	char			obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
152 	int			obj_len;
153 	char			obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
154 	char			pool_name[RBD_MAX_POOL_NAME_LEN];
155 	int			poolid;
156 
157 	struct ceph_osd_event   *watch_event;
158 	struct ceph_osd_request *watch_request;
159 
160 	char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
161 	u32 cur_snap;	/* index+1 of current snapshot within snap context
162 			   0 - for the head */
163 	int read_only;
164 
165 	struct list_head	node;
166 
167 	/* list of snapshots */
168 	struct list_head	snaps;
169 
170 	/* sysfs related */
171 	struct device		dev;
172 };
173 
174 static struct bus_type rbd_bus_type = {
175 	.name		= "rbd",
176 };
177 
178 static spinlock_t node_lock;      /* protects client get/put */
179 
180 static DEFINE_MUTEX(ctl_mutex);	  /* Serialize open/close/setup/teardown */
181 static LIST_HEAD(rbd_dev_list);    /* devices */
182 static LIST_HEAD(rbd_client_list);      /* clients */
183 
184 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
185 static void rbd_dev_release(struct device *dev);
186 static ssize_t rbd_snap_rollback(struct device *dev,
187 				 struct device_attribute *attr,
188 				 const char *buf,
189 				 size_t size);
190 static ssize_t rbd_snap_add(struct device *dev,
191 			    struct device_attribute *attr,
192 			    const char *buf,
193 			    size_t count);
194 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
195 				  struct rbd_snap *snap);;
196 
197 
198 static struct rbd_device *dev_to_rbd(struct device *dev)
199 {
200 	return container_of(dev, struct rbd_device, dev);
201 }
202 
203 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
204 {
205 	return get_device(&rbd_dev->dev);
206 }
207 
208 static void rbd_put_dev(struct rbd_device *rbd_dev)
209 {
210 	put_device(&rbd_dev->dev);
211 }
212 
213 static int __rbd_update_snaps(struct rbd_device *rbd_dev);
214 
215 static int rbd_open(struct block_device *bdev, fmode_t mode)
216 {
217 	struct gendisk *disk = bdev->bd_disk;
218 	struct rbd_device *rbd_dev = disk->private_data;
219 
220 	rbd_get_dev(rbd_dev);
221 
222 	set_device_ro(bdev, rbd_dev->read_only);
223 
224 	if ((mode & FMODE_WRITE) && rbd_dev->read_only)
225 		return -EROFS;
226 
227 	return 0;
228 }
229 
230 static int rbd_release(struct gendisk *disk, fmode_t mode)
231 {
232 	struct rbd_device *rbd_dev = disk->private_data;
233 
234 	rbd_put_dev(rbd_dev);
235 
236 	return 0;
237 }
238 
239 static const struct block_device_operations rbd_bd_ops = {
240 	.owner			= THIS_MODULE,
241 	.open			= rbd_open,
242 	.release		= rbd_release,
243 };
244 
245 /*
246  * Initialize an rbd client instance.
247  * We own *opt.
248  */
249 static struct rbd_client *rbd_client_create(struct ceph_options *opt,
250 					    struct rbd_options *rbd_opts)
251 {
252 	struct rbd_client *rbdc;
253 	int ret = -ENOMEM;
254 
255 	dout("rbd_client_create\n");
256 	rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
257 	if (!rbdc)
258 		goto out_opt;
259 
260 	kref_init(&rbdc->kref);
261 	INIT_LIST_HEAD(&rbdc->node);
262 
263 	rbdc->client = ceph_create_client(opt, rbdc);
264 	if (IS_ERR(rbdc->client))
265 		goto out_rbdc;
266 	opt = NULL; /* Now rbdc->client is responsible for opt */
267 
268 	ret = ceph_open_session(rbdc->client);
269 	if (ret < 0)
270 		goto out_err;
271 
272 	rbdc->rbd_opts = rbd_opts;
273 
274 	spin_lock(&node_lock);
275 	list_add_tail(&rbdc->node, &rbd_client_list);
276 	spin_unlock(&node_lock);
277 
278 	dout("rbd_client_create created %p\n", rbdc);
279 	return rbdc;
280 
281 out_err:
282 	ceph_destroy_client(rbdc->client);
283 out_rbdc:
284 	kfree(rbdc);
285 out_opt:
286 	if (opt)
287 		ceph_destroy_options(opt);
288 	return ERR_PTR(ret);
289 }
290 
291 /*
292  * Find a ceph client with specific addr and configuration.
293  */
294 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
295 {
296 	struct rbd_client *client_node;
297 
298 	if (opt->flags & CEPH_OPT_NOSHARE)
299 		return NULL;
300 
301 	list_for_each_entry(client_node, &rbd_client_list, node)
302 		if (ceph_compare_options(opt, client_node->client) == 0)
303 			return client_node;
304 	return NULL;
305 }
306 
307 /*
308  * mount options
309  */
310 enum {
311 	Opt_notify_timeout,
312 	Opt_last_int,
313 	/* int args above */
314 	Opt_last_string,
315 	/* string args above */
316 };
317 
318 static match_table_t rbdopt_tokens = {
319 	{Opt_notify_timeout, "notify_timeout=%d"},
320 	/* int args above */
321 	/* string args above */
322 	{-1, NULL}
323 };
324 
325 static int parse_rbd_opts_token(char *c, void *private)
326 {
327 	struct rbd_options *rbdopt = private;
328 	substring_t argstr[MAX_OPT_ARGS];
329 	int token, intval, ret;
330 
331 	token = match_token((char *)c, rbdopt_tokens, argstr);
332 	if (token < 0)
333 		return -EINVAL;
334 
335 	if (token < Opt_last_int) {
336 		ret = match_int(&argstr[0], &intval);
337 		if (ret < 0) {
338 			pr_err("bad mount option arg (not int) "
339 			       "at '%s'\n", c);
340 			return ret;
341 		}
342 		dout("got int token %d val %d\n", token, intval);
343 	} else if (token > Opt_last_int && token < Opt_last_string) {
344 		dout("got string token %d val %s\n", token,
345 		     argstr[0].from);
346 	} else {
347 		dout("got token %d\n", token);
348 	}
349 
350 	switch (token) {
351 	case Opt_notify_timeout:
352 		rbdopt->notify_timeout = intval;
353 		break;
354 	default:
355 		BUG_ON(token);
356 	}
357 	return 0;
358 }
359 
360 /*
361  * Get a ceph client with specific addr and configuration, if one does
362  * not exist create it.
363  */
364 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
365 			  char *options)
366 {
367 	struct rbd_client *rbdc;
368 	struct ceph_options *opt;
369 	int ret;
370 	struct rbd_options *rbd_opts;
371 
372 	rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
373 	if (!rbd_opts)
374 		return -ENOMEM;
375 
376 	rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
377 
378 	ret = ceph_parse_options(&opt, options, mon_addr,
379 				 mon_addr + strlen(mon_addr), parse_rbd_opts_token, rbd_opts);
380 	if (ret < 0)
381 		goto done_err;
382 
383 	spin_lock(&node_lock);
384 	rbdc = __rbd_client_find(opt);
385 	if (rbdc) {
386 		ceph_destroy_options(opt);
387 
388 		/* using an existing client */
389 		kref_get(&rbdc->kref);
390 		rbd_dev->rbd_client = rbdc;
391 		rbd_dev->client = rbdc->client;
392 		spin_unlock(&node_lock);
393 		return 0;
394 	}
395 	spin_unlock(&node_lock);
396 
397 	rbdc = rbd_client_create(opt, rbd_opts);
398 	if (IS_ERR(rbdc)) {
399 		ret = PTR_ERR(rbdc);
400 		goto done_err;
401 	}
402 
403 	rbd_dev->rbd_client = rbdc;
404 	rbd_dev->client = rbdc->client;
405 	return 0;
406 done_err:
407 	kfree(rbd_opts);
408 	return ret;
409 }
410 
411 /*
412  * Destroy ceph client
413  */
414 static void rbd_client_release(struct kref *kref)
415 {
416 	struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
417 
418 	dout("rbd_release_client %p\n", rbdc);
419 	spin_lock(&node_lock);
420 	list_del(&rbdc->node);
421 	spin_unlock(&node_lock);
422 
423 	ceph_destroy_client(rbdc->client);
424 	kfree(rbdc->rbd_opts);
425 	kfree(rbdc);
426 }
427 
428 /*
429  * Drop reference to ceph client node. If it's not referenced anymore, release
430  * it.
431  */
432 static void rbd_put_client(struct rbd_device *rbd_dev)
433 {
434 	kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
435 	rbd_dev->rbd_client = NULL;
436 	rbd_dev->client = NULL;
437 }
438 
439 /*
440  * Destroy requests collection
441  */
442 static void rbd_coll_release(struct kref *kref)
443 {
444 	struct rbd_req_coll *coll =
445 		container_of(kref, struct rbd_req_coll, kref);
446 
447 	dout("rbd_coll_release %p\n", coll);
448 	kfree(coll);
449 }
450 
451 /*
452  * Create a new header structure, translate header format from the on-disk
453  * header.
454  */
455 static int rbd_header_from_disk(struct rbd_image_header *header,
456 				 struct rbd_image_header_ondisk *ondisk,
457 				 int allocated_snaps,
458 				 gfp_t gfp_flags)
459 {
460 	int i;
461 	u32 snap_count = le32_to_cpu(ondisk->snap_count);
462 	int ret = -ENOMEM;
463 
464 	init_rwsem(&header->snap_rwsem);
465 	header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
466 	header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
467 				snap_count *
468 				 sizeof(struct rbd_image_snap_ondisk),
469 				gfp_flags);
470 	if (!header->snapc)
471 		return -ENOMEM;
472 	if (snap_count) {
473 		header->snap_names = kmalloc(header->snap_names_len,
474 					     GFP_KERNEL);
475 		if (!header->snap_names)
476 			goto err_snapc;
477 		header->snap_sizes = kmalloc(snap_count * sizeof(u64),
478 					     GFP_KERNEL);
479 		if (!header->snap_sizes)
480 			goto err_names;
481 	} else {
482 		header->snap_names = NULL;
483 		header->snap_sizes = NULL;
484 	}
485 	memcpy(header->block_name, ondisk->block_name,
486 	       sizeof(ondisk->block_name));
487 
488 	header->image_size = le64_to_cpu(ondisk->image_size);
489 	header->obj_order = ondisk->options.order;
490 	header->crypt_type = ondisk->options.crypt_type;
491 	header->comp_type = ondisk->options.comp_type;
492 
493 	atomic_set(&header->snapc->nref, 1);
494 	header->snap_seq = le64_to_cpu(ondisk->snap_seq);
495 	header->snapc->num_snaps = snap_count;
496 	header->total_snaps = snap_count;
497 
498 	if (snap_count &&
499 	    allocated_snaps == snap_count) {
500 		for (i = 0; i < snap_count; i++) {
501 			header->snapc->snaps[i] =
502 				le64_to_cpu(ondisk->snaps[i].id);
503 			header->snap_sizes[i] =
504 				le64_to_cpu(ondisk->snaps[i].image_size);
505 		}
506 
507 		/* copy snapshot names */
508 		memcpy(header->snap_names, &ondisk->snaps[i],
509 			header->snap_names_len);
510 	}
511 
512 	return 0;
513 
514 err_names:
515 	kfree(header->snap_names);
516 err_snapc:
517 	kfree(header->snapc);
518 	return ret;
519 }
520 
521 static int snap_index(struct rbd_image_header *header, int snap_num)
522 {
523 	return header->total_snaps - snap_num;
524 }
525 
526 static u64 cur_snap_id(struct rbd_device *rbd_dev)
527 {
528 	struct rbd_image_header *header = &rbd_dev->header;
529 
530 	if (!rbd_dev->cur_snap)
531 		return 0;
532 
533 	return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
534 }
535 
536 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
537 			u64 *seq, u64 *size)
538 {
539 	int i;
540 	char *p = header->snap_names;
541 
542 	for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
543 		if (strcmp(snap_name, p) == 0)
544 			break;
545 	}
546 	if (i == header->total_snaps)
547 		return -ENOENT;
548 	if (seq)
549 		*seq = header->snapc->snaps[i];
550 
551 	if (size)
552 		*size = header->snap_sizes[i];
553 
554 	return i;
555 }
556 
557 static int rbd_header_set_snap(struct rbd_device *dev,
558 			       const char *snap_name,
559 			       u64 *size)
560 {
561 	struct rbd_image_header *header = &dev->header;
562 	struct ceph_snap_context *snapc = header->snapc;
563 	int ret = -ENOENT;
564 
565 	down_write(&header->snap_rwsem);
566 
567 	if (!snap_name ||
568 	    !*snap_name ||
569 	    strcmp(snap_name, "-") == 0 ||
570 	    strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
571 		if (header->total_snaps)
572 			snapc->seq = header->snap_seq;
573 		else
574 			snapc->seq = 0;
575 		dev->cur_snap = 0;
576 		dev->read_only = 0;
577 		if (size)
578 			*size = header->image_size;
579 	} else {
580 		ret = snap_by_name(header, snap_name, &snapc->seq, size);
581 		if (ret < 0)
582 			goto done;
583 
584 		dev->cur_snap = header->total_snaps - ret;
585 		dev->read_only = 1;
586 	}
587 
588 	ret = 0;
589 done:
590 	up_write(&header->snap_rwsem);
591 	return ret;
592 }
593 
594 static void rbd_header_free(struct rbd_image_header *header)
595 {
596 	kfree(header->snapc);
597 	kfree(header->snap_names);
598 	kfree(header->snap_sizes);
599 }
600 
601 /*
602  * get the actual striped segment name, offset and length
603  */
604 static u64 rbd_get_segment(struct rbd_image_header *header,
605 			   const char *block_name,
606 			   u64 ofs, u64 len,
607 			   char *seg_name, u64 *segofs)
608 {
609 	u64 seg = ofs >> header->obj_order;
610 
611 	if (seg_name)
612 		snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
613 			 "%s.%012llx", block_name, seg);
614 
615 	ofs = ofs & ((1 << header->obj_order) - 1);
616 	len = min_t(u64, len, (1 << header->obj_order) - ofs);
617 
618 	if (segofs)
619 		*segofs = ofs;
620 
621 	return len;
622 }
623 
624 static int rbd_get_num_segments(struct rbd_image_header *header,
625 				u64 ofs, u64 len)
626 {
627 	u64 start_seg = ofs >> header->obj_order;
628 	u64 end_seg = (ofs + len - 1) >> header->obj_order;
629 	return end_seg - start_seg + 1;
630 }
631 
632 /*
633  * bio helpers
634  */
635 
636 static void bio_chain_put(struct bio *chain)
637 {
638 	struct bio *tmp;
639 
640 	while (chain) {
641 		tmp = chain;
642 		chain = chain->bi_next;
643 		bio_put(tmp);
644 	}
645 }
646 
647 /*
648  * zeros a bio chain, starting at specific offset
649  */
650 static void zero_bio_chain(struct bio *chain, int start_ofs)
651 {
652 	struct bio_vec *bv;
653 	unsigned long flags;
654 	void *buf;
655 	int i;
656 	int pos = 0;
657 
658 	while (chain) {
659 		bio_for_each_segment(bv, chain, i) {
660 			if (pos + bv->bv_len > start_ofs) {
661 				int remainder = max(start_ofs - pos, 0);
662 				buf = bvec_kmap_irq(bv, &flags);
663 				memset(buf + remainder, 0,
664 				       bv->bv_len - remainder);
665 				bvec_kunmap_irq(buf, &flags);
666 			}
667 			pos += bv->bv_len;
668 		}
669 
670 		chain = chain->bi_next;
671 	}
672 }
673 
674 /*
675  * bio_chain_clone - clone a chain of bios up to a certain length.
676  * might return a bio_pair that will need to be released.
677  */
678 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
679 				   struct bio_pair **bp,
680 				   int len, gfp_t gfpmask)
681 {
682 	struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
683 	int total = 0;
684 
685 	if (*bp) {
686 		bio_pair_release(*bp);
687 		*bp = NULL;
688 	}
689 
690 	while (old_chain && (total < len)) {
691 		tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
692 		if (!tmp)
693 			goto err_out;
694 
695 		if (total + old_chain->bi_size > len) {
696 			struct bio_pair *bp;
697 
698 			/*
699 			 * this split can only happen with a single paged bio,
700 			 * split_bio will BUG_ON if this is not the case
701 			 */
702 			dout("bio_chain_clone split! total=%d remaining=%d"
703 			     "bi_size=%d\n",
704 			     (int)total, (int)len-total,
705 			     (int)old_chain->bi_size);
706 
707 			/* split the bio. We'll release it either in the next
708 			   call, or it will have to be released outside */
709 			bp = bio_split(old_chain, (len - total) / 512ULL);
710 			if (!bp)
711 				goto err_out;
712 
713 			__bio_clone(tmp, &bp->bio1);
714 
715 			*next = &bp->bio2;
716 		} else {
717 			__bio_clone(tmp, old_chain);
718 			*next = old_chain->bi_next;
719 		}
720 
721 		tmp->bi_bdev = NULL;
722 		gfpmask &= ~__GFP_WAIT;
723 		tmp->bi_next = NULL;
724 
725 		if (!new_chain) {
726 			new_chain = tail = tmp;
727 		} else {
728 			tail->bi_next = tmp;
729 			tail = tmp;
730 		}
731 		old_chain = old_chain->bi_next;
732 
733 		total += tmp->bi_size;
734 	}
735 
736 	BUG_ON(total < len);
737 
738 	if (tail)
739 		tail->bi_next = NULL;
740 
741 	*old = old_chain;
742 
743 	return new_chain;
744 
745 err_out:
746 	dout("bio_chain_clone with err\n");
747 	bio_chain_put(new_chain);
748 	return NULL;
749 }
750 
751 /*
752  * helpers for osd request op vectors.
753  */
754 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
755 			    int num_ops,
756 			    int opcode,
757 			    u32 payload_len)
758 {
759 	*ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
760 		       GFP_NOIO);
761 	if (!*ops)
762 		return -ENOMEM;
763 	(*ops)[0].op = opcode;
764 	/*
765 	 * op extent offset and length will be set later on
766 	 * in calc_raw_layout()
767 	 */
768 	(*ops)[0].payload_len = payload_len;
769 	return 0;
770 }
771 
772 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
773 {
774 	kfree(ops);
775 }
776 
777 static void rbd_coll_end_req_index(struct request *rq,
778 				   struct rbd_req_coll *coll,
779 				   int index,
780 				   int ret, u64 len)
781 {
782 	struct request_queue *q;
783 	int min, max, i;
784 
785 	dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
786 	     coll, index, ret, len);
787 
788 	if (!rq)
789 		return;
790 
791 	if (!coll) {
792 		blk_end_request(rq, ret, len);
793 		return;
794 	}
795 
796 	q = rq->q;
797 
798 	spin_lock_irq(q->queue_lock);
799 	coll->status[index].done = 1;
800 	coll->status[index].rc = ret;
801 	coll->status[index].bytes = len;
802 	max = min = coll->num_done;
803 	while (max < coll->total && coll->status[max].done)
804 		max++;
805 
806 	for (i = min; i<max; i++) {
807 		__blk_end_request(rq, coll->status[i].rc,
808 				  coll->status[i].bytes);
809 		coll->num_done++;
810 		kref_put(&coll->kref, rbd_coll_release);
811 	}
812 	spin_unlock_irq(q->queue_lock);
813 }
814 
815 static void rbd_coll_end_req(struct rbd_request *req,
816 			     int ret, u64 len)
817 {
818 	rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
819 }
820 
821 /*
822  * Send ceph osd request
823  */
824 static int rbd_do_request(struct request *rq,
825 			  struct rbd_device *dev,
826 			  struct ceph_snap_context *snapc,
827 			  u64 snapid,
828 			  const char *obj, u64 ofs, u64 len,
829 			  struct bio *bio,
830 			  struct page **pages,
831 			  int num_pages,
832 			  int flags,
833 			  struct ceph_osd_req_op *ops,
834 			  int num_reply,
835 			  struct rbd_req_coll *coll,
836 			  int coll_index,
837 			  void (*rbd_cb)(struct ceph_osd_request *req,
838 					 struct ceph_msg *msg),
839 			  struct ceph_osd_request **linger_req,
840 			  u64 *ver)
841 {
842 	struct ceph_osd_request *req;
843 	struct ceph_file_layout *layout;
844 	int ret;
845 	u64 bno;
846 	struct timespec mtime = CURRENT_TIME;
847 	struct rbd_request *req_data;
848 	struct ceph_osd_request_head *reqhead;
849 	struct rbd_image_header *header = &dev->header;
850 
851 	req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
852 	if (!req_data) {
853 		if (coll)
854 			rbd_coll_end_req_index(rq, coll, coll_index,
855 					       -ENOMEM, len);
856 		return -ENOMEM;
857 	}
858 
859 	if (coll) {
860 		req_data->coll = coll;
861 		req_data->coll_index = coll_index;
862 	}
863 
864 	dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
865 
866 	down_read(&header->snap_rwsem);
867 
868 	req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
869 				      snapc,
870 				      ops,
871 				      false,
872 				      GFP_NOIO, pages, bio);
873 	if (!req) {
874 		up_read(&header->snap_rwsem);
875 		ret = -ENOMEM;
876 		goto done_pages;
877 	}
878 
879 	req->r_callback = rbd_cb;
880 
881 	req_data->rq = rq;
882 	req_data->bio = bio;
883 	req_data->pages = pages;
884 	req_data->len = len;
885 
886 	req->r_priv = req_data;
887 
888 	reqhead = req->r_request->front.iov_base;
889 	reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
890 
891 	strncpy(req->r_oid, obj, sizeof(req->r_oid));
892 	req->r_oid_len = strlen(req->r_oid);
893 
894 	layout = &req->r_file_layout;
895 	memset(layout, 0, sizeof(*layout));
896 	layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
897 	layout->fl_stripe_count = cpu_to_le32(1);
898 	layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
899 	layout->fl_pg_preferred = cpu_to_le32(-1);
900 	layout->fl_pg_pool = cpu_to_le32(dev->poolid);
901 	ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
902 			     ofs, &len, &bno, req, ops);
903 
904 	ceph_osdc_build_request(req, ofs, &len,
905 				ops,
906 				snapc,
907 				&mtime,
908 				req->r_oid, req->r_oid_len);
909 	up_read(&header->snap_rwsem);
910 
911 	if (linger_req) {
912 		ceph_osdc_set_request_linger(&dev->client->osdc, req);
913 		*linger_req = req;
914 	}
915 
916 	ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
917 	if (ret < 0)
918 		goto done_err;
919 
920 	if (!rbd_cb) {
921 		ret = ceph_osdc_wait_request(&dev->client->osdc, req);
922 		if (ver)
923 			*ver = le64_to_cpu(req->r_reassert_version.version);
924 		dout("reassert_ver=%lld\n",
925 		     le64_to_cpu(req->r_reassert_version.version));
926 		ceph_osdc_put_request(req);
927 	}
928 	return ret;
929 
930 done_err:
931 	bio_chain_put(req_data->bio);
932 	ceph_osdc_put_request(req);
933 done_pages:
934 	rbd_coll_end_req(req_data, ret, len);
935 	kfree(req_data);
936 	return ret;
937 }
938 
939 /*
940  * Ceph osd op callback
941  */
942 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
943 {
944 	struct rbd_request *req_data = req->r_priv;
945 	struct ceph_osd_reply_head *replyhead;
946 	struct ceph_osd_op *op;
947 	__s32 rc;
948 	u64 bytes;
949 	int read_op;
950 
951 	/* parse reply */
952 	replyhead = msg->front.iov_base;
953 	WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
954 	op = (void *)(replyhead + 1);
955 	rc = le32_to_cpu(replyhead->result);
956 	bytes = le64_to_cpu(op->extent.length);
957 	read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
958 
959 	dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
960 
961 	if (rc == -ENOENT && read_op) {
962 		zero_bio_chain(req_data->bio, 0);
963 		rc = 0;
964 	} else if (rc == 0 && read_op && bytes < req_data->len) {
965 		zero_bio_chain(req_data->bio, bytes);
966 		bytes = req_data->len;
967 	}
968 
969 	rbd_coll_end_req(req_data, rc, bytes);
970 
971 	if (req_data->bio)
972 		bio_chain_put(req_data->bio);
973 
974 	ceph_osdc_put_request(req);
975 	kfree(req_data);
976 }
977 
978 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
979 {
980 	ceph_osdc_put_request(req);
981 }
982 
983 /*
984  * Do a synchronous ceph osd operation
985  */
986 static int rbd_req_sync_op(struct rbd_device *dev,
987 			   struct ceph_snap_context *snapc,
988 			   u64 snapid,
989 			   int opcode,
990 			   int flags,
991 			   struct ceph_osd_req_op *orig_ops,
992 			   int num_reply,
993 			   const char *obj,
994 			   u64 ofs, u64 len,
995 			   char *buf,
996 			   struct ceph_osd_request **linger_req,
997 			   u64 *ver)
998 {
999 	int ret;
1000 	struct page **pages;
1001 	int num_pages;
1002 	struct ceph_osd_req_op *ops = orig_ops;
1003 	u32 payload_len;
1004 
1005 	num_pages = calc_pages_for(ofs , len);
1006 	pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1007 	if (IS_ERR(pages))
1008 		return PTR_ERR(pages);
1009 
1010 	if (!orig_ops) {
1011 		payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1012 		ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1013 		if (ret < 0)
1014 			goto done;
1015 
1016 		if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1017 			ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1018 			if (ret < 0)
1019 				goto done_ops;
1020 		}
1021 	}
1022 
1023 	ret = rbd_do_request(NULL, dev, snapc, snapid,
1024 			  obj, ofs, len, NULL,
1025 			  pages, num_pages,
1026 			  flags,
1027 			  ops,
1028 			  2,
1029 			  NULL, 0,
1030 			  NULL,
1031 			  linger_req, ver);
1032 	if (ret < 0)
1033 		goto done_ops;
1034 
1035 	if ((flags & CEPH_OSD_FLAG_READ) && buf)
1036 		ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1037 
1038 done_ops:
1039 	if (!orig_ops)
1040 		rbd_destroy_ops(ops);
1041 done:
1042 	ceph_release_page_vector(pages, num_pages);
1043 	return ret;
1044 }
1045 
1046 /*
1047  * Do an asynchronous ceph osd operation
1048  */
1049 static int rbd_do_op(struct request *rq,
1050 		     struct rbd_device *rbd_dev ,
1051 		     struct ceph_snap_context *snapc,
1052 		     u64 snapid,
1053 		     int opcode, int flags, int num_reply,
1054 		     u64 ofs, u64 len,
1055 		     struct bio *bio,
1056 		     struct rbd_req_coll *coll,
1057 		     int coll_index)
1058 {
1059 	char *seg_name;
1060 	u64 seg_ofs;
1061 	u64 seg_len;
1062 	int ret;
1063 	struct ceph_osd_req_op *ops;
1064 	u32 payload_len;
1065 
1066 	seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1067 	if (!seg_name)
1068 		return -ENOMEM;
1069 
1070 	seg_len = rbd_get_segment(&rbd_dev->header,
1071 				  rbd_dev->header.block_name,
1072 				  ofs, len,
1073 				  seg_name, &seg_ofs);
1074 
1075 	payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1076 
1077 	ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1078 	if (ret < 0)
1079 		goto done;
1080 
1081 	/* we've taken care of segment sizes earlier when we
1082 	   cloned the bios. We should never have a segment
1083 	   truncated at this point */
1084 	BUG_ON(seg_len < len);
1085 
1086 	ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1087 			     seg_name, seg_ofs, seg_len,
1088 			     bio,
1089 			     NULL, 0,
1090 			     flags,
1091 			     ops,
1092 			     num_reply,
1093 			     coll, coll_index,
1094 			     rbd_req_cb, 0, NULL);
1095 
1096 	rbd_destroy_ops(ops);
1097 done:
1098 	kfree(seg_name);
1099 	return ret;
1100 }
1101 
1102 /*
1103  * Request async osd write
1104  */
1105 static int rbd_req_write(struct request *rq,
1106 			 struct rbd_device *rbd_dev,
1107 			 struct ceph_snap_context *snapc,
1108 			 u64 ofs, u64 len,
1109 			 struct bio *bio,
1110 			 struct rbd_req_coll *coll,
1111 			 int coll_index)
1112 {
1113 	return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1114 			 CEPH_OSD_OP_WRITE,
1115 			 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1116 			 2,
1117 			 ofs, len, bio, coll, coll_index);
1118 }
1119 
1120 /*
1121  * Request async osd read
1122  */
1123 static int rbd_req_read(struct request *rq,
1124 			 struct rbd_device *rbd_dev,
1125 			 u64 snapid,
1126 			 u64 ofs, u64 len,
1127 			 struct bio *bio,
1128 			 struct rbd_req_coll *coll,
1129 			 int coll_index)
1130 {
1131 	return rbd_do_op(rq, rbd_dev, NULL,
1132 			 (snapid ? snapid : CEPH_NOSNAP),
1133 			 CEPH_OSD_OP_READ,
1134 			 CEPH_OSD_FLAG_READ,
1135 			 2,
1136 			 ofs, len, bio, coll, coll_index);
1137 }
1138 
1139 /*
1140  * Request sync osd read
1141  */
1142 static int rbd_req_sync_read(struct rbd_device *dev,
1143 			  struct ceph_snap_context *snapc,
1144 			  u64 snapid,
1145 			  const char *obj,
1146 			  u64 ofs, u64 len,
1147 			  char *buf,
1148 			  u64 *ver)
1149 {
1150 	return rbd_req_sync_op(dev, NULL,
1151 			       (snapid ? snapid : CEPH_NOSNAP),
1152 			       CEPH_OSD_OP_READ,
1153 			       CEPH_OSD_FLAG_READ,
1154 			       NULL,
1155 			       1, obj, ofs, len, buf, NULL, ver);
1156 }
1157 
1158 /*
1159  * Request sync osd watch
1160  */
1161 static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1162 				   u64 ver,
1163 				   u64 notify_id,
1164 				   const char *obj)
1165 {
1166 	struct ceph_osd_req_op *ops;
1167 	struct page **pages = NULL;
1168 	int ret;
1169 
1170 	ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1171 	if (ret < 0)
1172 		return ret;
1173 
1174 	ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1175 	ops[0].watch.cookie = notify_id;
1176 	ops[0].watch.flag = 0;
1177 
1178 	ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1179 			  obj, 0, 0, NULL,
1180 			  pages, 0,
1181 			  CEPH_OSD_FLAG_READ,
1182 			  ops,
1183 			  1,
1184 			  NULL, 0,
1185 			  rbd_simple_req_cb, 0, NULL);
1186 
1187 	rbd_destroy_ops(ops);
1188 	return ret;
1189 }
1190 
1191 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1192 {
1193 	struct rbd_device *dev = (struct rbd_device *)data;
1194 	if (!dev)
1195 		return;
1196 
1197 	dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1198 		notify_id, (int)opcode);
1199 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1200 	__rbd_update_snaps(dev);
1201 	mutex_unlock(&ctl_mutex);
1202 
1203 	rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1204 }
1205 
1206 /*
1207  * Request sync osd watch
1208  */
1209 static int rbd_req_sync_watch(struct rbd_device *dev,
1210 			      const char *obj,
1211 			      u64 ver)
1212 {
1213 	struct ceph_osd_req_op *ops;
1214 	struct ceph_osd_client *osdc = &dev->client->osdc;
1215 
1216 	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1217 	if (ret < 0)
1218 		return ret;
1219 
1220 	ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1221 				     (void *)dev, &dev->watch_event);
1222 	if (ret < 0)
1223 		goto fail;
1224 
1225 	ops[0].watch.ver = cpu_to_le64(ver);
1226 	ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1227 	ops[0].watch.flag = 1;
1228 
1229 	ret = rbd_req_sync_op(dev, NULL,
1230 			      CEPH_NOSNAP,
1231 			      0,
1232 			      CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1233 			      ops,
1234 			      1, obj, 0, 0, NULL,
1235 			      &dev->watch_request, NULL);
1236 
1237 	if (ret < 0)
1238 		goto fail_event;
1239 
1240 	rbd_destroy_ops(ops);
1241 	return 0;
1242 
1243 fail_event:
1244 	ceph_osdc_cancel_event(dev->watch_event);
1245 	dev->watch_event = NULL;
1246 fail:
1247 	rbd_destroy_ops(ops);
1248 	return ret;
1249 }
1250 
1251 struct rbd_notify_info {
1252 	struct rbd_device *dev;
1253 };
1254 
1255 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1256 {
1257 	struct rbd_device *dev = (struct rbd_device *)data;
1258 	if (!dev)
1259 		return;
1260 
1261 	dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1262 		notify_id, (int)opcode);
1263 }
1264 
1265 /*
1266  * Request sync osd notify
1267  */
1268 static int rbd_req_sync_notify(struct rbd_device *dev,
1269 		          const char *obj)
1270 {
1271 	struct ceph_osd_req_op *ops;
1272 	struct ceph_osd_client *osdc = &dev->client->osdc;
1273 	struct ceph_osd_event *event;
1274 	struct rbd_notify_info info;
1275 	int payload_len = sizeof(u32) + sizeof(u32);
1276 	int ret;
1277 
1278 	ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1279 	if (ret < 0)
1280 		return ret;
1281 
1282 	info.dev = dev;
1283 
1284 	ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1285 				     (void *)&info, &event);
1286 	if (ret < 0)
1287 		goto fail;
1288 
1289 	ops[0].watch.ver = 1;
1290 	ops[0].watch.flag = 1;
1291 	ops[0].watch.cookie = event->cookie;
1292 	ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1293 	ops[0].watch.timeout = 12;
1294 
1295 	ret = rbd_req_sync_op(dev, NULL,
1296 			       CEPH_NOSNAP,
1297 			       0,
1298 			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1299 			       ops,
1300 			       1, obj, 0, 0, NULL, NULL, NULL);
1301 	if (ret < 0)
1302 		goto fail_event;
1303 
1304 	ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1305 	dout("ceph_osdc_wait_event returned %d\n", ret);
1306 	rbd_destroy_ops(ops);
1307 	return 0;
1308 
1309 fail_event:
1310 	ceph_osdc_cancel_event(event);
1311 fail:
1312 	rbd_destroy_ops(ops);
1313 	return ret;
1314 }
1315 
1316 /*
1317  * Request sync osd rollback
1318  */
1319 static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
1320 				     u64 snapid,
1321 				     const char *obj)
1322 {
1323 	struct ceph_osd_req_op *ops;
1324 	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_ROLLBACK, 0);
1325 	if (ret < 0)
1326 		return ret;
1327 
1328 	ops[0].snap.snapid = snapid;
1329 
1330 	ret = rbd_req_sync_op(dev, NULL,
1331 			       CEPH_NOSNAP,
1332 			       0,
1333 			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1334 			       ops,
1335 			       1, obj, 0, 0, NULL, NULL, NULL);
1336 
1337 	rbd_destroy_ops(ops);
1338 
1339 	return ret;
1340 }
1341 
1342 /*
1343  * Request sync osd read
1344  */
1345 static int rbd_req_sync_exec(struct rbd_device *dev,
1346 			     const char *obj,
1347 			     const char *cls,
1348 			     const char *method,
1349 			     const char *data,
1350 			     int len,
1351 			     u64 *ver)
1352 {
1353 	struct ceph_osd_req_op *ops;
1354 	int cls_len = strlen(cls);
1355 	int method_len = strlen(method);
1356 	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1357 				    cls_len + method_len + len);
1358 	if (ret < 0)
1359 		return ret;
1360 
1361 	ops[0].cls.class_name = cls;
1362 	ops[0].cls.class_len = (__u8)cls_len;
1363 	ops[0].cls.method_name = method;
1364 	ops[0].cls.method_len = (__u8)method_len;
1365 	ops[0].cls.argc = 0;
1366 	ops[0].cls.indata = data;
1367 	ops[0].cls.indata_len = len;
1368 
1369 	ret = rbd_req_sync_op(dev, NULL,
1370 			       CEPH_NOSNAP,
1371 			       0,
1372 			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1373 			       ops,
1374 			       1, obj, 0, 0, NULL, NULL, ver);
1375 
1376 	rbd_destroy_ops(ops);
1377 
1378 	dout("cls_exec returned %d\n", ret);
1379 	return ret;
1380 }
1381 
1382 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1383 {
1384 	struct rbd_req_coll *coll =
1385 			kzalloc(sizeof(struct rbd_req_coll) +
1386 			        sizeof(struct rbd_req_status) * num_reqs,
1387 				GFP_ATOMIC);
1388 
1389 	if (!coll)
1390 		return NULL;
1391 	coll->total = num_reqs;
1392 	kref_init(&coll->kref);
1393 	return coll;
1394 }
1395 
1396 /*
1397  * block device queue callback
1398  */
1399 static void rbd_rq_fn(struct request_queue *q)
1400 {
1401 	struct rbd_device *rbd_dev = q->queuedata;
1402 	struct request *rq;
1403 	struct bio_pair *bp = NULL;
1404 
1405 	rq = blk_fetch_request(q);
1406 
1407 	while (1) {
1408 		struct bio *bio;
1409 		struct bio *rq_bio, *next_bio = NULL;
1410 		bool do_write;
1411 		int size, op_size = 0;
1412 		u64 ofs;
1413 		int num_segs, cur_seg = 0;
1414 		struct rbd_req_coll *coll;
1415 
1416 		/* peek at request from block layer */
1417 		if (!rq)
1418 			break;
1419 
1420 		dout("fetched request\n");
1421 
1422 		/* filter out block requests we don't understand */
1423 		if ((rq->cmd_type != REQ_TYPE_FS)) {
1424 			__blk_end_request_all(rq, 0);
1425 			goto next;
1426 		}
1427 
1428 		/* deduce our operation (read, write) */
1429 		do_write = (rq_data_dir(rq) == WRITE);
1430 
1431 		size = blk_rq_bytes(rq);
1432 		ofs = blk_rq_pos(rq) * 512ULL;
1433 		rq_bio = rq->bio;
1434 		if (do_write && rbd_dev->read_only) {
1435 			__blk_end_request_all(rq, -EROFS);
1436 			goto next;
1437 		}
1438 
1439 		spin_unlock_irq(q->queue_lock);
1440 
1441 		dout("%s 0x%x bytes at 0x%llx\n",
1442 		     do_write ? "write" : "read",
1443 		     size, blk_rq_pos(rq) * 512ULL);
1444 
1445 		num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1446 		coll = rbd_alloc_coll(num_segs);
1447 		if (!coll) {
1448 			spin_lock_irq(q->queue_lock);
1449 			__blk_end_request_all(rq, -ENOMEM);
1450 			goto next;
1451 		}
1452 
1453 		do {
1454 			/* a bio clone to be passed down to OSD req */
1455 			dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1456 			op_size = rbd_get_segment(&rbd_dev->header,
1457 						  rbd_dev->header.block_name,
1458 						  ofs, size,
1459 						  NULL, NULL);
1460 			kref_get(&coll->kref);
1461 			bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1462 					      op_size, GFP_ATOMIC);
1463 			if (!bio) {
1464 				rbd_coll_end_req_index(rq, coll, cur_seg,
1465 						       -ENOMEM, op_size);
1466 				goto next_seg;
1467 			}
1468 
1469 
1470 			/* init OSD command: write or read */
1471 			if (do_write)
1472 				rbd_req_write(rq, rbd_dev,
1473 					      rbd_dev->header.snapc,
1474 					      ofs,
1475 					      op_size, bio,
1476 					      coll, cur_seg);
1477 			else
1478 				rbd_req_read(rq, rbd_dev,
1479 					     cur_snap_id(rbd_dev),
1480 					     ofs,
1481 					     op_size, bio,
1482 					     coll, cur_seg);
1483 
1484 next_seg:
1485 			size -= op_size;
1486 			ofs += op_size;
1487 
1488 			cur_seg++;
1489 			rq_bio = next_bio;
1490 		} while (size > 0);
1491 		kref_put(&coll->kref, rbd_coll_release);
1492 
1493 		if (bp)
1494 			bio_pair_release(bp);
1495 		spin_lock_irq(q->queue_lock);
1496 next:
1497 		rq = blk_fetch_request(q);
1498 	}
1499 }
1500 
1501 /*
1502  * a queue callback. Makes sure that we don't create a bio that spans across
1503  * multiple osd objects. One exception would be with a single page bios,
1504  * which we handle later at bio_chain_clone
1505  */
1506 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1507 			  struct bio_vec *bvec)
1508 {
1509 	struct rbd_device *rbd_dev = q->queuedata;
1510 	unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1511 	sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1512 	unsigned int bio_sectors = bmd->bi_size >> 9;
1513 	int max;
1514 
1515 	max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1516 				 + bio_sectors)) << 9;
1517 	if (max < 0)
1518 		max = 0; /* bio_add cannot handle a negative return */
1519 	if (max <= bvec->bv_len && bio_sectors == 0)
1520 		return bvec->bv_len;
1521 	return max;
1522 }
1523 
1524 static void rbd_free_disk(struct rbd_device *rbd_dev)
1525 {
1526 	struct gendisk *disk = rbd_dev->disk;
1527 
1528 	if (!disk)
1529 		return;
1530 
1531 	rbd_header_free(&rbd_dev->header);
1532 
1533 	if (disk->flags & GENHD_FL_UP)
1534 		del_gendisk(disk);
1535 	if (disk->queue)
1536 		blk_cleanup_queue(disk->queue);
1537 	put_disk(disk);
1538 }
1539 
1540 /*
1541  * reload the ondisk the header
1542  */
1543 static int rbd_read_header(struct rbd_device *rbd_dev,
1544 			   struct rbd_image_header *header)
1545 {
1546 	ssize_t rc;
1547 	struct rbd_image_header_ondisk *dh;
1548 	int snap_count = 0;
1549 	u64 snap_names_len = 0;
1550 	u64 ver;
1551 
1552 	while (1) {
1553 		int len = sizeof(*dh) +
1554 			  snap_count * sizeof(struct rbd_image_snap_ondisk) +
1555 			  snap_names_len;
1556 
1557 		rc = -ENOMEM;
1558 		dh = kmalloc(len, GFP_KERNEL);
1559 		if (!dh)
1560 			return -ENOMEM;
1561 
1562 		rc = rbd_req_sync_read(rbd_dev,
1563 				       NULL, CEPH_NOSNAP,
1564 				       rbd_dev->obj_md_name,
1565 				       0, len,
1566 				       (char *)dh, &ver);
1567 		if (rc < 0)
1568 			goto out_dh;
1569 
1570 		rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1571 		if (rc < 0)
1572 			goto out_dh;
1573 
1574 		if (snap_count != header->total_snaps) {
1575 			snap_count = header->total_snaps;
1576 			snap_names_len = header->snap_names_len;
1577 			rbd_header_free(header);
1578 			kfree(dh);
1579 			continue;
1580 		}
1581 		break;
1582 	}
1583 	header->obj_version = ver;
1584 
1585 out_dh:
1586 	kfree(dh);
1587 	return rc;
1588 }
1589 
1590 /*
1591  * create a snapshot
1592  */
1593 static int rbd_header_add_snap(struct rbd_device *dev,
1594 			       const char *snap_name,
1595 			       gfp_t gfp_flags)
1596 {
1597 	int name_len = strlen(snap_name);
1598 	u64 new_snapid;
1599 	int ret;
1600 	void *data, *data_start, *data_end;
1601 	u64 ver;
1602 
1603 	/* we should create a snapshot only if we're pointing at the head */
1604 	if (dev->cur_snap)
1605 		return -EINVAL;
1606 
1607 	ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
1608 				      &new_snapid);
1609 	dout("created snapid=%lld\n", new_snapid);
1610 	if (ret < 0)
1611 		return ret;
1612 
1613 	data = kmalloc(name_len + 16, gfp_flags);
1614 	if (!data)
1615 		return -ENOMEM;
1616 
1617 	data_start = data;
1618 	data_end = data + name_len + 16;
1619 
1620 	ceph_encode_string_safe(&data, data_end, snap_name, name_len, bad);
1621 	ceph_encode_64_safe(&data, data_end, new_snapid, bad);
1622 
1623 	ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1624 				data_start, data - data_start, &ver);
1625 
1626 	kfree(data_start);
1627 
1628 	if (ret < 0)
1629 		return ret;
1630 
1631 	dev->header.snapc->seq =  new_snapid;
1632 
1633 	return 0;
1634 bad:
1635 	return -ERANGE;
1636 }
1637 
1638 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1639 {
1640 	struct rbd_snap *snap;
1641 
1642 	while (!list_empty(&rbd_dev->snaps)) {
1643 		snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1644 		__rbd_remove_snap_dev(rbd_dev, snap);
1645 	}
1646 }
1647 
1648 /*
1649  * only read the first part of the ondisk header, without the snaps info
1650  */
1651 static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1652 {
1653 	int ret;
1654 	struct rbd_image_header h;
1655 	u64 snap_seq;
1656 	int follow_seq = 0;
1657 
1658 	ret = rbd_read_header(rbd_dev, &h);
1659 	if (ret < 0)
1660 		return ret;
1661 
1662 	down_write(&rbd_dev->header.snap_rwsem);
1663 
1664 	snap_seq = rbd_dev->header.snapc->seq;
1665 	if (rbd_dev->header.total_snaps &&
1666 	    rbd_dev->header.snapc->snaps[0] == snap_seq)
1667 		/* pointing at the head, will need to follow that
1668 		   if head moves */
1669 		follow_seq = 1;
1670 
1671 	kfree(rbd_dev->header.snapc);
1672 	kfree(rbd_dev->header.snap_names);
1673 	kfree(rbd_dev->header.snap_sizes);
1674 
1675 	rbd_dev->header.total_snaps = h.total_snaps;
1676 	rbd_dev->header.snapc = h.snapc;
1677 	rbd_dev->header.snap_names = h.snap_names;
1678 	rbd_dev->header.snap_names_len = h.snap_names_len;
1679 	rbd_dev->header.snap_sizes = h.snap_sizes;
1680 	if (follow_seq)
1681 		rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1682 	else
1683 		rbd_dev->header.snapc->seq = snap_seq;
1684 
1685 	ret = __rbd_init_snaps_header(rbd_dev);
1686 
1687 	up_write(&rbd_dev->header.snap_rwsem);
1688 
1689 	return ret;
1690 }
1691 
1692 static int rbd_init_disk(struct rbd_device *rbd_dev)
1693 {
1694 	struct gendisk *disk;
1695 	struct request_queue *q;
1696 	int rc;
1697 	u64 total_size = 0;
1698 
1699 	/* contact OSD, request size info about the object being mapped */
1700 	rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1701 	if (rc)
1702 		return rc;
1703 
1704 	/* no need to lock here, as rbd_dev is not registered yet */
1705 	rc = __rbd_init_snaps_header(rbd_dev);
1706 	if (rc)
1707 		return rc;
1708 
1709 	rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
1710 	if (rc)
1711 		return rc;
1712 
1713 	/* create gendisk info */
1714 	rc = -ENOMEM;
1715 	disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1716 	if (!disk)
1717 		goto out;
1718 
1719 	sprintf(disk->disk_name, DRV_NAME "%d", rbd_dev->id);
1720 	disk->major = rbd_dev->major;
1721 	disk->first_minor = 0;
1722 	disk->fops = &rbd_bd_ops;
1723 	disk->private_data = rbd_dev;
1724 
1725 	/* init rq */
1726 	rc = -ENOMEM;
1727 	q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1728 	if (!q)
1729 		goto out_disk;
1730 	blk_queue_merge_bvec(q, rbd_merge_bvec);
1731 	disk->queue = q;
1732 
1733 	q->queuedata = rbd_dev;
1734 
1735 	rbd_dev->disk = disk;
1736 	rbd_dev->q = q;
1737 
1738 	/* finally, announce the disk to the world */
1739 	set_capacity(disk, total_size / 512ULL);
1740 	add_disk(disk);
1741 
1742 	pr_info("%s: added with size 0x%llx\n",
1743 		disk->disk_name, (unsigned long long)total_size);
1744 	return 0;
1745 
1746 out_disk:
1747 	put_disk(disk);
1748 out:
1749 	return rc;
1750 }
1751 
1752 /*
1753   sysfs
1754 */
1755 
1756 static ssize_t rbd_size_show(struct device *dev,
1757 			     struct device_attribute *attr, char *buf)
1758 {
1759 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1760 
1761 	return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1762 }
1763 
1764 static ssize_t rbd_major_show(struct device *dev,
1765 			      struct device_attribute *attr, char *buf)
1766 {
1767 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1768 
1769 	return sprintf(buf, "%d\n", rbd_dev->major);
1770 }
1771 
1772 static ssize_t rbd_client_id_show(struct device *dev,
1773 				  struct device_attribute *attr, char *buf)
1774 {
1775 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1776 
1777 	return sprintf(buf, "client%lld\n", ceph_client_id(rbd_dev->client));
1778 }
1779 
1780 static ssize_t rbd_pool_show(struct device *dev,
1781 			     struct device_attribute *attr, char *buf)
1782 {
1783 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1784 
1785 	return sprintf(buf, "%s\n", rbd_dev->pool_name);
1786 }
1787 
1788 static ssize_t rbd_name_show(struct device *dev,
1789 			     struct device_attribute *attr, char *buf)
1790 {
1791 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1792 
1793 	return sprintf(buf, "%s\n", rbd_dev->obj);
1794 }
1795 
1796 static ssize_t rbd_snap_show(struct device *dev,
1797 			     struct device_attribute *attr,
1798 			     char *buf)
1799 {
1800 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1801 
1802 	return sprintf(buf, "%s\n", rbd_dev->snap_name);
1803 }
1804 
1805 static ssize_t rbd_image_refresh(struct device *dev,
1806 				 struct device_attribute *attr,
1807 				 const char *buf,
1808 				 size_t size)
1809 {
1810 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1811 	int rc;
1812 	int ret = size;
1813 
1814 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1815 
1816 	rc = __rbd_update_snaps(rbd_dev);
1817 	if (rc < 0)
1818 		ret = rc;
1819 
1820 	mutex_unlock(&ctl_mutex);
1821 	return ret;
1822 }
1823 
1824 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1825 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1826 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1827 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1828 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1829 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1830 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1831 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1832 static DEVICE_ATTR(rollback_snap, S_IWUSR, NULL, rbd_snap_rollback);
1833 
1834 static struct attribute *rbd_attrs[] = {
1835 	&dev_attr_size.attr,
1836 	&dev_attr_major.attr,
1837 	&dev_attr_client_id.attr,
1838 	&dev_attr_pool.attr,
1839 	&dev_attr_name.attr,
1840 	&dev_attr_current_snap.attr,
1841 	&dev_attr_refresh.attr,
1842 	&dev_attr_create_snap.attr,
1843 	&dev_attr_rollback_snap.attr,
1844 	NULL
1845 };
1846 
1847 static struct attribute_group rbd_attr_group = {
1848 	.attrs = rbd_attrs,
1849 };
1850 
1851 static const struct attribute_group *rbd_attr_groups[] = {
1852 	&rbd_attr_group,
1853 	NULL
1854 };
1855 
1856 static void rbd_sysfs_dev_release(struct device *dev)
1857 {
1858 }
1859 
1860 static struct device_type rbd_device_type = {
1861 	.name		= "rbd",
1862 	.groups		= rbd_attr_groups,
1863 	.release	= rbd_sysfs_dev_release,
1864 };
1865 
1866 
1867 /*
1868   sysfs - snapshots
1869 */
1870 
1871 static ssize_t rbd_snap_size_show(struct device *dev,
1872 				  struct device_attribute *attr,
1873 				  char *buf)
1874 {
1875 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1876 
1877 	return sprintf(buf, "%lld\n", (long long)snap->size);
1878 }
1879 
1880 static ssize_t rbd_snap_id_show(struct device *dev,
1881 				struct device_attribute *attr,
1882 				char *buf)
1883 {
1884 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1885 
1886 	return sprintf(buf, "%lld\n", (long long)snap->id);
1887 }
1888 
1889 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1890 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1891 
1892 static struct attribute *rbd_snap_attrs[] = {
1893 	&dev_attr_snap_size.attr,
1894 	&dev_attr_snap_id.attr,
1895 	NULL,
1896 };
1897 
1898 static struct attribute_group rbd_snap_attr_group = {
1899 	.attrs = rbd_snap_attrs,
1900 };
1901 
1902 static void rbd_snap_dev_release(struct device *dev)
1903 {
1904 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1905 	kfree(snap->name);
1906 	kfree(snap);
1907 }
1908 
1909 static const struct attribute_group *rbd_snap_attr_groups[] = {
1910 	&rbd_snap_attr_group,
1911 	NULL
1912 };
1913 
1914 static struct device_type rbd_snap_device_type = {
1915 	.groups		= rbd_snap_attr_groups,
1916 	.release	= rbd_snap_dev_release,
1917 };
1918 
1919 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1920 				  struct rbd_snap *snap)
1921 {
1922 	list_del(&snap->node);
1923 	device_unregister(&snap->dev);
1924 }
1925 
1926 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1927 				  struct rbd_snap *snap,
1928 				  struct device *parent)
1929 {
1930 	struct device *dev = &snap->dev;
1931 	int ret;
1932 
1933 	dev->type = &rbd_snap_device_type;
1934 	dev->parent = parent;
1935 	dev->release = rbd_snap_dev_release;
1936 	dev_set_name(dev, "snap_%s", snap->name);
1937 	ret = device_register(dev);
1938 
1939 	return ret;
1940 }
1941 
1942 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
1943 			      int i, const char *name,
1944 			      struct rbd_snap **snapp)
1945 {
1946 	int ret;
1947 	struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
1948 	if (!snap)
1949 		return -ENOMEM;
1950 	snap->name = kstrdup(name, GFP_KERNEL);
1951 	snap->size = rbd_dev->header.snap_sizes[i];
1952 	snap->id = rbd_dev->header.snapc->snaps[i];
1953 	if (device_is_registered(&rbd_dev->dev)) {
1954 		ret = rbd_register_snap_dev(rbd_dev, snap,
1955 					     &rbd_dev->dev);
1956 		if (ret < 0)
1957 			goto err;
1958 	}
1959 	*snapp = snap;
1960 	return 0;
1961 err:
1962 	kfree(snap->name);
1963 	kfree(snap);
1964 	return ret;
1965 }
1966 
1967 /*
1968  * search for the previous snap in a null delimited string list
1969  */
1970 const char *rbd_prev_snap_name(const char *name, const char *start)
1971 {
1972 	if (name < start + 2)
1973 		return NULL;
1974 
1975 	name -= 2;
1976 	while (*name) {
1977 		if (name == start)
1978 			return start;
1979 		name--;
1980 	}
1981 	return name + 1;
1982 }
1983 
1984 /*
1985  * compare the old list of snapshots that we have to what's in the header
1986  * and update it accordingly. Note that the header holds the snapshots
1987  * in a reverse order (from newest to oldest) and we need to go from
1988  * older to new so that we don't get a duplicate snap name when
1989  * doing the process (e.g., removed snapshot and recreated a new
1990  * one with the same name.
1991  */
1992 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
1993 {
1994 	const char *name, *first_name;
1995 	int i = rbd_dev->header.total_snaps;
1996 	struct rbd_snap *snap, *old_snap = NULL;
1997 	int ret;
1998 	struct list_head *p, *n;
1999 
2000 	first_name = rbd_dev->header.snap_names;
2001 	name = first_name + rbd_dev->header.snap_names_len;
2002 
2003 	list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2004 		u64 cur_id;
2005 
2006 		old_snap = list_entry(p, struct rbd_snap, node);
2007 
2008 		if (i)
2009 			cur_id = rbd_dev->header.snapc->snaps[i - 1];
2010 
2011 		if (!i || old_snap->id < cur_id) {
2012 			/* old_snap->id was skipped, thus was removed */
2013 			__rbd_remove_snap_dev(rbd_dev, old_snap);
2014 			continue;
2015 		}
2016 		if (old_snap->id == cur_id) {
2017 			/* we have this snapshot already */
2018 			i--;
2019 			name = rbd_prev_snap_name(name, first_name);
2020 			continue;
2021 		}
2022 		for (; i > 0;
2023 		     i--, name = rbd_prev_snap_name(name, first_name)) {
2024 			if (!name) {
2025 				WARN_ON(1);
2026 				return -EINVAL;
2027 			}
2028 			cur_id = rbd_dev->header.snapc->snaps[i];
2029 			/* snapshot removal? handle it above */
2030 			if (cur_id >= old_snap->id)
2031 				break;
2032 			/* a new snapshot */
2033 			ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2034 			if (ret < 0)
2035 				return ret;
2036 
2037 			/* note that we add it backward so using n and not p */
2038 			list_add(&snap->node, n);
2039 			p = &snap->node;
2040 		}
2041 	}
2042 	/* we're done going over the old snap list, just add what's left */
2043 	for (; i > 0; i--) {
2044 		name = rbd_prev_snap_name(name, first_name);
2045 		if (!name) {
2046 			WARN_ON(1);
2047 			return -EINVAL;
2048 		}
2049 		ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2050 		if (ret < 0)
2051 			return ret;
2052 		list_add(&snap->node, &rbd_dev->snaps);
2053 	}
2054 
2055 	return 0;
2056 }
2057 
2058 
2059 static void rbd_root_dev_release(struct device *dev)
2060 {
2061 }
2062 
2063 static struct device rbd_root_dev = {
2064 	.init_name =    "rbd",
2065 	.release =      rbd_root_dev_release,
2066 };
2067 
2068 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2069 {
2070 	int ret = -ENOMEM;
2071 	struct device *dev;
2072 	struct rbd_snap *snap;
2073 
2074 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2075 	dev = &rbd_dev->dev;
2076 
2077 	dev->bus = &rbd_bus_type;
2078 	dev->type = &rbd_device_type;
2079 	dev->parent = &rbd_root_dev;
2080 	dev->release = rbd_dev_release;
2081 	dev_set_name(dev, "%d", rbd_dev->id);
2082 	ret = device_register(dev);
2083 	if (ret < 0)
2084 		goto done_free;
2085 
2086 	list_for_each_entry(snap, &rbd_dev->snaps, node) {
2087 		ret = rbd_register_snap_dev(rbd_dev, snap,
2088 					     &rbd_dev->dev);
2089 		if (ret < 0)
2090 			break;
2091 	}
2092 
2093 	mutex_unlock(&ctl_mutex);
2094 	return 0;
2095 done_free:
2096 	mutex_unlock(&ctl_mutex);
2097 	return ret;
2098 }
2099 
2100 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2101 {
2102 	device_unregister(&rbd_dev->dev);
2103 }
2104 
2105 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2106 {
2107 	int ret, rc;
2108 
2109 	do {
2110 		ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2111 					 rbd_dev->header.obj_version);
2112 		if (ret == -ERANGE) {
2113 			mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2114 			rc = __rbd_update_snaps(rbd_dev);
2115 			mutex_unlock(&ctl_mutex);
2116 			if (rc < 0)
2117 				return rc;
2118 		}
2119 	} while (ret == -ERANGE);
2120 
2121 	return ret;
2122 }
2123 
2124 static ssize_t rbd_add(struct bus_type *bus,
2125 		       const char *buf,
2126 		       size_t count)
2127 {
2128 	struct ceph_osd_client *osdc;
2129 	struct rbd_device *rbd_dev;
2130 	ssize_t rc = -ENOMEM;
2131 	int irc, new_id = 0;
2132 	struct list_head *tmp;
2133 	char *mon_dev_name;
2134 	char *options;
2135 
2136 	if (!try_module_get(THIS_MODULE))
2137 		return -ENODEV;
2138 
2139 	mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2140 	if (!mon_dev_name)
2141 		goto err_out_mod;
2142 
2143 	options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2144 	if (!options)
2145 		goto err_mon_dev;
2146 
2147 	/* new rbd_device object */
2148 	rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2149 	if (!rbd_dev)
2150 		goto err_out_opt;
2151 
2152 	/* static rbd_device initialization */
2153 	spin_lock_init(&rbd_dev->lock);
2154 	INIT_LIST_HEAD(&rbd_dev->node);
2155 	INIT_LIST_HEAD(&rbd_dev->snaps);
2156 
2157 	/* generate unique id: find highest unique id, add one */
2158 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2159 
2160 	list_for_each(tmp, &rbd_dev_list) {
2161 		struct rbd_device *rbd_dev;
2162 
2163 		rbd_dev = list_entry(tmp, struct rbd_device, node);
2164 		if (rbd_dev->id >= new_id)
2165 			new_id = rbd_dev->id + 1;
2166 	}
2167 
2168 	rbd_dev->id = new_id;
2169 
2170 	/* add to global list */
2171 	list_add_tail(&rbd_dev->node, &rbd_dev_list);
2172 
2173 	/* parse add command */
2174 	if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
2175 		   "%" __stringify(RBD_MAX_OPT_LEN) "s "
2176 		   "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
2177 		   "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
2178 		   "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
2179 		   mon_dev_name, options, rbd_dev->pool_name,
2180 		   rbd_dev->obj, rbd_dev->snap_name) < 4) {
2181 		rc = -EINVAL;
2182 		goto err_out_slot;
2183 	}
2184 
2185 	if (rbd_dev->snap_name[0] == 0)
2186 		rbd_dev->snap_name[0] = '-';
2187 
2188 	rbd_dev->obj_len = strlen(rbd_dev->obj);
2189 	snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
2190 		 rbd_dev->obj, RBD_SUFFIX);
2191 
2192 	/* initialize rest of new object */
2193 	snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
2194 	rc = rbd_get_client(rbd_dev, mon_dev_name, options);
2195 	if (rc < 0)
2196 		goto err_out_slot;
2197 
2198 	mutex_unlock(&ctl_mutex);
2199 
2200 	/* pick the pool */
2201 	osdc = &rbd_dev->client->osdc;
2202 	rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2203 	if (rc < 0)
2204 		goto err_out_client;
2205 	rbd_dev->poolid = rc;
2206 
2207 	/* register our block device */
2208 	irc = register_blkdev(0, rbd_dev->name);
2209 	if (irc < 0) {
2210 		rc = irc;
2211 		goto err_out_client;
2212 	}
2213 	rbd_dev->major = irc;
2214 
2215 	rc = rbd_bus_add_dev(rbd_dev);
2216 	if (rc)
2217 		goto err_out_blkdev;
2218 
2219 	/* set up and announce blkdev mapping */
2220 	rc = rbd_init_disk(rbd_dev);
2221 	if (rc)
2222 		goto err_out_bus;
2223 
2224 	rc = rbd_init_watch_dev(rbd_dev);
2225 	if (rc)
2226 		goto err_out_bus;
2227 
2228 	return count;
2229 
2230 err_out_bus:
2231 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2232 	list_del_init(&rbd_dev->node);
2233 	mutex_unlock(&ctl_mutex);
2234 
2235 	/* this will also clean up rest of rbd_dev stuff */
2236 
2237 	rbd_bus_del_dev(rbd_dev);
2238 	kfree(options);
2239 	kfree(mon_dev_name);
2240 	return rc;
2241 
2242 err_out_blkdev:
2243 	unregister_blkdev(rbd_dev->major, rbd_dev->name);
2244 err_out_client:
2245 	rbd_put_client(rbd_dev);
2246 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2247 err_out_slot:
2248 	list_del_init(&rbd_dev->node);
2249 	mutex_unlock(&ctl_mutex);
2250 
2251 	kfree(rbd_dev);
2252 err_out_opt:
2253 	kfree(options);
2254 err_mon_dev:
2255 	kfree(mon_dev_name);
2256 err_out_mod:
2257 	dout("Error adding device %s\n", buf);
2258 	module_put(THIS_MODULE);
2259 	return rc;
2260 }
2261 
2262 static struct rbd_device *__rbd_get_dev(unsigned long id)
2263 {
2264 	struct list_head *tmp;
2265 	struct rbd_device *rbd_dev;
2266 
2267 	list_for_each(tmp, &rbd_dev_list) {
2268 		rbd_dev = list_entry(tmp, struct rbd_device, node);
2269 		if (rbd_dev->id == id)
2270 			return rbd_dev;
2271 	}
2272 	return NULL;
2273 }
2274 
2275 static void rbd_dev_release(struct device *dev)
2276 {
2277 	struct rbd_device *rbd_dev =
2278 			container_of(dev, struct rbd_device, dev);
2279 
2280 	if (rbd_dev->watch_request)
2281 		ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc,
2282 						    rbd_dev->watch_request);
2283 	if (rbd_dev->watch_event)
2284 		ceph_osdc_cancel_event(rbd_dev->watch_event);
2285 
2286 	rbd_put_client(rbd_dev);
2287 
2288 	/* clean up and free blkdev */
2289 	rbd_free_disk(rbd_dev);
2290 	unregister_blkdev(rbd_dev->major, rbd_dev->name);
2291 	kfree(rbd_dev);
2292 
2293 	/* release module ref */
2294 	module_put(THIS_MODULE);
2295 }
2296 
2297 static ssize_t rbd_remove(struct bus_type *bus,
2298 			  const char *buf,
2299 			  size_t count)
2300 {
2301 	struct rbd_device *rbd_dev = NULL;
2302 	int target_id, rc;
2303 	unsigned long ul;
2304 	int ret = count;
2305 
2306 	rc = strict_strtoul(buf, 10, &ul);
2307 	if (rc)
2308 		return rc;
2309 
2310 	/* convert to int; abort if we lost anything in the conversion */
2311 	target_id = (int) ul;
2312 	if (target_id != ul)
2313 		return -EINVAL;
2314 
2315 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2316 
2317 	rbd_dev = __rbd_get_dev(target_id);
2318 	if (!rbd_dev) {
2319 		ret = -ENOENT;
2320 		goto done;
2321 	}
2322 
2323 	list_del_init(&rbd_dev->node);
2324 
2325 	__rbd_remove_all_snaps(rbd_dev);
2326 	rbd_bus_del_dev(rbd_dev);
2327 
2328 done:
2329 	mutex_unlock(&ctl_mutex);
2330 	return ret;
2331 }
2332 
2333 static ssize_t rbd_snap_add(struct device *dev,
2334 			    struct device_attribute *attr,
2335 			    const char *buf,
2336 			    size_t count)
2337 {
2338 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
2339 	int ret;
2340 	char *name = kmalloc(count + 1, GFP_KERNEL);
2341 	if (!name)
2342 		return -ENOMEM;
2343 
2344 	snprintf(name, count, "%s", buf);
2345 
2346 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2347 
2348 	ret = rbd_header_add_snap(rbd_dev,
2349 				  name, GFP_KERNEL);
2350 	if (ret < 0)
2351 		goto err_unlock;
2352 
2353 	ret = __rbd_update_snaps(rbd_dev);
2354 	if (ret < 0)
2355 		goto err_unlock;
2356 
2357 	/* shouldn't hold ctl_mutex when notifying.. notify might
2358 	   trigger a watch callback that would need to get that mutex */
2359 	mutex_unlock(&ctl_mutex);
2360 
2361 	/* make a best effort, don't error if failed */
2362 	rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2363 
2364 	ret = count;
2365 	kfree(name);
2366 	return ret;
2367 
2368 err_unlock:
2369 	mutex_unlock(&ctl_mutex);
2370 	kfree(name);
2371 	return ret;
2372 }
2373 
2374 static ssize_t rbd_snap_rollback(struct device *dev,
2375 				 struct device_attribute *attr,
2376 				 const char *buf,
2377 				 size_t count)
2378 {
2379 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
2380 	int ret;
2381 	u64 snapid;
2382 	u64 cur_ofs;
2383 	char *seg_name = NULL;
2384 	char *snap_name = kmalloc(count + 1, GFP_KERNEL);
2385 	ret = -ENOMEM;
2386 	if (!snap_name)
2387 		return ret;
2388 
2389 	/* parse snaps add command */
2390 	snprintf(snap_name, count, "%s", buf);
2391 	seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
2392 	if (!seg_name)
2393 		goto done;
2394 
2395 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2396 
2397 	ret = snap_by_name(&rbd_dev->header, snap_name, &snapid, NULL);
2398 	if (ret < 0)
2399 		goto done_unlock;
2400 
2401 	dout("snapid=%lld\n", snapid);
2402 
2403 	cur_ofs = 0;
2404 	while (cur_ofs < rbd_dev->header.image_size) {
2405 		cur_ofs += rbd_get_segment(&rbd_dev->header,
2406 					   rbd_dev->obj,
2407 					   cur_ofs, (u64)-1,
2408 					   seg_name, NULL);
2409 		dout("seg_name=%s\n", seg_name);
2410 
2411 		ret = rbd_req_sync_rollback_obj(rbd_dev, snapid, seg_name);
2412 		if (ret < 0)
2413 			pr_warning("could not roll back obj %s err=%d\n",
2414 				   seg_name, ret);
2415 	}
2416 
2417 	ret = __rbd_update_snaps(rbd_dev);
2418 	if (ret < 0)
2419 		goto done_unlock;
2420 
2421 	ret = count;
2422 
2423 done_unlock:
2424 	mutex_unlock(&ctl_mutex);
2425 done:
2426 	kfree(seg_name);
2427 	kfree(snap_name);
2428 
2429 	return ret;
2430 }
2431 
2432 static struct bus_attribute rbd_bus_attrs[] = {
2433 	__ATTR(add, S_IWUSR, NULL, rbd_add),
2434 	__ATTR(remove, S_IWUSR, NULL, rbd_remove),
2435 	__ATTR_NULL
2436 };
2437 
2438 /*
2439  * create control files in sysfs
2440  * /sys/bus/rbd/...
2441  */
2442 static int rbd_sysfs_init(void)
2443 {
2444 	int ret;
2445 
2446 	rbd_bus_type.bus_attrs = rbd_bus_attrs;
2447 
2448 	ret = bus_register(&rbd_bus_type);
2449 	 if (ret < 0)
2450 		return ret;
2451 
2452 	ret = device_register(&rbd_root_dev);
2453 
2454 	return ret;
2455 }
2456 
2457 static void rbd_sysfs_cleanup(void)
2458 {
2459 	device_unregister(&rbd_root_dev);
2460 	bus_unregister(&rbd_bus_type);
2461 }
2462 
2463 int __init rbd_init(void)
2464 {
2465 	int rc;
2466 
2467 	rc = rbd_sysfs_init();
2468 	if (rc)
2469 		return rc;
2470 	spin_lock_init(&node_lock);
2471 	pr_info("loaded " DRV_NAME_LONG "\n");
2472 	return 0;
2473 }
2474 
2475 void __exit rbd_exit(void)
2476 {
2477 	rbd_sysfs_cleanup();
2478 }
2479 
2480 module_init(rbd_init);
2481 module_exit(rbd_exit);
2482 
2483 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2484 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2485 MODULE_DESCRIPTION("rados block device");
2486 
2487 /* following authorship retained from original osdblk.c */
2488 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2489 
2490 MODULE_LICENSE("GPL");
2491