linux/fs/ocfs2/dlm/dlmdomain.c
<<
>>
Prefs
   1/* -*- mode: c; c-basic-offset: 8; -*-
   2 * vim: noexpandtab sw=8 ts=8 sts=0:
   3 *
   4 * dlmdomain.c
   5 *
   6 * defines domain join / leave apis
   7 *
   8 * Copyright (C) 2004 Oracle.  All rights reserved.
   9 *
  10 * This program is free software; you can redistribute it and/or
  11 * modify it under the terms of the GNU General Public
  12 * License as published by the Free Software Foundation; either
  13 * version 2 of the License, or (at your option) any later version.
  14 *
  15 * This program is distributed in the hope that it will be useful,
  16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
  17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  18 * General Public License for more details.
  19 *
  20 * You should have received a copy of the GNU General Public
  21 * License along with this program; if not, write to the
  22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  23 * Boston, MA 021110-1307, USA.
  24 *
  25 */
  26
  27#include <linux/module.h>
  28#include <linux/types.h>
  29#include <linux/slab.h>
  30#include <linux/highmem.h>
  31#include <linux/utsname.h>
  32#include <linux/init.h>
  33#include <linux/spinlock.h>
  34#include <linux/delay.h>
  35#include <linux/err.h>
  36#include <linux/debugfs.h>
  37
  38#include "cluster/heartbeat.h"
  39#include "cluster/nodemanager.h"
  40#include "cluster/tcp.h"
  41
  42#include "dlmapi.h"
  43#include "dlmcommon.h"
  44#include "dlmdomain.h"
  45#include "dlmdebug.h"
  46
  47#include "dlmver.h"
  48
  49#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN)
  50#include "cluster/masklog.h"
  51
  52/*
  53 * ocfs2 node maps are array of long int, which limits to send them freely
  54 * across the wire due to endianness issues. To workaround this, we convert
  55 * long ints to byte arrays. Following 3 routines are helper functions to
  56 * set/test/copy bits within those array of bytes
  57 */
  58static inline void byte_set_bit(u8 nr, u8 map[])
  59{
  60        map[nr >> 3] |= (1UL << (nr & 7));
  61}
  62
  63static inline int byte_test_bit(u8 nr, u8 map[])
  64{
  65        return ((1UL << (nr & 7)) & (map[nr >> 3])) != 0;
  66}
  67
  68static inline void byte_copymap(u8 dmap[], unsigned long smap[],
  69                        unsigned int sz)
  70{
  71        unsigned int nn;
  72
  73        if (!sz)
  74                return;
  75
  76        memset(dmap, 0, ((sz + 7) >> 3));
  77        for (nn = 0 ; nn < sz; nn++)
  78                if (test_bit(nn, smap))
  79                        byte_set_bit(nn, dmap);
  80}
  81
  82static void dlm_free_pagevec(void **vec, int pages)
  83{
  84        while (pages--)
  85                free_page((unsigned long)vec[pages]);
  86        kfree(vec);
  87}
  88
  89static void **dlm_alloc_pagevec(int pages)
  90{
  91        void **vec = kmalloc(pages * sizeof(void *), GFP_KERNEL);
  92        int i;
  93
  94        if (!vec)
  95                return NULL;
  96
  97        for (i = 0; i < pages; i++)
  98                if (!(vec[i] = (void *)__get_free_page(GFP_KERNEL)))
  99                        goto out_free;
 100
 101        mlog(0, "Allocated DLM hash pagevec; %d pages (%lu expected), %lu buckets per page\n",
 102             pages, (unsigned long)DLM_HASH_PAGES,
 103             (unsigned long)DLM_BUCKETS_PER_PAGE);
 104        return vec;
 105out_free:
 106        dlm_free_pagevec(vec, i);
 107        return NULL;
 108}
 109
 110/*
 111 *
 112 * spinlock lock ordering: if multiple locks are needed, obey this ordering:
 113 *    dlm_domain_lock
 114 *    struct dlm_ctxt->spinlock
 115 *    struct dlm_lock_resource->spinlock
 116 *    struct dlm_ctxt->master_lock
 117 *    struct dlm_ctxt->ast_lock
 118 *    dlm_master_list_entry->spinlock
 119 *    dlm_lock->spinlock
 120 *
 121 */
 122
 123DEFINE_SPINLOCK(dlm_domain_lock);
 124LIST_HEAD(dlm_domains);
 125static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
 126
 127/*
 128 * The supported protocol version for DLM communication.  Running domains
 129 * will have a negotiated version with the same major number and a minor
 130 * number equal or smaller.  The dlm_ctxt->dlm_locking_proto field should
 131 * be used to determine what a running domain is actually using.
 132 */
 133static const struct dlm_protocol_version dlm_protocol = {
 134        .pv_major = 1,
 135        .pv_minor = 0,
 136};
 137
 138#define DLM_DOMAIN_BACKOFF_MS 200
 139
 140static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
 141                                  void **ret_data);
 142static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
 143                                     void **ret_data);
 144static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
 145                                   void **ret_data);
 146static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
 147                                   void **ret_data);
 148static int dlm_protocol_compare(struct dlm_protocol_version *existing,
 149                                struct dlm_protocol_version *request);
 150
 151static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm);
 152
 153void __dlm_unhash_lockres(struct dlm_lock_resource *lockres)
 154{
 155        if (!hlist_unhashed(&lockres->hash_node)) {
 156                hlist_del_init(&lockres->hash_node);
 157                dlm_lockres_put(lockres);
 158        }
 159}
 160
 161void __dlm_insert_lockres(struct dlm_ctxt *dlm,
 162                       struct dlm_lock_resource *res)
 163{
 164        struct hlist_head *bucket;
 165        struct qstr *q;
 166
 167        assert_spin_locked(&dlm->spinlock);
 168
 169        q = &res->lockname;
 170        bucket = dlm_lockres_hash(dlm, q->hash);
 171
 172        /* get a reference for our hashtable */
 173        dlm_lockres_get(res);
 174
 175        hlist_add_head(&res->hash_node, bucket);
 176}
 177
 178struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
 179                                                     const char *name,
 180                                                     unsigned int len,
 181                                                     unsigned int hash)
 182{
 183        struct hlist_head *bucket;
 184        struct hlist_node *list;
 185
 186        mlog_entry("%.*s\n", len, name);
 187
 188        assert_spin_locked(&dlm->spinlock);
 189
 190        bucket = dlm_lockres_hash(dlm, hash);
 191
 192        hlist_for_each(list, bucket) {
 193                struct dlm_lock_resource *res = hlist_entry(list,
 194                        struct dlm_lock_resource, hash_node);
 195                if (res->lockname.name[0] != name[0])
 196                        continue;
 197                if (unlikely(res->lockname.len != len))
 198                        continue;
 199                if (memcmp(res->lockname.name + 1, name + 1, len - 1))
 200                        continue;
 201                dlm_lockres_get(res);
 202                return res;
 203        }
 204        return NULL;
 205}
 206
 207/* intended to be called by functions which do not care about lock
 208 * resources which are being purged (most net _handler functions).
 209 * this will return NULL for any lock resource which is found but
 210 * currently in the process of dropping its mastery reference.
 211 * use __dlm_lookup_lockres_full when you need the lock resource
 212 * regardless (e.g. dlm_get_lock_resource) */
 213struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
 214                                                const char *name,
 215                                                unsigned int len,
 216                                                unsigned int hash)
 217{
 218        struct dlm_lock_resource *res = NULL;
 219
 220        mlog_entry("%.*s\n", len, name);
 221
 222        assert_spin_locked(&dlm->spinlock);
 223
 224        res = __dlm_lookup_lockres_full(dlm, name, len, hash);
 225        if (res) {
 226                spin_lock(&res->spinlock);
 227                if (res->state & DLM_LOCK_RES_DROPPING_REF) {
 228                        spin_unlock(&res->spinlock);
 229                        dlm_lockres_put(res);
 230                        return NULL;
 231                }
 232                spin_unlock(&res->spinlock);
 233        }
 234
 235        return res;
 236}
 237
 238struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
 239                                    const char *name,
 240                                    unsigned int len)
 241{
 242        struct dlm_lock_resource *res;
 243        unsigned int hash = dlm_lockid_hash(name, len);
 244
 245        spin_lock(&dlm->spinlock);
 246        res = __dlm_lookup_lockres(dlm, name, len, hash);
 247        spin_unlock(&dlm->spinlock);
 248        return res;
 249}
 250
 251static struct dlm_ctxt * __dlm_lookup_domain_full(const char *domain, int len)
 252{
 253        struct dlm_ctxt *tmp = NULL;
 254        struct list_head *iter;
 255
 256        assert_spin_locked(&dlm_domain_lock);
 257
 258        /* tmp->name here is always NULL terminated,
 259         * but domain may not be! */
 260        list_for_each(iter, &dlm_domains) {
 261                tmp = list_entry (iter, struct dlm_ctxt, list);
 262                if (strlen(tmp->name) == len &&
 263                    memcmp(tmp->name, domain, len)==0)
 264                        break;
 265                tmp = NULL;
 266        }
 267
 268        return tmp;
 269}
 270
 271/* For null terminated domain strings ONLY */
 272static struct dlm_ctxt * __dlm_lookup_domain(const char *domain)
 273{
 274        assert_spin_locked(&dlm_domain_lock);
 275
 276        return __dlm_lookup_domain_full(domain, strlen(domain));
 277}
 278
 279
 280/* returns true on one of two conditions:
 281 * 1) the domain does not exist
 282 * 2) the domain exists and it's state is "joined" */
 283static int dlm_wait_on_domain_helper(const char *domain)
 284{
 285        int ret = 0;
 286        struct dlm_ctxt *tmp = NULL;
 287
 288        spin_lock(&dlm_domain_lock);
 289
 290        tmp = __dlm_lookup_domain(domain);
 291        if (!tmp)
 292                ret = 1;
 293        else if (tmp->dlm_state == DLM_CTXT_JOINED)
 294                ret = 1;
 295
 296        spin_unlock(&dlm_domain_lock);
 297        return ret;
 298}
 299
 300static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm)
 301{
 302        dlm_destroy_debugfs_subroot(dlm);
 303
 304        if (dlm->lockres_hash)
 305                dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES);
 306
 307        if (dlm->master_hash)
 308                dlm_free_pagevec((void **)dlm->master_hash, DLM_HASH_PAGES);
 309
 310        if (dlm->name)
 311                kfree(dlm->name);
 312
 313        kfree(dlm);
 314}
 315
 316/* A little strange - this function will be called while holding
 317 * dlm_domain_lock and is expected to be holding it on the way out. We
 318 * will however drop and reacquire it multiple times */
 319static void dlm_ctxt_release(struct kref *kref)
 320{
 321        struct dlm_ctxt *dlm;
 322
 323        dlm = container_of(kref, struct dlm_ctxt, dlm_refs);
 324
 325        BUG_ON(dlm->num_joins);
 326        BUG_ON(dlm->dlm_state == DLM_CTXT_JOINED);
 327
 328        /* we may still be in the list if we hit an error during join. */
 329        list_del_init(&dlm->list);
 330
 331        spin_unlock(&dlm_domain_lock);
 332
 333        mlog(0, "freeing memory from domain %s\n", dlm->name);
 334
 335        wake_up(&dlm_domain_events);
 336
 337        dlm_free_ctxt_mem(dlm);
 338
 339        spin_lock(&dlm_domain_lock);
 340}
 341
 342void dlm_put(struct dlm_ctxt *dlm)
 343{
 344        spin_lock(&dlm_domain_lock);
 345        kref_put(&dlm->dlm_refs, dlm_ctxt_release);
 346        spin_unlock(&dlm_domain_lock);
 347}
 348
 349static void __dlm_get(struct dlm_ctxt *dlm)
 350{
 351        kref_get(&dlm->dlm_refs);
 352}
 353
 354/* given a questionable reference to a dlm object, gets a reference if
 355 * it can find it in the list, otherwise returns NULL in which case
 356 * you shouldn't trust your pointer. */
 357struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm)
 358{
 359        struct list_head *iter;
 360        struct dlm_ctxt *target = NULL;
 361
 362        spin_lock(&dlm_domain_lock);
 363
 364        list_for_each(iter, &dlm_domains) {
 365                target = list_entry (iter, struct dlm_ctxt, list);
 366
 367                if (target == dlm) {
 368                        __dlm_get(target);
 369                        break;
 370                }
 371
 372                target = NULL;
 373        }
 374
 375        spin_unlock(&dlm_domain_lock);
 376
 377        return target;
 378}
 379
 380int dlm_domain_fully_joined(struct dlm_ctxt *dlm)
 381{
 382        int ret;
 383
 384        spin_lock(&dlm_domain_lock);
 385        ret = (dlm->dlm_state == DLM_CTXT_JOINED) ||
 386                (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN);
 387        spin_unlock(&dlm_domain_lock);
 388
 389        return ret;
 390}
 391
 392static void dlm_destroy_dlm_worker(struct dlm_ctxt *dlm)
 393{
 394        if (dlm->dlm_worker) {
 395                flush_workqueue(dlm->dlm_worker);
 396                destroy_workqueue(dlm->dlm_worker);
 397                dlm->dlm_worker = NULL;
 398        }
 399}
 400
 401static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm)
 402{
 403        dlm_unregister_domain_handlers(dlm);
 404        dlm_debug_shutdown(dlm);
 405        dlm_complete_thread(dlm);
 406        dlm_complete_recovery_thread(dlm);
 407        dlm_destroy_dlm_worker(dlm);
 408
 409        /* We've left the domain. Now we can take ourselves out of the
 410         * list and allow the kref stuff to help us free the
 411         * memory. */
 412        spin_lock(&dlm_domain_lock);
 413        list_del_init(&dlm->list);
 414        spin_unlock(&dlm_domain_lock);
 415
 416        /* Wake up anyone waiting for us to remove this domain */
 417        wake_up(&dlm_domain_events);
 418}
 419
 420static int dlm_migrate_all_locks(struct dlm_ctxt *dlm)
 421{
 422        int i, num, n, ret = 0;
 423        struct dlm_lock_resource *res;
 424        struct hlist_node *iter;
 425        struct hlist_head *bucket;
 426        int dropped;
 427
 428        mlog(0, "Migrating locks from domain %s\n", dlm->name);
 429
 430        num = 0;
 431        spin_lock(&dlm->spinlock);
 432        for (i = 0; i < DLM_HASH_BUCKETS; i++) {
 433redo_bucket:
 434                n = 0;
 435                bucket = dlm_lockres_hash(dlm, i);
 436                iter = bucket->first;
 437                while (iter) {
 438                        n++;
 439                        res = hlist_entry(iter, struct dlm_lock_resource,
 440                                          hash_node);
 441                        dlm_lockres_get(res);
 442                        /* migrate, if necessary.  this will drop the dlm
 443                         * spinlock and retake it if it does migration. */
 444                        dropped = dlm_empty_lockres(dlm, res);
 445
 446                        spin_lock(&res->spinlock);
 447                        __dlm_lockres_calc_usage(dlm, res);
 448                        iter = res->hash_node.next;
 449                        spin_unlock(&res->spinlock);
 450
 451                        dlm_lockres_put(res);
 452
 453                        if (dropped)
 454                                goto redo_bucket;
 455                }
 456                cond_resched_lock(&dlm->spinlock);
 457                num += n;
 458                mlog(0, "%s: touched %d lockreses in bucket %d "
 459                     "(tot=%d)\n", dlm->name, n, i, num);
 460        }
 461        spin_unlock(&dlm->spinlock);
 462        wake_up(&dlm->dlm_thread_wq);
 463
 464        /* let the dlm thread take care of purging, keep scanning until
 465         * nothing remains in the hash */
 466        if (num) {
 467                mlog(0, "%s: %d lock resources in hash last pass\n",
 468                     dlm->name, num);
 469                ret = -EAGAIN;
 470        }
 471        mlog(0, "DONE Migrating locks from domain %s\n", dlm->name);
 472        return ret;
 473}
 474
 475static int dlm_no_joining_node(struct dlm_ctxt *dlm)
 476{
 477        int ret;
 478
 479        spin_lock(&dlm->spinlock);
 480        ret = dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN;
 481        spin_unlock(&dlm->spinlock);
 482
 483        return ret;
 484}
 485
 486static void dlm_mark_domain_leaving(struct dlm_ctxt *dlm)
 487{
 488        /* Yikes, a double spinlock! I need domain_lock for the dlm
 489         * state and the dlm spinlock for join state... Sorry! */
 490again:
 491        spin_lock(&dlm_domain_lock);
 492        spin_lock(&dlm->spinlock);
 493
 494        if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
 495                mlog(0, "Node %d is joining, we wait on it.\n",
 496                          dlm->joining_node);
 497                spin_unlock(&dlm->spinlock);
 498                spin_unlock(&dlm_domain_lock);
 499
 500                wait_event(dlm->dlm_join_events, dlm_no_joining_node(dlm));
 501                goto again;
 502        }
 503
 504        dlm->dlm_state = DLM_CTXT_LEAVING;
 505        spin_unlock(&dlm->spinlock);
 506        spin_unlock(&dlm_domain_lock);
 507}
 508
 509static void __dlm_print_nodes(struct dlm_ctxt *dlm)
 510{
 511        int node = -1;
 512
 513        assert_spin_locked(&dlm->spinlock);
 514
 515        printk(KERN_INFO "ocfs2_dlm: Nodes in domain (\"%s\"): ", dlm->name);
 516
 517        while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
 518                                     node + 1)) < O2NM_MAX_NODES) {
 519                printk("%d ", node);
 520        }
 521        printk("\n");
 522}
 523
 524static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
 525                                   void **ret_data)
 526{
 527        struct dlm_ctxt *dlm = data;
 528        unsigned int node;
 529        struct dlm_exit_domain *exit_msg = (struct dlm_exit_domain *) msg->buf;
 530
 531        mlog_entry("%p %u %p", msg, len, data);
 532
 533        if (!dlm_grab(dlm))
 534                return 0;
 535
 536        node = exit_msg->node_idx;
 537
 538        printk(KERN_INFO "ocfs2_dlm: Node %u leaves domain %s\n", node, dlm->name);
 539
 540        spin_lock(&dlm->spinlock);
 541        clear_bit(node, dlm->domain_map);
 542        __dlm_print_nodes(dlm);
 543
 544        /* notify anything attached to the heartbeat events */
 545        dlm_hb_event_notify_attached(dlm, node, 0);
 546
 547        spin_unlock(&dlm->spinlock);
 548
 549        dlm_put(dlm);
 550
 551        return 0;
 552}
 553
 554static int dlm_send_one_domain_exit(struct dlm_ctxt *dlm,
 555                                    unsigned int node)
 556{
 557        int status;
 558        struct dlm_exit_domain leave_msg;
 559
 560        mlog(0, "Asking node %u if we can leave the domain %s me = %u\n",
 561                  node, dlm->name, dlm->node_num);
 562
 563        memset(&leave_msg, 0, sizeof(leave_msg));
 564        leave_msg.node_idx = dlm->node_num;
 565
 566        status = o2net_send_message(DLM_EXIT_DOMAIN_MSG, dlm->key,
 567                                    &leave_msg, sizeof(leave_msg), node,
 568                                    NULL);
 569
 570        mlog(0, "status return %d from o2net_send_message\n", status);
 571
 572        return status;
 573}
 574
 575
 576static void dlm_leave_domain(struct dlm_ctxt *dlm)
 577{
 578        int node, clear_node, status;
 579
 580        /* At this point we've migrated away all our locks and won't
 581         * accept mastership of new ones. The dlm is responsible for
 582         * almost nothing now. We make sure not to confuse any joining
 583         * nodes and then commence shutdown procedure. */
 584
 585        spin_lock(&dlm->spinlock);
 586        /* Clear ourselves from the domain map */
 587        clear_bit(dlm->node_num, dlm->domain_map);
 588        while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
 589                                     0)) < O2NM_MAX_NODES) {
 590                /* Drop the dlm spinlock. This is safe wrt the domain_map.
 591                 * -nodes cannot be added now as the
 592                 *   query_join_handlers knows to respond with OK_NO_MAP
 593                 * -we catch the right network errors if a node is
 594                 *   removed from the map while we're sending him the
 595                 *   exit message. */
 596                spin_unlock(&dlm->spinlock);
 597
 598                clear_node = 1;
 599
 600                status = dlm_send_one_domain_exit(dlm, node);
 601                if (status < 0 &&
 602                    status != -ENOPROTOOPT &&
 603                    status != -ENOTCONN) {
 604                        mlog(ML_NOTICE, "Error %d sending domain exit message "
 605                             "to node %d\n", status, node);
 606
 607                        /* Not sure what to do here but lets sleep for
 608                         * a bit in case this was a transient
 609                         * error... */
 610                        msleep(DLM_DOMAIN_BACKOFF_MS);
 611                        clear_node = 0;
 612                }
 613
 614                spin_lock(&dlm->spinlock);
 615                /* If we're not clearing the node bit then we intend
 616                 * to loop back around to try again. */
 617                if (clear_node)
 618                        clear_bit(node, dlm->domain_map);
 619        }
 620        spin_unlock(&dlm->spinlock);
 621}
 622
 623int dlm_joined(struct dlm_ctxt *dlm)
 624{
 625        int ret = 0;
 626
 627        spin_lock(&dlm_domain_lock);
 628
 629        if (dlm->dlm_state == DLM_CTXT_JOINED)
 630                ret = 1;
 631
 632        spin_unlock(&dlm_domain_lock);
 633
 634        return ret;
 635}
 636
 637int dlm_shutting_down(struct dlm_ctxt *dlm)
 638{
 639        int ret = 0;
 640
 641        spin_lock(&dlm_domain_lock);
 642
 643        if (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN)
 644                ret = 1;
 645
 646        spin_unlock(&dlm_domain_lock);
 647
 648        return ret;
 649}
 650
 651void dlm_unregister_domain(struct dlm_ctxt *dlm)
 652{
 653        int leave = 0;
 654        struct dlm_lock_resource *res;
 655
 656        spin_lock(&dlm_domain_lock);
 657        BUG_ON(dlm->dlm_state != DLM_CTXT_JOINED);
 658        BUG_ON(!dlm->num_joins);
 659
 660        dlm->num_joins--;
 661        if (!dlm->num_joins) {
 662                /* We mark it "in shutdown" now so new register
 663                 * requests wait until we've completely left the
 664                 * domain. Don't use DLM_CTXT_LEAVING yet as we still
 665                 * want new domain joins to communicate with us at
 666                 * least until we've completed migration of our
 667                 * resources. */
 668                dlm->dlm_state = DLM_CTXT_IN_SHUTDOWN;
 669                leave = 1;
 670        }
 671        spin_unlock(&dlm_domain_lock);
 672
 673        if (leave) {
 674                mlog(0, "shutting down domain %s\n", dlm->name);
 675
 676                /* We changed dlm state, notify the thread */
 677                dlm_kick_thread(dlm, NULL);
 678
 679                while (dlm_migrate_all_locks(dlm)) {
 680                        /* Give dlm_thread time to purge the lockres' */
 681                        msleep(500);
 682                        mlog(0, "%s: more migration to do\n", dlm->name);
 683                }
 684
 685                /* This list should be empty. If not, print remaining lockres */
 686                if (!list_empty(&dlm->tracking_list)) {
 687                        mlog(ML_ERROR, "Following lockres' are still on the "
 688                             "tracking list:\n");
 689                        list_for_each_entry(res, &dlm->tracking_list, tracking)
 690                                dlm_print_one_lock_resource(res);
 691                }
 692
 693                dlm_mark_domain_leaving(dlm);
 694                dlm_leave_domain(dlm);
 695                dlm_complete_dlm_shutdown(dlm);
 696        }
 697        dlm_put(dlm);
 698}
 699EXPORT_SYMBOL_GPL(dlm_unregister_domain);
 700
 701static int dlm_query_join_proto_check(char *proto_type, int node,
 702                                      struct dlm_protocol_version *ours,
 703                                      struct dlm_protocol_version *request)
 704{
 705        int rc;
 706        struct dlm_protocol_version proto = *request;
 707
 708        if (!dlm_protocol_compare(ours, &proto)) {
 709                mlog(0,
 710                     "node %u wanted to join with %s locking protocol "
 711                     "%u.%u, we respond with %u.%u\n",
 712                     node, proto_type,
 713                     request->pv_major,
 714                     request->pv_minor,
 715                     proto.pv_major, proto.pv_minor);
 716                request->pv_minor = proto.pv_minor;
 717                rc = 0;
 718        } else {
 719                mlog(ML_NOTICE,
 720                     "Node %u wanted to join with %s locking "
 721                     "protocol %u.%u, but we have %u.%u, disallowing\n",
 722                     node, proto_type,
 723                     request->pv_major,
 724                     request->pv_minor,
 725                     ours->pv_major,
 726                     ours->pv_minor);
 727                rc = 1;
 728        }
 729
 730        return rc;
 731}
 732
 733/*
 734 * struct dlm_query_join_packet is made up of four one-byte fields.  They
 735 * are effectively in big-endian order already.  However, little-endian
 736 * machines swap them before putting the packet on the wire (because
 737 * query_join's response is a status, and that status is treated as a u32
 738 * on the wire).  Thus, a big-endian and little-endian machines will treat
 739 * this structure differently.
 740 *
 741 * The solution is to have little-endian machines swap the structure when
 742 * converting from the structure to the u32 representation.  This will
 743 * result in the structure having the correct format on the wire no matter
 744 * the host endian format.
 745 */
 746static void dlm_query_join_packet_to_wire(struct dlm_query_join_packet *packet,
 747                                          u32 *wire)
 748{
 749        union dlm_query_join_response response;
 750
 751        response.packet = *packet;
 752        *wire = cpu_to_be32(response.intval);
 753}
 754
 755static void dlm_query_join_wire_to_packet(u32 wire,
 756                                          struct dlm_query_join_packet *packet)
 757{
 758        union dlm_query_join_response response;
 759
 760        response.intval = cpu_to_be32(wire);
 761        *packet = response.packet;
 762}
 763
 764static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
 765                                  void **ret_data)
 766{
 767        struct dlm_query_join_request *query;
 768        struct dlm_query_join_packet packet = {
 769                .code = JOIN_DISALLOW,
 770        };
 771        struct dlm_ctxt *dlm = NULL;
 772        u32 response;
 773        u8 nodenum;
 774
 775        query = (struct dlm_query_join_request *) msg->buf;
 776
 777        mlog(0, "node %u wants to join domain %s\n", query->node_idx,
 778                  query->domain);
 779
 780        /*
 781         * If heartbeat doesn't consider the node live, tell it
 782         * to back off and try again.  This gives heartbeat a chance
 783         * to catch up.
 784         */
 785        if (!o2hb_check_node_heartbeating(query->node_idx)) {
 786                mlog(0, "node %u is not in our live map yet\n",
 787                     query->node_idx);
 788
 789                packet.code = JOIN_DISALLOW;
 790                goto respond;
 791        }
 792
 793        packet.code = JOIN_OK_NO_MAP;
 794
 795        spin_lock(&dlm_domain_lock);
 796        dlm = __dlm_lookup_domain_full(query->domain, query->name_len);
 797        if (!dlm)
 798                goto unlock_respond;
 799
 800        /*
 801         * There is a small window where the joining node may not see the
 802         * node(s) that just left but still part of the cluster. DISALLOW
 803         * join request if joining node has different node map.
 804         */
 805        nodenum=0;
 806        while (nodenum < O2NM_MAX_NODES) {
 807                if (test_bit(nodenum, dlm->domain_map)) {
 808                        if (!byte_test_bit(nodenum, query->node_map)) {
 809                                mlog(0, "disallow join as node %u does not "
 810                                     "have node %u in its nodemap\n",
 811                                     query->node_idx, nodenum);
 812                                packet.code = JOIN_DISALLOW;
 813                                goto unlock_respond;
 814                        }
 815                }
 816                nodenum++;
 817        }
 818
 819        /* Once the dlm ctxt is marked as leaving then we don't want
 820         * to be put in someone's domain map. 
 821         * Also, explicitly disallow joining at certain troublesome
 822         * times (ie. during recovery). */
 823        if (dlm && dlm->dlm_state != DLM_CTXT_LEAVING) {
 824                int bit = query->node_idx;
 825                spin_lock(&dlm->spinlock);
 826
 827                if (dlm->dlm_state == DLM_CTXT_NEW &&
 828                    dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN) {
 829                        /*If this is a brand new context and we
 830                         * haven't started our join process yet, then
 831                         * the other node won the race. */
 832                        packet.code = JOIN_OK_NO_MAP;
 833                } else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
 834                        /* Disallow parallel joins. */
 835                        packet.code = JOIN_DISALLOW;
 836                } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) {
 837                        mlog(0, "node %u trying to join, but recovery "
 838                             "is ongoing.\n", bit);
 839                        packet.code = JOIN_DISALLOW;
 840                } else if (test_bit(bit, dlm->recovery_map)) {
 841                        mlog(0, "node %u trying to join, but it "
 842                             "still needs recovery.\n", bit);
 843                        packet.code = JOIN_DISALLOW;
 844                } else if (test_bit(bit, dlm->domain_map)) {
 845                        mlog(0, "node %u trying to join, but it "
 846                             "is still in the domain! needs recovery?\n",
 847                             bit);
 848                        packet.code = JOIN_DISALLOW;
 849                } else {
 850                        /* Alright we're fully a part of this domain
 851                         * so we keep some state as to who's joining
 852                         * and indicate to him that needs to be fixed
 853                         * up. */
 854
 855                        /* Make sure we speak compatible locking protocols.  */
 856                        if (dlm_query_join_proto_check("DLM", bit,
 857                                                       &dlm->dlm_locking_proto,
 858                                                       &query->dlm_proto)) {
 859                                packet.code = JOIN_PROTOCOL_MISMATCH;
 860                        } else if (dlm_query_join_proto_check("fs", bit,
 861                                                              &dlm->fs_locking_proto,
 862                                                              &query->fs_proto)) {
 863                                packet.code = JOIN_PROTOCOL_MISMATCH;
 864                        } else {
 865                                packet.dlm_minor = query->dlm_proto.pv_minor;
 866                                packet.fs_minor = query->fs_proto.pv_minor;
 867                                packet.code = JOIN_OK;
 868                                __dlm_set_joining_node(dlm, query->node_idx);
 869                        }
 870                }
 871
 872                spin_unlock(&dlm->spinlock);
 873        }
 874unlock_respond:
 875        spin_unlock(&dlm_domain_lock);
 876
 877respond:
 878        mlog(0, "We respond with %u\n", packet.code);
 879
 880        dlm_query_join_packet_to_wire(&packet, &response);
 881        return response;
 882}
 883
 884static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
 885                                     void **ret_data)
 886{
 887        struct dlm_assert_joined *assert;
 888        struct dlm_ctxt *dlm = NULL;
 889
 890        assert = (struct dlm_assert_joined *) msg->buf;
 891
 892        mlog(0, "node %u asserts join on domain %s\n", assert->node_idx,
 893                  assert->domain);
 894
 895        spin_lock(&dlm_domain_lock);
 896        dlm = __dlm_lookup_domain_full(assert->domain, assert->name_len);
 897        /* XXX should we consider no dlm ctxt an error? */
 898        if (dlm) {
 899                spin_lock(&dlm->spinlock);
 900
 901                /* Alright, this node has officially joined our
 902                 * domain. Set him in the map and clean up our
 903                 * leftover join state. */
 904                BUG_ON(dlm->joining_node != assert->node_idx);
 905                set_bit(assert->node_idx, dlm->domain_map);
 906                __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
 907
 908                printk(KERN_INFO "ocfs2_dlm: Node %u joins domain %s\n",
 909                       assert->node_idx, dlm->name);
 910                __dlm_print_nodes(dlm);
 911
 912                /* notify anything attached to the heartbeat events */
 913                dlm_hb_event_notify_attached(dlm, assert->node_idx, 1);
 914
 915                spin_unlock(&dlm->spinlock);
 916        }
 917        spin_unlock(&dlm_domain_lock);
 918
 919        return 0;
 920}
 921
 922static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
 923                                   void **ret_data)
 924{
 925        struct dlm_cancel_join *cancel;
 926        struct dlm_ctxt *dlm = NULL;
 927
 928        cancel = (struct dlm_cancel_join *) msg->buf;
 929
 930        mlog(0, "node %u cancels join on domain %s\n", cancel->node_idx,
 931                  cancel->domain);
 932
 933        spin_lock(&dlm_domain_lock);
 934        dlm = __dlm_lookup_domain_full(cancel->domain, cancel->name_len);
 935
 936        if (dlm) {
 937                spin_lock(&dlm->spinlock);
 938
 939                /* Yikes, this guy wants to cancel his join. No
 940                 * problem, we simply cleanup our join state. */
 941                BUG_ON(dlm->joining_node != cancel->node_idx);
 942                __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
 943
 944                spin_unlock(&dlm->spinlock);
 945        }
 946        spin_unlock(&dlm_domain_lock);
 947
 948        return 0;
 949}
 950
 951static int dlm_send_one_join_cancel(struct dlm_ctxt *dlm,
 952                                    unsigned int node)
 953{
 954        int status;
 955        struct dlm_cancel_join cancel_msg;
 956
 957        memset(&cancel_msg, 0, sizeof(cancel_msg));
 958        cancel_msg.node_idx = dlm->node_num;
 959        cancel_msg.name_len = strlen(dlm->name);
 960        memcpy(cancel_msg.domain, dlm->name, cancel_msg.name_len);
 961
 962        status = o2net_send_message(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
 963                                    &cancel_msg, sizeof(cancel_msg), node,
 964                                    NULL);
 965        if (status < 0) {
 966                mlog_errno(status);
 967                goto bail;
 968        }
 969
 970bail:
 971        return status;
 972}
 973
 974/* map_size should be in bytes. */
 975static int dlm_send_join_cancels(struct dlm_ctxt *dlm,
 976                                 unsigned long *node_map,
 977                                 unsigned int map_size)
 978{
 979        int status, tmpstat;
 980        unsigned int node;
 981
 982        if (map_size != (BITS_TO_LONGS(O2NM_MAX_NODES) *
 983                         sizeof(unsigned long))) {
 984                mlog(ML_ERROR,
 985                     "map_size %u != BITS_TO_LONGS(O2NM_MAX_NODES) %u\n",
 986                     map_size, (unsigned)BITS_TO_LONGS(O2NM_MAX_NODES));
 987                return -EINVAL;
 988        }
 989
 990        status = 0;
 991        node = -1;
 992        while ((node = find_next_bit(node_map, O2NM_MAX_NODES,
 993                                     node + 1)) < O2NM_MAX_NODES) {
 994                if (node == dlm->node_num)
 995                        continue;
 996
 997                tmpstat = dlm_send_one_join_cancel(dlm, node);
 998                if (tmpstat) {
 999                        mlog(ML_ERROR, "Error return %d cancelling join on "
1000                             "node %d\n", tmpstat, node);
1001                        if (!status)
1002                                status = tmpstat;
1003                }
1004        }
1005
1006        if (status)
1007                mlog_errno(status);
1008        return status;
1009}
1010
1011static int dlm_request_join(struct dlm_ctxt *dlm,
1012                            int node,
1013                            enum dlm_query_join_response_code *response)
1014{
1015        int status;
1016        struct dlm_query_join_request join_msg;
1017        struct dlm_query_join_packet packet;
1018        u32 join_resp;
1019
1020        mlog(0, "querying node %d\n", node);
1021
1022        memset(&join_msg, 0, sizeof(join_msg));
1023        join_msg.node_idx = dlm->node_num;
1024        join_msg.name_len = strlen(dlm->name);
1025        memcpy(join_msg.domain, dlm->name, join_msg.name_len);
1026        join_msg.dlm_proto = dlm->dlm_locking_proto;
1027        join_msg.fs_proto = dlm->fs_locking_proto;
1028
1029        /* copy live node map to join message */
1030        byte_copymap(join_msg.node_map, dlm->live_nodes_map, O2NM_MAX_NODES);
1031
1032        status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg,
1033                                    sizeof(join_msg), node,
1034                                    &join_resp);
1035        if (status < 0 && status != -ENOPROTOOPT) {
1036                mlog_errno(status);
1037                goto bail;
1038        }
1039        dlm_query_join_wire_to_packet(join_resp, &packet);
1040
1041        /* -ENOPROTOOPT from the net code means the other side isn't
1042            listening for our message type -- that's fine, it means
1043            his dlm isn't up, so we can consider him a 'yes' but not
1044            joined into the domain.  */
1045        if (status == -ENOPROTOOPT) {
1046                status = 0;
1047                *response = JOIN_OK_NO_MAP;
1048        } else if (packet.code == JOIN_DISALLOW ||
1049                   packet.code == JOIN_OK_NO_MAP) {
1050                *response = packet.code;
1051        } else if (packet.code == JOIN_PROTOCOL_MISMATCH) {
1052                mlog(ML_NOTICE,
1053                     "This node requested DLM locking protocol %u.%u and "
1054                     "filesystem locking protocol %u.%u.  At least one of "
1055                     "the protocol versions on node %d is not compatible, "
1056                     "disconnecting\n",
1057                     dlm->dlm_locking_proto.pv_major,
1058                     dlm->dlm_locking_proto.pv_minor,
1059                     dlm->fs_locking_proto.pv_major,
1060                     dlm->fs_locking_proto.pv_minor,
1061                     node);
1062                status = -EPROTO;
1063                *response = packet.code;
1064        } else if (packet.code == JOIN_OK) {
1065                *response = packet.code;
1066                /* Use the same locking protocol as the remote node */
1067                dlm->dlm_locking_proto.pv_minor = packet.dlm_minor;
1068                dlm->fs_locking_proto.pv_minor = packet.fs_minor;
1069                mlog(0,
1070                     "Node %d responds JOIN_OK with DLM locking protocol "
1071                     "%u.%u and fs locking protocol %u.%u\n",
1072                     node,
1073                     dlm->dlm_locking_proto.pv_major,
1074                     dlm->dlm_locking_proto.pv_minor,
1075                     dlm->fs_locking_proto.pv_major,
1076                     dlm->fs_locking_proto.pv_minor);
1077        } else {
1078                status = -EINVAL;
1079                mlog(ML_ERROR, "invalid response %d from node %u\n",
1080                     packet.code, node);
1081        }
1082
1083        mlog(0, "status %d, node %d response is %d\n", status, node,
1084             *response);
1085
1086bail:
1087        return status;
1088}
1089
1090static int dlm_send_one_join_assert(struct dlm_ctxt *dlm,
1091                                    unsigned int node)
1092{
1093        int status;
1094        struct dlm_assert_joined assert_msg;
1095
1096        mlog(0, "Sending join assert to node %u\n", node);
1097
1098        memset(&assert_msg, 0, sizeof(assert_msg));
1099        assert_msg.node_idx = dlm->node_num;
1100        assert_msg.name_len = strlen(dlm->name);
1101        memcpy(assert_msg.domain, dlm->name, assert_msg.name_len);
1102
1103        status = o2net_send_message(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
1104                                    &assert_msg, sizeof(assert_msg), node,
1105                                    NULL);
1106        if (status < 0)
1107                mlog_errno(status);
1108
1109        return status;
1110}
1111
1112static void dlm_send_join_asserts(struct dlm_ctxt *dlm,
1113                                  unsigned long *node_map)
1114{
1115        int status, node, live;
1116
1117        status = 0;
1118        node = -1;
1119        while ((node = find_next_bit(node_map, O2NM_MAX_NODES,
1120                                     node + 1)) < O2NM_MAX_NODES) {
1121                if (node == dlm->node_num)
1122                        continue;
1123
1124                do {
1125                        /* It is very important that this message be
1126                         * received so we spin until either the node
1127                         * has died or it gets the message. */
1128                        status = dlm_send_one_join_assert(dlm, node);
1129
1130                        spin_lock(&dlm->spinlock);
1131                        live = test_bit(node, dlm->live_nodes_map);
1132                        spin_unlock(&dlm->spinlock);
1133
1134                        if (status) {
1135                                mlog(ML_ERROR, "Error return %d asserting "
1136                                     "join on node %d\n", status, node);
1137
1138                                /* give us some time between errors... */
1139                                if (live)
1140                                        msleep(DLM_DOMAIN_BACKOFF_MS);
1141                        }
1142                } while (status && live);
1143        }
1144}
1145
1146struct domain_join_ctxt {
1147        unsigned long live_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1148        unsigned long yes_resp_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1149};
1150
1151static int dlm_should_restart_join(struct dlm_ctxt *dlm,
1152                                   struct domain_join_ctxt *ctxt,
1153                                   enum dlm_query_join_response_code response)
1154{
1155        int ret;
1156
1157        if (response == JOIN_DISALLOW) {
1158                mlog(0, "Latest response of disallow -- should restart\n");
1159                return 1;
1160        }
1161
1162        spin_lock(&dlm->spinlock);
1163        /* For now, we restart the process if the node maps have
1164         * changed at all */
1165        ret = memcmp(ctxt->live_map, dlm->live_nodes_map,
1166                     sizeof(dlm->live_nodes_map));
1167        spin_unlock(&dlm->spinlock);
1168
1169        if (ret)
1170                mlog(0, "Node maps changed -- should restart\n");
1171
1172        return ret;
1173}
1174
1175static int dlm_try_to_join_domain(struct dlm_ctxt *dlm)
1176{
1177        int status = 0, tmpstat, node;
1178        struct domain_join_ctxt *ctxt;
1179        enum dlm_query_join_response_code response = JOIN_DISALLOW;
1180
1181        mlog_entry("%p", dlm);
1182
1183        ctxt = kzalloc(sizeof(*ctxt), GFP_KERNEL);
1184        if (!ctxt) {
1185                status = -ENOMEM;
1186                mlog_errno(status);
1187                goto bail;
1188        }
1189
1190        /* group sem locking should work for us here -- we're already
1191         * registered for heartbeat events so filling this should be
1192         * atomic wrt getting those handlers called. */
1193        o2hb_fill_node_map(dlm->live_nodes_map, sizeof(dlm->live_nodes_map));
1194
1195        spin_lock(&dlm->spinlock);
1196        memcpy(ctxt->live_map, dlm->live_nodes_map, sizeof(ctxt->live_map));
1197
1198        __dlm_set_joining_node(dlm, dlm->node_num);
1199
1200        spin_unlock(&dlm->spinlock);
1201
1202        node = -1;
1203        while ((node = find_next_bit(ctxt->live_map, O2NM_MAX_NODES,
1204                                     node + 1)) < O2NM_MAX_NODES) {
1205                if (node == dlm->node_num)
1206                        continue;
1207
1208                status = dlm_request_join(dlm, node, &response);
1209                if (status < 0) {
1210                        mlog_errno(status);
1211                        goto bail;
1212                }
1213
1214                /* Ok, either we got a response or the node doesn't have a
1215                 * dlm up. */
1216                if (response == JOIN_OK)
1217                        set_bit(node, ctxt->yes_resp_map);
1218
1219                if (dlm_should_restart_join(dlm, ctxt, response)) {
1220                        status = -EAGAIN;
1221                        goto bail;
1222                }
1223        }
1224
1225        mlog(0, "Yay, done querying nodes!\n");
1226
1227        /* Yay, everyone agree's we can join the domain. My domain is
1228         * comprised of all nodes who were put in the
1229         * yes_resp_map. Copy that into our domain map and send a join
1230         * assert message to clean up everyone elses state. */
1231        spin_lock(&dlm->spinlock);
1232        memcpy(dlm->domain_map, ctxt->yes_resp_map,
1233               sizeof(ctxt->yes_resp_map));
1234        set_bit(dlm->node_num, dlm->domain_map);
1235        spin_unlock(&dlm->spinlock);
1236
1237        dlm_send_join_asserts(dlm, ctxt->yes_resp_map);
1238
1239        /* Joined state *must* be set before the joining node
1240         * information, otherwise the query_join handler may read no
1241         * current joiner but a state of NEW and tell joining nodes
1242         * we're not in the domain. */
1243        spin_lock(&dlm_domain_lock);
1244        dlm->dlm_state = DLM_CTXT_JOINED;
1245        dlm->num_joins++;
1246        spin_unlock(&dlm_domain_lock);
1247
1248bail:
1249        spin_lock(&dlm->spinlock);
1250        __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
1251        if (!status)
1252                __dlm_print_nodes(dlm);
1253        spin_unlock(&dlm->spinlock);
1254
1255        if (ctxt) {
1256                /* Do we need to send a cancel message to any nodes? */
1257                if (status < 0) {
1258                        tmpstat = dlm_send_join_cancels(dlm,
1259                                                        ctxt->yes_resp_map,
1260                                                        sizeof(ctxt->yes_resp_map));
1261                        if (tmpstat < 0)
1262                                mlog_errno(tmpstat);
1263                }
1264                kfree(ctxt);
1265        }
1266
1267        mlog(0, "returning %d\n", status);
1268        return status;
1269}
1270
1271static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm)
1272{
1273        o2hb_unregister_callback(NULL, &dlm->dlm_hb_up);
1274        o2hb_unregister_callback(NULL, &dlm->dlm_hb_down);
1275        o2net_unregister_handler_list(&dlm->dlm_domain_handlers);
1276}
1277
1278static int dlm_register_domain_handlers(struct dlm_ctxt *dlm)
1279{
1280        int status;
1281
1282        mlog(0, "registering handlers.\n");
1283
1284        o2hb_setup_callback(&dlm->dlm_hb_down, O2HB_NODE_DOWN_CB,
1285                            dlm_hb_node_down_cb, dlm, DLM_HB_NODE_DOWN_PRI);
1286        status = o2hb_register_callback(NULL, &dlm->dlm_hb_down);
1287        if (status)
1288                goto bail;
1289
1290        o2hb_setup_callback(&dlm->dlm_hb_up, O2HB_NODE_UP_CB,
1291                            dlm_hb_node_up_cb, dlm, DLM_HB_NODE_UP_PRI);
1292        status = o2hb_register_callback(NULL, &dlm->dlm_hb_up);
1293        if (status)
1294                goto bail;
1295
1296        status = o2net_register_handler(DLM_MASTER_REQUEST_MSG, dlm->key,
1297                                        sizeof(struct dlm_master_request),
1298                                        dlm_master_request_handler,
1299                                        dlm, NULL, &dlm->dlm_domain_handlers);
1300        if (status)
1301                goto bail;
1302
1303        status = o2net_register_handler(DLM_ASSERT_MASTER_MSG, dlm->key,
1304                                        sizeof(struct dlm_assert_master),
1305                                        dlm_assert_master_handler,
1306                                        dlm, dlm_assert_master_post_handler,
1307                                        &dlm->dlm_domain_handlers);
1308        if (status)
1309                goto bail;
1310
1311        status = o2net_register_handler(DLM_CREATE_LOCK_MSG, dlm->key,
1312                                        sizeof(struct dlm_create_lock),
1313                                        dlm_create_lock_handler,
1314                                        dlm, NULL, &dlm->dlm_domain_handlers);
1315        if (status)
1316                goto bail;
1317
1318        status = o2net_register_handler(DLM_CONVERT_LOCK_MSG, dlm->key,
1319                                        DLM_CONVERT_LOCK_MAX_LEN,
1320                                        dlm_convert_lock_handler,
1321                                        dlm, NULL, &dlm->dlm_domain_handlers);
1322        if (status)
1323                goto bail;
1324
1325        status = o2net_register_handler(DLM_UNLOCK_LOCK_MSG, dlm->key,
1326                                        DLM_UNLOCK_LOCK_MAX_LEN,
1327                                        dlm_unlock_lock_handler,
1328                                        dlm, NULL, &dlm->dlm_domain_handlers);
1329        if (status)
1330                goto bail;
1331
1332        status = o2net_register_handler(DLM_PROXY_AST_MSG, dlm->key,
1333                                        DLM_PROXY_AST_MAX_LEN,
1334                                        dlm_proxy_ast_handler,
1335                                        dlm, NULL, &dlm->dlm_domain_handlers);
1336        if (status)
1337                goto bail;
1338
1339        status = o2net_register_handler(DLM_EXIT_DOMAIN_MSG, dlm->key,
1340                                        sizeof(struct dlm_exit_domain),
1341                                        dlm_exit_domain_handler,
1342                                        dlm, NULL, &dlm->dlm_domain_handlers);
1343        if (status)
1344                goto bail;
1345
1346        status = o2net_register_handler(DLM_DEREF_LOCKRES_MSG, dlm->key,
1347                                        sizeof(struct dlm_deref_lockres),
1348                                        dlm_deref_lockres_handler,
1349                                        dlm, NULL, &dlm->dlm_domain_handlers);
1350        if (status)
1351                goto bail;
1352
1353        status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key,
1354                                        sizeof(struct dlm_migrate_request),
1355                                        dlm_migrate_request_handler,
1356                                        dlm, NULL, &dlm->dlm_domain_handlers);
1357        if (status)
1358                goto bail;
1359
1360        status = o2net_register_handler(DLM_MIG_LOCKRES_MSG, dlm->key,
1361                                        DLM_MIG_LOCKRES_MAX_LEN,
1362                                        dlm_mig_lockres_handler,
1363                                        dlm, NULL, &dlm->dlm_domain_handlers);
1364        if (status)
1365                goto bail;
1366
1367        status = o2net_register_handler(DLM_MASTER_REQUERY_MSG, dlm->key,
1368                                        sizeof(struct dlm_master_requery),
1369                                        dlm_master_requery_handler,
1370                                        dlm, NULL, &dlm->dlm_domain_handlers);
1371        if (status)
1372                goto bail;
1373
1374        status = o2net_register_handler(DLM_LOCK_REQUEST_MSG, dlm->key,
1375                                        sizeof(struct dlm_lock_request),
1376                                        dlm_request_all_locks_handler,
1377                                        dlm, NULL, &dlm->dlm_domain_handlers);
1378        if (status)
1379                goto bail;
1380
1381        status = o2net_register_handler(DLM_RECO_DATA_DONE_MSG, dlm->key,
1382                                        sizeof(struct dlm_reco_data_done),
1383                                        dlm_reco_data_done_handler,
1384                                        dlm, NULL, &dlm->dlm_domain_handlers);
1385        if (status)
1386                goto bail;
1387
1388        status = o2net_register_handler(DLM_BEGIN_RECO_MSG, dlm->key,
1389                                        sizeof(struct dlm_begin_reco),
1390                                        dlm_begin_reco_handler,
1391                                        dlm, NULL, &dlm->dlm_domain_handlers);
1392        if (status)
1393                goto bail;
1394
1395        status = o2net_register_handler(DLM_FINALIZE_RECO_MSG, dlm->key,
1396                                        sizeof(struct dlm_finalize_reco),
1397                                        dlm_finalize_reco_handler,
1398                                        dlm, NULL, &dlm->dlm_domain_handlers);
1399        if (status)
1400                goto bail;
1401
1402bail:
1403        if (status)
1404                dlm_unregister_domain_handlers(dlm);
1405
1406        return status;
1407}
1408
1409static int dlm_join_domain(struct dlm_ctxt *dlm)
1410{
1411        int status;
1412        unsigned int backoff;
1413        unsigned int total_backoff = 0;
1414
1415        BUG_ON(!dlm);
1416
1417        mlog(0, "Join domain %s\n", dlm->name);
1418
1419        status = dlm_register_domain_handlers(dlm);
1420        if (status) {
1421                mlog_errno(status);
1422                goto bail;
1423        }
1424
1425        status = dlm_debug_init(dlm);
1426        if (status < 0) {
1427                mlog_errno(status);
1428                goto bail;
1429        }
1430
1431        status = dlm_launch_thread(dlm);
1432        if (status < 0) {
1433                mlog_errno(status);
1434                goto bail;
1435        }
1436
1437        status = dlm_launch_recovery_thread(dlm);
1438        if (status < 0) {
1439                mlog_errno(status);
1440                goto bail;
1441        }
1442
1443        dlm->dlm_worker = create_singlethread_workqueue("dlm_wq");
1444        if (!dlm->dlm_worker) {
1445                status = -ENOMEM;
1446                mlog_errno(status);
1447                goto bail;
1448        }
1449
1450        do {
1451                status = dlm_try_to_join_domain(dlm);
1452
1453                /* If we're racing another node to the join, then we
1454                 * need to back off temporarily and let them
1455                 * complete. */
1456#define DLM_JOIN_TIMEOUT_MSECS  90000
1457                if (status == -EAGAIN) {
1458                        if (signal_pending(current)) {
1459                                status = -ERESTARTSYS;
1460                                goto bail;
1461                        }
1462
1463                        if (total_backoff >
1464                            msecs_to_jiffies(DLM_JOIN_TIMEOUT_MSECS)) {
1465                                status = -ERESTARTSYS;
1466                                mlog(ML_NOTICE, "Timed out joining dlm domain "
1467                                     "%s after %u msecs\n", dlm->name,
1468                                     jiffies_to_msecs(total_backoff));
1469                                goto bail;
1470                        }
1471
1472                        /*
1473                         * <chip> After you!
1474                         * <dale> No, after you!
1475                         * <chip> I insist!
1476                         * <dale> But you first!
1477                         * ...
1478                         */
1479                        backoff = (unsigned int)(jiffies & 0x3);
1480                        backoff *= DLM_DOMAIN_BACKOFF_MS;
1481                        total_backoff += backoff;
1482                        mlog(0, "backoff %d\n", backoff);
1483                        msleep(backoff);
1484                }
1485        } while (status == -EAGAIN);
1486
1487        if (status < 0) {
1488                mlog_errno(status);
1489                goto bail;
1490        }
1491
1492        status = 0;
1493bail:
1494        wake_up(&dlm_domain_events);
1495
1496        if (status) {
1497                dlm_unregister_domain_handlers(dlm);
1498                dlm_debug_shutdown(dlm);
1499                dlm_complete_thread(dlm);
1500                dlm_complete_recovery_thread(dlm);
1501                dlm_destroy_dlm_worker(dlm);
1502        }
1503
1504        return status;
1505}
1506
1507static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain,
1508                                u32 key)
1509{
1510        int i;
1511        int ret;
1512        struct dlm_ctxt *dlm = NULL;
1513
1514        dlm = kzalloc(sizeof(*dlm), GFP_KERNEL);
1515        if (!dlm) {
1516                mlog_errno(-ENOMEM);
1517                goto leave;
1518        }
1519
1520        dlm->name = kmalloc(strlen(domain) + 1, GFP_KERNEL);
1521        if (dlm->name == NULL) {
1522                mlog_errno(-ENOMEM);
1523                kfree(dlm);
1524                dlm = NULL;
1525                goto leave;
1526        }
1527
1528        dlm->lockres_hash = (struct hlist_head **)dlm_alloc_pagevec(DLM_HASH_PAGES);
1529        if (!dlm->lockres_hash) {
1530                mlog_errno(-ENOMEM);
1531                kfree(dlm->name);
1532                kfree(dlm);
1533                dlm = NULL;
1534                goto leave;
1535        }
1536
1537        for (i = 0; i < DLM_HASH_BUCKETS; i++)
1538                INIT_HLIST_HEAD(dlm_lockres_hash(dlm, i));
1539
1540        dlm->master_hash = (struct hlist_head **)
1541                                dlm_alloc_pagevec(DLM_HASH_PAGES);
1542        if (!dlm->master_hash) {
1543                mlog_errno(-ENOMEM);
1544                dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES);
1545                kfree(dlm->name);
1546                kfree(dlm);
1547                dlm = NULL;
1548                goto leave;
1549        }
1550
1551        for (i = 0; i < DLM_HASH_BUCKETS; i++)
1552                INIT_HLIST_HEAD(dlm_master_hash(dlm, i));
1553
1554        strcpy(dlm->name, domain);
1555        dlm->key = key;
1556        dlm->node_num = o2nm_this_node();
1557
1558        ret = dlm_create_debugfs_subroot(dlm);
1559        if (ret < 0) {
1560                dlm_free_pagevec((void **)dlm->master_hash, DLM_HASH_PAGES);
1561                dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES);
1562                kfree(dlm->name);
1563                kfree(dlm);
1564                dlm = NULL;
1565                goto leave;
1566        }
1567
1568        spin_lock_init(&dlm->spinlock);
1569        spin_lock_init(&dlm->master_lock);
1570        spin_lock_init(&dlm->ast_lock);
1571        spin_lock_init(&dlm->track_lock);
1572        INIT_LIST_HEAD(&dlm->list);
1573        INIT_LIST_HEAD(&dlm->dirty_list);
1574        INIT_LIST_HEAD(&dlm->reco.resources);
1575        INIT_LIST_HEAD(&dlm->reco.received);
1576        INIT_LIST_HEAD(&dlm->reco.node_data);
1577        INIT_LIST_HEAD(&dlm->purge_list);
1578        INIT_LIST_HEAD(&dlm->dlm_domain_handlers);
1579        INIT_LIST_HEAD(&dlm->tracking_list);
1580        dlm->reco.state = 0;
1581
1582        INIT_LIST_HEAD(&dlm->pending_asts);
1583        INIT_LIST_HEAD(&dlm->pending_basts);
1584
1585        mlog(0, "dlm->recovery_map=%p, &(dlm->recovery_map[0])=%p\n",
1586                  dlm->recovery_map, &(dlm->recovery_map[0]));
1587
1588        memset(dlm->recovery_map, 0, sizeof(dlm->recovery_map));
1589        memset(dlm->live_nodes_map, 0, sizeof(dlm->live_nodes_map));
1590        memset(dlm->domain_map, 0, sizeof(dlm->domain_map));
1591
1592        dlm->dlm_thread_task = NULL;
1593        dlm->dlm_reco_thread_task = NULL;
1594        dlm->dlm_worker = NULL;
1595        init_waitqueue_head(&dlm->dlm_thread_wq);
1596        init_waitqueue_head(&dlm->dlm_reco_thread_wq);
1597        init_waitqueue_head(&dlm->reco.event);
1598        init_waitqueue_head(&dlm->ast_wq);
1599        init_waitqueue_head(&dlm->migration_wq);
1600        INIT_LIST_HEAD(&dlm->mle_hb_events);
1601
1602        dlm->joining_node = DLM_LOCK_RES_OWNER_UNKNOWN;
1603        init_waitqueue_head(&dlm->dlm_join_events);
1604
1605        dlm->reco.new_master = O2NM_INVALID_NODE_NUM;
1606        dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
1607
1608        atomic_set(&dlm->res_tot_count, 0);
1609        atomic_set(&dlm->res_cur_count, 0);
1610        for (i = 0; i < DLM_MLE_NUM_TYPES; ++i) {
1611                atomic_set(&dlm->mle_tot_count[i], 0);
1612                atomic_set(&dlm->mle_cur_count[i], 0);
1613        }
1614
1615        spin_lock_init(&dlm->work_lock);
1616        INIT_LIST_HEAD(&dlm->work_list);
1617        INIT_WORK(&dlm->dispatched_work, dlm_dispatch_work);
1618
1619        kref_init(&dlm->dlm_refs);
1620        dlm->dlm_state = DLM_CTXT_NEW;
1621
1622        INIT_LIST_HEAD(&dlm->dlm_eviction_callbacks);
1623
1624        mlog(0, "context init: refcount %u\n",
1625                  atomic_read(&dlm->dlm_refs.refcount));
1626
1627leave:
1628        return dlm;
1629}
1630
1631/*
1632 * Compare a requested locking protocol version against the current one.
1633 *
1634 * If the major numbers are different, they are incompatible.
1635 * If the current minor is greater than the request, they are incompatible.
1636 * If the current minor is less than or equal to the request, they are
1637 * compatible, and the requester should run at the current minor version.
1638 */
1639static int dlm_protocol_compare(struct dlm_protocol_version *existing,
1640                                struct dlm_protocol_version *request)
1641{
1642        if (existing->pv_major != request->pv_major)
1643                return 1;
1644
1645        if (existing->pv_minor > request->pv_minor)
1646                return 1;
1647
1648        if (existing->pv_minor < request->pv_minor)
1649                request->pv_minor = existing->pv_minor;
1650
1651        return 0;
1652}
1653
1654/*
1655 * dlm_register_domain: one-time setup per "domain".
1656 *
1657 * The filesystem passes in the requested locking version via proto.
1658 * If registration was successful, proto will contain the negotiated
1659 * locking protocol.
1660 */
1661struct dlm_ctxt * dlm_register_domain(const char *domain,
1662                               u32 key,
1663                               struct dlm_protocol_version *fs_proto)
1664{
1665        int ret;
1666        struct dlm_ctxt *dlm = NULL;
1667        struct dlm_ctxt *new_ctxt = NULL;
1668
1669        if (strlen(domain) > O2NM_MAX_NAME_LEN) {
1670                ret = -ENAMETOOLONG;
1671                mlog(ML_ERROR, "domain name length too long\n");
1672                goto leave;
1673        }
1674
1675        if (!o2hb_check_local_node_heartbeating()) {
1676                mlog(ML_ERROR, "the local node has not been configured, or is "
1677                     "not heartbeating\n");
1678                ret = -EPROTO;
1679                goto leave;
1680        }
1681
1682        mlog(0, "register called for domain \"%s\"\n", domain);
1683
1684retry:
1685        dlm = NULL;
1686        if (signal_pending(current)) {
1687                ret = -ERESTARTSYS;
1688                mlog_errno(ret);
1689                goto leave;
1690        }
1691
1692        spin_lock(&dlm_domain_lock);
1693
1694        dlm = __dlm_lookup_domain(domain);
1695        if (dlm) {
1696                if (dlm->dlm_state != DLM_CTXT_JOINED) {
1697                        spin_unlock(&dlm_domain_lock);
1698
1699                        mlog(0, "This ctxt is not joined yet!\n");
1700                        wait_event_interruptible(dlm_domain_events,
1701                                                 dlm_wait_on_domain_helper(
1702                                                         domain));
1703                        goto retry;
1704                }
1705
1706                if (dlm_protocol_compare(&dlm->fs_locking_proto, fs_proto)) {
1707                        mlog(ML_ERROR,
1708                             "Requested locking protocol version is not "
1709                             "compatible with already registered domain "
1710                             "\"%s\"\n", domain);
1711                        ret = -EPROTO;
1712                        goto leave;
1713                }
1714
1715                __dlm_get(dlm);
1716                dlm->num_joins++;
1717
1718                spin_unlock(&dlm_domain_lock);
1719
1720                ret = 0;
1721                goto leave;
1722        }
1723
1724        /* doesn't exist */
1725        if (!new_ctxt) {
1726                spin_unlock(&dlm_domain_lock);
1727
1728                new_ctxt = dlm_alloc_ctxt(domain, key);
1729                if (new_ctxt)
1730                        goto retry;
1731
1732                ret = -ENOMEM;
1733                mlog_errno(ret);
1734                goto leave;
1735        }
1736
1737        /* a little variable switch-a-roo here... */
1738        dlm = new_ctxt;
1739        new_ctxt = NULL;
1740
1741        /* add the new domain */
1742        list_add_tail(&dlm->list, &dlm_domains);
1743        spin_unlock(&dlm_domain_lock);
1744
1745        /*
1746         * Pass the locking protocol version into the join.  If the join
1747         * succeeds, it will have the negotiated protocol set.
1748         */
1749        dlm->dlm_locking_proto = dlm_protocol;
1750        dlm->fs_locking_proto = *fs_proto;
1751
1752        ret = dlm_join_domain(dlm);
1753        if (ret) {
1754                mlog_errno(ret);
1755                dlm_put(dlm);
1756                goto leave;
1757        }
1758
1759        /* Tell the caller what locking protocol we negotiated */
1760        *fs_proto = dlm->fs_locking_proto;
1761
1762        ret = 0;
1763leave:
1764        if (new_ctxt)
1765                dlm_free_ctxt_mem(new_ctxt);
1766
1767        if (ret < 0)
1768                dlm = ERR_PTR(ret);
1769
1770        return dlm;
1771}
1772EXPORT_SYMBOL_GPL(dlm_register_domain);
1773
1774static LIST_HEAD(dlm_join_handlers);
1775
1776static void dlm_unregister_net_handlers(void)
1777{
1778        o2net_unregister_handler_list(&dlm_join_handlers);
1779}
1780
1781static int dlm_register_net_handlers(void)
1782{
1783        int status = 0;
1784
1785        status = o2net_register_handler(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY,
1786                                        sizeof(struct dlm_query_join_request),
1787                                        dlm_query_join_handler,
1788                                        NULL, NULL, &dlm_join_handlers);
1789        if (status)
1790                goto bail;
1791
1792        status = o2net_register_handler(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
1793                                        sizeof(struct dlm_assert_joined),
1794                                        dlm_assert_joined_handler,
1795                                        NULL, NULL, &dlm_join_handlers);
1796        if (status)
1797                goto bail;
1798
1799        status = o2net_register_handler(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
1800                                        sizeof(struct dlm_cancel_join),
1801                                        dlm_cancel_join_handler,
1802                                        NULL, NULL, &dlm_join_handlers);
1803
1804bail:
1805        if (status < 0)
1806                dlm_unregister_net_handlers();
1807
1808        return status;
1809}
1810
1811/* Domain eviction callback handling.
1812 *
1813 * The file system requires notification of node death *before* the
1814 * dlm completes it's recovery work, otherwise it may be able to
1815 * acquire locks on resources requiring recovery. Since the dlm can
1816 * evict a node from it's domain *before* heartbeat fires, a similar
1817 * mechanism is required. */
1818
1819/* Eviction is not expected to happen often, so a per-domain lock is
1820 * not necessary. Eviction callbacks are allowed to sleep for short
1821 * periods of time. */
1822static DECLARE_RWSEM(dlm_callback_sem);
1823
1824void dlm_fire_domain_eviction_callbacks(struct dlm_ctxt *dlm,
1825                                        int node_num)
1826{
1827        struct list_head *iter;
1828        struct dlm_eviction_cb *cb;
1829
1830        down_read(&dlm_callback_sem);
1831        list_for_each(iter, &dlm->dlm_eviction_callbacks) {
1832                cb = list_entry(iter, struct dlm_eviction_cb, ec_item);
1833
1834                cb->ec_func(node_num, cb->ec_data);
1835        }
1836        up_read(&dlm_callback_sem);
1837}
1838
1839void dlm_setup_eviction_cb(struct dlm_eviction_cb *cb,
1840                           dlm_eviction_func *f,
1841                           void *data)
1842{
1843        INIT_LIST_HEAD(&cb->ec_item);
1844        cb->ec_func = f;
1845        cb->ec_data = data;
1846}
1847EXPORT_SYMBOL_GPL(dlm_setup_eviction_cb);
1848
1849void dlm_register_eviction_cb(struct dlm_ctxt *dlm,
1850                              struct dlm_eviction_cb *cb)
1851{
1852        down_write(&dlm_callback_sem);
1853        list_add_tail(&cb->ec_item, &dlm->dlm_eviction_callbacks);
1854        up_write(&dlm_callback_sem);
1855}
1856EXPORT_SYMBOL_GPL(dlm_register_eviction_cb);
1857
1858void dlm_unregister_eviction_cb(struct dlm_eviction_cb *cb)
1859{
1860        down_write(&dlm_callback_sem);
1861        list_del_init(&cb->ec_item);
1862        up_write(&dlm_callback_sem);
1863}
1864EXPORT_SYMBOL_GPL(dlm_unregister_eviction_cb);
1865
1866static int __init dlm_init(void)
1867{
1868        int status;
1869
1870        dlm_print_version();
1871
1872        status = dlm_init_mle_cache();
1873        if (status) {
1874                mlog(ML_ERROR, "Could not create o2dlm_mle slabcache\n");
1875                goto error;
1876        }
1877
1878        status = dlm_init_master_caches();
1879        if (status) {
1880                mlog(ML_ERROR, "Could not create o2dlm_lockres and "
1881                     "o2dlm_lockname slabcaches\n");
1882                goto error;
1883        }
1884
1885        status = dlm_init_lock_cache();
1886        if (status) {
1887                mlog(ML_ERROR, "Count not create o2dlm_lock slabcache\n");
1888                goto error;
1889        }
1890
1891        status = dlm_register_net_handlers();
1892        if (status) {
1893                mlog(ML_ERROR, "Unable to register network handlers\n");
1894                goto error;
1895        }
1896
1897        status = dlm_create_debugfs_root();
1898        if (status)
1899                goto error;
1900
1901        return 0;
1902error:
1903        dlm_unregister_net_handlers();
1904        dlm_destroy_lock_cache();
1905        dlm_destroy_master_caches();
1906        dlm_destroy_mle_cache();
1907        return -1;
1908}
1909
1910static void __exit dlm_exit (void)
1911{
1912        dlm_destroy_debugfs_root();
1913        dlm_unregister_net_handlers();
1914        dlm_destroy_lock_cache();
1915        dlm_destroy_master_caches();
1916        dlm_destroy_mle_cache();
1917}
1918
1919MODULE_AUTHOR("Oracle");
1920MODULE_LICENSE("GPL");
1921
1922module_init(dlm_init);
1923module_exit(dlm_exit);
1924
lxr.linux.no kindly hosted by Redpill Linpro AS, provider of Linux consulting and operations services since 1995.