linux/fs/dlm/dir.c
<<
>>
Prefs
   1/******************************************************************************
   2*******************************************************************************
   3**
   4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   5**  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
   6**
   7**  This copyrighted material is made available to anyone wishing to use,
   8**  modify, copy, or redistribute it subject to the terms and conditions
   9**  of the GNU General Public License v.2.
  10**
  11*******************************************************************************
  12******************************************************************************/
  13
  14#include "dlm_internal.h"
  15#include "lockspace.h"
  16#include "member.h"
  17#include "lowcomms.h"
  18#include "rcom.h"
  19#include "config.h"
  20#include "memory.h"
  21#include "recover.h"
  22#include "util.h"
  23#include "lock.h"
  24#include "dir.h"
  25
  26
  27static void put_free_de(struct dlm_ls *ls, struct dlm_direntry *de)
  28{
  29        spin_lock(&ls->ls_recover_list_lock);
  30        list_add(&de->list, &ls->ls_recover_list);
  31        spin_unlock(&ls->ls_recover_list_lock);
  32}
  33
  34static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len)
  35{
  36        int found = 0;
  37        struct dlm_direntry *de;
  38
  39        spin_lock(&ls->ls_recover_list_lock);
  40        list_for_each_entry(de, &ls->ls_recover_list, list) {
  41                if (de->length == len) {
  42                        list_del(&de->list);
  43                        de->master_nodeid = 0;
  44                        memset(de->name, 0, len);
  45                        found = 1;
  46                        break;
  47                }
  48        }
  49        spin_unlock(&ls->ls_recover_list_lock);
  50
  51        if (!found)
  52                de = kzalloc(sizeof(struct dlm_direntry) + len, GFP_NOFS);
  53        return de;
  54}
  55
  56void dlm_clear_free_entries(struct dlm_ls *ls)
  57{
  58        struct dlm_direntry *de;
  59
  60        spin_lock(&ls->ls_recover_list_lock);
  61        while (!list_empty(&ls->ls_recover_list)) {
  62                de = list_entry(ls->ls_recover_list.next, struct dlm_direntry,
  63                                list);
  64                list_del(&de->list);
  65                kfree(de);
  66        }
  67        spin_unlock(&ls->ls_recover_list_lock);
  68}
  69
  70/*
  71 * We use the upper 16 bits of the hash value to select the directory node.
  72 * Low bits are used for distribution of rsb's among hash buckets on each node.
  73 *
  74 * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
  75 * num_nodes to the hash value.  This value in the desired range is used as an
  76 * offset into the sorted list of nodeid's to give the particular nodeid.
  77 */
  78
  79int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
  80{
  81        struct list_head *tmp;
  82        struct dlm_member *memb = NULL;
  83        uint32_t node, n = 0;
  84        int nodeid;
  85
  86        if (ls->ls_num_nodes == 1) {
  87                nodeid = dlm_our_nodeid();
  88                goto out;
  89        }
  90
  91        if (ls->ls_node_array) {
  92                node = (hash >> 16) % ls->ls_total_weight;
  93                nodeid = ls->ls_node_array[node];
  94                goto out;
  95        }
  96
  97        /* make_member_array() failed to kmalloc ls_node_array... */
  98
  99        node = (hash >> 16) % ls->ls_num_nodes;
 100
 101        list_for_each(tmp, &ls->ls_nodes) {
 102                if (n++ != node)
 103                        continue;
 104                memb = list_entry(tmp, struct dlm_member, list);
 105                break;
 106        }
 107
 108        DLM_ASSERT(memb , printk("num_nodes=%u n=%u node=%u\n",
 109                                 ls->ls_num_nodes, n, node););
 110        nodeid = memb->nodeid;
 111 out:
 112        return nodeid;
 113}
 114
 115int dlm_dir_nodeid(struct dlm_rsb *r)
 116{
 117        return dlm_hash2nodeid(r->res_ls, r->res_hash);
 118}
 119
 120static inline uint32_t dir_hash(struct dlm_ls *ls, char *name, int len)
 121{
 122        uint32_t val;
 123
 124        val = jhash(name, len, 0);
 125        val &= (ls->ls_dirtbl_size - 1);
 126
 127        return val;
 128}
 129
 130static void add_entry_to_hash(struct dlm_ls *ls, struct dlm_direntry *de)
 131{
 132        uint32_t bucket;
 133
 134        bucket = dir_hash(ls, de->name, de->length);
 135        list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
 136}
 137
 138static struct dlm_direntry *search_bucket(struct dlm_ls *ls, char *name,
 139                                          int namelen, uint32_t bucket)
 140{
 141        struct dlm_direntry *de;
 142
 143        list_for_each_entry(de, &ls->ls_dirtbl[bucket].list, list) {
 144                if (de->length == namelen && !memcmp(name, de->name, namelen))
 145                        goto out;
 146        }
 147        de = NULL;
 148 out:
 149        return de;
 150}
 151
 152void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen)
 153{
 154        struct dlm_direntry *de;
 155        uint32_t bucket;
 156
 157        bucket = dir_hash(ls, name, namelen);
 158
 159        spin_lock(&ls->ls_dirtbl[bucket].lock);
 160
 161        de = search_bucket(ls, name, namelen, bucket);
 162
 163        if (!de) {
 164                log_error(ls, "remove fr %u none", nodeid);
 165                goto out;
 166        }
 167
 168        if (de->master_nodeid != nodeid) {
 169                log_error(ls, "remove fr %u ID %u", nodeid, de->master_nodeid);
 170                goto out;
 171        }
 172
 173        list_del(&de->list);
 174        kfree(de);
 175 out:
 176        spin_unlock(&ls->ls_dirtbl[bucket].lock);
 177}
 178
 179void dlm_dir_clear(struct dlm_ls *ls)
 180{
 181        struct list_head *head;
 182        struct dlm_direntry *de;
 183        int i;
 184
 185        DLM_ASSERT(list_empty(&ls->ls_recover_list), );
 186
 187        for (i = 0; i < ls->ls_dirtbl_size; i++) {
 188                spin_lock(&ls->ls_dirtbl[i].lock);
 189                head = &ls->ls_dirtbl[i].list;
 190                while (!list_empty(head)) {
 191                        de = list_entry(head->next, struct dlm_direntry, list);
 192                        list_del(&de->list);
 193                        put_free_de(ls, de);
 194                }
 195                spin_unlock(&ls->ls_dirtbl[i].lock);
 196        }
 197}
 198
 199int dlm_recover_directory(struct dlm_ls *ls)
 200{
 201        struct dlm_member *memb;
 202        struct dlm_direntry *de;
 203        char *b, *last_name = NULL;
 204        int error = -ENOMEM, last_len, count = 0;
 205        uint16_t namelen;
 206
 207        log_debug(ls, "dlm_recover_directory");
 208
 209        if (dlm_no_directory(ls))
 210                goto out_status;
 211
 212        dlm_dir_clear(ls);
 213
 214        last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
 215        if (!last_name)
 216                goto out;
 217
 218        list_for_each_entry(memb, &ls->ls_nodes, list) {
 219                memset(last_name, 0, DLM_RESNAME_MAXLEN);
 220                last_len = 0;
 221
 222                for (;;) {
 223                        int left;
 224                        error = dlm_recovery_stopped(ls);
 225                        if (error)
 226                                goto out_free;
 227
 228                        error = dlm_rcom_names(ls, memb->nodeid,
 229                                               last_name, last_len);
 230                        if (error)
 231                                goto out_free;
 232
 233                        schedule();
 234
 235                        /*
 236                         * pick namelen/name pairs out of received buffer
 237                         */
 238
 239                        b = ls->ls_recover_buf->rc_buf;
 240                        left = ls->ls_recover_buf->rc_header.h_length;
 241                        left -= sizeof(struct dlm_rcom);
 242
 243                        for (;;) {
 244                                __be16 v;
 245
 246                                error = -EINVAL;
 247                                if (left < sizeof(__be16))
 248                                        goto out_free;
 249
 250                                memcpy(&v, b, sizeof(__be16));
 251                                namelen = be16_to_cpu(v);
 252                                b += sizeof(__be16);
 253                                left -= sizeof(__be16);
 254
 255                                /* namelen of 0xFFFFF marks end of names for
 256                                   this node; namelen of 0 marks end of the
 257                                   buffer */
 258
 259                                if (namelen == 0xFFFF)
 260                                        goto done;
 261                                if (!namelen)
 262                                        break;
 263
 264                                if (namelen > left)
 265                                        goto out_free;
 266
 267                                if (namelen > DLM_RESNAME_MAXLEN)
 268                                        goto out_free;
 269
 270                                error = -ENOMEM;
 271                                de = get_free_de(ls, namelen);
 272                                if (!de)
 273                                        goto out_free;
 274
 275                                de->master_nodeid = memb->nodeid;
 276                                de->length = namelen;
 277                                last_len = namelen;
 278                                memcpy(de->name, b, namelen);
 279                                memcpy(last_name, b, namelen);
 280                                b += namelen;
 281                                left -= namelen;
 282
 283                                add_entry_to_hash(ls, de);
 284                                count++;
 285                        }
 286                }
 287         done:
 288                ;
 289        }
 290
 291 out_status:
 292        error = 0;
 293        dlm_set_recover_status(ls, DLM_RS_DIR);
 294        log_debug(ls, "dlm_recover_directory %d entries", count);
 295 out_free:
 296        kfree(last_name);
 297 out:
 298        dlm_clear_free_entries(ls);
 299        return error;
 300}
 301
 302static int get_entry(struct dlm_ls *ls, int nodeid, char *name,
 303                     int namelen, int *r_nodeid)
 304{
 305        struct dlm_direntry *de, *tmp;
 306        uint32_t bucket;
 307
 308        bucket = dir_hash(ls, name, namelen);
 309
 310        spin_lock(&ls->ls_dirtbl[bucket].lock);
 311        de = search_bucket(ls, name, namelen, bucket);
 312        if (de) {
 313                *r_nodeid = de->master_nodeid;
 314                spin_unlock(&ls->ls_dirtbl[bucket].lock);
 315                if (*r_nodeid == nodeid)
 316                        return -EEXIST;
 317                return 0;
 318        }
 319
 320        spin_unlock(&ls->ls_dirtbl[bucket].lock);
 321
 322        if (namelen > DLM_RESNAME_MAXLEN)
 323                return -EINVAL;
 324
 325        de = kzalloc(sizeof(struct dlm_direntry) + namelen, GFP_NOFS);
 326        if (!de)
 327                return -ENOMEM;
 328
 329        de->master_nodeid = nodeid;
 330        de->length = namelen;
 331        memcpy(de->name, name, namelen);
 332
 333        spin_lock(&ls->ls_dirtbl[bucket].lock);
 334        tmp = search_bucket(ls, name, namelen, bucket);
 335        if (tmp) {
 336                kfree(de);
 337                de = tmp;
 338        } else {
 339                list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
 340        }
 341        *r_nodeid = de->master_nodeid;
 342        spin_unlock(&ls->ls_dirtbl[bucket].lock);
 343        return 0;
 344}
 345
 346int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen,
 347                   int *r_nodeid)
 348{
 349        return get_entry(ls, nodeid, name, namelen, r_nodeid);
 350}
 351
 352static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
 353{
 354        struct dlm_rsb *r;
 355
 356        down_read(&ls->ls_root_sem);
 357        list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
 358                if (len == r->res_length && !memcmp(name, r->res_name, len)) {
 359                        up_read(&ls->ls_root_sem);
 360                        return r;
 361                }
 362        }
 363        up_read(&ls->ls_root_sem);
 364        return NULL;
 365}
 366
 367/* Find the rsb where we left off (or start again), then send rsb names
 368   for rsb's we're master of and whose directory node matches the requesting
 369   node.  inbuf is the rsb name last sent, inlen is the name's length */
 370
 371void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
 372                           char *outbuf, int outlen, int nodeid)
 373{
 374        struct list_head *list;
 375        struct dlm_rsb *r;
 376        int offset = 0, dir_nodeid;
 377        __be16 be_namelen;
 378
 379        down_read(&ls->ls_root_sem);
 380
 381        if (inlen > 1) {
 382                r = find_rsb_root(ls, inbuf, inlen);
 383                if (!r) {
 384                        inbuf[inlen - 1] = '\0';
 385                        log_error(ls, "copy_master_names from %d start %d %s",
 386                                  nodeid, inlen, inbuf);
 387                        goto out;
 388                }
 389                list = r->res_root_list.next;
 390        } else {
 391                list = ls->ls_root_list.next;
 392        }
 393
 394        for (offset = 0; list != &ls->ls_root_list; list = list->next) {
 395                r = list_entry(list, struct dlm_rsb, res_root_list);
 396                if (r->res_nodeid)
 397                        continue;
 398
 399                dir_nodeid = dlm_dir_nodeid(r);
 400                if (dir_nodeid != nodeid)
 401                        continue;
 402
 403                /*
 404                 * The block ends when we can't fit the following in the
 405                 * remaining buffer space:
 406                 * namelen (uint16_t) +
 407                 * name (r->res_length) +
 408                 * end-of-block record 0x0000 (uint16_t)
 409                 */
 410
 411                if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
 412                        /* Write end-of-block record */
 413                        be_namelen = cpu_to_be16(0);
 414                        memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
 415                        offset += sizeof(__be16);
 416                        goto out;
 417                }
 418
 419                be_namelen = cpu_to_be16(r->res_length);
 420                memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
 421                offset += sizeof(__be16);
 422                memcpy(outbuf + offset, r->res_name, r->res_length);
 423                offset += r->res_length;
 424        }
 425
 426        /*
 427         * If we've reached the end of the list (and there's room) write a
 428         * terminating record.
 429         */
 430
 431        if ((list == &ls->ls_root_list) &&
 432            (offset + sizeof(uint16_t) <= outlen)) {
 433                be_namelen = cpu_to_be16(0xFFFF);
 434                memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
 435                offset += sizeof(__be16);
 436        }
 437
 438 out:
 439        up_read(&ls->ls_root_sem);
 440}
 441
 442
lxr.linux.no kindly hosted by Redpill Linpro AS, provider of Linux consulting and operations services since 1995.