1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
34#include <linux/ceph/cls_lock_client.h>
35#include <linux/ceph/striper.h>
36#include <linux/ceph/decode.h>
37#include <linux/fs_parser.h>
38#include <linux/bsearch.h>
39
40#include <linux/kernel.h>
41#include <linux/device.h>
42#include <linux/module.h>
43#include <linux/blk-mq.h>
44#include <linux/fs.h>
45#include <linux/blkdev.h>
46#include <linux/slab.h>
47#include <linux/idr.h>
48#include <linux/workqueue.h>
49
50#include "rbd_types.h"
51
52#define RBD_DEBUG
53
54
55
56
57
58
59
60static int atomic_inc_return_safe(atomic_t *v)
61{
62 unsigned int counter;
63
64 counter = (unsigned int)atomic_fetch_add_unless(v, 1, 0);
65 if (counter <= (unsigned int)INT_MAX)
66 return (int)counter;
67
68 atomic_dec(v);
69
70 return -EINVAL;
71}
72
73
74static int atomic_dec_return_safe(atomic_t *v)
75{
76 int counter;
77
78 counter = atomic_dec_return(v);
79 if (counter >= 0)
80 return counter;
81
82 atomic_inc(v);
83
84 return -EINVAL;
85}
86
87#define RBD_DRV_NAME "rbd"
88
89#define RBD_MINORS_PER_MAJOR 256
90#define RBD_SINGLE_MAJOR_PART_SHIFT 4
91
92#define RBD_MAX_PARENT_CHAIN_LEN 16
93
94#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
95#define RBD_MAX_SNAP_NAME_LEN \
96 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
97
98#define RBD_MAX_SNAP_COUNT 510
99
100#define RBD_SNAP_HEAD_NAME "-"
101
102#define BAD_SNAP_INDEX U32_MAX
103
104
105#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
106#define RBD_IMAGE_ID_LEN_MAX 64
107
108#define RBD_OBJ_PREFIX_LEN_MAX 64
109
110#define RBD_NOTIFY_TIMEOUT 5
111#define RBD_RETRY_DELAY msecs_to_jiffies(1000)
112
113
114
115#define RBD_FEATURE_LAYERING (1ULL<<0)
116#define RBD_FEATURE_STRIPINGV2 (1ULL<<1)
117#define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2)
118#define RBD_FEATURE_OBJECT_MAP (1ULL<<3)
119#define RBD_FEATURE_FAST_DIFF (1ULL<<4)
120#define RBD_FEATURE_DEEP_FLATTEN (1ULL<<5)
121#define RBD_FEATURE_DATA_POOL (1ULL<<7)
122#define RBD_FEATURE_OPERATIONS (1ULL<<8)
123
124#define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \
125 RBD_FEATURE_STRIPINGV2 | \
126 RBD_FEATURE_EXCLUSIVE_LOCK | \
127 RBD_FEATURE_OBJECT_MAP | \
128 RBD_FEATURE_FAST_DIFF | \
129 RBD_FEATURE_DEEP_FLATTEN | \
130 RBD_FEATURE_DATA_POOL | \
131 RBD_FEATURE_OPERATIONS)
132
133
134
135#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
136
137
138
139
140
141#define DEV_NAME_LEN 32
142
143
144
145
146struct rbd_image_header {
147
148 char *object_prefix;
149 __u8 obj_order;
150 u64 stripe_unit;
151 u64 stripe_count;
152 s64 data_pool_id;
153 u64 features;
154
155
156 u64 image_size;
157 struct ceph_snap_context *snapc;
158 char *snap_names;
159 u64 *snap_sizes;
160};
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187struct rbd_spec {
188 u64 pool_id;
189 const char *pool_name;
190 const char *pool_ns;
191
192 const char *image_id;
193 const char *image_name;
194
195 u64 snap_id;
196 const char *snap_name;
197
198 struct kref kref;
199};
200
201
202
203
204struct rbd_client {
205 struct ceph_client *client;
206 struct kref kref;
207 struct list_head node;
208};
209
210struct pending_result {
211 int result;
212 int num_pending;
213};
214
215struct rbd_img_request;
216
217enum obj_request_type {
218 OBJ_REQUEST_NODATA = 1,
219 OBJ_REQUEST_BIO,
220 OBJ_REQUEST_BVECS,
221 OBJ_REQUEST_OWN_BVECS,
222};
223
224enum obj_operation_type {
225 OBJ_OP_READ = 1,
226 OBJ_OP_WRITE,
227 OBJ_OP_DISCARD,
228 OBJ_OP_ZEROOUT,
229};
230
231#define RBD_OBJ_FLAG_DELETION (1U << 0)
232#define RBD_OBJ_FLAG_COPYUP_ENABLED (1U << 1)
233#define RBD_OBJ_FLAG_COPYUP_ZEROS (1U << 2)
234#define RBD_OBJ_FLAG_MAY_EXIST (1U << 3)
235#define RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT (1U << 4)
236
237enum rbd_obj_read_state {
238 RBD_OBJ_READ_START = 1,
239 RBD_OBJ_READ_OBJECT,
240 RBD_OBJ_READ_PARENT,
241};
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268enum rbd_obj_write_state {
269 RBD_OBJ_WRITE_START = 1,
270 RBD_OBJ_WRITE_PRE_OBJECT_MAP,
271 RBD_OBJ_WRITE_OBJECT,
272 __RBD_OBJ_WRITE_COPYUP,
273 RBD_OBJ_WRITE_COPYUP,
274 RBD_OBJ_WRITE_POST_OBJECT_MAP,
275};
276
277enum rbd_obj_copyup_state {
278 RBD_OBJ_COPYUP_START = 1,
279 RBD_OBJ_COPYUP_READ_PARENT,
280 __RBD_OBJ_COPYUP_OBJECT_MAPS,
281 RBD_OBJ_COPYUP_OBJECT_MAPS,
282 __RBD_OBJ_COPYUP_WRITE_OBJECT,
283 RBD_OBJ_COPYUP_WRITE_OBJECT,
284};
285
286struct rbd_obj_request {
287 struct ceph_object_extent ex;
288 unsigned int flags;
289 union {
290 enum rbd_obj_read_state read_state;
291 enum rbd_obj_write_state write_state;
292 };
293
294 struct rbd_img_request *img_request;
295 struct ceph_file_extent *img_extents;
296 u32 num_img_extents;
297
298 union {
299 struct ceph_bio_iter bio_pos;
300 struct {
301 struct ceph_bvec_iter bvec_pos;
302 u32 bvec_count;
303 u32 bvec_idx;
304 };
305 };
306
307 enum rbd_obj_copyup_state copyup_state;
308 struct bio_vec *copyup_bvecs;
309 u32 copyup_bvec_count;
310
311 struct list_head osd_reqs;
312
313 struct mutex state_mutex;
314 struct pending_result pending;
315 struct kref kref;
316};
317
318enum img_req_flags {
319 IMG_REQ_CHILD,
320 IMG_REQ_LAYERED,
321};
322
323enum rbd_img_state {
324 RBD_IMG_START = 1,
325 RBD_IMG_EXCLUSIVE_LOCK,
326 __RBD_IMG_OBJECT_REQUESTS,
327 RBD_IMG_OBJECT_REQUESTS,
328};
329
330struct rbd_img_request {
331 struct rbd_device *rbd_dev;
332 enum obj_operation_type op_type;
333 enum obj_request_type data_type;
334 unsigned long flags;
335 enum rbd_img_state state;
336 union {
337 u64 snap_id;
338 struct ceph_snap_context *snapc;
339 };
340 struct rbd_obj_request *obj_request;
341
342 struct list_head lock_item;
343 struct list_head object_extents;
344
345 struct mutex state_mutex;
346 struct pending_result pending;
347 struct work_struct work;
348 int work_result;
349};
350
351#define for_each_obj_request(ireq, oreq) \
352 list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item)
353#define for_each_obj_request_safe(ireq, oreq, n) \
354 list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item)
355
356enum rbd_watch_state {
357 RBD_WATCH_STATE_UNREGISTERED,
358 RBD_WATCH_STATE_REGISTERED,
359 RBD_WATCH_STATE_ERROR,
360};
361
362enum rbd_lock_state {
363 RBD_LOCK_STATE_UNLOCKED,
364 RBD_LOCK_STATE_LOCKED,
365 RBD_LOCK_STATE_RELEASING,
366};
367
368
369struct rbd_client_id {
370 u64 gid;
371 u64 handle;
372};
373
374struct rbd_mapping {
375 u64 size;
376};
377
378
379
380
381struct rbd_device {
382 int dev_id;
383
384 int major;
385 int minor;
386 struct gendisk *disk;
387
388 u32 image_format;
389 struct rbd_client *rbd_client;
390
391 char name[DEV_NAME_LEN];
392
393 spinlock_t lock;
394
395 struct rbd_image_header header;
396 unsigned long flags;
397 struct rbd_spec *spec;
398 struct rbd_options *opts;
399 char *config_info;
400
401 struct ceph_object_id header_oid;
402 struct ceph_object_locator header_oloc;
403
404 struct ceph_file_layout layout;
405
406 struct mutex watch_mutex;
407 enum rbd_watch_state watch_state;
408 struct ceph_osd_linger_request *watch_handle;
409 u64 watch_cookie;
410 struct delayed_work watch_dwork;
411
412 struct rw_semaphore lock_rwsem;
413 enum rbd_lock_state lock_state;
414 char lock_cookie[32];
415 struct rbd_client_id owner_cid;
416 struct work_struct acquired_lock_work;
417 struct work_struct released_lock_work;
418 struct delayed_work lock_dwork;
419 struct work_struct unlock_work;
420 spinlock_t lock_lists_lock;
421 struct list_head acquiring_list;
422 struct list_head running_list;
423 struct completion acquire_wait;
424 int acquire_err;
425 struct completion releasing_wait;
426
427 spinlock_t object_map_lock;
428 u8 *object_map;
429 u64 object_map_size;
430 u64 object_map_flags;
431
432 struct workqueue_struct *task_wq;
433
434 struct rbd_spec *parent_spec;
435 u64 parent_overlap;
436 atomic_t parent_ref;
437 struct rbd_device *parent;
438
439
440 struct blk_mq_tag_set tag_set;
441
442
443 struct rw_semaphore header_rwsem;
444
445 struct rbd_mapping mapping;
446
447 struct list_head node;
448
449
450 struct device dev;
451 unsigned long open_count;
452};
453
454
455
456
457
458
459enum rbd_dev_flags {
460 RBD_DEV_FLAG_EXISTS,
461 RBD_DEV_FLAG_REMOVING,
462 RBD_DEV_FLAG_READONLY,
463};
464
465static DEFINE_MUTEX(client_mutex);
466
467static LIST_HEAD(rbd_dev_list);
468static DEFINE_SPINLOCK(rbd_dev_list_lock);
469
470static LIST_HEAD(rbd_client_list);
471static DEFINE_SPINLOCK(rbd_client_list_lock);
472
473
474
475static struct kmem_cache *rbd_img_request_cache;
476static struct kmem_cache *rbd_obj_request_cache;
477
478static int rbd_major;
479static DEFINE_IDA(rbd_dev_id_ida);
480
481static struct workqueue_struct *rbd_wq;
482
483static struct ceph_snap_context rbd_empty_snapc = {
484 .nref = REFCOUNT_INIT(1),
485};
486
487
488
489
490static bool single_major = true;
491module_param(single_major, bool, 0444);
492MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
493
494static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count);
495static ssize_t remove_store(struct bus_type *bus, const char *buf,
496 size_t count);
497static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
498 size_t count);
499static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
500 size_t count);
501static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
502
503static int rbd_dev_id_to_minor(int dev_id)
504{
505 return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
506}
507
508static int minor_to_rbd_dev_id(int minor)
509{
510 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
511}
512
513static bool rbd_is_ro(struct rbd_device *rbd_dev)
514{
515 return test_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
516}
517
518static bool rbd_is_snap(struct rbd_device *rbd_dev)
519{
520 return rbd_dev->spec->snap_id != CEPH_NOSNAP;
521}
522
523static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
524{
525 lockdep_assert_held(&rbd_dev->lock_rwsem);
526
527 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
528 rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
529}
530
531static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
532{
533 bool is_lock_owner;
534
535 down_read(&rbd_dev->lock_rwsem);
536 is_lock_owner = __rbd_is_lock_owner(rbd_dev);
537 up_read(&rbd_dev->lock_rwsem);
538 return is_lock_owner;
539}
540
541static ssize_t supported_features_show(struct bus_type *bus, char *buf)
542{
543 return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
544}
545
546static BUS_ATTR_WO(add);
547static BUS_ATTR_WO(remove);
548static BUS_ATTR_WO(add_single_major);
549static BUS_ATTR_WO(remove_single_major);
550static BUS_ATTR_RO(supported_features);
551
552static struct attribute *rbd_bus_attrs[] = {
553 &bus_attr_add.attr,
554 &bus_attr_remove.attr,
555 &bus_attr_add_single_major.attr,
556 &bus_attr_remove_single_major.attr,
557 &bus_attr_supported_features.attr,
558 NULL,
559};
560
561static umode_t rbd_bus_is_visible(struct kobject *kobj,
562 struct attribute *attr, int index)
563{
564 if (!single_major &&
565 (attr == &bus_attr_add_single_major.attr ||
566 attr == &bus_attr_remove_single_major.attr))
567 return 0;
568
569 return attr->mode;
570}
571
572static const struct attribute_group rbd_bus_group = {
573 .attrs = rbd_bus_attrs,
574 .is_visible = rbd_bus_is_visible,
575};
576__ATTRIBUTE_GROUPS(rbd_bus);
577
578static struct bus_type rbd_bus_type = {
579 .name = "rbd",
580 .bus_groups = rbd_bus_groups,
581};
582
583static void rbd_root_dev_release(struct device *dev)
584{
585}
586
587static struct device rbd_root_dev = {
588 .init_name = "rbd",
589 .release = rbd_root_dev_release,
590};
591
592static __printf(2, 3)
593void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
594{
595 struct va_format vaf;
596 va_list args;
597
598 va_start(args, fmt);
599 vaf.fmt = fmt;
600 vaf.va = &args;
601
602 if (!rbd_dev)
603 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
604 else if (rbd_dev->disk)
605 printk(KERN_WARNING "%s: %s: %pV\n",
606 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
607 else if (rbd_dev->spec && rbd_dev->spec->image_name)
608 printk(KERN_WARNING "%s: image %s: %pV\n",
609 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
610 else if (rbd_dev->spec && rbd_dev->spec->image_id)
611 printk(KERN_WARNING "%s: id %s: %pV\n",
612 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
613 else
614 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
615 RBD_DRV_NAME, rbd_dev, &vaf);
616 va_end(args);
617}
618
619#ifdef RBD_DEBUG
620#define rbd_assert(expr) \
621 if (unlikely(!(expr))) { \
622 printk(KERN_ERR "\nAssertion failure in %s() " \
623 "at line %d:\n\n" \
624 "\trbd_assert(%s);\n\n", \
625 __func__, __LINE__, #expr); \
626 BUG(); \
627 }
628#else
629# define rbd_assert(expr) ((void) 0)
630#endif
631
632static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
633
634static int rbd_dev_refresh(struct rbd_device *rbd_dev);
635static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
636static int rbd_dev_header_info(struct rbd_device *rbd_dev);
637static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
638static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
639 u64 snap_id);
640static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
641 u8 *order, u64 *snap_size);
642static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev);
643
644static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result);
645static void rbd_img_handle_request(struct rbd_img_request *img_req, int result);
646
647
648
649
650static bool pending_result_dec(struct pending_result *pending, int *result)
651{
652 rbd_assert(pending->num_pending > 0);
653
654 if (*result && !pending->result)
655 pending->result = *result;
656 if (--pending->num_pending)
657 return false;
658
659 *result = pending->result;
660 return true;
661}
662
663static int rbd_open(struct block_device *bdev, fmode_t mode)
664{
665 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
666 bool removing = false;
667
668 spin_lock_irq(&rbd_dev->lock);
669 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
670 removing = true;
671 else
672 rbd_dev->open_count++;
673 spin_unlock_irq(&rbd_dev->lock);
674 if (removing)
675 return -ENOENT;
676
677 (void) get_device(&rbd_dev->dev);
678
679 return 0;
680}
681
682static void rbd_release(struct gendisk *disk, fmode_t mode)
683{
684 struct rbd_device *rbd_dev = disk->private_data;
685 unsigned long open_count_before;
686
687 spin_lock_irq(&rbd_dev->lock);
688 open_count_before = rbd_dev->open_count--;
689 spin_unlock_irq(&rbd_dev->lock);
690 rbd_assert(open_count_before > 0);
691
692 put_device(&rbd_dev->dev);
693}
694
695static const struct block_device_operations rbd_bd_ops = {
696 .owner = THIS_MODULE,
697 .open = rbd_open,
698 .release = rbd_release,
699};
700
701
702
703
704
705static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
706{
707 struct rbd_client *rbdc;
708 int ret = -ENOMEM;
709
710 dout("%s:\n", __func__);
711 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
712 if (!rbdc)
713 goto out_opt;
714
715 kref_init(&rbdc->kref);
716 INIT_LIST_HEAD(&rbdc->node);
717
718 rbdc->client = ceph_create_client(ceph_opts, rbdc);
719 if (IS_ERR(rbdc->client))
720 goto out_rbdc;
721 ceph_opts = NULL;
722
723 ret = ceph_open_session(rbdc->client);
724 if (ret < 0)
725 goto out_client;
726
727 spin_lock(&rbd_client_list_lock);
728 list_add_tail(&rbdc->node, &rbd_client_list);
729 spin_unlock(&rbd_client_list_lock);
730
731 dout("%s: rbdc %p\n", __func__, rbdc);
732
733 return rbdc;
734out_client:
735 ceph_destroy_client(rbdc->client);
736out_rbdc:
737 kfree(rbdc);
738out_opt:
739 if (ceph_opts)
740 ceph_destroy_options(ceph_opts);
741 dout("%s: error %d\n", __func__, ret);
742
743 return ERR_PTR(ret);
744}
745
746static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
747{
748 kref_get(&rbdc->kref);
749
750 return rbdc;
751}
752
753
754
755
756
757static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
758{
759 struct rbd_client *client_node;
760 bool found = false;
761
762 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
763 return NULL;
764
765 spin_lock(&rbd_client_list_lock);
766 list_for_each_entry(client_node, &rbd_client_list, node) {
767 if (!ceph_compare_options(ceph_opts, client_node->client)) {
768 __rbd_get_client(client_node);
769
770 found = true;
771 break;
772 }
773 }
774 spin_unlock(&rbd_client_list_lock);
775
776 return found ? client_node : NULL;
777}
778
779
780
781
782enum {
783 Opt_queue_depth,
784 Opt_alloc_size,
785 Opt_lock_timeout,
786
787 Opt_pool_ns,
788 Opt_compression_hint,
789
790 Opt_read_only,
791 Opt_read_write,
792 Opt_lock_on_read,
793 Opt_exclusive,
794 Opt_notrim,
795};
796
797enum {
798 Opt_compression_hint_none,
799 Opt_compression_hint_compressible,
800 Opt_compression_hint_incompressible,
801};
802
803static const struct constant_table rbd_param_compression_hint[] = {
804 {"none", Opt_compression_hint_none},
805 {"compressible", Opt_compression_hint_compressible},
806 {"incompressible", Opt_compression_hint_incompressible},
807 {}
808};
809
810static const struct fs_parameter_spec rbd_parameters[] = {
811 fsparam_u32 ("alloc_size", Opt_alloc_size),
812 fsparam_enum ("compression_hint", Opt_compression_hint,
813 rbd_param_compression_hint),
814 fsparam_flag ("exclusive", Opt_exclusive),
815 fsparam_flag ("lock_on_read", Opt_lock_on_read),
816 fsparam_u32 ("lock_timeout", Opt_lock_timeout),
817 fsparam_flag ("notrim", Opt_notrim),
818 fsparam_string ("_pool_ns", Opt_pool_ns),
819 fsparam_u32 ("queue_depth", Opt_queue_depth),
820 fsparam_flag ("read_only", Opt_read_only),
821 fsparam_flag ("read_write", Opt_read_write),
822 fsparam_flag ("ro", Opt_read_only),
823 fsparam_flag ("rw", Opt_read_write),
824 {}
825};
826
827struct rbd_options {
828 int queue_depth;
829 int alloc_size;
830 unsigned long lock_timeout;
831 bool read_only;
832 bool lock_on_read;
833 bool exclusive;
834 bool trim;
835
836 u32 alloc_hint_flags;
837};
838
839#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
840#define RBD_ALLOC_SIZE_DEFAULT (64 * 1024)
841#define RBD_LOCK_TIMEOUT_DEFAULT 0
842#define RBD_READ_ONLY_DEFAULT false
843#define RBD_LOCK_ON_READ_DEFAULT false
844#define RBD_EXCLUSIVE_DEFAULT false
845#define RBD_TRIM_DEFAULT true
846
847struct rbd_parse_opts_ctx {
848 struct rbd_spec *spec;
849 struct ceph_options *copts;
850 struct rbd_options *opts;
851};
852
853static char* obj_op_name(enum obj_operation_type op_type)
854{
855 switch (op_type) {
856 case OBJ_OP_READ:
857 return "read";
858 case OBJ_OP_WRITE:
859 return "write";
860 case OBJ_OP_DISCARD:
861 return "discard";
862 case OBJ_OP_ZEROOUT:
863 return "zeroout";
864 default:
865 return "???";
866 }
867}
868
869
870
871
872
873
874static void rbd_client_release(struct kref *kref)
875{
876 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
877
878 dout("%s: rbdc %p\n", __func__, rbdc);
879 spin_lock(&rbd_client_list_lock);
880 list_del(&rbdc->node);
881 spin_unlock(&rbd_client_list_lock);
882
883 ceph_destroy_client(rbdc->client);
884 kfree(rbdc);
885}
886
887
888
889
890
891static void rbd_put_client(struct rbd_client *rbdc)
892{
893 if (rbdc)
894 kref_put(&rbdc->kref, rbd_client_release);
895}
896
897
898
899
900
901
902static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
903{
904 struct rbd_client *rbdc;
905 int ret;
906
907 mutex_lock(&client_mutex);
908 rbdc = rbd_client_find(ceph_opts);
909 if (rbdc) {
910 ceph_destroy_options(ceph_opts);
911
912
913
914
915
916 ret = ceph_wait_for_latest_osdmap(rbdc->client,
917 rbdc->client->options->mount_timeout);
918 if (ret) {
919 rbd_warn(NULL, "failed to get latest osdmap: %d", ret);
920 rbd_put_client(rbdc);
921 rbdc = ERR_PTR(ret);
922 }
923 } else {
924 rbdc = rbd_client_create(ceph_opts);
925 }
926 mutex_unlock(&client_mutex);
927
928 return rbdc;
929}
930
931static bool rbd_image_format_valid(u32 image_format)
932{
933 return image_format == 1 || image_format == 2;
934}
935
936static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
937{
938 size_t size;
939 u32 snap_count;
940
941
942 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
943 return false;
944
945
946
947 if (ondisk->options.order < SECTOR_SHIFT)
948 return false;
949
950
951
952 if (ondisk->options.order > 8 * sizeof (int) - 1)
953 return false;
954
955
956
957
958
959 snap_count = le32_to_cpu(ondisk->snap_count);
960 size = SIZE_MAX - sizeof (struct ceph_snap_context);
961 if (snap_count > size / sizeof (__le64))
962 return false;
963
964
965
966
967
968 size -= snap_count * sizeof (__le64);
969 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
970 return false;
971
972 return true;
973}
974
975
976
977
978static u32 rbd_obj_bytes(struct rbd_image_header *header)
979{
980 return 1U << header->obj_order;
981}
982
983static void rbd_init_layout(struct rbd_device *rbd_dev)
984{
985 if (rbd_dev->header.stripe_unit == 0 ||
986 rbd_dev->header.stripe_count == 0) {
987 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
988 rbd_dev->header.stripe_count = 1;
989 }
990
991 rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
992 rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
993 rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
994 rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
995 rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
996 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
997}
998
999
1000
1001
1002
1003static int rbd_header_from_disk(struct rbd_device *rbd_dev,
1004 struct rbd_image_header_ondisk *ondisk)
1005{
1006 struct rbd_image_header *header = &rbd_dev->header;
1007 bool first_time = header->object_prefix == NULL;
1008 struct ceph_snap_context *snapc;
1009 char *object_prefix = NULL;
1010 char *snap_names = NULL;
1011 u64 *snap_sizes = NULL;
1012 u32 snap_count;
1013 int ret = -ENOMEM;
1014 u32 i;
1015
1016
1017
1018 if (first_time) {
1019 object_prefix = kstrndup(ondisk->object_prefix,
1020 sizeof(ondisk->object_prefix),
1021 GFP_KERNEL);
1022 if (!object_prefix)
1023 return -ENOMEM;
1024 }
1025
1026
1027
1028 snap_count = le32_to_cpu(ondisk->snap_count);
1029 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1030 if (!snapc)
1031 goto out_err;
1032 snapc->seq = le64_to_cpu(ondisk->snap_seq);
1033 if (snap_count) {
1034 struct rbd_image_snap_ondisk *snaps;
1035 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1036
1037
1038
1039 if (snap_names_len > (u64)SIZE_MAX)
1040 goto out_2big;
1041 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1042 if (!snap_names)
1043 goto out_err;
1044
1045
1046 snap_sizes = kmalloc_array(snap_count,
1047 sizeof(*header->snap_sizes),
1048 GFP_KERNEL);
1049 if (!snap_sizes)
1050 goto out_err;
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1062 snaps = ondisk->snaps;
1063 for (i = 0; i < snap_count; i++) {
1064 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1065 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1066 }
1067 }
1068
1069
1070
1071 if (first_time) {
1072 header->object_prefix = object_prefix;
1073 header->obj_order = ondisk->options.order;
1074 rbd_init_layout(rbd_dev);
1075 } else {
1076 ceph_put_snap_context(header->snapc);
1077 kfree(header->snap_names);
1078 kfree(header->snap_sizes);
1079 }
1080
1081
1082
1083 header->image_size = le64_to_cpu(ondisk->image_size);
1084 header->snapc = snapc;
1085 header->snap_names = snap_names;
1086 header->snap_sizes = snap_sizes;
1087
1088 return 0;
1089out_2big:
1090 ret = -EIO;
1091out_err:
1092 kfree(snap_sizes);
1093 kfree(snap_names);
1094 ceph_put_snap_context(snapc);
1095 kfree(object_prefix);
1096
1097 return ret;
1098}
1099
1100static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1101{
1102 const char *snap_name;
1103
1104 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1105
1106
1107
1108 snap_name = rbd_dev->header.snap_names;
1109 while (which--)
1110 snap_name += strlen(snap_name) + 1;
1111
1112 return kstrdup(snap_name, GFP_KERNEL);
1113}
1114
1115
1116
1117
1118
1119static int snapid_compare_reverse(const void *s1, const void *s2)
1120{
1121 u64 snap_id1 = *(u64 *)s1;
1122 u64 snap_id2 = *(u64 *)s2;
1123
1124 if (snap_id1 < snap_id2)
1125 return 1;
1126 return snap_id1 == snap_id2 ? 0 : -1;
1127}
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1140{
1141 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1142 u64 *found;
1143
1144 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1145 sizeof (snap_id), snapid_compare_reverse);
1146
1147 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1148}
1149
1150static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1151 u64 snap_id)
1152{
1153 u32 which;
1154 const char *snap_name;
1155
1156 which = rbd_dev_snap_index(rbd_dev, snap_id);
1157 if (which == BAD_SNAP_INDEX)
1158 return ERR_PTR(-ENOENT);
1159
1160 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1161 return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1162}
1163
1164static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1165{
1166 if (snap_id == CEPH_NOSNAP)
1167 return RBD_SNAP_HEAD_NAME;
1168
1169 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1170 if (rbd_dev->image_format == 1)
1171 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1172
1173 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1174}
1175
1176static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1177 u64 *snap_size)
1178{
1179 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1180 if (snap_id == CEPH_NOSNAP) {
1181 *snap_size = rbd_dev->header.image_size;
1182 } else if (rbd_dev->image_format == 1) {
1183 u32 which;
1184
1185 which = rbd_dev_snap_index(rbd_dev, snap_id);
1186 if (which == BAD_SNAP_INDEX)
1187 return -ENOENT;
1188
1189 *snap_size = rbd_dev->header.snap_sizes[which];
1190 } else {
1191 u64 size = 0;
1192 int ret;
1193
1194 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1195 if (ret)
1196 return ret;
1197
1198 *snap_size = size;
1199 }
1200 return 0;
1201}
1202
1203static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1204{
1205 u64 snap_id = rbd_dev->spec->snap_id;
1206 u64 size = 0;
1207 int ret;
1208
1209 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1210 if (ret)
1211 return ret;
1212
1213 rbd_dev->mapping.size = size;
1214 return 0;
1215}
1216
1217static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1218{
1219 rbd_dev->mapping.size = 0;
1220}
1221
1222static void zero_bvec(struct bio_vec *bv)
1223{
1224 void *buf;
1225 unsigned long flags;
1226
1227 buf = bvec_kmap_irq(bv, &flags);
1228 memset(buf, 0, bv->bv_len);
1229 flush_dcache_page(bv->bv_page);
1230 bvec_kunmap_irq(buf, &flags);
1231}
1232
1233static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes)
1234{
1235 struct ceph_bio_iter it = *bio_pos;
1236
1237 ceph_bio_iter_advance(&it, off);
1238 ceph_bio_iter_advance_step(&it, bytes, ({
1239 zero_bvec(&bv);
1240 }));
1241}
1242
1243static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
1244{
1245 struct ceph_bvec_iter it = *bvec_pos;
1246
1247 ceph_bvec_iter_advance(&it, off);
1248 ceph_bvec_iter_advance_step(&it, bytes, ({
1249 zero_bvec(&bv);
1250 }));
1251}
1252
1253
1254
1255
1256
1257
1258
1259static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
1260 u32 bytes)
1261{
1262 dout("%s %p data buf %u~%u\n", __func__, obj_req, off, bytes);
1263
1264 switch (obj_req->img_request->data_type) {
1265 case OBJ_REQUEST_BIO:
1266 zero_bios(&obj_req->bio_pos, off, bytes);
1267 break;
1268 case OBJ_REQUEST_BVECS:
1269 case OBJ_REQUEST_OWN_BVECS:
1270 zero_bvecs(&obj_req->bvec_pos, off, bytes);
1271 break;
1272 default:
1273 BUG();
1274 }
1275}
1276
1277static void rbd_obj_request_destroy(struct kref *kref);
1278static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1279{
1280 rbd_assert(obj_request != NULL);
1281 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1282 kref_read(&obj_request->kref));
1283 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1284}
1285
1286static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1287 struct rbd_obj_request *obj_request)
1288{
1289 rbd_assert(obj_request->img_request == NULL);
1290
1291
1292 obj_request->img_request = img_request;
1293 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1294}
1295
1296static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1297 struct rbd_obj_request *obj_request)
1298{
1299 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1300 list_del(&obj_request->ex.oe_item);
1301 rbd_assert(obj_request->img_request == img_request);
1302 rbd_obj_request_put(obj_request);
1303}
1304
1305static void rbd_osd_submit(struct ceph_osd_request *osd_req)
1306{
1307 struct rbd_obj_request *obj_req = osd_req->r_priv;
1308
1309 dout("%s osd_req %p for obj_req %p objno %llu %llu~%llu\n",
1310 __func__, osd_req, obj_req, obj_req->ex.oe_objno,
1311 obj_req->ex.oe_off, obj_req->ex.oe_len);
1312 ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1313}
1314
1315
1316
1317
1318
1319
1320static void img_request_layered_set(struct rbd_img_request *img_request)
1321{
1322 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1323}
1324
1325static bool img_request_layered_test(struct rbd_img_request *img_request)
1326{
1327 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1328}
1329
1330static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req)
1331{
1332 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1333
1334 return !obj_req->ex.oe_off &&
1335 obj_req->ex.oe_len == rbd_dev->layout.object_size;
1336}
1337
1338static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
1339{
1340 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1341
1342 return obj_req->ex.oe_off + obj_req->ex.oe_len ==
1343 rbd_dev->layout.object_size;
1344}
1345
1346
1347
1348
1349static bool rbd_obj_copyup_enabled(struct rbd_obj_request *obj_req)
1350{
1351 if (!obj_req->num_img_extents ||
1352 (rbd_obj_is_entire(obj_req) &&
1353 !obj_req->img_request->snapc->num_snaps))
1354 return false;
1355
1356 return true;
1357}
1358
1359static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
1360{
1361 return ceph_file_extents_bytes(obj_req->img_extents,
1362 obj_req->num_img_extents);
1363}
1364
1365static bool rbd_img_is_write(struct rbd_img_request *img_req)
1366{
1367 switch (img_req->op_type) {
1368 case OBJ_OP_READ:
1369 return false;
1370 case OBJ_OP_WRITE:
1371 case OBJ_OP_DISCARD:
1372 case OBJ_OP_ZEROOUT:
1373 return true;
1374 default:
1375 BUG();
1376 }
1377}
1378
1379static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1380{
1381 struct rbd_obj_request *obj_req = osd_req->r_priv;
1382 int result;
1383
1384 dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1385 osd_req->r_result, obj_req);
1386
1387
1388
1389
1390
1391
1392 if (osd_req->r_result > 0 && rbd_img_is_write(obj_req->img_request))
1393 result = 0;
1394 else
1395 result = osd_req->r_result;
1396
1397 rbd_obj_handle_request(obj_req, result);
1398}
1399
1400static void rbd_osd_format_read(struct ceph_osd_request *osd_req)
1401{
1402 struct rbd_obj_request *obj_request = osd_req->r_priv;
1403 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1404 struct ceph_options *opt = rbd_dev->rbd_client->client->options;
1405
1406 osd_req->r_flags = CEPH_OSD_FLAG_READ | opt->read_from_replica;
1407 osd_req->r_snapid = obj_request->img_request->snap_id;
1408}
1409
1410static void rbd_osd_format_write(struct ceph_osd_request *osd_req)
1411{
1412 struct rbd_obj_request *obj_request = osd_req->r_priv;
1413
1414 osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
1415 ktime_get_real_ts64(&osd_req->r_mtime);
1416 osd_req->r_data_offset = obj_request->ex.oe_off;
1417}
1418
1419static struct ceph_osd_request *
1420__rbd_obj_add_osd_request(struct rbd_obj_request *obj_req,
1421 struct ceph_snap_context *snapc, int num_ops)
1422{
1423 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1424 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1425 struct ceph_osd_request *req;
1426 const char *name_format = rbd_dev->image_format == 1 ?
1427 RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1428 int ret;
1429
1430 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
1431 if (!req)
1432 return ERR_PTR(-ENOMEM);
1433
1434 list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
1435 req->r_callback = rbd_osd_req_callback;
1436 req->r_priv = obj_req;
1437
1438
1439
1440
1441
1442 ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
1443 req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1444
1445 ret = ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1446 rbd_dev->header.object_prefix,
1447 obj_req->ex.oe_objno);
1448 if (ret)
1449 return ERR_PTR(ret);
1450
1451 return req;
1452}
1453
1454static struct ceph_osd_request *
1455rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, int num_ops)
1456{
1457 return __rbd_obj_add_osd_request(obj_req, obj_req->img_request->snapc,
1458 num_ops);
1459}
1460
1461static struct rbd_obj_request *rbd_obj_request_create(void)
1462{
1463 struct rbd_obj_request *obj_request;
1464
1465 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
1466 if (!obj_request)
1467 return NULL;
1468
1469 ceph_object_extent_init(&obj_request->ex);
1470 INIT_LIST_HEAD(&obj_request->osd_reqs);
1471 mutex_init(&obj_request->state_mutex);
1472 kref_init(&obj_request->kref);
1473
1474 dout("%s %p\n", __func__, obj_request);
1475 return obj_request;
1476}
1477
1478static void rbd_obj_request_destroy(struct kref *kref)
1479{
1480 struct rbd_obj_request *obj_request;
1481 struct ceph_osd_request *osd_req;
1482 u32 i;
1483
1484 obj_request = container_of(kref, struct rbd_obj_request, kref);
1485
1486 dout("%s: obj %p\n", __func__, obj_request);
1487
1488 while (!list_empty(&obj_request->osd_reqs)) {
1489 osd_req = list_first_entry(&obj_request->osd_reqs,
1490 struct ceph_osd_request, r_private_item);
1491 list_del_init(&osd_req->r_private_item);
1492 ceph_osdc_put_request(osd_req);
1493 }
1494
1495 switch (obj_request->img_request->data_type) {
1496 case OBJ_REQUEST_NODATA:
1497 case OBJ_REQUEST_BIO:
1498 case OBJ_REQUEST_BVECS:
1499 break;
1500 case OBJ_REQUEST_OWN_BVECS:
1501 kfree(obj_request->bvec_pos.bvecs);
1502 break;
1503 default:
1504 BUG();
1505 }
1506
1507 kfree(obj_request->img_extents);
1508 if (obj_request->copyup_bvecs) {
1509 for (i = 0; i < obj_request->copyup_bvec_count; i++) {
1510 if (obj_request->copyup_bvecs[i].bv_page)
1511 __free_page(obj_request->copyup_bvecs[i].bv_page);
1512 }
1513 kfree(obj_request->copyup_bvecs);
1514 }
1515
1516 kmem_cache_free(rbd_obj_request_cache, obj_request);
1517}
1518
1519
1520
1521static void rbd_spec_put(struct rbd_spec *spec);
1522static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1523{
1524 rbd_dev_remove_parent(rbd_dev);
1525 rbd_spec_put(rbd_dev->parent_spec);
1526 rbd_dev->parent_spec = NULL;
1527 rbd_dev->parent_overlap = 0;
1528}
1529
1530
1531
1532
1533
1534
1535
1536static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1537{
1538 int counter;
1539
1540 if (!rbd_dev->parent_spec)
1541 return;
1542
1543 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1544 if (counter > 0)
1545 return;
1546
1547
1548
1549 if (!counter)
1550 rbd_dev_unparent(rbd_dev);
1551 else
1552 rbd_warn(rbd_dev, "parent reference underflow");
1553}
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1564{
1565 int counter = 0;
1566
1567 if (!rbd_dev->parent_spec)
1568 return false;
1569
1570 if (rbd_dev->parent_overlap)
1571 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1572
1573 if (counter < 0)
1574 rbd_warn(rbd_dev, "parent reference overflow");
1575
1576 return counter > 0;
1577}
1578
1579static void rbd_img_request_init(struct rbd_img_request *img_request,
1580 struct rbd_device *rbd_dev,
1581 enum obj_operation_type op_type)
1582{
1583 memset(img_request, 0, sizeof(*img_request));
1584
1585 img_request->rbd_dev = rbd_dev;
1586 img_request->op_type = op_type;
1587
1588 INIT_LIST_HEAD(&img_request->lock_item);
1589 INIT_LIST_HEAD(&img_request->object_extents);
1590 mutex_init(&img_request->state_mutex);
1591}
1592
1593static void rbd_img_capture_header(struct rbd_img_request *img_req)
1594{
1595 struct rbd_device *rbd_dev = img_req->rbd_dev;
1596
1597 lockdep_assert_held(&rbd_dev->header_rwsem);
1598
1599 if (rbd_img_is_write(img_req))
1600 img_req->snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1601 else
1602 img_req->snap_id = rbd_dev->spec->snap_id;
1603
1604 if (rbd_dev_parent_get(rbd_dev))
1605 img_request_layered_set(img_req);
1606}
1607
1608static void rbd_img_request_destroy(struct rbd_img_request *img_request)
1609{
1610 struct rbd_obj_request *obj_request;
1611 struct rbd_obj_request *next_obj_request;
1612
1613 dout("%s: img %p\n", __func__, img_request);
1614
1615 WARN_ON(!list_empty(&img_request->lock_item));
1616 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1617 rbd_img_obj_request_del(img_request, obj_request);
1618
1619 if (img_request_layered_test(img_request))
1620 rbd_dev_parent_put(img_request->rbd_dev);
1621
1622 if (rbd_img_is_write(img_request))
1623 ceph_put_snap_context(img_request->snapc);
1624
1625 if (test_bit(IMG_REQ_CHILD, &img_request->flags))
1626 kmem_cache_free(rbd_img_request_cache, img_request);
1627}
1628
1629#define BITS_PER_OBJ 2
1630#define OBJS_PER_BYTE (BITS_PER_BYTE / BITS_PER_OBJ)
1631#define OBJ_MASK ((1 << BITS_PER_OBJ) - 1)
1632
1633static void __rbd_object_map_index(struct rbd_device *rbd_dev, u64 objno,
1634 u64 *index, u8 *shift)
1635{
1636 u32 off;
1637
1638 rbd_assert(objno < rbd_dev->object_map_size);
1639 *index = div_u64_rem(objno, OBJS_PER_BYTE, &off);
1640 *shift = (OBJS_PER_BYTE - off - 1) * BITS_PER_OBJ;
1641}
1642
1643static u8 __rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1644{
1645 u64 index;
1646 u8 shift;
1647
1648 lockdep_assert_held(&rbd_dev->object_map_lock);
1649 __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1650 return (rbd_dev->object_map[index] >> shift) & OBJ_MASK;
1651}
1652
1653static void __rbd_object_map_set(struct rbd_device *rbd_dev, u64 objno, u8 val)
1654{
1655 u64 index;
1656 u8 shift;
1657 u8 *p;
1658
1659 lockdep_assert_held(&rbd_dev->object_map_lock);
1660 rbd_assert(!(val & ~OBJ_MASK));
1661
1662 __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1663 p = &rbd_dev->object_map[index];
1664 *p = (*p & ~(OBJ_MASK << shift)) | (val << shift);
1665}
1666
1667static u8 rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1668{
1669 u8 state;
1670
1671 spin_lock(&rbd_dev->object_map_lock);
1672 state = __rbd_object_map_get(rbd_dev, objno);
1673 spin_unlock(&rbd_dev->object_map_lock);
1674 return state;
1675}
1676
1677static bool use_object_map(struct rbd_device *rbd_dev)
1678{
1679
1680
1681
1682
1683
1684
1685
1686
1687 if (!rbd_is_snap(rbd_dev) && rbd_is_ro(rbd_dev))
1688 return false;
1689
1690 return ((rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) &&
1691 !(rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID));
1692}
1693
1694static bool rbd_object_map_may_exist(struct rbd_device *rbd_dev, u64 objno)
1695{
1696 u8 state;
1697
1698
1699 if (!use_object_map(rbd_dev))
1700 return true;
1701
1702 state = rbd_object_map_get(rbd_dev, objno);
1703 return state != OBJECT_NONEXISTENT;
1704}
1705
1706static void rbd_object_map_name(struct rbd_device *rbd_dev, u64 snap_id,
1707 struct ceph_object_id *oid)
1708{
1709 if (snap_id == CEPH_NOSNAP)
1710 ceph_oid_printf(oid, "%s%s", RBD_OBJECT_MAP_PREFIX,
1711 rbd_dev->spec->image_id);
1712 else
1713 ceph_oid_printf(oid, "%s%s.%016llx", RBD_OBJECT_MAP_PREFIX,
1714 rbd_dev->spec->image_id, snap_id);
1715}
1716
1717static int rbd_object_map_lock(struct rbd_device *rbd_dev)
1718{
1719 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1720 CEPH_DEFINE_OID_ONSTACK(oid);
1721 u8 lock_type;
1722 char *lock_tag;
1723 struct ceph_locker *lockers;
1724 u32 num_lockers;
1725 bool broke_lock = false;
1726 int ret;
1727
1728 rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1729
1730again:
1731 ret = ceph_cls_lock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1732 CEPH_CLS_LOCK_EXCLUSIVE, "", "", "", 0);
1733 if (ret != -EBUSY || broke_lock) {
1734 if (ret == -EEXIST)
1735 ret = 0;
1736 if (ret)
1737 rbd_warn(rbd_dev, "failed to lock object map: %d", ret);
1738 return ret;
1739 }
1740
1741 ret = ceph_cls_lock_info(osdc, &oid, &rbd_dev->header_oloc,
1742 RBD_LOCK_NAME, &lock_type, &lock_tag,
1743 &lockers, &num_lockers);
1744 if (ret) {
1745 if (ret == -ENOENT)
1746 goto again;
1747
1748 rbd_warn(rbd_dev, "failed to get object map lockers: %d", ret);
1749 return ret;
1750 }
1751
1752 kfree(lock_tag);
1753 if (num_lockers == 0)
1754 goto again;
1755
1756 rbd_warn(rbd_dev, "breaking object map lock owned by %s%llu",
1757 ENTITY_NAME(lockers[0].id.name));
1758
1759 ret = ceph_cls_break_lock(osdc, &oid, &rbd_dev->header_oloc,
1760 RBD_LOCK_NAME, lockers[0].id.cookie,
1761 &lockers[0].id.name);
1762 ceph_free_lockers(lockers, num_lockers);
1763 if (ret) {
1764 if (ret == -ENOENT)
1765 goto again;
1766
1767 rbd_warn(rbd_dev, "failed to break object map lock: %d", ret);
1768 return ret;
1769 }
1770
1771 broke_lock = true;
1772 goto again;
1773}
1774
1775static void rbd_object_map_unlock(struct rbd_device *rbd_dev)
1776{
1777 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1778 CEPH_DEFINE_OID_ONSTACK(oid);
1779 int ret;
1780
1781 rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1782
1783 ret = ceph_cls_unlock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1784 "");
1785 if (ret && ret != -ENOENT)
1786 rbd_warn(rbd_dev, "failed to unlock object map: %d", ret);
1787}
1788
1789static int decode_object_map_header(void **p, void *end, u64 *object_map_size)
1790{
1791 u8 struct_v;
1792 u32 struct_len;
1793 u32 header_len;
1794 void *header_end;
1795 int ret;
1796
1797 ceph_decode_32_safe(p, end, header_len, e_inval);
1798 header_end = *p + header_len;
1799
1800 ret = ceph_start_decoding(p, end, 1, "BitVector header", &struct_v,
1801 &struct_len);
1802 if (ret)
1803 return ret;
1804
1805 ceph_decode_64_safe(p, end, *object_map_size, e_inval);
1806
1807 *p = header_end;
1808 return 0;
1809
1810e_inval:
1811 return -EINVAL;
1812}
1813
1814static int __rbd_object_map_load(struct rbd_device *rbd_dev)
1815{
1816 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1817 CEPH_DEFINE_OID_ONSTACK(oid);
1818 struct page **pages;
1819 void *p, *end;
1820 size_t reply_len;
1821 u64 num_objects;
1822 u64 object_map_bytes;
1823 u64 object_map_size;
1824 int num_pages;
1825 int ret;
1826
1827 rbd_assert(!rbd_dev->object_map && !rbd_dev->object_map_size);
1828
1829 num_objects = ceph_get_num_objects(&rbd_dev->layout,
1830 rbd_dev->mapping.size);
1831 object_map_bytes = DIV_ROUND_UP_ULL(num_objects * BITS_PER_OBJ,
1832 BITS_PER_BYTE);
1833 num_pages = calc_pages_for(0, object_map_bytes) + 1;
1834 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1835 if (IS_ERR(pages))
1836 return PTR_ERR(pages);
1837
1838 reply_len = num_pages * PAGE_SIZE;
1839 rbd_object_map_name(rbd_dev, rbd_dev->spec->snap_id, &oid);
1840 ret = ceph_osdc_call(osdc, &oid, &rbd_dev->header_oloc,
1841 "rbd", "object_map_load", CEPH_OSD_FLAG_READ,
1842 NULL, 0, pages, &reply_len);
1843 if (ret)
1844 goto out;
1845
1846 p = page_address(pages[0]);
1847 end = p + min(reply_len, (size_t)PAGE_SIZE);
1848 ret = decode_object_map_header(&p, end, &object_map_size);
1849 if (ret)
1850 goto out;
1851
1852 if (object_map_size != num_objects) {
1853 rbd_warn(rbd_dev, "object map size mismatch: %llu vs %llu",
1854 object_map_size, num_objects);
1855 ret = -EINVAL;
1856 goto out;
1857 }
1858
1859 if (offset_in_page(p) + object_map_bytes > reply_len) {
1860 ret = -EINVAL;
1861 goto out;
1862 }
1863
1864 rbd_dev->object_map = kvmalloc(object_map_bytes, GFP_KERNEL);
1865 if (!rbd_dev->object_map) {
1866 ret = -ENOMEM;
1867 goto out;
1868 }
1869
1870 rbd_dev->object_map_size = object_map_size;
1871 ceph_copy_from_page_vector(pages, rbd_dev->object_map,
1872 offset_in_page(p), object_map_bytes);
1873
1874out:
1875 ceph_release_page_vector(pages, num_pages);
1876 return ret;
1877}
1878
1879static void rbd_object_map_free(struct rbd_device *rbd_dev)
1880{
1881 kvfree(rbd_dev->object_map);
1882 rbd_dev->object_map = NULL;
1883 rbd_dev->object_map_size = 0;
1884}
1885
1886static int rbd_object_map_load(struct rbd_device *rbd_dev)
1887{
1888 int ret;
1889
1890 ret = __rbd_object_map_load(rbd_dev);
1891 if (ret)
1892 return ret;
1893
1894 ret = rbd_dev_v2_get_flags(rbd_dev);
1895 if (ret) {
1896 rbd_object_map_free(rbd_dev);
1897 return ret;
1898 }
1899
1900 if (rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID)
1901 rbd_warn(rbd_dev, "object map is invalid");
1902
1903 return 0;
1904}
1905
1906static int rbd_object_map_open(struct rbd_device *rbd_dev)
1907{
1908 int ret;
1909
1910 ret = rbd_object_map_lock(rbd_dev);
1911 if (ret)
1912 return ret;
1913
1914 ret = rbd_object_map_load(rbd_dev);
1915 if (ret) {
1916 rbd_object_map_unlock(rbd_dev);
1917 return ret;
1918 }
1919
1920 return 0;
1921}
1922
1923static void rbd_object_map_close(struct rbd_device *rbd_dev)
1924{
1925 rbd_object_map_free(rbd_dev);
1926 rbd_object_map_unlock(rbd_dev);
1927}
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req,
1940 struct ceph_osd_request *osd_req)
1941{
1942 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1943 struct ceph_osd_data *osd_data;
1944 u64 objno;
1945 u8 state, new_state, current_state;
1946 bool has_current_state;
1947 void *p;
1948
1949 if (osd_req->r_result)
1950 return osd_req->r_result;
1951
1952
1953
1954
1955 if (osd_req->r_num_ops == 1)
1956 return 0;
1957
1958
1959
1960
1961 rbd_assert(osd_req->r_num_ops == 2);
1962 osd_data = osd_req_op_data(osd_req, 1, cls, request_data);
1963 rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_PAGES);
1964
1965 p = page_address(osd_data->pages[0]);
1966 objno = ceph_decode_64(&p);
1967 rbd_assert(objno == obj_req->ex.oe_objno);
1968 rbd_assert(ceph_decode_64(&p) == objno + 1);
1969 new_state = ceph_decode_8(&p);
1970 has_current_state = ceph_decode_8(&p);
1971 if (has_current_state)
1972 current_state = ceph_decode_8(&p);
1973
1974 spin_lock(&rbd_dev->object_map_lock);
1975 state = __rbd_object_map_get(rbd_dev, objno);
1976 if (!has_current_state || current_state == state ||
1977 (current_state == OBJECT_EXISTS && state == OBJECT_EXISTS_CLEAN))
1978 __rbd_object_map_set(rbd_dev, objno, new_state);
1979 spin_unlock(&rbd_dev->object_map_lock);
1980
1981 return 0;
1982}
1983
1984static void rbd_object_map_callback(struct ceph_osd_request *osd_req)
1985{
1986 struct rbd_obj_request *obj_req = osd_req->r_priv;
1987 int result;
1988
1989 dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1990 osd_req->r_result, obj_req);
1991
1992 result = rbd_object_map_update_finish(obj_req, osd_req);
1993 rbd_obj_handle_request(obj_req, result);
1994}
1995
1996static bool update_needed(struct rbd_device *rbd_dev, u64 objno, u8 new_state)
1997{
1998 u8 state = rbd_object_map_get(rbd_dev, objno);
1999
2000 if (state == new_state ||
2001 (new_state == OBJECT_PENDING && state == OBJECT_NONEXISTENT) ||
2002 (new_state == OBJECT_NONEXISTENT && state != OBJECT_PENDING))
2003 return false;
2004
2005 return true;
2006}
2007
2008static int rbd_cls_object_map_update(struct ceph_osd_request *req,
2009 int which, u64 objno, u8 new_state,
2010 const u8 *current_state)
2011{
2012 struct page **pages;
2013 void *p, *start;
2014 int ret;
2015
2016 ret = osd_req_op_cls_init(req, which, "rbd", "object_map_update");
2017 if (ret)
2018 return ret;
2019
2020 pages = ceph_alloc_page_vector(1, GFP_NOIO);
2021 if (IS_ERR(pages))
2022 return PTR_ERR(pages);
2023
2024 p = start = page_address(pages[0]);
2025 ceph_encode_64(&p, objno);
2026 ceph_encode_64(&p, objno + 1);
2027 ceph_encode_8(&p, new_state);
2028 if (current_state) {
2029 ceph_encode_8(&p, 1);
2030 ceph_encode_8(&p, *current_state);
2031 } else {
2032 ceph_encode_8(&p, 0);
2033 }
2034
2035 osd_req_op_cls_request_data_pages(req, which, pages, p - start, 0,
2036 false, true);
2037 return 0;
2038}
2039
2040
2041
2042
2043
2044
2045
2046static int rbd_object_map_update(struct rbd_obj_request *obj_req, u64 snap_id,
2047 u8 new_state, const u8 *current_state)
2048{
2049 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2050 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2051 struct ceph_osd_request *req;
2052 int num_ops = 1;
2053 int which = 0;
2054 int ret;
2055
2056 if (snap_id == CEPH_NOSNAP) {
2057 if (!update_needed(rbd_dev, obj_req->ex.oe_objno, new_state))
2058 return 1;
2059
2060 num_ops++;
2061 }
2062
2063 req = ceph_osdc_alloc_request(osdc, NULL, num_ops, false, GFP_NOIO);
2064 if (!req)
2065 return -ENOMEM;
2066
2067 list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
2068 req->r_callback = rbd_object_map_callback;
2069 req->r_priv = obj_req;
2070
2071 rbd_object_map_name(rbd_dev, snap_id, &req->r_base_oid);
2072 ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
2073 req->r_flags = CEPH_OSD_FLAG_WRITE;
2074 ktime_get_real_ts64(&req->r_mtime);
2075
2076 if (snap_id == CEPH_NOSNAP) {
2077
2078
2079
2080
2081 ret = ceph_cls_assert_locked(req, which++, RBD_LOCK_NAME,
2082 CEPH_CLS_LOCK_EXCLUSIVE, "", "");
2083 if (ret)
2084 return ret;
2085 }
2086
2087 ret = rbd_cls_object_map_update(req, which, obj_req->ex.oe_objno,
2088 new_state, current_state);
2089 if (ret)
2090 return ret;
2091
2092 ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
2093 if (ret)
2094 return ret;
2095
2096 ceph_osdc_start_request(osdc, req, false);
2097 return 0;
2098}
2099
2100static void prune_extents(struct ceph_file_extent *img_extents,
2101 u32 *num_img_extents, u64 overlap)
2102{
2103 u32 cnt = *num_img_extents;
2104
2105
2106 while (cnt && img_extents[cnt - 1].fe_off >= overlap)
2107 cnt--;
2108
2109 if (cnt) {
2110 struct ceph_file_extent *ex = &img_extents[cnt - 1];
2111
2112
2113 if (ex->fe_off + ex->fe_len > overlap)
2114 ex->fe_len = overlap - ex->fe_off;
2115 }
2116
2117 *num_img_extents = cnt;
2118}
2119
2120
2121
2122
2123
2124static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
2125 bool entire)
2126{
2127 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2128 int ret;
2129
2130 if (!rbd_dev->parent_overlap)
2131 return 0;
2132
2133 ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno,
2134 entire ? 0 : obj_req->ex.oe_off,
2135 entire ? rbd_dev->layout.object_size :
2136 obj_req->ex.oe_len,
2137 &obj_req->img_extents,
2138 &obj_req->num_img_extents);
2139 if (ret)
2140 return ret;
2141
2142 prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
2143 rbd_dev->parent_overlap);
2144 return 0;
2145}
2146
2147static void rbd_osd_setup_data(struct ceph_osd_request *osd_req, int which)
2148{
2149 struct rbd_obj_request *obj_req = osd_req->r_priv;
2150
2151 switch (obj_req->img_request->data_type) {
2152 case OBJ_REQUEST_BIO:
2153 osd_req_op_extent_osd_data_bio(osd_req, which,
2154 &obj_req->bio_pos,
2155 obj_req->ex.oe_len);
2156 break;
2157 case OBJ_REQUEST_BVECS:
2158 case OBJ_REQUEST_OWN_BVECS:
2159 rbd_assert(obj_req->bvec_pos.iter.bi_size ==
2160 obj_req->ex.oe_len);
2161 rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
2162 osd_req_op_extent_osd_data_bvec_pos(osd_req, which,
2163 &obj_req->bvec_pos);
2164 break;
2165 default:
2166 BUG();
2167 }
2168}
2169
2170static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which)
2171{
2172 struct page **pages;
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182 pages = ceph_alloc_page_vector(1, GFP_NOIO);
2183 if (IS_ERR(pages))
2184 return PTR_ERR(pages);
2185
2186 osd_req_op_init(osd_req, which, CEPH_OSD_OP_STAT, 0);
2187 osd_req_op_raw_data_in_pages(osd_req, which, pages,
2188 8 + sizeof(struct ceph_timespec),
2189 0, false, true);
2190 return 0;
2191}
2192
2193static int rbd_osd_setup_copyup(struct ceph_osd_request *osd_req, int which,
2194 u32 bytes)
2195{
2196 struct rbd_obj_request *obj_req = osd_req->r_priv;
2197 int ret;
2198
2199 ret = osd_req_op_cls_init(osd_req, which, "rbd", "copyup");
2200 if (ret)
2201 return ret;
2202
2203 osd_req_op_cls_request_data_bvecs(osd_req, which, obj_req->copyup_bvecs,
2204 obj_req->copyup_bvec_count, bytes);
2205 return 0;
2206}
2207
2208static int rbd_obj_init_read(struct rbd_obj_request *obj_req)
2209{
2210 obj_req->read_state = RBD_OBJ_READ_START;
2211 return 0;
2212}
2213
2214static void __rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2215 int which)
2216{
2217 struct rbd_obj_request *obj_req = osd_req->r_priv;
2218 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2219 u16 opcode;
2220
2221 if (!use_object_map(rbd_dev) ||
2222 !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) {
2223 osd_req_op_alloc_hint_init(osd_req, which++,
2224 rbd_dev->layout.object_size,
2225 rbd_dev->layout.object_size,
2226 rbd_dev->opts->alloc_hint_flags);
2227 }
2228
2229 if (rbd_obj_is_entire(obj_req))
2230 opcode = CEPH_OSD_OP_WRITEFULL;
2231 else
2232 opcode = CEPH_OSD_OP_WRITE;
2233
2234 osd_req_op_extent_init(osd_req, which, opcode,
2235 obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2236 rbd_osd_setup_data(osd_req, which);
2237}
2238
2239static int rbd_obj_init_write(struct rbd_obj_request *obj_req)
2240{
2241 int ret;
2242
2243
2244 ret = rbd_obj_calc_img_extents(obj_req, true);
2245 if (ret)
2246 return ret;
2247
2248 if (rbd_obj_copyup_enabled(obj_req))
2249 obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
2250
2251 obj_req->write_state = RBD_OBJ_WRITE_START;
2252 return 0;
2253}
2254
2255static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req)
2256{
2257 return rbd_obj_is_tail(obj_req) ? CEPH_OSD_OP_TRUNCATE :
2258 CEPH_OSD_OP_ZERO;
2259}
2260
2261static void __rbd_osd_setup_discard_ops(struct ceph_osd_request *osd_req,
2262 int which)
2263{
2264 struct rbd_obj_request *obj_req = osd_req->r_priv;
2265
2266 if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) {
2267 rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2268 osd_req_op_init(osd_req, which, CEPH_OSD_OP_DELETE, 0);
2269 } else {
2270 osd_req_op_extent_init(osd_req, which,
2271 truncate_or_zero_opcode(obj_req),
2272 obj_req->ex.oe_off, obj_req->ex.oe_len,
2273 0, 0);
2274 }
2275}
2276
2277static int rbd_obj_init_discard(struct rbd_obj_request *obj_req)
2278{
2279 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2280 u64 off, next_off;
2281 int ret;
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291 if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size ||
2292 !rbd_obj_is_tail(obj_req)) {
2293 off = round_up(obj_req->ex.oe_off, rbd_dev->opts->alloc_size);
2294 next_off = round_down(obj_req->ex.oe_off + obj_req->ex.oe_len,
2295 rbd_dev->opts->alloc_size);
2296 if (off >= next_off)
2297 return 1;
2298
2299 dout("%s %p %llu~%llu -> %llu~%llu\n", __func__,
2300 obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len,
2301 off, next_off - off);
2302 obj_req->ex.oe_off = off;
2303 obj_req->ex.oe_len = next_off - off;
2304 }
2305
2306
2307 ret = rbd_obj_calc_img_extents(obj_req, true);
2308 if (ret)
2309 return ret;
2310
2311 obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2312 if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents)
2313 obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2314
2315 obj_req->write_state = RBD_OBJ_WRITE_START;
2316 return 0;
2317}
2318
2319static void __rbd_osd_setup_zeroout_ops(struct ceph_osd_request *osd_req,
2320 int which)
2321{
2322 struct rbd_obj_request *obj_req = osd_req->r_priv;
2323 u16 opcode;
2324
2325 if (rbd_obj_is_entire(obj_req)) {
2326 if (obj_req->num_img_extents) {
2327 if (!(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2328 osd_req_op_init(osd_req, which++,
2329 CEPH_OSD_OP_CREATE, 0);
2330 opcode = CEPH_OSD_OP_TRUNCATE;
2331 } else {
2332 rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2333 osd_req_op_init(osd_req, which++,
2334 CEPH_OSD_OP_DELETE, 0);
2335 opcode = 0;
2336 }
2337 } else {
2338 opcode = truncate_or_zero_opcode(obj_req);
2339 }
2340
2341 if (opcode)
2342 osd_req_op_extent_init(osd_req, which, opcode,
2343 obj_req->ex.oe_off, obj_req->ex.oe_len,
2344 0, 0);
2345}
2346
2347static int rbd_obj_init_zeroout(struct rbd_obj_request *obj_req)
2348{
2349 int ret;
2350
2351
2352 ret = rbd_obj_calc_img_extents(obj_req, true);
2353 if (ret)
2354 return ret;
2355
2356 if (rbd_obj_copyup_enabled(obj_req))
2357 obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
2358 if (!obj_req->num_img_extents) {
2359 obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2360 if (rbd_obj_is_entire(obj_req))
2361 obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2362 }
2363
2364 obj_req->write_state = RBD_OBJ_WRITE_START;
2365 return 0;
2366}
2367
2368static int count_write_ops(struct rbd_obj_request *obj_req)
2369{
2370 struct rbd_img_request *img_req = obj_req->img_request;
2371
2372 switch (img_req->op_type) {
2373 case OBJ_OP_WRITE:
2374 if (!use_object_map(img_req->rbd_dev) ||
2375 !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST))
2376 return 2;
2377
2378 return 1;
2379 case OBJ_OP_DISCARD:
2380 return 1;
2381 case OBJ_OP_ZEROOUT:
2382 if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents &&
2383 !(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2384 return 2;
2385
2386 return 1;
2387 default:
2388 BUG();
2389 }
2390}
2391
2392static void rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2393 int which)
2394{
2395 struct rbd_obj_request *obj_req = osd_req->r_priv;
2396
2397 switch (obj_req->img_request->op_type) {
2398 case OBJ_OP_WRITE:
2399 __rbd_osd_setup_write_ops(osd_req, which);
2400 break;
2401 case OBJ_OP_DISCARD:
2402 __rbd_osd_setup_discard_ops(osd_req, which);
2403 break;
2404 case OBJ_OP_ZEROOUT:
2405 __rbd_osd_setup_zeroout_ops(osd_req, which);
2406 break;
2407 default:
2408 BUG();
2409 }
2410}
2411
2412
2413
2414
2415
2416
2417static int __rbd_img_fill_request(struct rbd_img_request *img_req)
2418{
2419 struct rbd_obj_request *obj_req, *next_obj_req;
2420 int ret;
2421
2422 for_each_obj_request_safe(img_req, obj_req, next_obj_req) {
2423 switch (img_req->op_type) {
2424 case OBJ_OP_READ:
2425 ret = rbd_obj_init_read(obj_req);
2426 break;
2427 case OBJ_OP_WRITE:
2428 ret = rbd_obj_init_write(obj_req);
2429 break;
2430 case OBJ_OP_DISCARD:
2431 ret = rbd_obj_init_discard(obj_req);
2432 break;
2433 case OBJ_OP_ZEROOUT:
2434 ret = rbd_obj_init_zeroout(obj_req);
2435 break;
2436 default:
2437 BUG();
2438 }
2439 if (ret < 0)
2440 return ret;
2441 if (ret > 0) {
2442 rbd_img_obj_request_del(img_req, obj_req);
2443 continue;
2444 }
2445 }
2446
2447 img_req->state = RBD_IMG_START;
2448 return 0;
2449}
2450
2451union rbd_img_fill_iter {
2452 struct ceph_bio_iter bio_iter;
2453 struct ceph_bvec_iter bvec_iter;
2454};
2455
2456struct rbd_img_fill_ctx {
2457 enum obj_request_type pos_type;
2458 union rbd_img_fill_iter *pos;
2459 union rbd_img_fill_iter iter;
2460 ceph_object_extent_fn_t set_pos_fn;
2461 ceph_object_extent_fn_t count_fn;
2462 ceph_object_extent_fn_t copy_fn;
2463};
2464
2465static struct ceph_object_extent *alloc_object_extent(void *arg)
2466{
2467 struct rbd_img_request *img_req = arg;
2468 struct rbd_obj_request *obj_req;
2469
2470 obj_req = rbd_obj_request_create();
2471 if (!obj_req)
2472 return NULL;
2473
2474 rbd_img_obj_request_add(img_req, obj_req);
2475 return &obj_req->ex;
2476}
2477
2478
2479
2480
2481
2482
2483
2484
2485static bool rbd_layout_is_fancy(struct ceph_file_layout *l)
2486{
2487 return l->stripe_unit != l->object_size;
2488}
2489
2490static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req,
2491 struct ceph_file_extent *img_extents,
2492 u32 num_img_extents,
2493 struct rbd_img_fill_ctx *fctx)
2494{
2495 u32 i;
2496 int ret;
2497
2498 img_req->data_type = fctx->pos_type;
2499
2500
2501
2502
2503
2504 fctx->iter = *fctx->pos;
2505 for (i = 0; i < num_img_extents; i++) {
2506 ret = ceph_file_to_extents(&img_req->rbd_dev->layout,
2507 img_extents[i].fe_off,
2508 img_extents[i].fe_len,
2509 &img_req->object_extents,
2510 alloc_object_extent, img_req,
2511 fctx->set_pos_fn, &fctx->iter);
2512 if (ret)
2513 return ret;
2514 }
2515
2516 return __rbd_img_fill_request(img_req);
2517}
2518
2519
2520
2521
2522
2523
2524
2525
2526
2527
2528
2529
2530
2531
2532static int rbd_img_fill_request(struct rbd_img_request *img_req,
2533 struct ceph_file_extent *img_extents,
2534 u32 num_img_extents,
2535 struct rbd_img_fill_ctx *fctx)
2536{
2537 struct rbd_device *rbd_dev = img_req->rbd_dev;
2538 struct rbd_obj_request *obj_req;
2539 u32 i;
2540 int ret;
2541
2542 if (fctx->pos_type == OBJ_REQUEST_NODATA ||
2543 !rbd_layout_is_fancy(&rbd_dev->layout))
2544 return rbd_img_fill_request_nocopy(img_req, img_extents,
2545 num_img_extents, fctx);
2546
2547 img_req->data_type = OBJ_REQUEST_OWN_BVECS;
2548
2549
2550
2551
2552
2553
2554
2555
2556 fctx->iter = *fctx->pos;
2557 for (i = 0; i < num_img_extents; i++) {
2558 ret = ceph_file_to_extents(&rbd_dev->layout,
2559 img_extents[i].fe_off,
2560 img_extents[i].fe_len,
2561 &img_req->object_extents,
2562 alloc_object_extent, img_req,
2563 fctx->count_fn, &fctx->iter);
2564 if (ret)
2565 return ret;
2566 }
2567
2568 for_each_obj_request(img_req, obj_req) {
2569 obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count,
2570 sizeof(*obj_req->bvec_pos.bvecs),
2571 GFP_NOIO);
2572 if (!obj_req->bvec_pos.bvecs)
2573 return -ENOMEM;
2574 }
2575
2576
2577
2578
2579
2580 fctx->iter = *fctx->pos;
2581 for (i = 0; i < num_img_extents; i++) {
2582 ret = ceph_iterate_extents(&rbd_dev->layout,
2583 img_extents[i].fe_off,
2584 img_extents[i].fe_len,
2585 &img_req->object_extents,
2586 fctx->copy_fn, &fctx->iter);
2587 if (ret)
2588 return ret;
2589 }
2590
2591 return __rbd_img_fill_request(img_req);
2592}
2593
2594static int rbd_img_fill_nodata(struct rbd_img_request *img_req,
2595 u64 off, u64 len)
2596{
2597 struct ceph_file_extent ex = { off, len };
2598 union rbd_img_fill_iter dummy = {};
2599 struct rbd_img_fill_ctx fctx = {
2600 .pos_type = OBJ_REQUEST_NODATA,
2601 .pos = &dummy,
2602 };
2603
2604 return rbd_img_fill_request(img_req, &ex, 1, &fctx);
2605}
2606
2607static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2608{
2609 struct rbd_obj_request *obj_req =
2610 container_of(ex, struct rbd_obj_request, ex);
2611 struct ceph_bio_iter *it = arg;
2612
2613 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2614 obj_req->bio_pos = *it;
2615 ceph_bio_iter_advance(it, bytes);
2616}
2617
2618static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2619{
2620 struct rbd_obj_request *obj_req =
2621 container_of(ex, struct rbd_obj_request, ex);
2622 struct ceph_bio_iter *it = arg;
2623
2624 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2625 ceph_bio_iter_advance_step(it, bytes, ({
2626 obj_req->bvec_count++;
2627 }));
2628
2629}
2630
2631static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2632{
2633 struct rbd_obj_request *obj_req =
2634 container_of(ex, struct rbd_obj_request, ex);
2635 struct ceph_bio_iter *it = arg;
2636
2637 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2638 ceph_bio_iter_advance_step(it, bytes, ({
2639 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2640 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2641 }));
2642}
2643
2644static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2645 struct ceph_file_extent *img_extents,
2646 u32 num_img_extents,
2647 struct ceph_bio_iter *bio_pos)
2648{
2649 struct rbd_img_fill_ctx fctx = {
2650 .pos_type = OBJ_REQUEST_BIO,
2651 .pos = (union rbd_img_fill_iter *)bio_pos,
2652 .set_pos_fn = set_bio_pos,
2653 .count_fn = count_bio_bvecs,
2654 .copy_fn = copy_bio_bvecs,
2655 };
2656
2657 return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2658 &fctx);
2659}
2660
2661static int rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2662 u64 off, u64 len, struct bio *bio)
2663{
2664 struct ceph_file_extent ex = { off, len };
2665 struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter };
2666
2667 return __rbd_img_fill_from_bio(img_req, &ex, 1, &it);
2668}
2669
2670static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2671{
2672 struct rbd_obj_request *obj_req =
2673 container_of(ex, struct rbd_obj_request, ex);
2674 struct ceph_bvec_iter *it = arg;
2675
2676 obj_req->bvec_pos = *it;
2677 ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes);
2678 ceph_bvec_iter_advance(it, bytes);
2679}
2680
2681static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2682{
2683 struct rbd_obj_request *obj_req =
2684 container_of(ex, struct rbd_obj_request, ex);
2685 struct ceph_bvec_iter *it = arg;
2686
2687 ceph_bvec_iter_advance_step(it, bytes, ({
2688 obj_req->bvec_count++;
2689 }));
2690}
2691
2692static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2693{
2694 struct rbd_obj_request *obj_req =
2695 container_of(ex, struct rbd_obj_request, ex);
2696 struct ceph_bvec_iter *it = arg;
2697
2698 ceph_bvec_iter_advance_step(it, bytes, ({
2699 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2700 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2701 }));
2702}
2703
2704static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2705 struct ceph_file_extent *img_extents,
2706 u32 num_img_extents,
2707 struct ceph_bvec_iter *bvec_pos)
2708{
2709 struct rbd_img_fill_ctx fctx = {
2710 .pos_type = OBJ_REQUEST_BVECS,
2711 .pos = (union rbd_img_fill_iter *)bvec_pos,
2712 .set_pos_fn = set_bvec_pos,
2713 .count_fn = count_bvecs,
2714 .copy_fn = copy_bvecs,
2715 };
2716
2717 return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2718 &fctx);
2719}
2720
2721static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2722 struct ceph_file_extent *img_extents,
2723 u32 num_img_extents,
2724 struct bio_vec *bvecs)
2725{
2726 struct ceph_bvec_iter it = {
2727 .bvecs = bvecs,
2728 .iter = { .bi_size = ceph_file_extents_bytes(img_extents,
2729 num_img_extents) },
2730 };
2731
2732 return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents,
2733 &it);
2734}
2735
2736static void rbd_img_handle_request_work(struct work_struct *work)
2737{
2738 struct rbd_img_request *img_req =
2739 container_of(work, struct rbd_img_request, work);
2740
2741 rbd_img_handle_request(img_req, img_req->work_result);
2742}
2743
2744static void rbd_img_schedule(struct rbd_img_request *img_req, int result)
2745{
2746 INIT_WORK(&img_req->work, rbd_img_handle_request_work);
2747 img_req->work_result = result;
2748 queue_work(rbd_wq, &img_req->work);
2749}
2750
2751static bool rbd_obj_may_exist(struct rbd_obj_request *obj_req)
2752{
2753 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2754
2755 if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) {
2756 obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
2757 return true;
2758 }
2759
2760 dout("%s %p objno %llu assuming dne\n", __func__, obj_req,
2761 obj_req->ex.oe_objno);
2762 return false;
2763}
2764
2765static int rbd_obj_read_object(struct rbd_obj_request *obj_req)
2766{
2767 struct ceph_osd_request *osd_req;
2768 int ret;
2769
2770 osd_req = __rbd_obj_add_osd_request(obj_req, NULL, 1);
2771 if (IS_ERR(osd_req))
2772 return PTR_ERR(osd_req);
2773
2774 osd_req_op_extent_init(osd_req, 0, CEPH_OSD_OP_READ,
2775 obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2776 rbd_osd_setup_data(osd_req, 0);
2777 rbd_osd_format_read(osd_req);
2778
2779 ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
2780 if (ret)
2781 return ret;
2782
2783 rbd_osd_submit(osd_req);
2784 return 0;
2785}
2786
2787static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
2788{
2789 struct rbd_img_request *img_req = obj_req->img_request;
2790 struct rbd_device *parent = img_req->rbd_dev->parent;
2791 struct rbd_img_request *child_img_req;
2792 int ret;
2793
2794 child_img_req = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
2795 if (!child_img_req)
2796 return -ENOMEM;
2797
2798 rbd_img_request_init(child_img_req, parent, OBJ_OP_READ);
2799 __set_bit(IMG_REQ_CHILD, &child_img_req->flags);
2800 child_img_req->obj_request = obj_req;
2801
2802 down_read(&parent->header_rwsem);
2803 rbd_img_capture_header(child_img_req);
2804 up_read(&parent->header_rwsem);
2805
2806 dout("%s child_img_req %p for obj_req %p\n", __func__, child_img_req,
2807 obj_req);
2808
2809 if (!rbd_img_is_write(img_req)) {
2810 switch (img_req->data_type) {
2811 case OBJ_REQUEST_BIO:
2812 ret = __rbd_img_fill_from_bio(child_img_req,
2813 obj_req->img_extents,
2814 obj_req->num_img_extents,
2815 &obj_req->bio_pos);
2816 break;
2817 case OBJ_REQUEST_BVECS:
2818 case OBJ_REQUEST_OWN_BVECS:
2819 ret = __rbd_img_fill_from_bvecs(child_img_req,
2820 obj_req->img_extents,
2821 obj_req->num_img_extents,
2822 &obj_req->bvec_pos);
2823 break;
2824 default:
2825 BUG();
2826 }
2827 } else {
2828 ret = rbd_img_fill_from_bvecs(child_img_req,
2829 obj_req->img_extents,
2830 obj_req->num_img_extents,
2831 obj_req->copyup_bvecs);
2832 }
2833 if (ret) {
2834 rbd_img_request_destroy(child_img_req);
2835 return ret;
2836 }
2837
2838
2839 rbd_img_schedule(child_img_req, 0);
2840 return 0;
2841}
2842
2843static bool rbd_obj_advance_read(struct rbd_obj_request *obj_req, int *result)
2844{
2845 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2846 int ret;
2847
2848again:
2849 switch (obj_req->read_state) {
2850 case RBD_OBJ_READ_START:
2851 rbd_assert(!*result);
2852
2853 if (!rbd_obj_may_exist(obj_req)) {
2854 *result = -ENOENT;
2855 obj_req->read_state = RBD_OBJ_READ_OBJECT;
2856 goto again;
2857 }
2858
2859 ret = rbd_obj_read_object(obj_req);
2860 if (ret) {
2861 *result = ret;
2862 return true;
2863 }
2864 obj_req->read_state = RBD_OBJ_READ_OBJECT;
2865 return false;
2866 case RBD_OBJ_READ_OBJECT:
2867 if (*result == -ENOENT && rbd_dev->parent_overlap) {
2868
2869 ret = rbd_obj_calc_img_extents(obj_req, false);
2870 if (ret) {
2871 *result = ret;
2872 return true;
2873 }
2874 if (obj_req->num_img_extents) {
2875 ret = rbd_obj_read_from_parent(obj_req);
2876 if (ret) {
2877 *result = ret;
2878 return true;
2879 }
2880 obj_req->read_state = RBD_OBJ_READ_PARENT;
2881 return false;
2882 }
2883 }
2884
2885
2886
2887
2888
2889
2890 if (*result == -ENOENT) {
2891 rbd_obj_zero_range(obj_req, 0, obj_req->ex.oe_len);
2892 *result = 0;
2893 } else if (*result >= 0) {
2894 if (*result < obj_req->ex.oe_len)
2895 rbd_obj_zero_range(obj_req, *result,
2896 obj_req->ex.oe_len - *result);
2897 else
2898 rbd_assert(*result == obj_req->ex.oe_len);
2899 *result = 0;
2900 }
2901 return true;
2902 case RBD_OBJ_READ_PARENT:
2903
2904
2905
2906
2907 if (!*result) {
2908 u32 obj_overlap = rbd_obj_img_extents_bytes(obj_req);
2909
2910 if (obj_overlap < obj_req->ex.oe_len)
2911 rbd_obj_zero_range(obj_req, obj_overlap,
2912 obj_req->ex.oe_len - obj_overlap);
2913 }
2914 return true;
2915 default:
2916 BUG();
2917 }
2918}
2919
2920static bool rbd_obj_write_is_noop(struct rbd_obj_request *obj_req)
2921{
2922 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2923
2924 if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno))
2925 obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
2926
2927 if (!(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST) &&
2928 (obj_req->flags & RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT)) {
2929 dout("%s %p noop for nonexistent\n", __func__, obj_req);
2930 return true;
2931 }
2932
2933 return false;
2934}
2935
2936
2937
2938
2939
2940
2941
2942static int rbd_obj_write_pre_object_map(struct rbd_obj_request *obj_req)
2943{
2944 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2945 u8 new_state;
2946
2947 if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
2948 return 1;
2949
2950 if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
2951 new_state = OBJECT_PENDING;
2952 else
2953 new_state = OBJECT_EXISTS;
2954
2955 return rbd_object_map_update(obj_req, CEPH_NOSNAP, new_state, NULL);
2956}
2957
2958static int rbd_obj_write_object(struct rbd_obj_request *obj_req)
2959{
2960 struct ceph_osd_request *osd_req;
2961 int num_ops = count_write_ops(obj_req);
2962 int which = 0;
2963 int ret;
2964
2965 if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)
2966 num_ops++;
2967
2968 osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
2969 if (IS_ERR(osd_req))
2970 return PTR_ERR(osd_req);
2971
2972 if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
2973 ret = rbd_osd_setup_stat(osd_req, which++);
2974 if (ret)
2975 return ret;
2976 }
2977
2978 rbd_osd_setup_write_ops(osd_req, which);
2979 rbd_osd_format_write(osd_req);
2980
2981 ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
2982 if (ret)
2983 return ret;
2984
2985 rbd_osd_submit(osd_req);
2986 return 0;
2987}
2988
2989
2990
2991
2992static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
2993{
2994 struct ceph_bvec_iter it = {
2995 .bvecs = bvecs,
2996 .iter = { .bi_size = bytes },
2997 };
2998
2999 ceph_bvec_iter_advance_step(&it, bytes, ({
3000 if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0,
3001 bv.bv_len))
3002 return false;
3003 }));
3004 return true;
3005}
3006
3007#define MODS_ONLY U32_MAX
3008
3009static int rbd_obj_copyup_empty_snapc(struct rbd_obj_request *obj_req,
3010 u32 bytes)
3011{
3012 struct ceph_osd_request *osd_req;
3013 int ret;
3014
3015 dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3016 rbd_assert(bytes > 0 && bytes != MODS_ONLY);
3017
3018 osd_req = __rbd_obj_add_osd_request(obj_req, &rbd_empty_snapc, 1);
3019 if (IS_ERR(osd_req))
3020 return PTR_ERR(osd_req);
3021
3022 ret = rbd_osd_setup_copyup(osd_req, 0, bytes);
3023 if (ret)
3024 return ret;
3025
3026 rbd_osd_format_write(osd_req);
3027
3028 ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3029 if (ret)
3030 return ret;
3031
3032 rbd_osd_submit(osd_req);
3033 return 0;
3034}
3035
3036static int rbd_obj_copyup_current_snapc(struct rbd_obj_request *obj_req,
3037 u32 bytes)
3038{
3039 struct ceph_osd_request *osd_req;
3040 int num_ops = count_write_ops(obj_req);
3041 int which = 0;
3042 int ret;
3043
3044 dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3045
3046 if (bytes != MODS_ONLY)
3047 num_ops++;
3048
3049 osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
3050 if (IS_ERR(osd_req))
3051 return PTR_ERR(osd_req);
3052
3053 if (bytes != MODS_ONLY) {
3054 ret = rbd_osd_setup_copyup(osd_req, which++, bytes);
3055 if (ret)
3056 return ret;
3057 }
3058
3059 rbd_osd_setup_write_ops(osd_req, which);
3060 rbd_osd_format_write(osd_req);
3061
3062 ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3063 if (ret)
3064 return ret;
3065
3066 rbd_osd_submit(osd_req);
3067 return 0;
3068}
3069
3070static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
3071{
3072 u32 i;
3073
3074 rbd_assert(!obj_req->copyup_bvecs);
3075 obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap);
3076 obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count,
3077 sizeof(*obj_req->copyup_bvecs),
3078 GFP_NOIO);
3079 if (!obj_req->copyup_bvecs)
3080 return -ENOMEM;
3081
3082 for (i = 0; i < obj_req->copyup_bvec_count; i++) {
3083 unsigned int len = min(obj_overlap, (u64)PAGE_SIZE);
3084
3085 obj_req->copyup_bvecs[i].bv_page = alloc_page(GFP_NOIO);
3086 if (!obj_req->copyup_bvecs[i].bv_page)
3087 return -ENOMEM;
3088
3089 obj_req->copyup_bvecs[i].bv_offset = 0;
3090 obj_req->copyup_bvecs[i].bv_len = len;
3091 obj_overlap -= len;
3092 }
3093
3094 rbd_assert(!obj_overlap);
3095 return 0;
3096}
3097
3098
3099
3100
3101
3102
3103static int rbd_obj_copyup_read_parent(struct rbd_obj_request *obj_req)
3104{
3105 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3106 int ret;
3107
3108 rbd_assert(obj_req->num_img_extents);
3109 prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
3110 rbd_dev->parent_overlap);
3111 if (!obj_req->num_img_extents) {
3112
3113
3114
3115
3116
3117
3118 return rbd_obj_copyup_current_snapc(obj_req, MODS_ONLY);
3119 }
3120
3121 ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
3122 if (ret)
3123 return ret;
3124
3125 return rbd_obj_read_from_parent(obj_req);
3126}
3127
3128static void rbd_obj_copyup_object_maps(struct rbd_obj_request *obj_req)
3129{
3130 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3131 struct ceph_snap_context *snapc = obj_req->img_request->snapc;
3132 u8 new_state;
3133 u32 i;
3134 int ret;
3135
3136 rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3137
3138 if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3139 return;
3140
3141 if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3142 return;
3143
3144 for (i = 0; i < snapc->num_snaps; i++) {
3145 if ((rbd_dev->header.features & RBD_FEATURE_FAST_DIFF) &&
3146 i + 1 < snapc->num_snaps)
3147 new_state = OBJECT_EXISTS_CLEAN;
3148 else
3149 new_state = OBJECT_EXISTS;
3150
3151 ret = rbd_object_map_update(obj_req, snapc->snaps[i],
3152 new_state, NULL);
3153 if (ret < 0) {
3154 obj_req->pending.result = ret;
3155 return;
3156 }
3157
3158 rbd_assert(!ret);
3159 obj_req->pending.num_pending++;
3160 }
3161}
3162
3163static void rbd_obj_copyup_write_object(struct rbd_obj_request *obj_req)
3164{
3165 u32 bytes = rbd_obj_img_extents_bytes(obj_req);
3166 int ret;
3167
3168 rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3169
3170
3171
3172
3173
3174
3175 if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3176 bytes = 0;
3177
3178 if (obj_req->img_request->snapc->num_snaps && bytes > 0) {
3179
3180
3181
3182
3183
3184
3185 ret = rbd_obj_copyup_empty_snapc(obj_req, bytes);
3186 if (ret) {
3187 obj_req->pending.result = ret;
3188 return;
3189 }
3190
3191 obj_req->pending.num_pending++;
3192 bytes = MODS_ONLY;
3193 }
3194
3195 ret = rbd_obj_copyup_current_snapc(obj_req, bytes);
3196 if (ret) {
3197 obj_req->pending.result = ret;
3198 return;
3199 }
3200
3201 obj_req->pending.num_pending++;
3202}
3203
3204static bool rbd_obj_advance_copyup(struct rbd_obj_request *obj_req, int *result)
3205{
3206 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3207 int ret;
3208
3209again:
3210 switch (obj_req->copyup_state) {
3211 case RBD_OBJ_COPYUP_START:
3212 rbd_assert(!*result);
3213
3214 ret = rbd_obj_copyup_read_parent(obj_req);
3215 if (ret) {
3216 *result = ret;
3217 return true;
3218 }
3219 if (obj_req->num_img_extents)
3220 obj_req->copyup_state = RBD_OBJ_COPYUP_READ_PARENT;
3221 else
3222 obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3223 return false;
3224 case RBD_OBJ_COPYUP_READ_PARENT:
3225 if (*result)
3226 return true;
3227
3228 if (is_zero_bvecs(obj_req->copyup_bvecs,
3229 rbd_obj_img_extents_bytes(obj_req))) {
3230 dout("%s %p detected zeros\n", __func__, obj_req);
3231 obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ZEROS;
3232 }
3233
3234 rbd_obj_copyup_object_maps(obj_req);
3235 if (!obj_req->pending.num_pending) {
3236 *result = obj_req->pending.result;
3237 obj_req->copyup_state = RBD_OBJ_COPYUP_OBJECT_MAPS;
3238 goto again;
3239 }
3240 obj_req->copyup_state = __RBD_OBJ_COPYUP_OBJECT_MAPS;
3241 return false;
3242 case __RBD_OBJ_COPYUP_OBJECT_MAPS:
3243 if (!pending_result_dec(&obj_req->pending, result))
3244 return false;
3245 fallthrough;
3246 case RBD_OBJ_COPYUP_OBJECT_MAPS:
3247 if (*result) {
3248 rbd_warn(rbd_dev, "snap object map update failed: %d",
3249 *result);
3250 return true;
3251 }
3252
3253 rbd_obj_copyup_write_object(obj_req);
3254 if (!obj_req->pending.num_pending) {
3255 *result = obj_req->pending.result;
3256 obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3257 goto again;
3258 }
3259 obj_req->copyup_state = __RBD_OBJ_COPYUP_WRITE_OBJECT;
3260 return false;
3261 case __RBD_OBJ_COPYUP_WRITE_OBJECT:
3262 if (!pending_result_dec(&obj_req->pending, result))
3263 return false;
3264 fallthrough;
3265 case RBD_OBJ_COPYUP_WRITE_OBJECT:
3266 return true;
3267 default:
3268 BUG();
3269 }
3270}
3271
3272
3273
3274
3275
3276
3277
3278static int rbd_obj_write_post_object_map(struct rbd_obj_request *obj_req)
3279{
3280 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3281 u8 current_state = OBJECT_PENDING;
3282
3283 if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3284 return 1;
3285
3286 if (!(obj_req->flags & RBD_OBJ_FLAG_DELETION))
3287 return 1;
3288
3289 return rbd_object_map_update(obj_req, CEPH_NOSNAP, OBJECT_NONEXISTENT,
3290 ¤t_state);
3291}
3292
3293static bool rbd_obj_advance_write(struct rbd_obj_request *obj_req, int *result)
3294{
3295 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3296 int ret;
3297
3298again:
3299 switch (obj_req->write_state) {
3300 case RBD_OBJ_WRITE_START:
3301 rbd_assert(!*result);
3302
3303 if (rbd_obj_write_is_noop(obj_req))
3304 return true;
3305
3306 ret = rbd_obj_write_pre_object_map(obj_req);
3307 if (ret < 0) {
3308 *result = ret;
3309 return true;
3310 }
3311 obj_req->write_state = RBD_OBJ_WRITE_PRE_OBJECT_MAP;
3312 if (ret > 0)
3313 goto again;
3314 return false;
3315 case RBD_OBJ_WRITE_PRE_OBJECT_MAP:
3316 if (*result) {
3317 rbd_warn(rbd_dev, "pre object map update failed: %d",
3318 *result);
3319 return true;
3320 }
3321 ret = rbd_obj_write_object(obj_req);
3322 if (ret) {
3323 *result = ret;
3324 return true;
3325 }
3326 obj_req->write_state = RBD_OBJ_WRITE_OBJECT;
3327 return false;
3328 case RBD_OBJ_WRITE_OBJECT:
3329 if (*result == -ENOENT) {
3330 if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
3331 *result = 0;
3332 obj_req->copyup_state = RBD_OBJ_COPYUP_START;
3333 obj_req->write_state = __RBD_OBJ_WRITE_COPYUP;
3334 goto again;
3335 }
3336
3337
3338
3339
3340 if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
3341 *result = 0;
3342 }
3343 if (*result)
3344 return true;
3345
3346 obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
3347 goto again;
3348 case __RBD_OBJ_WRITE_COPYUP:
3349 if (!rbd_obj_advance_copyup(obj_req, result))
3350 return false;
3351 fallthrough;
3352 case RBD_OBJ_WRITE_COPYUP:
3353 if (*result) {
3354 rbd_warn(rbd_dev, "copyup failed: %d", *result);
3355 return true;
3356 }
3357 ret = rbd_obj_write_post_object_map(obj_req);
3358 if (ret < 0) {
3359 *result = ret;
3360 return true;
3361 }
3362 obj_req->write_state = RBD_OBJ_WRITE_POST_OBJECT_MAP;
3363 if (ret > 0)
3364 goto again;
3365 return false;
3366 case RBD_OBJ_WRITE_POST_OBJECT_MAP:
3367 if (*result)
3368 rbd_warn(rbd_dev, "post object map update failed: %d",
3369 *result);
3370 return true;
3371 default:
3372 BUG();
3373 }
3374}
3375
3376
3377
3378
3379static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req,
3380 int *result)
3381{
3382 struct rbd_img_request *img_req = obj_req->img_request;
3383 struct rbd_device *rbd_dev = img_req->rbd_dev;
3384 bool done;
3385
3386 mutex_lock(&obj_req->state_mutex);
3387 if (!rbd_img_is_write(img_req))
3388 done = rbd_obj_advance_read(obj_req, result);
3389 else
3390 done = rbd_obj_advance_write(obj_req, result);
3391 mutex_unlock(&obj_req->state_mutex);
3392
3393 if (done && *result) {
3394 rbd_assert(*result < 0);
3395 rbd_warn(rbd_dev, "%s at objno %llu %llu~%llu result %d",
3396 obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
3397 obj_req->ex.oe_off, obj_req->ex.oe_len, *result);
3398 }
3399 return done;
3400}
3401
3402
3403
3404
3405
3406static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result)
3407{
3408 if (__rbd_obj_handle_request(obj_req, &result))
3409 rbd_img_handle_request(obj_req->img_request, result);
3410}
3411
3412static bool need_exclusive_lock(struct rbd_img_request *img_req)
3413{
3414 struct rbd_device *rbd_dev = img_req->rbd_dev;
3415
3416 if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK))
3417 return false;
3418
3419 if (rbd_is_ro(rbd_dev))
3420 return false;
3421
3422 rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
3423 if (rbd_dev->opts->lock_on_read ||
3424 (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3425 return true;
3426
3427 return rbd_img_is_write(img_req);
3428}
3429
3430static bool rbd_lock_add_request(struct rbd_img_request *img_req)
3431{
3432 struct rbd_device *rbd_dev = img_req->rbd_dev;
3433 bool locked;
3434
3435 lockdep_assert_held(&rbd_dev->lock_rwsem);
3436 locked = rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED;
3437 spin_lock(&rbd_dev->lock_lists_lock);
3438 rbd_assert(list_empty(&img_req->lock_item));
3439 if (!locked)
3440 list_add_tail(&img_req->lock_item, &rbd_dev->acquiring_list);
3441 else
3442 list_add_tail(&img_req->lock_item, &rbd_dev->running_list);
3443 spin_unlock(&rbd_dev->lock_lists_lock);
3444 return locked;
3445}
3446
3447static void rbd_lock_del_request(struct rbd_img_request *img_req)
3448{
3449 struct rbd_device *rbd_dev = img_req->rbd_dev;
3450 bool need_wakeup;
3451
3452 lockdep_assert_held(&rbd_dev->lock_rwsem);
3453 spin_lock(&rbd_dev->lock_lists_lock);
3454 rbd_assert(!list_empty(&img_req->lock_item));
3455 list_del_init(&img_req->lock_item);
3456 need_wakeup = (rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING &&
3457 list_empty(&rbd_dev->running_list));
3458 spin_unlock(&rbd_dev->lock_lists_lock);
3459 if (need_wakeup)
3460 complete(&rbd_dev->releasing_wait);
3461}
3462
3463static int rbd_img_exclusive_lock(struct rbd_img_request *img_req)
3464{
3465 struct rbd_device *rbd_dev = img_req->rbd_dev;
3466
3467 if (!need_exclusive_lock(img_req))
3468 return 1;
3469
3470 if (rbd_lock_add_request(img_req))
3471 return 1;
3472
3473 if (rbd_dev->opts->exclusive) {
3474 WARN_ON(1);
3475 return -EROFS;
3476 }
3477
3478
3479
3480
3481
3482 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3483 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3484 return 0;
3485}
3486
3487static void rbd_img_object_requests(struct rbd_img_request *img_req)
3488{
3489 struct rbd_obj_request *obj_req;
3490
3491 rbd_assert(!img_req->pending.result && !img_req->pending.num_pending);
3492
3493 for_each_obj_request(img_req, obj_req) {
3494 int result = 0;
3495
3496 if (__rbd_obj_handle_request(obj_req, &result)) {
3497 if (result) {
3498 img_req->pending.result = result;
3499 return;
3500 }
3501 } else {
3502 img_req->pending.num_pending++;
3503 }
3504 }
3505}
3506
3507static bool rbd_img_advance(struct rbd_img_request *img_req, int *result)
3508{
3509 struct rbd_device *rbd_dev = img_req->rbd_dev;
3510 int ret;
3511
3512again:
3513 switch (img_req->state) {
3514 case RBD_IMG_START:
3515 rbd_assert(!*result);
3516
3517 ret = rbd_img_exclusive_lock(img_req);
3518 if (ret < 0) {
3519 *result = ret;
3520 return true;
3521 }
3522 img_req->state = RBD_IMG_EXCLUSIVE_LOCK;
3523 if (ret > 0)
3524 goto again;
3525 return false;
3526 case RBD_IMG_EXCLUSIVE_LOCK:
3527 if (*result)
3528 return true;
3529
3530 rbd_assert(!need_exclusive_lock(img_req) ||
3531 __rbd_is_lock_owner(rbd_dev));
3532
3533 rbd_img_object_requests(img_req);
3534 if (!img_req->pending.num_pending) {
3535 *result = img_req->pending.result;
3536 img_req->state = RBD_IMG_OBJECT_REQUESTS;
3537 goto again;
3538 }
3539 img_req->state = __RBD_IMG_OBJECT_REQUESTS;
3540 return false;
3541 case __RBD_IMG_OBJECT_REQUESTS:
3542 if (!pending_result_dec(&img_req->pending, result))
3543 return false;
3544 fallthrough;
3545 case RBD_IMG_OBJECT_REQUESTS:
3546 return true;
3547 default:
3548 BUG();
3549 }
3550}
3551
3552
3553
3554
3555static bool __rbd_img_handle_request(struct rbd_img_request *img_req,
3556 int *result)
3557{
3558 struct rbd_device *rbd_dev = img_req->rbd_dev;
3559 bool done;
3560
3561 if (need_exclusive_lock(img_req)) {
3562 down_read(&rbd_dev->lock_rwsem);
3563 mutex_lock(&img_req->state_mutex);
3564 done = rbd_img_advance(img_req, result);
3565 if (done)
3566 rbd_lock_del_request(img_req);
3567 mutex_unlock(&img_req->state_mutex);
3568 up_read(&rbd_dev->lock_rwsem);
3569 } else {
3570 mutex_lock(&img_req->state_mutex);
3571 done = rbd_img_advance(img_req, result);
3572 mutex_unlock(&img_req->state_mutex);
3573 }
3574
3575 if (done && *result) {
3576 rbd_assert(*result < 0);
3577 rbd_warn(rbd_dev, "%s%s result %d",
3578 test_bit(IMG_REQ_CHILD, &img_req->flags) ? "child " : "",
3579 obj_op_name(img_req->op_type), *result);
3580 }
3581 return done;
3582}
3583
3584static void rbd_img_handle_request(struct rbd_img_request *img_req, int result)
3585{
3586again:
3587 if (!__rbd_img_handle_request(img_req, &result))
3588 return;
3589
3590 if (test_bit(IMG_REQ_CHILD, &img_req->flags)) {
3591 struct rbd_obj_request *obj_req = img_req->obj_request;
3592
3593 rbd_img_request_destroy(img_req);
3594 if (__rbd_obj_handle_request(obj_req, &result)) {
3595 img_req = obj_req->img_request;
3596 goto again;
3597 }
3598 } else {
3599 struct request *rq = blk_mq_rq_from_pdu(img_req);
3600
3601 rbd_img_request_destroy(img_req);
3602 blk_mq_end_request(rq, errno_to_blk_status(result));
3603 }
3604}
3605
3606static const struct rbd_client_id rbd_empty_cid;
3607
3608static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3609 const struct rbd_client_id *rhs)
3610{
3611 return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3612}
3613
3614static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3615{
3616 struct rbd_client_id cid;
3617
3618 mutex_lock(&rbd_dev->watch_mutex);
3619 cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3620 cid.handle = rbd_dev->watch_cookie;
3621 mutex_unlock(&rbd_dev->watch_mutex);
3622 return cid;
3623}
3624
3625
3626
3627
3628static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3629 const struct rbd_client_id *cid)
3630{
3631 dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3632 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3633 cid->gid, cid->handle);
3634 rbd_dev->owner_cid = *cid;
3635}
3636
3637static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3638{
3639 mutex_lock(&rbd_dev->watch_mutex);
3640 sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3641 mutex_unlock(&rbd_dev->watch_mutex);
3642}
3643
3644static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
3645{
3646 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3647
3648 rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3649 strcpy(rbd_dev->lock_cookie, cookie);
3650 rbd_set_owner_cid(rbd_dev, &cid);
3651 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3652}
3653
3654
3655
3656
3657static int rbd_lock(struct rbd_device *rbd_dev)
3658{
3659 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3660 char cookie[32];
3661 int ret;
3662
3663 WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
3664 rbd_dev->lock_cookie[0] != '\0');
3665
3666 format_lock_cookie(rbd_dev, cookie);
3667 ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3668 RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3669 RBD_LOCK_TAG, "", 0);
3670 if (ret)
3671 return ret;
3672
3673 __rbd_lock(rbd_dev, cookie);
3674 return 0;
3675}
3676
3677
3678
3679
3680static void rbd_unlock(struct rbd_device *rbd_dev)
3681{
3682 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3683 int ret;
3684
3685 WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
3686 rbd_dev->lock_cookie[0] == '\0');
3687
3688 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3689 RBD_LOCK_NAME, rbd_dev->lock_cookie);
3690 if (ret && ret != -ENOENT)
3691 rbd_warn(rbd_dev, "failed to unlock header: %d", ret);
3692
3693
3694 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3695 rbd_dev->lock_cookie[0] = '\0';
3696 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3697 queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3698}
3699
3700static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3701 enum rbd_notify_op notify_op,
3702 struct page ***preply_pages,
3703 size_t *preply_len)
3704{
3705 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3706 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3707 char buf[4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN];
3708 int buf_size = sizeof(buf);
3709 void *p = buf;
3710
3711 dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3712
3713
3714 ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3715 ceph_encode_32(&p, notify_op);
3716 ceph_encode_64(&p, cid.gid);
3717 ceph_encode_64(&p, cid.handle);
3718
3719 return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3720 &rbd_dev->header_oloc, buf, buf_size,
3721 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3722}
3723
3724static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3725 enum rbd_notify_op notify_op)
3726{
3727 __rbd_notify_op_lock(rbd_dev, notify_op, NULL, NULL);
3728}
3729
3730static void rbd_notify_acquired_lock(struct work_struct *work)
3731{
3732 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3733 acquired_lock_work);
3734
3735 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3736}
3737
3738static void rbd_notify_released_lock(struct work_struct *work)
3739{
3740 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3741 released_lock_work);
3742
3743 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3744}
3745
3746static int rbd_request_lock(struct rbd_device *rbd_dev)
3747{
3748 struct page **reply_pages;
3749 size_t reply_len;
3750 bool lock_owner_responded = false;
3751 int ret;
3752
3753 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3754
3755 ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3756 &reply_pages, &reply_len);
3757 if (ret && ret != -ETIMEDOUT) {
3758 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3759 goto out;
3760 }
3761
3762 if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3763 void *p = page_address(reply_pages[0]);
3764 void *const end = p + reply_len;
3765 u32 n;
3766
3767 ceph_decode_32_safe(&p, end, n, e_inval);
3768 while (n--) {
3769 u8 struct_v;
3770 u32 len;
3771
3772 ceph_decode_need(&p, end, 8 + 8, e_inval);
3773 p += 8 + 8;
3774
3775 ceph_decode_32_safe(&p, end, len, e_inval);
3776 if (!len)
3777 continue;
3778
3779 if (lock_owner_responded) {
3780 rbd_warn(rbd_dev,
3781 "duplicate lock owners detected");
3782 ret = -EIO;
3783 goto out;
3784 }
3785
3786 lock_owner_responded = true;
3787 ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3788 &struct_v, &len);
3789 if (ret) {
3790 rbd_warn(rbd_dev,
3791 "failed to decode ResponseMessage: %d",
3792 ret);
3793 goto e_inval;
3794 }
3795
3796 ret = ceph_decode_32(&p);
3797 }
3798 }
3799
3800 if (!lock_owner_responded) {
3801 rbd_warn(rbd_dev, "no lock owners detected");
3802 ret = -ETIMEDOUT;
3803 }
3804
3805out:
3806 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3807 return ret;
3808
3809e_inval:
3810 ret = -EINVAL;
3811 goto out;
3812}
3813
3814
3815
3816
3817
3818static void wake_lock_waiters(struct rbd_device *rbd_dev, int result)
3819{
3820 struct rbd_img_request *img_req;
3821
3822 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3823 lockdep_assert_held_write(&rbd_dev->lock_rwsem);
3824
3825 cancel_delayed_work(&rbd_dev->lock_dwork);
3826 if (!completion_done(&rbd_dev->acquire_wait)) {
3827 rbd_assert(list_empty(&rbd_dev->acquiring_list) &&
3828 list_empty(&rbd_dev->running_list));
3829 rbd_dev->acquire_err = result;
3830 complete_all(&rbd_dev->acquire_wait);
3831 return;
3832 }
3833
3834 list_for_each_entry(img_req, &rbd_dev->acquiring_list, lock_item) {
3835 mutex_lock(&img_req->state_mutex);
3836 rbd_assert(img_req->state == RBD_IMG_EXCLUSIVE_LOCK);
3837 rbd_img_schedule(img_req, result);
3838 mutex_unlock(&img_req->state_mutex);
3839 }
3840
3841 list_splice_tail_init(&rbd_dev->acquiring_list, &rbd_dev->running_list);
3842}
3843
3844static int get_lock_owner_info(struct rbd_device *rbd_dev,
3845 struct ceph_locker **lockers, u32 *num_lockers)
3846{
3847 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3848 u8 lock_type;
3849 char *lock_tag;
3850 int ret;
3851
3852 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3853
3854 ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3855 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3856 &lock_type, &lock_tag, lockers, num_lockers);
3857 if (ret)
3858 return ret;
3859
3860 if (*num_lockers == 0) {
3861 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3862 goto out;
3863 }
3864
3865 if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3866 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3867 lock_tag);
3868 ret = -EBUSY;
3869 goto out;
3870 }
3871
3872 if (lock_type == CEPH_CLS_LOCK_SHARED) {
3873 rbd_warn(rbd_dev, "shared lock type detected");
3874 ret = -EBUSY;
3875 goto out;
3876 }
3877
3878 if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3879 strlen(RBD_LOCK_COOKIE_PREFIX))) {
3880 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3881 (*lockers)[0].id.cookie);
3882 ret = -EBUSY;
3883 goto out;
3884 }
3885
3886out:
3887 kfree(lock_tag);
3888 return ret;
3889}
3890
3891static int find_watcher(struct rbd_device *rbd_dev,
3892 const struct ceph_locker *locker)
3893{
3894 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3895 struct ceph_watch_item *watchers;
3896 u32 num_watchers;
3897 u64 cookie;
3898 int i;
3899 int ret;
3900
3901 ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
3902 &rbd_dev->header_oloc, &watchers,
3903 &num_watchers);
3904 if (ret)
3905 return ret;
3906
3907 sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
3908 for (i = 0; i < num_watchers; i++) {
3909
3910
3911
3912
3913 if (ceph_addr_equal_no_type(&watchers[i].addr,
3914 &locker->info.addr) &&
3915 watchers[i].cookie == cookie) {
3916 struct rbd_client_id cid = {
3917 .gid = le64_to_cpu(watchers[i].name.num),
3918 .handle = cookie,
3919 };
3920
3921 dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
3922 rbd_dev, cid.gid, cid.handle);
3923 rbd_set_owner_cid(rbd_dev, &cid);
3924 ret = 1;
3925 goto out;
3926 }
3927 }
3928
3929 dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3930 ret = 0;
3931out:
3932 kfree(watchers);
3933 return ret;
3934}
3935
3936
3937
3938
3939static int rbd_try_lock(struct rbd_device *rbd_dev)
3940{
3941 struct ceph_client *client = rbd_dev->rbd_client->client;
3942 struct ceph_locker *lockers;
3943 u32 num_lockers;
3944 int ret;
3945
3946 for (;;) {
3947 ret = rbd_lock(rbd_dev);
3948 if (ret != -EBUSY)
3949 return ret;
3950
3951
3952 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
3953 if (ret)
3954 return ret;
3955
3956 if (num_lockers == 0)
3957 goto again;
3958
3959 ret = find_watcher(rbd_dev, lockers);
3960 if (ret)
3961 goto out;
3962
3963 rbd_warn(rbd_dev, "breaking header lock owned by %s%llu",
3964 ENTITY_NAME(lockers[0].id.name));
3965
3966 ret = ceph_monc_blocklist_add(&client->monc,
3967 &lockers[0].info.addr);
3968 if (ret) {
3969 rbd_warn(rbd_dev, "blocklist of %s%llu failed: %d",
3970 ENTITY_NAME(lockers[0].id.name), ret);
3971 goto out;
3972 }
3973
3974 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
3975 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3976 lockers[0].id.cookie,
3977 &lockers[0].id.name);
3978 if (ret && ret != -ENOENT)
3979 goto out;
3980
3981again:
3982 ceph_free_lockers(lockers, num_lockers);
3983 }
3984
3985out:
3986 ceph_free_lockers(lockers, num_lockers);
3987 return ret;
3988}
3989
3990static int rbd_post_acquire_action(struct rbd_device *rbd_dev)
3991{
3992 int ret;
3993
3994 if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) {
3995 ret = rbd_object_map_open(rbd_dev);
3996 if (ret)
3997 return ret;
3998 }
3999
4000 return 0;
4001}
4002
4003
4004
4005
4006
4007
4008
4009static int rbd_try_acquire_lock(struct rbd_device *rbd_dev)
4010{
4011 int ret;
4012
4013 down_read(&rbd_dev->lock_rwsem);
4014 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
4015 rbd_dev->lock_state);
4016 if (__rbd_is_lock_owner(rbd_dev)) {
4017 up_read(&rbd_dev->lock_rwsem);
4018 return 0;
4019 }
4020
4021 up_read(&rbd_dev->lock_rwsem);
4022 down_write(&rbd_dev->lock_rwsem);
4023 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
4024 rbd_dev->lock_state);
4025 if (__rbd_is_lock_owner(rbd_dev)) {
4026 up_write(&rbd_dev->lock_rwsem);
4027 return 0;
4028 }
4029
4030 ret = rbd_try_lock(rbd_dev);
4031 if (ret < 0) {
4032 rbd_warn(rbd_dev, "failed to lock header: %d", ret);
4033 if (ret == -EBLOCKLISTED)
4034 goto out;
4035
4036 ret = 1;
4037 }
4038 if (ret > 0) {
4039 up_write(&rbd_dev->lock_rwsem);
4040 return ret;
4041 }
4042
4043 rbd_assert(rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED);
4044 rbd_assert(list_empty(&rbd_dev->running_list));
4045
4046 ret = rbd_post_acquire_action(rbd_dev);
4047 if (ret) {
4048 rbd_warn(rbd_dev, "post-acquire action failed: %d", ret);
4049
4050
4051
4052
4053
4054 rbd_unlock(rbd_dev);
4055 }
4056
4057out:
4058 wake_lock_waiters(rbd_dev, ret);
4059 up_write(&rbd_dev->lock_rwsem);
4060 return ret;
4061}
4062
4063static void rbd_acquire_lock(struct work_struct *work)
4064{
4065 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
4066 struct rbd_device, lock_dwork);
4067 int ret;
4068
4069 dout("%s rbd_dev %p\n", __func__, rbd_dev);
4070again:
4071 ret = rbd_try_acquire_lock(rbd_dev);
4072 if (ret <= 0) {
4073 dout("%s rbd_dev %p ret %d - done\n", __func__, rbd_dev, ret);
4074 return;
4075 }
4076
4077 ret = rbd_request_lock(rbd_dev);
4078 if (ret == -ETIMEDOUT) {
4079 goto again;
4080 } else if (ret == -EROFS) {
4081 rbd_warn(rbd_dev, "peer will not release lock");
4082 down_write(&rbd_dev->lock_rwsem);
4083 wake_lock_waiters(rbd_dev, ret);
4084 up_write(&rbd_dev->lock_rwsem);
4085 } else if (ret < 0) {
4086 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
4087 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
4088 RBD_RETRY_DELAY);
4089 } else {
4090
4091
4092
4093
4094 dout("%s rbd_dev %p requeuing lock_dwork\n", __func__,
4095 rbd_dev);
4096 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
4097 msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
4098 }
4099}
4100
4101static bool rbd_quiesce_lock(struct rbd_device *rbd_dev)
4102{
4103 dout("%s rbd_dev %p\n", __func__, rbd_dev);
4104 lockdep_assert_held_write(&rbd_dev->lock_rwsem);
4105
4106 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
4107 return false;
4108
4109
4110
4111
4112 rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
4113 rbd_assert(!completion_done(&rbd_dev->releasing_wait));
4114 if (list_empty(&rbd_dev->running_list))
4115 return true;
4116
4117 up_write(&rbd_dev->lock_rwsem);
4118 wait_for_completion(&rbd_dev->releasing_wait);
4119
4120 down_write(&rbd_dev->lock_rwsem);
4121 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
4122 return false;
4123
4124 rbd_assert(list_empty(&rbd_dev->running_list));
4125 return true;
4126}
4127
4128static void rbd_pre_release_action(struct rbd_device *rbd_dev)
4129{
4130 if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)
4131 rbd_object_map_close(rbd_dev);
4132}
4133
4134static void __rbd_release_lock(struct rbd_device *rbd_dev)
4135{
4136 rbd_assert(list_empty(&rbd_dev->running_list));
4137
4138 rbd_pre_release_action(rbd_dev);
4139 rbd_unlock(rbd_dev);
4140}
4141
4142
4143
4144
4145static void rbd_release_lock(struct rbd_device *rbd_dev)
4146{
4147 if (!rbd_quiesce_lock(rbd_dev))
4148 return;
4149
4150 __rbd_release_lock(rbd_dev);
4151
4152
4153
4154
4155
4156
4157
4158
4159 cancel_delayed_work(&rbd_dev->lock_dwork);
4160}
4161
4162static void rbd_release_lock_work(struct work_struct *work)
4163{
4164 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
4165 unlock_work);
4166
4167 down_write(&rbd_dev->lock_rwsem);
4168 rbd_release_lock(rbd_dev);
4169 up_write(&rbd_dev->lock_rwsem);
4170}
4171
4172static void maybe_kick_acquire(struct rbd_device *rbd_dev)
4173{
4174 bool have_requests;
4175
4176 dout("%s rbd_dev %p\n", __func__, rbd_dev);
4177 if (__rbd_is_lock_owner(rbd_dev))
4178 return;
4179
4180 spin_lock(&rbd_dev->lock_lists_lock);
4181 have_requests = !list_empty(&rbd_dev->acquiring_list);
4182 spin_unlock(&rbd_dev->lock_lists_lock);
4183 if (have_requests || delayed_work_pending(&rbd_dev->lock_dwork)) {
4184 dout("%s rbd_dev %p kicking lock_dwork\n", __func__, rbd_dev);
4185 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4186 }
4187}
4188
4189static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
4190 void **p)
4191{
4192 struct rbd_client_id cid = { 0 };
4193
4194 if (struct_v >= 2) {
4195 cid.gid = ceph_decode_64(p);
4196 cid.handle = ceph_decode_64(p);
4197 }
4198
4199 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4200 cid.handle);
4201 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
4202 down_write(&rbd_dev->lock_rwsem);
4203 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
4204 dout("%s rbd_dev %p cid %llu-%llu == owner_cid\n",
4205 __func__, rbd_dev, cid.gid, cid.handle);
4206 } else {
4207 rbd_set_owner_cid(rbd_dev, &cid);
4208 }
4209 downgrade_write(&rbd_dev->lock_rwsem);
4210 } else {
4211 down_read(&rbd_dev->lock_rwsem);
4212 }
4213
4214 maybe_kick_acquire(rbd_dev);
4215 up_read(&rbd_dev->lock_rwsem);
4216}
4217
4218static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
4219 void **p)
4220{
4221 struct rbd_client_id cid = { 0 };
4222
4223 if (struct_v >= 2) {
4224 cid.gid = ceph_decode_64(p);
4225 cid.handle = ceph_decode_64(p);
4226 }
4227
4228 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4229 cid.handle);
4230 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
4231 down_write(&rbd_dev->lock_rwsem);
4232 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
4233 dout("%s rbd_dev %p cid %llu-%llu != owner_cid %llu-%llu\n",
4234 __func__, rbd_dev, cid.gid, cid.handle,
4235 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
4236 } else {
4237 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
4238 }
4239 downgrade_write(&rbd_dev->lock_rwsem);
4240 } else {
4241 down_read(&rbd_dev->lock_rwsem);
4242 }
4243
4244 maybe_kick_acquire(rbd_dev);
4245 up_read(&rbd_dev->lock_rwsem);
4246}
4247
4248
4249
4250
4251
4252static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
4253 void **p)
4254{
4255 struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
4256 struct rbd_client_id cid = { 0 };
4257 int result = 1;
4258
4259 if (struct_v >= 2) {
4260 cid.gid = ceph_decode_64(p);
4261 cid.handle = ceph_decode_64(p);
4262 }
4263
4264 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4265 cid.handle);
4266 if (rbd_cid_equal(&cid, &my_cid))
4267 return result;
4268
4269 down_read(&rbd_dev->lock_rwsem);
4270 if (__rbd_is_lock_owner(rbd_dev)) {
4271 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
4272 rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
4273 goto out_unlock;
4274
4275
4276
4277
4278
4279 result = 0;
4280
4281 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
4282 if (!rbd_dev->opts->exclusive) {
4283 dout("%s rbd_dev %p queueing unlock_work\n",
4284 __func__, rbd_dev);
4285 queue_work(rbd_dev->task_wq,
4286 &rbd_dev->unlock_work);
4287 } else {
4288
4289 result = -EROFS;
4290 }
4291 }
4292 }
4293
4294out_unlock:
4295 up_read(&rbd_dev->lock_rwsem);
4296 return result;
4297}
4298
4299static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
4300 u64 notify_id, u64 cookie, s32 *result)
4301{
4302 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4303 char buf[4 + CEPH_ENCODING_START_BLK_LEN];
4304 int buf_size = sizeof(buf);
4305 int ret;
4306
4307 if (result) {
4308 void *p = buf;
4309
4310
4311 ceph_start_encoding(&p, 1, 1,
4312 buf_size - CEPH_ENCODING_START_BLK_LEN);
4313 ceph_encode_32(&p, *result);
4314 } else {
4315 buf_size = 0;
4316 }
4317
4318 ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
4319 &rbd_dev->header_oloc, notify_id, cookie,
4320 buf, buf_size);
4321 if (ret)
4322 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
4323}
4324
4325static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
4326 u64 cookie)
4327{
4328 dout("%s rbd_dev %p\n", __func__, rbd_dev);
4329 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
4330}
4331
4332static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
4333 u64 notify_id, u64 cookie, s32 result)
4334{
4335 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
4336 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
4337}
4338
4339static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
4340 u64 notifier_id, void *data, size_t data_len)
4341{
4342 struct rbd_device *rbd_dev = arg;
4343 void *p = data;
4344 void *const end = p + data_len;
4345 u8 struct_v = 0;
4346 u32 len;
4347 u32 notify_op;
4348 int ret;
4349
4350 dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
4351 __func__, rbd_dev, cookie, notify_id, data_len);
4352 if (data_len) {
4353 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
4354 &struct_v, &len);
4355 if (ret) {
4356 rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
4357 ret);
4358 return;
4359 }
4360
4361 notify_op = ceph_decode_32(&p);
4362 } else {
4363
4364 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
4365 len = 0;
4366 }
4367
4368 dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
4369 switch (notify_op) {
4370 case RBD_NOTIFY_OP_ACQUIRED_LOCK:
4371 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
4372 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4373 break;
4374 case RBD_NOTIFY_OP_RELEASED_LOCK:
4375 rbd_handle_released_lock(rbd_dev, struct_v, &p);
4376 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4377 break;
4378 case RBD_NOTIFY_OP_REQUEST_LOCK:
4379 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
4380 if (ret <= 0)
4381 rbd_acknowledge_notify_result(rbd_dev, notify_id,
4382 cookie, ret);
4383 else
4384 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4385 break;
4386 case RBD_NOTIFY_OP_HEADER_UPDATE:
4387 ret = rbd_dev_refresh(rbd_dev);
4388 if (ret)
4389 rbd_warn(rbd_dev, "refresh failed: %d", ret);
4390
4391 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4392 break;
4393 default:
4394 if (rbd_is_lock_owner(rbd_dev))
4395 rbd_acknowledge_notify_result(rbd_dev, notify_id,
4396 cookie, -EOPNOTSUPP);
4397 else
4398 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4399 break;
4400 }
4401}
4402
4403static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
4404
4405static void rbd_watch_errcb(void *arg, u64 cookie, int err)
4406{
4407 struct rbd_device *rbd_dev = arg;
4408
4409 rbd_warn(rbd_dev, "encountered watch error: %d", err);
4410
4411 down_write(&rbd_dev->lock_rwsem);
4412 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
4413 up_write(&rbd_dev->lock_rwsem);
4414
4415 mutex_lock(&rbd_dev->watch_mutex);
4416 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
4417 __rbd_unregister_watch(rbd_dev);
4418 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
4419
4420 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
4421 }
4422 mutex_unlock(&rbd_dev->watch_mutex);
4423}
4424
4425
4426
4427
4428static int __rbd_register_watch(struct rbd_device *rbd_dev)
4429{
4430 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4431 struct ceph_osd_linger_request *handle;
4432
4433 rbd_assert(!rbd_dev->watch_handle);
4434 dout("%s rbd_dev %p\n", __func__, rbd_dev);
4435
4436 handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
4437 &rbd_dev->header_oloc, rbd_watch_cb,
4438 rbd_watch_errcb, rbd_dev);
4439 if (IS_ERR(handle))
4440 return PTR_ERR(handle);
4441
4442 rbd_dev->watch_handle = handle;
4443 return 0;
4444}
4445
4446
4447
4448
4449static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
4450{
4451 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4452 int ret;
4453
4454 rbd_assert(rbd_dev->watch_handle);
4455 dout("%s rbd_dev %p\n", __func__, rbd_dev);
4456
4457 ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
4458 if (ret)
4459 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
4460
4461 rbd_dev->watch_handle = NULL;
4462}
4463
4464static int rbd_register_watch(struct rbd_device *rbd_dev)
4465{
4466 int ret;
4467
4468 mutex_lock(&rbd_dev->watch_mutex);
4469 rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
4470 ret = __rbd_register_watch(rbd_dev);
4471 if (ret)
4472 goto out;
4473
4474 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4475 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4476
4477out:
4478 mutex_unlock(&rbd_dev->watch_mutex);
4479 return ret;
4480}
4481
4482static void cancel_tasks_sync(struct rbd_device *rbd_dev)
4483{
4484 dout("%s rbd_dev %p\n", __func__, rbd_dev);
4485
4486 cancel_work_sync(&rbd_dev->acquired_lock_work);
4487 cancel_work_sync(&rbd_dev->released_lock_work);
4488 cancel_delayed_work_sync(&rbd_dev->lock_dwork);
4489 cancel_work_sync(&rbd_dev->unlock_work);
4490}
4491
4492
4493
4494
4495
4496static void rbd_unregister_watch(struct rbd_device *rbd_dev)
4497{
4498 cancel_tasks_sync(rbd_dev);
4499
4500 mutex_lock(&rbd_dev->watch_mutex);
4501 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
4502 __rbd_unregister_watch(rbd_dev);
4503 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4504 mutex_unlock(&rbd_dev->watch_mutex);
4505
4506 cancel_delayed_work_sync(&rbd_dev->watch_dwork);
4507 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
4508}
4509
4510
4511
4512
4513static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
4514{
4515 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4516 char cookie[32];
4517 int ret;
4518
4519 if (!rbd_quiesce_lock(rbd_dev))
4520 return;
4521
4522 format_lock_cookie(rbd_dev, cookie);
4523 ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
4524 &rbd_dev->header_oloc, RBD_LOCK_NAME,
4525 CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
4526 RBD_LOCK_TAG, cookie);
4527 if (ret) {
4528 if (ret != -EOPNOTSUPP)
4529 rbd_warn(rbd_dev, "failed to update lock cookie: %d",
4530 ret);
4531
4532
4533
4534
4535
4536 __rbd_release_lock(rbd_dev);
4537 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4538 } else {
4539 __rbd_lock(rbd_dev, cookie);
4540 wake_lock_waiters(rbd_dev, 0);
4541 }
4542}
4543
4544static void rbd_reregister_watch(struct work_struct *work)
4545{
4546 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
4547 struct rbd_device, watch_dwork);
4548 int ret;
4549
4550 dout("%s rbd_dev %p\n", __func__, rbd_dev);
4551
4552 mutex_lock(&rbd_dev->watch_mutex);
4553 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
4554 mutex_unlock(&rbd_dev->watch_mutex);
4555 return;
4556 }
4557
4558 ret = __rbd_register_watch(rbd_dev);
4559 if (ret) {
4560 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
4561 if (ret != -EBLOCKLISTED && ret != -ENOENT) {
4562 queue_delayed_work(rbd_dev->task_wq,
4563 &rbd_dev->watch_dwork,
4564 RBD_RETRY_DELAY);
4565 mutex_unlock(&rbd_dev->watch_mutex);
4566 return;
4567 }
4568
4569 mutex_unlock(&rbd_dev->watch_mutex);
4570 down_write(&rbd_dev->lock_rwsem);
4571 wake_lock_waiters(rbd_dev, ret);
4572 up_write(&rbd_dev->lock_rwsem);
4573 return;
4574 }
4575
4576 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4577 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4578 mutex_unlock(&rbd_dev->watch_mutex);
4579
4580 down_write(&rbd_dev->lock_rwsem);
4581 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
4582 rbd_reacquire_lock(rbd_dev);
4583 up_write(&rbd_dev->lock_rwsem);
4584
4585 ret = rbd_dev_refresh(rbd_dev);
4586 if (ret)
4587 rbd_warn(rbd_dev, "reregistration refresh failed: %d", ret);
4588}
4589
4590
4591
4592
4593
4594static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
4595 struct ceph_object_id *oid,
4596 struct ceph_object_locator *oloc,
4597 const char *method_name,
4598 const void *outbound,
4599 size_t outbound_size,
4600 void *inbound,
4601 size_t inbound_size)
4602{
4603 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4604 struct page *req_page = NULL;
4605 struct page *reply_page;
4606 int ret;
4607
4608
4609
4610
4611
4612
4613
4614
4615 if (outbound) {
4616 if (outbound_size > PAGE_SIZE)
4617 return -E2BIG;
4618
4619 req_page = alloc_page(GFP_KERNEL);
4620 if (!req_page)
4621 return -ENOMEM;
4622
4623 memcpy(page_address(req_page), outbound, outbound_size);
4624 }
4625
4626 reply_page = alloc_page(