1 // SPDX-License-Identifier: GPL-2.0-only 1 2 /********************************************* 3 ********************************************** 4 ** 5 ** Copyright (C) 2005-2011 Red Hat, Inc. All 6 ** 7 ** 8 ********************************************** 9 ********************************************** 10 11 #include "dlm_internal.h" 12 #include "lockspace.h" 13 #include "member.h" 14 #include "recoverd.h" 15 #include "recover.h" 16 #include "rcom.h" 17 #include "config.h" 18 #include "midcomms.h" 19 #include "lowcomms.h" 20 21 int dlm_slots_version(const struct dlm_header 22 { 23 if ((le32_to_cpu(h->h_version) & 0x000 24 return 0; 25 return 1; 26 } 27 28 void dlm_slot_save(struct dlm_ls *ls, struct d 29 struct dlm_member *memb) 30 { 31 struct rcom_config *rf = (struct rcom_ 32 33 if (!dlm_slots_version(&rc->rc_header) 34 return; 35 36 memb->slot = le16_to_cpu(rf->rf_our_sl 37 memb->generation = le32_to_cpu(rf->rf_ 38 } 39 40 void dlm_slots_copy_out(struct dlm_ls *ls, str 41 { 42 struct dlm_slot *slot; 43 struct rcom_slot *ro; 44 int i; 45 46 ro = (struct rcom_slot *)(rc->rc_buf + 47 48 /* ls_slots array is sparse, but not r 49 50 for (i = 0; i < ls->ls_slots_size; i++ 51 slot = &ls->ls_slots[i]; 52 if (!slot->nodeid) 53 continue; 54 ro->ro_nodeid = cpu_to_le32(sl 55 ro->ro_slot = cpu_to_le16(slot 56 ro++; 57 } 58 } 59 60 #define SLOT_DEBUG_LINE 128 61 62 static void log_slots(struct dlm_ls *ls, uint3 63 struct rcom_slot *ro0, s 64 int array_size) 65 { 66 char line[SLOT_DEBUG_LINE]; 67 int len = SLOT_DEBUG_LINE - 1; 68 int pos = 0; 69 int ret, i; 70 71 memset(line, 0, sizeof(line)); 72 73 if (array) { 74 for (i = 0; i < array_size; i+ 75 if (!array[i].nodeid) 76 continue; 77 78 ret = snprintf(line + 79 array[i 80 if (ret >= len - pos) 81 break; 82 pos += ret; 83 } 84 } else if (ro0) { 85 for (i = 0; i < num_slots; i++ 86 ret = snprintf(line + 87 ro0[i]. 88 if (ret >= len - pos) 89 break; 90 pos += ret; 91 } 92 } 93 94 log_rinfo(ls, "generation %u slots %d% 95 } 96 97 int dlm_slots_copy_in(struct dlm_ls *ls) 98 { 99 struct dlm_member *memb; 100 struct dlm_rcom *rc = ls->ls_recover_b 101 struct rcom_config *rf = (struct rcom_ 102 struct rcom_slot *ro0, *ro; 103 int our_nodeid = dlm_our_nodeid(); 104 int i, num_slots; 105 uint32_t gen; 106 107 if (!dlm_slots_version(&rc->rc_header) 108 return -1; 109 110 gen = le32_to_cpu(rf->rf_generation); 111 if (gen <= ls->ls_generation) { 112 log_error(ls, "dlm_slots_copy_ 113 gen, ls->ls_generati 114 } 115 ls->ls_generation = gen; 116 117 num_slots = le16_to_cpu(rf->rf_num_slo 118 if (!num_slots) 119 return -1; 120 121 ro0 = (struct rcom_slot *)(rc->rc_buf 122 123 log_slots(ls, gen, num_slots, ro0, NUL 124 125 list_for_each_entry(memb, &ls->ls_node 126 for (i = 0, ro = ro0; i < num_ 127 if (le32_to_cpu(ro->ro 128 continue; 129 memb->slot = le16_to_c 130 memb->slot_prev = memb 131 break; 132 } 133 134 if (memb->nodeid == our_nodeid 135 if (ls->ls_slot && ls- 136 log_error(ls, 137 "cha 138 memb 139 return -1; 140 } 141 142 if (!ls->ls_slot) 143 ls->ls_slot = 144 } 145 146 if (!memb->slot) { 147 log_error(ls, "dlm_slo 148 memb->nodei 149 return -1; 150 } 151 } 152 153 return 0; 154 } 155 156 /* for any nodes that do not support slots, we 157 in wait_status_all(), so memb->slot will re 158 assign slots or set ls_num_slots here */ 159 160 int dlm_slots_assign(struct dlm_ls *ls, int *n 161 struct dlm_slot **slots_o 162 { 163 struct dlm_member *memb; 164 struct dlm_slot *array; 165 int our_nodeid = dlm_our_nodeid(); 166 int array_size, max_slots, i; 167 int need = 0; 168 int max = 0; 169 int num = 0; 170 uint32_t gen = 0; 171 172 /* our own memb struct will have slot 173 174 list_for_each_entry(memb, &ls->ls_node 175 if (memb->nodeid == our_nodeid 176 memb->slot = ls->ls_sl 177 memb->generation = ls- 178 break; 179 } 180 } 181 182 list_for_each_entry(memb, &ls->ls_node 183 if (memb->generation > gen) 184 gen = memb->generation 185 186 /* node doesn't support slots 187 188 if (memb->slot == -1) 189 return -1; 190 191 /* node needs a slot assigned 192 193 if (!memb->slot) 194 need++; 195 196 /* node has a slot assigned */ 197 198 num++; 199 200 if (!max || max < memb->slot) 201 max = memb->slot; 202 203 /* sanity check, once slot is 204 205 if (memb->slot_prev && memb->s 206 log_error(ls, "nodeid 207 memb->nodeid 208 return -1; 209 } 210 memb->slot_prev = memb->slot; 211 } 212 213 array_size = max + need; 214 array = kcalloc(array_size, sizeof(*ar 215 if (!array) 216 return -ENOMEM; 217 218 num = 0; 219 220 /* fill in slots (offsets) that are us 221 222 list_for_each_entry(memb, &ls->ls_node 223 if (!memb->slot) 224 continue; 225 226 if (memb->slot > array_size) { 227 log_error(ls, "invalid 228 kfree(array); 229 return -1; 230 } 231 232 array[memb->slot - 1].nodeid = 233 array[memb->slot - 1].slot = m 234 num++; 235 } 236 237 /* assign new slots from unused offset 238 239 list_for_each_entry(memb, &ls->ls_node 240 if (memb->slot) 241 continue; 242 243 for (i = 0; i < array_size; i+ 244 if (array[i].nodeid) 245 continue; 246 247 memb->slot = i + 1; 248 memb->slot_prev = memb 249 array[i].nodeid = memb 250 array[i].slot = memb-> 251 num++; 252 253 if (!ls->ls_slot && me 254 ls->ls_slot = 255 break; 256 } 257 258 if (!memb->slot) { 259 log_error(ls, "no free 260 kfree(array); 261 return -1; 262 } 263 } 264 265 gen++; 266 267 log_slots(ls, gen, num, NULL, array, a 268 269 max_slots = (DLM_MAX_APP_BUFSIZE - siz 270 sizeof(struct rcom_config 271 272 if (num > max_slots) { 273 log_error(ls, "num_slots %d ex 274 num, max_slots); 275 kfree(array); 276 return -1; 277 } 278 279 *gen_out = gen; 280 *slots_out = array; 281 *slots_size = array_size; 282 *num_slots = num; 283 return 0; 284 } 285 286 static void add_ordered_member(struct dlm_ls * 287 { 288 struct dlm_member *memb = NULL; 289 struct list_head *tmp; 290 struct list_head *newlist = &new->list 291 struct list_head *head = &ls->ls_nodes 292 293 list_for_each(tmp, head) { 294 memb = list_entry(tmp, struct 295 if (new->nodeid < memb->nodeid 296 break; 297 } 298 299 if (!memb) 300 list_add_tail(newlist, head); 301 else { 302 /* FIXME: can use list macro h 303 newlist->prev = tmp->prev; 304 newlist->next = tmp; 305 tmp->prev->next = newlist; 306 tmp->prev = newlist; 307 } 308 } 309 310 static int add_remote_member(int nodeid) 311 { 312 int error; 313 314 if (nodeid == dlm_our_nodeid()) 315 return 0; 316 317 error = dlm_lowcomms_connect_node(node 318 if (error < 0) 319 return error; 320 321 dlm_midcomms_add_member(nodeid); 322 return 0; 323 } 324 325 static int dlm_add_member(struct dlm_ls *ls, s 326 { 327 struct dlm_member *memb; 328 int error; 329 330 memb = kzalloc(sizeof(*memb), GFP_NOFS 331 if (!memb) 332 return -ENOMEM; 333 334 memb->nodeid = node->nodeid; 335 memb->weight = node->weight; 336 memb->comm_seq = node->comm_seq; 337 338 error = add_remote_member(node->nodeid 339 if (error < 0) { 340 kfree(memb); 341 return error; 342 } 343 344 add_ordered_member(ls, memb); 345 ls->ls_num_nodes++; 346 return 0; 347 } 348 349 static struct dlm_member *find_memb(struct lis 350 { 351 struct dlm_member *memb; 352 353 list_for_each_entry(memb, head, list) 354 if (memb->nodeid == nodeid) 355 return memb; 356 } 357 return NULL; 358 } 359 360 int dlm_is_member(struct dlm_ls *ls, int nodei 361 { 362 if (find_memb(&ls->ls_nodes, nodeid)) 363 return 1; 364 return 0; 365 } 366 367 int dlm_is_removed(struct dlm_ls *ls, int node 368 { 369 WARN_ON_ONCE(!nodeid || nodeid == -1); 370 371 if (find_memb(&ls->ls_nodes_gone, node 372 return 1; 373 return 0; 374 } 375 376 static void clear_memb_list(struct list_head * 377 void (*after_del)( 378 { 379 struct dlm_member *memb; 380 381 while (!list_empty(head)) { 382 memb = list_entry(head->next, 383 list_del(&memb->list); 384 if (after_del) 385 after_del(memb->nodeid 386 kfree(memb); 387 } 388 } 389 390 static void remove_remote_member(int nodeid) 391 { 392 if (nodeid == dlm_our_nodeid()) 393 return; 394 395 dlm_midcomms_remove_member(nodeid); 396 } 397 398 void dlm_clear_members(struct dlm_ls *ls) 399 { 400 clear_memb_list(&ls->ls_nodes, remove_ 401 ls->ls_num_nodes = 0; 402 } 403 404 void dlm_clear_members_gone(struct dlm_ls *ls) 405 { 406 clear_memb_list(&ls->ls_nodes_gone, NU 407 } 408 409 static void make_member_array(struct dlm_ls *l 410 { 411 struct dlm_member *memb; 412 int i, w, x = 0, total = 0, all_zero = 413 414 kfree(ls->ls_node_array); 415 ls->ls_node_array = NULL; 416 417 list_for_each_entry(memb, &ls->ls_node 418 if (memb->weight) 419 total += memb->weight; 420 } 421 422 /* all nodes revert to weight of 1 if 423 424 if (!total) { 425 total = ls->ls_num_nodes; 426 all_zero = 1; 427 } 428 429 ls->ls_total_weight = total; 430 array = kmalloc_array(total, sizeof(*a 431 if (!array) 432 return; 433 434 list_for_each_entry(memb, &ls->ls_node 435 if (!all_zero && !memb->weight 436 continue; 437 438 if (all_zero) 439 w = 1; 440 else 441 w = memb->weight; 442 443 DLM_ASSERT(x < total, printk(" 444 445 for (i = 0; i < w; i++) 446 array[x++] = memb->nod 447 } 448 449 ls->ls_node_array = array; 450 } 451 452 /* send a status request to all members just t 453 454 static int ping_members(struct dlm_ls *ls, uin 455 { 456 struct dlm_member *memb; 457 int error = 0; 458 459 list_for_each_entry(memb, &ls->ls_node 460 if (dlm_recovery_stopped(ls)) 461 error = -EINTR; 462 break; 463 } 464 error = dlm_rcom_status(ls, me 465 if (error) 466 break; 467 } 468 if (error) 469 log_rinfo(ls, "ping_members ab 470 error, ls->ls_recove 471 return error; 472 } 473 474 static void dlm_lsop_recover_prep(struct dlm_l 475 { 476 if (!ls->ls_ops || !ls->ls_ops->recove 477 return; 478 ls->ls_ops->recover_prep(ls->ls_ops_ar 479 } 480 481 static void dlm_lsop_recover_slot(struct dlm_l 482 { 483 struct dlm_slot slot; 484 uint32_t seq; 485 int error; 486 487 if (!ls->ls_ops || !ls->ls_ops->recove 488 return; 489 490 /* if there is no comms connection wit 491 or the present comms connection is 492 than the one when this member was a 493 we consider the node to have failed 494 being removed due to dlm_release_lo 495 496 error = dlm_comm_seq(memb->nodeid, &se 497 498 if (!error && seq == memb->comm_seq) 499 return; 500 501 slot.nodeid = memb->nodeid; 502 slot.slot = memb->slot; 503 504 ls->ls_ops->recover_slot(ls->ls_ops_ar 505 } 506 507 void dlm_lsop_recover_done(struct dlm_ls *ls) 508 { 509 struct dlm_member *memb; 510 struct dlm_slot *slots; 511 int i, num; 512 513 if (!ls->ls_ops || !ls->ls_ops->recove 514 return; 515 516 num = ls->ls_num_nodes; 517 slots = kcalloc(num, sizeof(*slots), G 518 if (!slots) 519 return; 520 521 i = 0; 522 list_for_each_entry(memb, &ls->ls_node 523 if (i == num) { 524 log_error(ls, "dlm_lso 525 goto out; 526 } 527 slots[i].nodeid = memb->nodeid 528 slots[i].slot = memb->slot; 529 i++; 530 } 531 532 ls->ls_ops->recover_done(ls->ls_ops_ar 533 ls->ls_slot, 534 out: 535 kfree(slots); 536 } 537 538 static struct dlm_config_node *find_config_nod 539 540 { 541 int i; 542 543 for (i = 0; i < rv->nodes_count; i++) 544 if (rv->nodes[i].nodeid == nod 545 return &rv->nodes[i]; 546 } 547 return NULL; 548 } 549 550 int dlm_recover_members(struct dlm_ls *ls, str 551 { 552 struct dlm_member *memb, *safe; 553 struct dlm_config_node *node; 554 int i, error, neg = 0, low = -1; 555 556 /* previously removed members that we' 557 * count as a negative change so the " 558 * 559 * This functionality must report all 560 * midcomms layer and must never retur 561 */ 562 563 list_for_each_entry(memb, &ls->ls_node 564 log_rinfo(ls, "prev removed me 565 neg++; 566 } 567 568 /* move departed members from ls_nodes 569 570 list_for_each_entry_safe(memb, safe, & 571 node = find_config_node(rv, me 572 if (node && !node->new) 573 continue; 574 575 if (!node) { 576 log_rinfo(ls, "remove 577 } else { 578 /* removed and re-adde 579 log_rinfo(ls, "remove 580 memb->nodeid 581 } 582 583 neg++; 584 list_move(&memb->list, &ls->ls 585 remove_remote_member(memb->nod 586 ls->ls_num_nodes--; 587 dlm_lsop_recover_slot(ls, memb 588 } 589 590 /* add new members to ls_nodes */ 591 592 for (i = 0; i < rv->nodes_count; i++) 593 node = &rv->nodes[i]; 594 if (dlm_is_member(ls, node->no 595 continue; 596 error = dlm_add_member(ls, nod 597 if (error) 598 return error; 599 600 log_rinfo(ls, "add member %d", 601 } 602 603 list_for_each_entry(memb, &ls->ls_node 604 if (low == -1 || memb->nodeid 605 low = memb->nodeid; 606 } 607 ls->ls_low_nodeid = low; 608 609 make_member_array(ls); 610 *neg_out = neg; 611 612 error = ping_members(ls, rv->seq); 613 log_rinfo(ls, "dlm_recover_members %d 614 return error; 615 } 616 617 /* Userspace guarantees that dlm_ls_stop() has 618 dlm_ls_start() is called on any of them to 619 620 int dlm_ls_stop(struct dlm_ls *ls) 621 { 622 int new; 623 624 /* 625 * Prevent dlm_recv from being in the 626 * the stop. This includes ensuring d 627 * recovery message (rcom), while dlm_ 628 * resetting things from an in-progres 629 * dlm_recoverd to abort its recovery 630 * processing an rcom at the same time 631 * it easy for dlm_receive_message() t 632 * message to the requestqueue without 633 */ 634 635 write_lock_bh(&ls->ls_recv_active); 636 637 /* 638 * Abort any recovery that's in progre 639 * dlm_recovery_stopped()) and tell an 640 * dlm to quit any processing (see RUN 641 */ 642 643 spin_lock_bh(&ls->ls_recover_lock); 644 set_bit(LSFL_RECOVER_STOP, &ls->ls_fla 645 new = test_and_clear_bit(LSFL_RUNNING, 646 if (new) 647 timer_delete_sync(&ls->ls_scan 648 ls->ls_recover_seq++; 649 650 /* activate requestqueue and stop proc 651 write_lock_bh(&ls->ls_requestqueue_loc 652 set_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls 653 write_unlock_bh(&ls->ls_requestqueue_l 654 spin_unlock_bh(&ls->ls_recover_lock); 655 656 /* 657 * Let dlm_recv run again, now any nor 658 * requestqueue for later. 659 */ 660 661 write_unlock_bh(&ls->ls_recv_active); 662 663 /* 664 * This in_recovery lock does two thin 665 * 1) Keeps this function from returni 666 * of locking routines and locking 667 * 2) Keeps any new requests from bein 668 * when recovery is complete. 669 */ 670 671 if (new) { 672 set_bit(LSFL_RECOVER_DOWN, &ls 673 wake_up_process(ls->ls_recover 674 wait_event(ls->ls_recover_lock 675 test_bit(LSFL_RECOV 676 } 677 678 /* 679 * The recoverd suspend/resume makes s 680 * running) has noticed RECOVER_STOP a 681 * previous recovery. 682 */ 683 684 dlm_recoverd_suspend(ls); 685 686 spin_lock_bh(&ls->ls_recover_lock); 687 kfree(ls->ls_slots); 688 ls->ls_slots = NULL; 689 ls->ls_num_slots = 0; 690 ls->ls_slots_size = 0; 691 ls->ls_recover_status = 0; 692 spin_unlock_bh(&ls->ls_recover_lock); 693 694 dlm_recoverd_resume(ls); 695 696 if (!ls->ls_recover_begin) 697 ls->ls_recover_begin = jiffies 698 699 /* call recover_prep ops only once and 700 * for each possible dlm_ls_stop() whe 701 * stopped. 702 * 703 * If we successful was able to clear 704 * it was set we know it is the first 705 */ 706 if (new) 707 dlm_lsop_recover_prep(ls); 708 709 return 0; 710 } 711 712 int dlm_ls_start(struct dlm_ls *ls) 713 { 714 struct dlm_recover *rv, *rv_old; 715 struct dlm_config_node *nodes = NULL; 716 int error, count; 717 718 rv = kzalloc(sizeof(*rv), GFP_NOFS); 719 if (!rv) 720 return -ENOMEM; 721 722 error = dlm_config_nodes(ls->ls_name, 723 if (error < 0) 724 goto fail_rv; 725 726 spin_lock_bh(&ls->ls_recover_lock); 727 728 /* the lockspace needs to be stopped b 729 730 if (!dlm_locking_stopped(ls)) { 731 spin_unlock_bh(&ls->ls_recover 732 log_error(ls, "start ignored: 733 error = -EINVAL; 734 goto fail; 735 } 736 737 rv->nodes = nodes; 738 rv->nodes_count = count; 739 rv->seq = ++ls->ls_recover_seq; 740 rv_old = ls->ls_recover_args; 741 ls->ls_recover_args = rv; 742 spin_unlock_bh(&ls->ls_recover_lock); 743 744 if (rv_old) { 745 log_error(ls, "unused recovery 746 (unsigned long long) 747 kfree(rv_old->nodes); 748 kfree(rv_old); 749 } 750 751 set_bit(LSFL_RECOVER_WORK, &ls->ls_fla 752 wake_up_process(ls->ls_recoverd_task); 753 return 0; 754 755 fail: 756 kfree(nodes); 757 fail_rv: 758 kfree(rv); 759 return error; 760 } 761 762
Linux® is a registered trademark of Linus Torvalds in the United States and other countries.
TOMOYO® is a registered trademark of NTT DATA CORPORATION.