linux/drivers/lightnvm/pblk-gc.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2/*
   3 * Copyright (C) 2016 CNEX Labs
   4 * Initial release: Javier Gonzalez <javier@cnexlabs.com>
   5 *                  Matias Bjorling <matias@cnexlabs.com>
   6 *
   7 * This program is free software; you can redistribute it and/or
   8 * modify it under the terms of the GNU General Public License version
   9 * 2 as published by the Free Software Foundation.
  10 *
  11 * This program is distributed in the hope that it will be useful, but
  12 * WITHOUT ANY WARRANTY; without even the implied warranty of
  13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  14 * General Public License for more details.
  15 *
  16 * pblk-gc.c - pblk's garbage collector
  17 */
  18
  19#include "pblk.h"
  20#include "pblk-trace.h"
  21#include <linux/delay.h>
  22
  23
  24static void pblk_gc_free_gc_rq(struct pblk_gc_rq *gc_rq)
  25{
  26        vfree(gc_rq->data);
  27        kfree(gc_rq);
  28}
  29
  30static int pblk_gc_write(struct pblk *pblk)
  31{
  32        struct pblk_gc *gc = &pblk->gc;
  33        struct pblk_gc_rq *gc_rq, *tgc_rq;
  34        LIST_HEAD(w_list);
  35
  36        spin_lock(&gc->w_lock);
  37        if (list_empty(&gc->w_list)) {
  38                spin_unlock(&gc->w_lock);
  39                return 1;
  40        }
  41
  42        list_cut_position(&w_list, &gc->w_list, gc->w_list.prev);
  43        gc->w_entries = 0;
  44        spin_unlock(&gc->w_lock);
  45
  46        list_for_each_entry_safe(gc_rq, tgc_rq, &w_list, list) {
  47                pblk_write_gc_to_cache(pblk, gc_rq);
  48                list_del(&gc_rq->list);
  49                kref_put(&gc_rq->line->ref, pblk_line_put);
  50                pblk_gc_free_gc_rq(gc_rq);
  51        }
  52
  53        return 0;
  54}
  55
  56static void pblk_gc_writer_kick(struct pblk_gc *gc)
  57{
  58        wake_up_process(gc->gc_writer_ts);
  59}
  60
  61void pblk_put_line_back(struct pblk *pblk, struct pblk_line *line)
  62{
  63        struct pblk_line_mgmt *l_mg = &pblk->l_mg;
  64        struct list_head *move_list;
  65
  66        spin_lock(&l_mg->gc_lock);
  67        spin_lock(&line->lock);
  68        WARN_ON(line->state != PBLK_LINESTATE_GC);
  69        line->state = PBLK_LINESTATE_CLOSED;
  70        trace_pblk_line_state(pblk_disk_name(pblk), line->id,
  71                                        line->state);
  72
  73        /* We need to reset gc_group in order to ensure that
  74         * pblk_line_gc_list will return proper move_list
  75         * since right now current line is not on any of the
  76         * gc lists.
  77         */
  78        line->gc_group = PBLK_LINEGC_NONE;
  79        move_list = pblk_line_gc_list(pblk, line);
  80        spin_unlock(&line->lock);
  81        list_add_tail(&line->list, move_list);
  82        spin_unlock(&l_mg->gc_lock);
  83}
  84
  85static void pblk_gc_line_ws(struct work_struct *work)
  86{
  87        struct pblk_line_ws *gc_rq_ws = container_of(work,
  88                                                struct pblk_line_ws, ws);
  89        struct pblk *pblk = gc_rq_ws->pblk;
  90        struct pblk_gc *gc = &pblk->gc;
  91        struct pblk_line *line = gc_rq_ws->line;
  92        struct pblk_gc_rq *gc_rq = gc_rq_ws->priv;
  93        int ret;
  94
  95        up(&gc->gc_sem);
  96
  97        /* Read from GC victim block */
  98        ret = pblk_submit_read_gc(pblk, gc_rq);
  99        if (ret) {
 100                line->w_err_gc->has_gc_err = 1;
 101                goto out;
 102        }
 103
 104        if (!gc_rq->secs_to_gc)
 105                goto out;
 106
 107retry:
 108        spin_lock(&gc->w_lock);
 109        if (gc->w_entries >= PBLK_GC_RQ_QD) {
 110                spin_unlock(&gc->w_lock);
 111                pblk_gc_writer_kick(&pblk->gc);
 112                usleep_range(128, 256);
 113                goto retry;
 114        }
 115        gc->w_entries++;
 116        list_add_tail(&gc_rq->list, &gc->w_list);
 117        spin_unlock(&gc->w_lock);
 118
 119        pblk_gc_writer_kick(&pblk->gc);
 120
 121        kfree(gc_rq_ws);
 122        return;
 123
 124out:
 125        pblk_gc_free_gc_rq(gc_rq);
 126        kref_put(&line->ref, pblk_line_put);
 127        kfree(gc_rq_ws);
 128}
 129
 130static __le64 *get_lba_list_from_emeta(struct pblk *pblk,
 131                                       struct pblk_line *line)
 132{
 133        struct line_emeta *emeta_buf;
 134        struct pblk_line_meta *lm = &pblk->lm;
 135        unsigned int lba_list_size = lm->emeta_len[2];
 136        __le64 *lba_list;
 137        int ret;
 138
 139        emeta_buf = kvmalloc(lm->emeta_len[0], GFP_KERNEL);
 140        if (!emeta_buf)
 141                return NULL;
 142
 143        ret = pblk_line_emeta_read(pblk, line, emeta_buf);
 144        if (ret) {
 145                pblk_err(pblk, "line %d read emeta failed (%d)\n",
 146                                line->id, ret);
 147                kvfree(emeta_buf);
 148                return NULL;
 149        }
 150
 151        /* If this read fails, it means that emeta is corrupted.
 152         * For now, leave the line untouched.
 153         * TODO: Implement a recovery routine that scans and moves
 154         * all sectors on the line.
 155         */
 156
 157        ret = pblk_recov_check_emeta(pblk, emeta_buf);
 158        if (ret) {
 159                pblk_err(pblk, "inconsistent emeta (line %d)\n",
 160                                line->id);
 161                kvfree(emeta_buf);
 162                return NULL;
 163        }
 164
 165        lba_list = kvmalloc(lba_list_size, GFP_KERNEL);
 166
 167        if (lba_list)
 168                memcpy(lba_list, emeta_to_lbas(pblk, emeta_buf), lba_list_size);
 169
 170        kvfree(emeta_buf);
 171
 172        return lba_list;
 173}
 174
 175static void pblk_gc_line_prepare_ws(struct work_struct *work)
 176{
 177        struct pblk_line_ws *line_ws = container_of(work, struct pblk_line_ws,
 178                                                                        ws);
 179        struct pblk *pblk = line_ws->pblk;
 180        struct pblk_line *line = line_ws->line;
 181        struct pblk_line_meta *lm = &pblk->lm;
 182        struct nvm_tgt_dev *dev = pblk->dev;
 183        struct nvm_geo *geo = &dev->geo;
 184        struct pblk_gc *gc = &pblk->gc;
 185        struct pblk_line_ws *gc_rq_ws;
 186        struct pblk_gc_rq *gc_rq;
 187        __le64 *lba_list;
 188        unsigned long *invalid_bitmap;
 189        int sec_left, nr_secs, bit;
 190
 191        invalid_bitmap = kmalloc(lm->sec_bitmap_len, GFP_KERNEL);
 192        if (!invalid_bitmap)
 193                goto fail_free_ws;
 194
 195        if (line->w_err_gc->has_write_err) {
 196                lba_list = line->w_err_gc->lba_list;
 197                line->w_err_gc->lba_list = NULL;
 198        } else {
 199                lba_list = get_lba_list_from_emeta(pblk, line);
 200                if (!lba_list) {
 201                        pblk_err(pblk, "could not interpret emeta (line %d)\n",
 202                                        line->id);
 203                        goto fail_free_invalid_bitmap;
 204                }
 205        }
 206
 207        spin_lock(&line->lock);
 208        bitmap_copy(invalid_bitmap, line->invalid_bitmap, lm->sec_per_line);
 209        sec_left = pblk_line_vsc(line);
 210        spin_unlock(&line->lock);
 211
 212        if (sec_left < 0) {
 213                pblk_err(pblk, "corrupted GC line (%d)\n", line->id);
 214                goto fail_free_lba_list;
 215        }
 216
 217        bit = -1;
 218next_rq:
 219        gc_rq = kmalloc(sizeof(struct pblk_gc_rq), GFP_KERNEL);
 220        if (!gc_rq)
 221                goto fail_free_lba_list;
 222
 223        nr_secs = 0;
 224        do {
 225                bit = find_next_zero_bit(invalid_bitmap, lm->sec_per_line,
 226                                                                bit + 1);
 227                if (bit > line->emeta_ssec)
 228                        break;
 229
 230                gc_rq->paddr_list[nr_secs] = bit;
 231                gc_rq->lba_list[nr_secs++] = le64_to_cpu(lba_list[bit]);
 232        } while (nr_secs < pblk->max_write_pgs);
 233
 234        if (unlikely(!nr_secs)) {
 235                kfree(gc_rq);
 236                goto out;
 237        }
 238
 239        gc_rq->nr_secs = nr_secs;
 240        gc_rq->line = line;
 241
 242        gc_rq->data = vmalloc(array_size(gc_rq->nr_secs, geo->csecs));
 243        if (!gc_rq->data)
 244                goto fail_free_gc_rq;
 245
 246        gc_rq_ws = kmalloc(sizeof(struct pblk_line_ws), GFP_KERNEL);
 247        if (!gc_rq_ws)
 248                goto fail_free_gc_data;
 249
 250        gc_rq_ws->pblk = pblk;
 251        gc_rq_ws->line = line;
 252        gc_rq_ws->priv = gc_rq;
 253
 254        /* The write GC path can be much slower than the read GC one due to
 255         * the budget imposed by the rate-limiter. Balance in case that we get
 256         * back pressure from the write GC path.
 257         */
 258        while (down_timeout(&gc->gc_sem, msecs_to_jiffies(30000)))
 259                io_schedule();
 260
 261        kref_get(&line->ref);
 262
 263        INIT_WORK(&gc_rq_ws->ws, pblk_gc_line_ws);
 264        queue_work(gc->gc_line_reader_wq, &gc_rq_ws->ws);
 265
 266        sec_left -= nr_secs;
 267        if (sec_left > 0)
 268                goto next_rq;
 269
 270out:
 271        kvfree(lba_list);
 272        kfree(line_ws);
 273        kfree(invalid_bitmap);
 274
 275        kref_put(&line->ref, pblk_line_put);
 276        atomic_dec(&gc->read_inflight_gc);
 277
 278        return;
 279
 280fail_free_gc_data:
 281        vfree(gc_rq->data);
 282fail_free_gc_rq:
 283        kfree(gc_rq);
 284fail_free_lba_list:
 285        kvfree(lba_list);
 286fail_free_invalid_bitmap:
 287        kfree(invalid_bitmap);
 288fail_free_ws:
 289        kfree(line_ws);
 290
 291        /* Line goes back to closed state, so we cannot release additional
 292         * reference for line, since we do that only when we want to do
 293         * gc to free line state transition.
 294         */
 295        pblk_put_line_back(pblk, line);
 296        atomic_dec(&gc->read_inflight_gc);
 297
 298        pblk_err(pblk, "failed to GC line %d\n", line->id);
 299}
 300
 301static int pblk_gc_line(struct pblk *pblk, struct pblk_line *line)
 302{
 303        struct pblk_gc *gc = &pblk->gc;
 304        struct pblk_line_ws *line_ws;
 305
 306        pblk_debug(pblk, "line '%d' being reclaimed for GC\n", line->id);
 307
 308        line_ws = kmalloc(sizeof(struct pblk_line_ws), GFP_KERNEL);
 309        if (!line_ws)
 310                return -ENOMEM;
 311
 312        line_ws->pblk = pblk;
 313        line_ws->line = line;
 314
 315        atomic_inc(&gc->pipeline_gc);
 316        INIT_WORK(&line_ws->ws, pblk_gc_line_prepare_ws);
 317        queue_work(gc->gc_reader_wq, &line_ws->ws);
 318
 319        return 0;
 320}
 321
 322static void pblk_gc_reader_kick(struct pblk_gc *gc)
 323{
 324        wake_up_process(gc->gc_reader_ts);
 325}
 326
 327static void pblk_gc_kick(struct pblk *pblk)
 328{
 329        struct pblk_gc *gc = &pblk->gc;
 330
 331        pblk_gc_writer_kick(gc);
 332        pblk_gc_reader_kick(gc);
 333
 334        /* If we're shutting down GC, let's not start it up again */
 335        if (gc->gc_enabled) {
 336                wake_up_process(gc->gc_ts);
 337                mod_timer(&gc->gc_timer,
 338                          jiffies + msecs_to_jiffies(GC_TIME_MSECS));
 339        }
 340}
 341
 342static int pblk_gc_read(struct pblk *pblk)
 343{
 344        struct pblk_gc *gc = &pblk->gc;
 345        struct pblk_line *line;
 346
 347        spin_lock(&gc->r_lock);
 348        if (list_empty(&gc->r_list)) {
 349                spin_unlock(&gc->r_lock);
 350                return 1;
 351        }
 352
 353        line = list_first_entry(&gc->r_list, struct pblk_line, list);
 354        list_del(&line->list);
 355        spin_unlock(&gc->r_lock);
 356
 357        pblk_gc_kick(pblk);
 358
 359        if (pblk_gc_line(pblk, line)) {
 360                pblk_err(pblk, "failed to GC line %d\n", line->id);
 361                /* rollback */
 362                spin_lock(&gc->r_lock);
 363                list_add_tail(&line->list, &gc->r_list);
 364                spin_unlock(&gc->r_lock);
 365        }
 366
 367        return 0;
 368}
 369
 370static struct pblk_line *pblk_gc_get_victim_line(struct pblk *pblk,
 371                                                 struct list_head *group_list)
 372{
 373        struct pblk_line *line, *victim;
 374        unsigned int line_vsc = ~0x0L, victim_vsc = ~0x0L;
 375
 376        victim = list_first_entry(group_list, struct pblk_line, list);
 377
 378        list_for_each_entry(line, group_list, list) {
 379                if (!atomic_read(&line->sec_to_update))
 380                        line_vsc = le32_to_cpu(*line->vsc);
 381                if (line_vsc < victim_vsc) {
 382                        victim = line;
 383                        victim_vsc = le32_to_cpu(*victim->vsc);
 384                }
 385        }
 386
 387        if (victim_vsc == ~0x0)
 388                return NULL;
 389
 390        return victim;
 391}
 392
 393static bool pblk_gc_should_run(struct pblk_gc *gc, struct pblk_rl *rl)
 394{
 395        unsigned int nr_blocks_free, nr_blocks_need;
 396        unsigned int werr_lines = atomic_read(&rl->werr_lines);
 397
 398        nr_blocks_need = pblk_rl_high_thrs(rl);
 399        nr_blocks_free = pblk_rl_nr_free_blks(rl);
 400
 401        /* This is not critical, no need to take lock here */
 402        return ((werr_lines > 0) ||
 403                ((gc->gc_active) && (nr_blocks_need > nr_blocks_free)));
 404}
 405
 406void pblk_gc_free_full_lines(struct pblk *pblk)
 407{
 408        struct pblk_line_mgmt *l_mg = &pblk->l_mg;
 409        struct pblk_gc *gc = &pblk->gc;
 410        struct pblk_line *line;
 411
 412        do {
 413                spin_lock(&l_mg->gc_lock);
 414                if (list_empty(&l_mg->gc_full_list)) {
 415                        spin_unlock(&l_mg->gc_lock);
 416                        return;
 417                }
 418
 419                line = list_first_entry(&l_mg->gc_full_list,
 420                                                        struct pblk_line, list);
 421
 422                spin_lock(&line->lock);
 423                WARN_ON(line->state != PBLK_LINESTATE_CLOSED);
 424                line->state = PBLK_LINESTATE_GC;
 425                trace_pblk_line_state(pblk_disk_name(pblk), line->id,
 426                                        line->state);
 427                spin_unlock(&line->lock);
 428
 429                list_del(&line->list);
 430                spin_unlock(&l_mg->gc_lock);
 431
 432                atomic_inc(&gc->pipeline_gc);
 433                kref_put(&line->ref, pblk_line_put);
 434        } while (1);
 435}
 436
 437/*
 438 * Lines with no valid sectors will be returned to the free list immediately. If
 439 * GC is activated - either because the free block count is under the determined
 440 * threshold, or because it is being forced from user space - only lines with a
 441 * high count of invalid sectors will be recycled.
 442 */
 443static void pblk_gc_run(struct pblk *pblk)
 444{
 445        struct pblk_line_mgmt *l_mg = &pblk->l_mg;
 446        struct pblk_gc *gc = &pblk->gc;
 447        struct pblk_line *line;
 448        struct list_head *group_list;
 449        bool run_gc;
 450        int read_inflight_gc, gc_group = 0, prev_group = 0;
 451
 452        pblk_gc_free_full_lines(pblk);
 453
 454        run_gc = pblk_gc_should_run(&pblk->gc, &pblk->rl);
 455        if (!run_gc || (atomic_read(&gc->read_inflight_gc) >= PBLK_GC_L_QD))
 456                return;
 457
 458next_gc_group:
 459        group_list = l_mg->gc_lists[gc_group++];
 460
 461        do {
 462                spin_lock(&l_mg->gc_lock);
 463
 464                line = pblk_gc_get_victim_line(pblk, group_list);
 465                if (!line) {
 466                        spin_unlock(&l_mg->gc_lock);
 467                        break;
 468                }
 469
 470                spin_lock(&line->lock);
 471                WARN_ON(line->state != PBLK_LINESTATE_CLOSED);
 472                line->state = PBLK_LINESTATE_GC;
 473                trace_pblk_line_state(pblk_disk_name(pblk), line->id,
 474                                        line->state);
 475                spin_unlock(&line->lock);
 476
 477                list_del(&line->list);
 478                spin_unlock(&l_mg->gc_lock);
 479
 480                spin_lock(&gc->r_lock);
 481                list_add_tail(&line->list, &gc->r_list);
 482                spin_unlock(&gc->r_lock);
 483
 484                read_inflight_gc = atomic_inc_return(&gc->read_inflight_gc);
 485                pblk_gc_reader_kick(gc);
 486
 487                prev_group = 1;
 488
 489                /* No need to queue up more GC lines than we can handle */
 490                run_gc = pblk_gc_should_run(&pblk->gc, &pblk->rl);
 491                if (!run_gc || read_inflight_gc >= PBLK_GC_L_QD)
 492                        break;
 493        } while (1);
 494
 495        if (!prev_group && pblk->rl.rb_state > gc_group &&
 496                                                gc_group < PBLK_GC_NR_LISTS)
 497                goto next_gc_group;
 498}
 499
 500static void pblk_gc_timer(struct timer_list *t)
 501{
 502        struct pblk *pblk = from_timer(pblk, t, gc.gc_timer);
 503
 504        pblk_gc_kick(pblk);
 505}
 506
 507static int pblk_gc_ts(void *data)
 508{
 509        struct pblk *pblk = data;
 510
 511        while (!kthread_should_stop()) {
 512                pblk_gc_run(pblk);
 513                set_current_state(TASK_INTERRUPTIBLE);
 514                io_schedule();
 515        }
 516
 517        return 0;
 518}
 519
 520static int pblk_gc_writer_ts(void *data)
 521{
 522        struct pblk *pblk = data;
 523
 524        while (!kthread_should_stop()) {
 525                if (!pblk_gc_write(pblk))
 526                        continue;
 527                set_current_state(TASK_INTERRUPTIBLE);
 528                io_schedule();
 529        }
 530
 531        return 0;
 532}
 533
 534static int pblk_gc_reader_ts(void *data)
 535{
 536        struct pblk *pblk = data;
 537        struct pblk_gc *gc = &pblk->gc;
 538
 539        while (!kthread_should_stop()) {
 540                if (!pblk_gc_read(pblk))
 541                        continue;
 542                set_current_state(TASK_INTERRUPTIBLE);
 543                io_schedule();
 544        }
 545
 546#ifdef CONFIG_NVM_PBLK_DEBUG
 547        pblk_info(pblk, "flushing gc pipeline, %d lines left\n",
 548                atomic_read(&gc->pipeline_gc));
 549#endif
 550
 551        do {
 552                if (!atomic_read(&gc->pipeline_gc))
 553                        break;
 554
 555                schedule();
 556        } while (1);
 557
 558        return 0;
 559}
 560
 561static void pblk_gc_start(struct pblk *pblk)
 562{
 563        pblk->gc.gc_active = 1;
 564        pblk_debug(pblk, "gc start\n");
 565}
 566
 567void pblk_gc_should_start(struct pblk *pblk)
 568{
 569        struct pblk_gc *gc = &pblk->gc;
 570
 571        if (gc->gc_enabled && !gc->gc_active) {
 572                pblk_gc_start(pblk);
 573                pblk_gc_kick(pblk);
 574        }
 575}
 576
 577void pblk_gc_should_stop(struct pblk *pblk)
 578{
 579        struct pblk_gc *gc = &pblk->gc;
 580
 581        if (gc->gc_active && !gc->gc_forced)
 582                gc->gc_active = 0;
 583}
 584
 585void pblk_gc_should_kick(struct pblk *pblk)
 586{
 587        pblk_rl_update_rates(&pblk->rl);
 588}
 589
 590void pblk_gc_sysfs_state_show(struct pblk *pblk, int *gc_enabled,
 591                              int *gc_active)
 592{
 593        struct pblk_gc *gc = &pblk->gc;
 594
 595        spin_lock(&gc->lock);
 596        *gc_enabled = gc->gc_enabled;
 597        *gc_active = gc->gc_active;
 598        spin_unlock(&gc->lock);
 599}
 600
 601int pblk_gc_sysfs_force(struct pblk *pblk, int force)
 602{
 603        struct pblk_gc *gc = &pblk->gc;
 604
 605        if (force < 0 || force > 1)
 606                return -EINVAL;
 607
 608        spin_lock(&gc->lock);
 609        gc->gc_forced = force;
 610
 611        if (force)
 612                gc->gc_enabled = 1;
 613        else
 614                gc->gc_enabled = 0;
 615        spin_unlock(&gc->lock);
 616
 617        pblk_gc_should_start(pblk);
 618
 619        return 0;
 620}
 621
 622int pblk_gc_init(struct pblk *pblk)
 623{
 624        struct pblk_gc *gc = &pblk->gc;
 625        int ret;
 626
 627        gc->gc_ts = kthread_create(pblk_gc_ts, pblk, "pblk-gc-ts");
 628        if (IS_ERR(gc->gc_ts)) {
 629                pblk_err(pblk, "could not allocate GC main kthread\n");
 630                return PTR_ERR(gc->gc_ts);
 631        }
 632
 633        gc->gc_writer_ts = kthread_create(pblk_gc_writer_ts, pblk,
 634                                                        "pblk-gc-writer-ts");
 635        if (IS_ERR(gc->gc_writer_ts)) {
 636                pblk_err(pblk, "could not allocate GC writer kthread\n");
 637                ret = PTR_ERR(gc->gc_writer_ts);
 638                goto fail_free_main_kthread;
 639        }
 640
 641        gc->gc_reader_ts = kthread_create(pblk_gc_reader_ts, pblk,
 642                                                        "pblk-gc-reader-ts");
 643        if (IS_ERR(gc->gc_reader_ts)) {
 644                pblk_err(pblk, "could not allocate GC reader kthread\n");
 645                ret = PTR_ERR(gc->gc_reader_ts);
 646                goto fail_free_writer_kthread;
 647        }
 648
 649        timer_setup(&gc->gc_timer, pblk_gc_timer, 0);
 650        mod_timer(&gc->gc_timer, jiffies + msecs_to_jiffies(GC_TIME_MSECS));
 651
 652        gc->gc_active = 0;
 653        gc->gc_forced = 0;
 654        gc->gc_enabled = 1;
 655        gc->w_entries = 0;
 656        atomic_set(&gc->read_inflight_gc, 0);
 657        atomic_set(&gc->pipeline_gc, 0);
 658
 659        /* Workqueue that reads valid sectors from a line and submit them to the
 660         * GC writer to be recycled.
 661         */
 662        gc->gc_line_reader_wq = alloc_workqueue("pblk-gc-line-reader-wq",
 663                        WQ_MEM_RECLAIM | WQ_UNBOUND, PBLK_GC_MAX_READERS);
 664        if (!gc->gc_line_reader_wq) {
 665                pblk_err(pblk, "could not allocate GC line reader workqueue\n");
 666                ret = -ENOMEM;
 667                goto fail_free_reader_kthread;
 668        }
 669
 670        /* Workqueue that prepare lines for GC */
 671        gc->gc_reader_wq = alloc_workqueue("pblk-gc-line_wq",
 672                                        WQ_MEM_RECLAIM | WQ_UNBOUND, 1);
 673        if (!gc->gc_reader_wq) {
 674                pblk_err(pblk, "could not allocate GC reader workqueue\n");
 675                ret = -ENOMEM;
 676                goto fail_free_reader_line_wq;
 677        }
 678
 679        spin_lock_init(&gc->lock);
 680        spin_lock_init(&gc->w_lock);
 681        spin_lock_init(&gc->r_lock);
 682
 683        sema_init(&gc->gc_sem, PBLK_GC_RQ_QD);
 684
 685        INIT_LIST_HEAD(&gc->w_list);
 686        INIT_LIST_HEAD(&gc->r_list);
 687
 688        return 0;
 689
 690fail_free_reader_line_wq:
 691        destroy_workqueue(gc->gc_line_reader_wq);
 692fail_free_reader_kthread:
 693        kthread_stop(gc->gc_reader_ts);
 694fail_free_writer_kthread:
 695        kthread_stop(gc->gc_writer_ts);
 696fail_free_main_kthread:
 697        kthread_stop(gc->gc_ts);
 698
 699        return ret;
 700}
 701
 702void pblk_gc_exit(struct pblk *pblk, bool graceful)
 703{
 704        struct pblk_gc *gc = &pblk->gc;
 705
 706        gc->gc_enabled = 0;
 707        del_timer_sync(&gc->gc_timer);
 708        gc->gc_active = 0;
 709
 710        if (gc->gc_ts)
 711                kthread_stop(gc->gc_ts);
 712
 713        if (gc->gc_reader_ts)
 714                kthread_stop(gc->gc_reader_ts);
 715
 716        if (graceful) {
 717                flush_workqueue(gc->gc_reader_wq);
 718                flush_workqueue(gc->gc_line_reader_wq);
 719        }
 720
 721        destroy_workqueue(gc->gc_reader_wq);
 722        destroy_workqueue(gc->gc_line_reader_wq);
 723
 724        if (gc->gc_writer_ts)
 725                kthread_stop(gc->gc_writer_ts);
 726}
 727