linux/fs/dlm/dir.c
<<
>>
Prefs
   1/******************************************************************************
   2*******************************************************************************
   3**
   4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   5**  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
   6**
   7**  This copyrighted material is made available to anyone wishing to use,
   8**  modify, copy, or redistribute it subject to the terms and conditions
   9**  of the GNU General Public License v.2.
  10**
  11*******************************************************************************
  12******************************************************************************/
  13
  14#include "dlm_internal.h"
  15#include "lockspace.h"
  16#include "member.h"
  17#include "lowcomms.h"
  18#include "rcom.h"
  19#include "config.h"
  20#include "memory.h"
  21#include "recover.h"
  22#include "util.h"
  23#include "lock.h"
  24#include "dir.h"
  25
  26
  27static void put_free_de(struct dlm_ls *ls, struct dlm_direntry *de)
  28{
  29        spin_lock(&ls->ls_recover_list_lock);
  30        list_add(&de->list, &ls->ls_recover_list);
  31        spin_unlock(&ls->ls_recover_list_lock);
  32}
  33
  34static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len)
  35{
  36        int found = 0;
  37        struct dlm_direntry *de;
  38
  39        spin_lock(&ls->ls_recover_list_lock);
  40        list_for_each_entry(de, &ls->ls_recover_list, list) {
  41                if (de->length == len) {
  42                        list_del(&de->list);
  43                        de->master_nodeid = 0;
  44                        memset(de->name, 0, len);
  45                        found = 1;
  46                        break;
  47                }
  48        }
  49        spin_unlock(&ls->ls_recover_list_lock);
  50
  51        if (!found)
  52                de = kzalloc(sizeof(struct dlm_direntry) + len, GFP_NOFS);
  53        return de;
  54}
  55
  56void dlm_clear_free_entries(struct dlm_ls *ls)
  57{
  58        struct dlm_direntry *de;
  59
  60        spin_lock(&ls->ls_recover_list_lock);
  61        while (!list_empty(&ls->ls_recover_list)) {
  62                de = list_entry(ls->ls_recover_list.next, struct dlm_direntry,
  63                                list);
  64                list_del(&de->list);
  65                kfree(de);
  66        }
  67        spin_unlock(&ls->ls_recover_list_lock);
  68}
  69
  70/*
  71 * We use the upper 16 bits of the hash value to select the directory node.
  72 * Low bits are used for distribution of rsb's among hash buckets on each node.
  73 *
  74 * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
  75 * num_nodes to the hash value.  This value in the desired range is used as an
  76 * offset into the sorted list of nodeid's to give the particular nodeid.
  77 */
  78
  79int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
  80{
  81        struct list_head *tmp;
  82        struct dlm_member *memb = NULL;
  83        uint32_t node, n = 0;
  84        int nodeid;
  85
  86        if (ls->ls_num_nodes == 1) {
  87                nodeid = dlm_our_nodeid();
  88                goto out;
  89        }
  90
  91        if (ls->ls_node_array) {
  92                node = (hash >> 16) % ls->ls_total_weight;
  93                nodeid = ls->ls_node_array[node];
  94                goto out;
  95        }
  96
  97        /* make_member_array() failed to kmalloc ls_node_array... */
  98
  99        node = (hash >> 16) % ls->ls_num_nodes;
 100
 101        list_for_each(tmp, &ls->ls_nodes) {
 102                if (n++ != node)
 103                        continue;
 104                memb = list_entry(tmp, struct dlm_member, list);
 105                break;
 106        }
 107
 108        DLM_ASSERT(memb , printk("num_nodes=%u n=%u node=%u\n",
 109                                 ls->ls_num_nodes, n, node););
 110        nodeid = memb->nodeid;
 111 out:
 112        return nodeid;
 113}
 114
 115int dlm_dir_nodeid(struct dlm_rsb *r)
 116{
 117        return dlm_hash2nodeid(r->res_ls, r->res_hash);
 118}
 119
 120static inline uint32_t dir_hash(struct dlm_ls *ls, char *name, int len)
 121{
 122        uint32_t val;
 123
 124        val = jhash(name, len, 0);
 125        val &= (ls->ls_dirtbl_size - 1);
 126
 127        return val;
 128}
 129
 130static void add_entry_to_hash(struct dlm_ls *ls, struct dlm_direntry *de)
 131{
 132        uint32_t bucket;
 133
 134        bucket = dir_hash(ls, de->name, de->length);
 135        list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
 136}
 137
 138static struct dlm_direntry *search_bucket(struct dlm_ls *ls, char *name,
 139                                          int namelen, uint32_t bucket)
 140{
 141        struct dlm_direntry *de;
 142
 143        list_for_each_entry(de, &ls->ls_dirtbl[bucket].list, list) {
 144                if (de->length == namelen && !memcmp(name, de->name, namelen))
 145                        goto out;
 146        }
 147        de = NULL;
 148 out:
 149        return de;
 150}
 151
 152void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen)
 153{
 154        struct dlm_direntry *de;
 155        uint32_t bucket;
 156
 157        bucket = dir_hash(ls, name, namelen);
 158
 159        spin_lock(&ls->ls_dirtbl[bucket].lock);
 160
 161        de = search_bucket(ls, name, namelen, bucket);
 162
 163        if (!de) {
 164                log_error(ls, "remove fr %u none", nodeid);
 165                goto out;
 166        }
 167
 168        if (de->master_nodeid != nodeid) {
 169                log_error(ls, "remove fr %u ID %u", nodeid, de->master_nodeid);
 170                goto out;
 171        }
 172
 173        list_del(&de->list);
 174        kfree(de);
 175 out:
 176        spin_unlock(&ls->ls_dirtbl[bucket].lock);
 177}
 178
 179void dlm_dir_clear(struct dlm_ls *ls)
 180{
 181        struct list_head *head;
 182        struct dlm_direntry *de;
 183        int i;
 184
 185        DLM_ASSERT(list_empty(&ls->ls_recover_list), );
 186
 187        for (i = 0; i < ls->ls_dirtbl_size; i++) {
 188                spin_lock(&ls->ls_dirtbl[i].lock);
 189                head = &ls->ls_dirtbl[i].list;
 190                while (!list_empty(head)) {
 191                        de = list_entry(head->next, struct dlm_direntry, list);
 192                        list_del(&de->list);
 193                        put_free_de(ls, de);
 194                }
 195                spin_unlock(&ls->ls_dirtbl[i].lock);
 196        }
 197}
 198
 199int dlm_recover_directory(struct dlm_ls *ls)
 200{
 201        struct dlm_member *memb;
 202        struct dlm_direntry *de;
 203        char *b, *last_name = NULL;
 204        int error = -ENOMEM, last_len, count = 0;
 205        uint16_t namelen;
 206
 207        log_debug(ls, "dlm_recover_directory");
 208
 209        if (dlm_no_directory(ls))
 210                goto out_status;
 211
 212        dlm_dir_clear(ls);
 213
 214        last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
 215        if (!last_name)
 216                goto out;
 217
 218        list_for_each_entry(memb, &ls->ls_nodes, list) {
 219                memset(last_name, 0, DLM_RESNAME_MAXLEN);
 220                last_len = 0;
 221
 222                for (;;) {
 223                        int left;
 224                        error = dlm_recovery_stopped(ls);
 225                        if (error)
 226                                goto out_free;
 227
 228                        error = dlm_rcom_names(ls, memb->nodeid,
 229                                               last_name, last_len);
 230                        if (error)
 231                                goto out_free;
 232
 233                        schedule();
 234
 235                        /*
 236                         * pick namelen/name pairs out of received buffer
 237                         */
 238
 239                        b = ls->ls_recover_buf->rc_buf;
 240                        left = ls->ls_recover_buf->rc_header.h_length;
 241                        left -= sizeof(struct dlm_rcom);
 242
 243                        for (;;) {
 244                                __be16 v;
 245
 246                                error = -EINVAL;
 247                                if (left < sizeof(__be16))
 248                                        goto out_free;
 249
 250                                memcpy(&v, b, sizeof(__be16));
 251                                namelen = be16_to_cpu(v);
 252                                b += sizeof(__be16);
 253                                left -= sizeof(__be16);
 254
 255                                /* namelen of 0xFFFFF marks end of names for
 256                                   this node; namelen of 0 marks end of the
 257                                   buffer */
 258
 259                                if (namelen == 0xFFFF)
 260                                        goto done;
 261                                if (!namelen)
 262                                        break;
 263
 264                                if (namelen > left)
 265                                        goto out_free;
 266
 267                                if (namelen > DLM_RESNAME_MAXLEN)
 268                                        goto out_free;
 269
 270                                error = -ENOMEM;
 271                                de = get_free_de(ls, namelen);
 272                                if (!de)
 273                                        goto out_free;
 274
 275                                de->master_nodeid = memb->nodeid;
 276                                de->length = namelen;
 277                                last_len = namelen;
 278                                memcpy(de->name, b, namelen);
 279                                memcpy(last_name, b, namelen);
 280                                b += namelen;
 281                                left -= namelen;
 282
 283                                add_entry_to_hash(ls, de);
 284                                count++;
 285                        }
 286                }
 287         done:
 288                ;
 289        }
 290
 291 out_status:
 292        error = 0;
 293        log_debug(ls, "dlm_recover_directory %d entries", count);
 294 out_free:
 295        kfree(last_name);
 296 out:
 297        dlm_clear_free_entries(ls);
 298        return error;
 299}
 300
 301static int get_entry(struct dlm_ls *ls, int nodeid, char *name,
 302                     int namelen, int *r_nodeid)
 303{
 304        struct dlm_direntry *de, *tmp;
 305        uint32_t bucket;
 306
 307        bucket = dir_hash(ls, name, namelen);
 308
 309        spin_lock(&ls->ls_dirtbl[bucket].lock);
 310        de = search_bucket(ls, name, namelen, bucket);
 311        if (de) {
 312                *r_nodeid = de->master_nodeid;
 313                spin_unlock(&ls->ls_dirtbl[bucket].lock);
 314                if (*r_nodeid == nodeid)
 315                        return -EEXIST;
 316                return 0;
 317        }
 318
 319        spin_unlock(&ls->ls_dirtbl[bucket].lock);
 320
 321        if (namelen > DLM_RESNAME_MAXLEN)
 322                return -EINVAL;
 323
 324        de = kzalloc(sizeof(struct dlm_direntry) + namelen, GFP_NOFS);
 325        if (!de)
 326                return -ENOMEM;
 327
 328        de->master_nodeid = nodeid;
 329        de->length = namelen;
 330        memcpy(de->name, name, namelen);
 331
 332        spin_lock(&ls->ls_dirtbl[bucket].lock);
 333        tmp = search_bucket(ls, name, namelen, bucket);
 334        if (tmp) {
 335                kfree(de);
 336                de = tmp;
 337        } else {
 338                list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
 339        }
 340        *r_nodeid = de->master_nodeid;
 341        spin_unlock(&ls->ls_dirtbl[bucket].lock);
 342        return 0;
 343}
 344
 345int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen,
 346                   int *r_nodeid)
 347{
 348        return get_entry(ls, nodeid, name, namelen, r_nodeid);
 349}
 350
 351static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
 352{
 353        struct dlm_rsb *r;
 354
 355        down_read(&ls->ls_root_sem);
 356        list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
 357                if (len == r->res_length && !memcmp(name, r->res_name, len)) {
 358                        up_read(&ls->ls_root_sem);
 359                        return r;
 360                }
 361        }
 362        up_read(&ls->ls_root_sem);
 363        return NULL;
 364}
 365
 366/* Find the rsb where we left off (or start again), then send rsb names
 367   for rsb's we're master of and whose directory node matches the requesting
 368   node.  inbuf is the rsb name last sent, inlen is the name's length */
 369
 370void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
 371                           char *outbuf, int outlen, int nodeid)
 372{
 373        struct list_head *list;
 374        struct dlm_rsb *r;
 375        int offset = 0, dir_nodeid;
 376        __be16 be_namelen;
 377
 378        down_read(&ls->ls_root_sem);
 379
 380        if (inlen > 1) {
 381                r = find_rsb_root(ls, inbuf, inlen);
 382                if (!r) {
 383                        inbuf[inlen - 1] = '\0';
 384                        log_error(ls, "copy_master_names from %d start %d %s",
 385                                  nodeid, inlen, inbuf);
 386                        goto out;
 387                }
 388                list = r->res_root_list.next;
 389        } else {
 390                list = ls->ls_root_list.next;
 391        }
 392
 393        for (offset = 0; list != &ls->ls_root_list; list = list->next) {
 394                r = list_entry(list, struct dlm_rsb, res_root_list);
 395                if (r->res_nodeid)
 396                        continue;
 397
 398                dir_nodeid = dlm_dir_nodeid(r);
 399                if (dir_nodeid != nodeid)
 400                        continue;
 401
 402                /*
 403                 * The block ends when we can't fit the following in the
 404                 * remaining buffer space:
 405                 * namelen (uint16_t) +
 406                 * name (r->res_length) +
 407                 * end-of-block record 0x0000 (uint16_t)
 408                 */
 409
 410                if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
 411                        /* Write end-of-block record */
 412                        be_namelen = cpu_to_be16(0);
 413                        memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
 414                        offset += sizeof(__be16);
 415                        goto out;
 416                }
 417
 418                be_namelen = cpu_to_be16(r->res_length);
 419                memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
 420                offset += sizeof(__be16);
 421                memcpy(outbuf + offset, r->res_name, r->res_length);
 422                offset += r->res_length;
 423        }
 424
 425        /*
 426         * If we've reached the end of the list (and there's room) write a
 427         * terminating record.
 428         */
 429
 430        if ((list == &ls->ls_root_list) &&
 431            (offset + sizeof(uint16_t) <= outlen)) {
 432                be_namelen = cpu_to_be16(0xFFFF);
 433                memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
 434                offset += sizeof(__be16);
 435        }
 436
 437 out:
 438        up_read(&ls->ls_root_sem);
 439}
 440
 441
lxr.linux.no kindly hosted by Redpill Linpro AS, provider of Linux consulting and operations services since 1995.