1 // SPDX-License-Identifier: GPL-2.0-only 1 // SPDX-License-Identifier: GPL-2.0-only 2 /********************************************* 2 /****************************************************************************** 3 ********************************************** 3 ******************************************************************************* 4 ** 4 ** 5 ** Copyright (C) 2005-2011 Red Hat, Inc. All 5 ** Copyright (C) 2005-2011 Red Hat, Inc. All rights reserved. 6 ** 6 ** 7 ** 7 ** 8 ********************************************** 8 ******************************************************************************* 9 ********************************************** 9 ******************************************************************************/ 10 10 11 #include "dlm_internal.h" 11 #include "dlm_internal.h" 12 #include "lockspace.h" 12 #include "lockspace.h" 13 #include "member.h" 13 #include "member.h" 14 #include "recoverd.h" 14 #include "recoverd.h" 15 #include "recover.h" 15 #include "recover.h" 16 #include "rcom.h" 16 #include "rcom.h" 17 #include "config.h" 17 #include "config.h" 18 #include "midcomms.h" << 19 #include "lowcomms.h" 18 #include "lowcomms.h" 20 19 21 int dlm_slots_version(const struct dlm_header !! 20 int dlm_slots_version(struct dlm_header *h) 22 { 21 { 23 if ((le32_to_cpu(h->h_version) & 0x000 !! 22 if ((h->h_version & 0x0000FFFF) < DLM_HEADER_SLOTS) 24 return 0; 23 return 0; 25 return 1; 24 return 1; 26 } 25 } 27 26 28 void dlm_slot_save(struct dlm_ls *ls, struct d 27 void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc, 29 struct dlm_member *memb) 28 struct dlm_member *memb) 30 { 29 { 31 struct rcom_config *rf = (struct rcom_ 30 struct rcom_config *rf = (struct rcom_config *)rc->rc_buf; 32 31 33 if (!dlm_slots_version(&rc->rc_header) 32 if (!dlm_slots_version(&rc->rc_header)) 34 return; 33 return; 35 34 36 memb->slot = le16_to_cpu(rf->rf_our_sl 35 memb->slot = le16_to_cpu(rf->rf_our_slot); 37 memb->generation = le32_to_cpu(rf->rf_ 36 memb->generation = le32_to_cpu(rf->rf_generation); 38 } 37 } 39 38 40 void dlm_slots_copy_out(struct dlm_ls *ls, str 39 void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc) 41 { 40 { 42 struct dlm_slot *slot; 41 struct dlm_slot *slot; 43 struct rcom_slot *ro; 42 struct rcom_slot *ro; 44 int i; 43 int i; 45 44 46 ro = (struct rcom_slot *)(rc->rc_buf + 45 ro = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config)); 47 46 48 /* ls_slots array is sparse, but not r 47 /* ls_slots array is sparse, but not rcom_slots */ 49 48 50 for (i = 0; i < ls->ls_slots_size; i++ 49 for (i = 0; i < ls->ls_slots_size; i++) { 51 slot = &ls->ls_slots[i]; 50 slot = &ls->ls_slots[i]; 52 if (!slot->nodeid) 51 if (!slot->nodeid) 53 continue; 52 continue; 54 ro->ro_nodeid = cpu_to_le32(sl 53 ro->ro_nodeid = cpu_to_le32(slot->nodeid); 55 ro->ro_slot = cpu_to_le16(slot 54 ro->ro_slot = cpu_to_le16(slot->slot); 56 ro++; 55 ro++; 57 } 56 } 58 } 57 } 59 58 60 #define SLOT_DEBUG_LINE 128 59 #define SLOT_DEBUG_LINE 128 61 60 62 static void log_slots(struct dlm_ls *ls, uint3 61 static void log_slots(struct dlm_ls *ls, uint32_t gen, int num_slots, 63 struct rcom_slot *ro0, s 62 struct rcom_slot *ro0, struct dlm_slot *array, 64 int array_size) 63 int array_size) 65 { 64 { 66 char line[SLOT_DEBUG_LINE]; 65 char line[SLOT_DEBUG_LINE]; 67 int len = SLOT_DEBUG_LINE - 1; 66 int len = SLOT_DEBUG_LINE - 1; 68 int pos = 0; 67 int pos = 0; 69 int ret, i; 68 int ret, i; 70 69 71 memset(line, 0, sizeof(line)); 70 memset(line, 0, sizeof(line)); 72 71 73 if (array) { 72 if (array) { 74 for (i = 0; i < array_size; i+ 73 for (i = 0; i < array_size; i++) { 75 if (!array[i].nodeid) 74 if (!array[i].nodeid) 76 continue; 75 continue; 77 76 78 ret = snprintf(line + 77 ret = snprintf(line + pos, len - pos, " %d:%d", 79 array[i 78 array[i].slot, array[i].nodeid); 80 if (ret >= len - pos) 79 if (ret >= len - pos) 81 break; 80 break; 82 pos += ret; 81 pos += ret; 83 } 82 } 84 } else if (ro0) { 83 } else if (ro0) { 85 for (i = 0; i < num_slots; i++ 84 for (i = 0; i < num_slots; i++) { 86 ret = snprintf(line + 85 ret = snprintf(line + pos, len - pos, " %d:%d", 87 ro0[i]. 86 ro0[i].ro_slot, ro0[i].ro_nodeid); 88 if (ret >= len - pos) 87 if (ret >= len - pos) 89 break; 88 break; 90 pos += ret; 89 pos += ret; 91 } 90 } 92 } 91 } 93 92 94 log_rinfo(ls, "generation %u slots %d% 93 log_rinfo(ls, "generation %u slots %d%s", gen, num_slots, line); 95 } 94 } 96 95 97 int dlm_slots_copy_in(struct dlm_ls *ls) 96 int dlm_slots_copy_in(struct dlm_ls *ls) 98 { 97 { 99 struct dlm_member *memb; 98 struct dlm_member *memb; 100 struct dlm_rcom *rc = ls->ls_recover_b 99 struct dlm_rcom *rc = ls->ls_recover_buf; 101 struct rcom_config *rf = (struct rcom_ 100 struct rcom_config *rf = (struct rcom_config *)rc->rc_buf; 102 struct rcom_slot *ro0, *ro; 101 struct rcom_slot *ro0, *ro; 103 int our_nodeid = dlm_our_nodeid(); 102 int our_nodeid = dlm_our_nodeid(); 104 int i, num_slots; 103 int i, num_slots; 105 uint32_t gen; 104 uint32_t gen; 106 105 107 if (!dlm_slots_version(&rc->rc_header) 106 if (!dlm_slots_version(&rc->rc_header)) 108 return -1; 107 return -1; 109 108 110 gen = le32_to_cpu(rf->rf_generation); 109 gen = le32_to_cpu(rf->rf_generation); 111 if (gen <= ls->ls_generation) { 110 if (gen <= ls->ls_generation) { 112 log_error(ls, "dlm_slots_copy_ 111 log_error(ls, "dlm_slots_copy_in gen %u old %u", 113 gen, ls->ls_generati 112 gen, ls->ls_generation); 114 } 113 } 115 ls->ls_generation = gen; 114 ls->ls_generation = gen; 116 115 117 num_slots = le16_to_cpu(rf->rf_num_slo 116 num_slots = le16_to_cpu(rf->rf_num_slots); 118 if (!num_slots) 117 if (!num_slots) 119 return -1; 118 return -1; 120 119 121 ro0 = (struct rcom_slot *)(rc->rc_buf 120 ro0 = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config)); 122 121 >> 122 for (i = 0, ro = ro0; i < num_slots; i++, ro++) { >> 123 ro->ro_nodeid = le32_to_cpu(ro->ro_nodeid); >> 124 ro->ro_slot = le16_to_cpu(ro->ro_slot); >> 125 } >> 126 123 log_slots(ls, gen, num_slots, ro0, NUL 127 log_slots(ls, gen, num_slots, ro0, NULL, 0); 124 128 125 list_for_each_entry(memb, &ls->ls_node 129 list_for_each_entry(memb, &ls->ls_nodes, list) { 126 for (i = 0, ro = ro0; i < num_ 130 for (i = 0, ro = ro0; i < num_slots; i++, ro++) { 127 if (le32_to_cpu(ro->ro !! 131 if (ro->ro_nodeid != memb->nodeid) 128 continue; 132 continue; 129 memb->slot = le16_to_c !! 133 memb->slot = ro->ro_slot; 130 memb->slot_prev = memb 134 memb->slot_prev = memb->slot; 131 break; 135 break; 132 } 136 } 133 137 134 if (memb->nodeid == our_nodeid 138 if (memb->nodeid == our_nodeid) { 135 if (ls->ls_slot && ls- 139 if (ls->ls_slot && ls->ls_slot != memb->slot) { 136 log_error(ls, 140 log_error(ls, "dlm_slots_copy_in our slot " 137 "cha 141 "changed %d %d", ls->ls_slot, 138 memb 142 memb->slot); 139 return -1; 143 return -1; 140 } 144 } 141 145 142 if (!ls->ls_slot) 146 if (!ls->ls_slot) 143 ls->ls_slot = 147 ls->ls_slot = memb->slot; 144 } 148 } 145 149 146 if (!memb->slot) { 150 if (!memb->slot) { 147 log_error(ls, "dlm_slo 151 log_error(ls, "dlm_slots_copy_in nodeid %d no slot", 148 memb->nodei 152 memb->nodeid); 149 return -1; 153 return -1; 150 } 154 } 151 } 155 } 152 156 153 return 0; 157 return 0; 154 } 158 } 155 159 156 /* for any nodes that do not support slots, we 160 /* for any nodes that do not support slots, we will not have set memb->slot 157 in wait_status_all(), so memb->slot will re 161 in wait_status_all(), so memb->slot will remain -1, and we will not 158 assign slots or set ls_num_slots here */ 162 assign slots or set ls_num_slots here */ 159 163 160 int dlm_slots_assign(struct dlm_ls *ls, int *n 164 int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size, 161 struct dlm_slot **slots_o 165 struct dlm_slot **slots_out, uint32_t *gen_out) 162 { 166 { 163 struct dlm_member *memb; 167 struct dlm_member *memb; 164 struct dlm_slot *array; 168 struct dlm_slot *array; 165 int our_nodeid = dlm_our_nodeid(); 169 int our_nodeid = dlm_our_nodeid(); 166 int array_size, max_slots, i; 170 int array_size, max_slots, i; 167 int need = 0; 171 int need = 0; 168 int max = 0; 172 int max = 0; 169 int num = 0; 173 int num = 0; 170 uint32_t gen = 0; 174 uint32_t gen = 0; 171 175 172 /* our own memb struct will have slot 176 /* our own memb struct will have slot -1 gen 0 */ 173 177 174 list_for_each_entry(memb, &ls->ls_node 178 list_for_each_entry(memb, &ls->ls_nodes, list) { 175 if (memb->nodeid == our_nodeid 179 if (memb->nodeid == our_nodeid) { 176 memb->slot = ls->ls_sl 180 memb->slot = ls->ls_slot; 177 memb->generation = ls- 181 memb->generation = ls->ls_generation; 178 break; 182 break; 179 } 183 } 180 } 184 } 181 185 182 list_for_each_entry(memb, &ls->ls_node 186 list_for_each_entry(memb, &ls->ls_nodes, list) { 183 if (memb->generation > gen) 187 if (memb->generation > gen) 184 gen = memb->generation 188 gen = memb->generation; 185 189 186 /* node doesn't support slots 190 /* node doesn't support slots */ 187 191 188 if (memb->slot == -1) 192 if (memb->slot == -1) 189 return -1; 193 return -1; 190 194 191 /* node needs a slot assigned 195 /* node needs a slot assigned */ 192 196 193 if (!memb->slot) 197 if (!memb->slot) 194 need++; 198 need++; 195 199 196 /* node has a slot assigned */ 200 /* node has a slot assigned */ 197 201 198 num++; 202 num++; 199 203 200 if (!max || max < memb->slot) 204 if (!max || max < memb->slot) 201 max = memb->slot; 205 max = memb->slot; 202 206 203 /* sanity check, once slot is 207 /* sanity check, once slot is assigned it shouldn't change */ 204 208 205 if (memb->slot_prev && memb->s 209 if (memb->slot_prev && memb->slot && memb->slot_prev != memb->slot) { 206 log_error(ls, "nodeid 210 log_error(ls, "nodeid %d slot changed %d %d", 207 memb->nodeid 211 memb->nodeid, memb->slot_prev, memb->slot); 208 return -1; 212 return -1; 209 } 213 } 210 memb->slot_prev = memb->slot; 214 memb->slot_prev = memb->slot; 211 } 215 } 212 216 213 array_size = max + need; 217 array_size = max + need; 214 array = kcalloc(array_size, sizeof(*ar 218 array = kcalloc(array_size, sizeof(*array), GFP_NOFS); 215 if (!array) 219 if (!array) 216 return -ENOMEM; 220 return -ENOMEM; 217 221 218 num = 0; 222 num = 0; 219 223 220 /* fill in slots (offsets) that are us 224 /* fill in slots (offsets) that are used */ 221 225 222 list_for_each_entry(memb, &ls->ls_node 226 list_for_each_entry(memb, &ls->ls_nodes, list) { 223 if (!memb->slot) 227 if (!memb->slot) 224 continue; 228 continue; 225 229 226 if (memb->slot > array_size) { 230 if (memb->slot > array_size) { 227 log_error(ls, "invalid 231 log_error(ls, "invalid slot number %d", memb->slot); 228 kfree(array); 232 kfree(array); 229 return -1; 233 return -1; 230 } 234 } 231 235 232 array[memb->slot - 1].nodeid = 236 array[memb->slot - 1].nodeid = memb->nodeid; 233 array[memb->slot - 1].slot = m 237 array[memb->slot - 1].slot = memb->slot; 234 num++; 238 num++; 235 } 239 } 236 240 237 /* assign new slots from unused offset 241 /* assign new slots from unused offsets */ 238 242 239 list_for_each_entry(memb, &ls->ls_node 243 list_for_each_entry(memb, &ls->ls_nodes, list) { 240 if (memb->slot) 244 if (memb->slot) 241 continue; 245 continue; 242 246 243 for (i = 0; i < array_size; i+ 247 for (i = 0; i < array_size; i++) { 244 if (array[i].nodeid) 248 if (array[i].nodeid) 245 continue; 249 continue; 246 250 247 memb->slot = i + 1; 251 memb->slot = i + 1; 248 memb->slot_prev = memb 252 memb->slot_prev = memb->slot; 249 array[i].nodeid = memb 253 array[i].nodeid = memb->nodeid; 250 array[i].slot = memb-> 254 array[i].slot = memb->slot; 251 num++; 255 num++; 252 256 253 if (!ls->ls_slot && me 257 if (!ls->ls_slot && memb->nodeid == our_nodeid) 254 ls->ls_slot = 258 ls->ls_slot = memb->slot; 255 break; 259 break; 256 } 260 } 257 261 258 if (!memb->slot) { 262 if (!memb->slot) { 259 log_error(ls, "no free 263 log_error(ls, "no free slot found"); 260 kfree(array); 264 kfree(array); 261 return -1; 265 return -1; 262 } 266 } 263 } 267 } 264 268 265 gen++; 269 gen++; 266 270 267 log_slots(ls, gen, num, NULL, array, a 271 log_slots(ls, gen, num, NULL, array, array_size); 268 272 269 max_slots = (DLM_MAX_APP_BUFSIZE - siz !! 273 max_slots = (LOWCOMMS_MAX_TX_BUFFER_LEN - sizeof(struct dlm_rcom) - 270 sizeof(struct rcom_config 274 sizeof(struct rcom_config)) / sizeof(struct rcom_slot); 271 275 272 if (num > max_slots) { 276 if (num > max_slots) { 273 log_error(ls, "num_slots %d ex 277 log_error(ls, "num_slots %d exceeds max_slots %d", 274 num, max_slots); 278 num, max_slots); 275 kfree(array); 279 kfree(array); 276 return -1; 280 return -1; 277 } 281 } 278 282 279 *gen_out = gen; 283 *gen_out = gen; 280 *slots_out = array; 284 *slots_out = array; 281 *slots_size = array_size; 285 *slots_size = array_size; 282 *num_slots = num; 286 *num_slots = num; 283 return 0; 287 return 0; 284 } 288 } 285 289 286 static void add_ordered_member(struct dlm_ls * 290 static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new) 287 { 291 { 288 struct dlm_member *memb = NULL; 292 struct dlm_member *memb = NULL; 289 struct list_head *tmp; 293 struct list_head *tmp; 290 struct list_head *newlist = &new->list 294 struct list_head *newlist = &new->list; 291 struct list_head *head = &ls->ls_nodes 295 struct list_head *head = &ls->ls_nodes; 292 296 293 list_for_each(tmp, head) { 297 list_for_each(tmp, head) { 294 memb = list_entry(tmp, struct 298 memb = list_entry(tmp, struct dlm_member, list); 295 if (new->nodeid < memb->nodeid 299 if (new->nodeid < memb->nodeid) 296 break; 300 break; 297 } 301 } 298 302 299 if (!memb) 303 if (!memb) 300 list_add_tail(newlist, head); 304 list_add_tail(newlist, head); 301 else { 305 else { 302 /* FIXME: can use list macro h 306 /* FIXME: can use list macro here */ 303 newlist->prev = tmp->prev; 307 newlist->prev = tmp->prev; 304 newlist->next = tmp; 308 newlist->next = tmp; 305 tmp->prev->next = newlist; 309 tmp->prev->next = newlist; 306 tmp->prev = newlist; 310 tmp->prev = newlist; 307 } 311 } 308 } 312 } 309 313 310 static int add_remote_member(int nodeid) << 311 { << 312 int error; << 313 << 314 if (nodeid == dlm_our_nodeid()) << 315 return 0; << 316 << 317 error = dlm_lowcomms_connect_node(node << 318 if (error < 0) << 319 return error; << 320 << 321 dlm_midcomms_add_member(nodeid); << 322 return 0; << 323 } << 324 << 325 static int dlm_add_member(struct dlm_ls *ls, s 314 static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node) 326 { 315 { 327 struct dlm_member *memb; 316 struct dlm_member *memb; 328 int error; 317 int error; 329 318 330 memb = kzalloc(sizeof(*memb), GFP_NOFS 319 memb = kzalloc(sizeof(*memb), GFP_NOFS); 331 if (!memb) 320 if (!memb) 332 return -ENOMEM; 321 return -ENOMEM; 333 322 334 memb->nodeid = node->nodeid; !! 323 error = dlm_lowcomms_connect_node(node->nodeid); 335 memb->weight = node->weight; << 336 memb->comm_seq = node->comm_seq; << 337 << 338 error = add_remote_member(node->nodeid << 339 if (error < 0) { 324 if (error < 0) { 340 kfree(memb); 325 kfree(memb); 341 return error; 326 return error; 342 } 327 } 343 328 >> 329 memb->nodeid = node->nodeid; >> 330 memb->weight = node->weight; >> 331 memb->comm_seq = node->comm_seq; 344 add_ordered_member(ls, memb); 332 add_ordered_member(ls, memb); 345 ls->ls_num_nodes++; 333 ls->ls_num_nodes++; 346 return 0; 334 return 0; 347 } 335 } 348 336 349 static struct dlm_member *find_memb(struct lis 337 static struct dlm_member *find_memb(struct list_head *head, int nodeid) 350 { 338 { 351 struct dlm_member *memb; 339 struct dlm_member *memb; 352 340 353 list_for_each_entry(memb, head, list) 341 list_for_each_entry(memb, head, list) { 354 if (memb->nodeid == nodeid) 342 if (memb->nodeid == nodeid) 355 return memb; 343 return memb; 356 } 344 } 357 return NULL; 345 return NULL; 358 } 346 } 359 347 360 int dlm_is_member(struct dlm_ls *ls, int nodei 348 int dlm_is_member(struct dlm_ls *ls, int nodeid) 361 { 349 { 362 if (find_memb(&ls->ls_nodes, nodeid)) 350 if (find_memb(&ls->ls_nodes, nodeid)) 363 return 1; 351 return 1; 364 return 0; 352 return 0; 365 } 353 } 366 354 367 int dlm_is_removed(struct dlm_ls *ls, int node 355 int dlm_is_removed(struct dlm_ls *ls, int nodeid) 368 { 356 { 369 WARN_ON_ONCE(!nodeid || nodeid == -1); << 370 << 371 if (find_memb(&ls->ls_nodes_gone, node 357 if (find_memb(&ls->ls_nodes_gone, nodeid)) 372 return 1; 358 return 1; 373 return 0; 359 return 0; 374 } 360 } 375 361 376 static void clear_memb_list(struct list_head * !! 362 static void clear_memb_list(struct list_head *head) 377 void (*after_del)( << 378 { 363 { 379 struct dlm_member *memb; 364 struct dlm_member *memb; 380 365 381 while (!list_empty(head)) { 366 while (!list_empty(head)) { 382 memb = list_entry(head->next, 367 memb = list_entry(head->next, struct dlm_member, list); 383 list_del(&memb->list); 368 list_del(&memb->list); 384 if (after_del) << 385 after_del(memb->nodeid << 386 kfree(memb); 369 kfree(memb); 387 } 370 } 388 } 371 } 389 372 390 static void remove_remote_member(int nodeid) << 391 { << 392 if (nodeid == dlm_our_nodeid()) << 393 return; << 394 << 395 dlm_midcomms_remove_member(nodeid); << 396 } << 397 << 398 void dlm_clear_members(struct dlm_ls *ls) 373 void dlm_clear_members(struct dlm_ls *ls) 399 { 374 { 400 clear_memb_list(&ls->ls_nodes, remove_ !! 375 clear_memb_list(&ls->ls_nodes); 401 ls->ls_num_nodes = 0; 376 ls->ls_num_nodes = 0; 402 } 377 } 403 378 404 void dlm_clear_members_gone(struct dlm_ls *ls) 379 void dlm_clear_members_gone(struct dlm_ls *ls) 405 { 380 { 406 clear_memb_list(&ls->ls_nodes_gone, NU !! 381 clear_memb_list(&ls->ls_nodes_gone); 407 } 382 } 408 383 409 static void make_member_array(struct dlm_ls *l 384 static void make_member_array(struct dlm_ls *ls) 410 { 385 { 411 struct dlm_member *memb; 386 struct dlm_member *memb; 412 int i, w, x = 0, total = 0, all_zero = 387 int i, w, x = 0, total = 0, all_zero = 0, *array; 413 388 414 kfree(ls->ls_node_array); 389 kfree(ls->ls_node_array); 415 ls->ls_node_array = NULL; 390 ls->ls_node_array = NULL; 416 391 417 list_for_each_entry(memb, &ls->ls_node 392 list_for_each_entry(memb, &ls->ls_nodes, list) { 418 if (memb->weight) 393 if (memb->weight) 419 total += memb->weight; 394 total += memb->weight; 420 } 395 } 421 396 422 /* all nodes revert to weight of 1 if 397 /* all nodes revert to weight of 1 if all have weight 0 */ 423 398 424 if (!total) { 399 if (!total) { 425 total = ls->ls_num_nodes; 400 total = ls->ls_num_nodes; 426 all_zero = 1; 401 all_zero = 1; 427 } 402 } 428 403 429 ls->ls_total_weight = total; 404 ls->ls_total_weight = total; 430 array = kmalloc_array(total, sizeof(*a 405 array = kmalloc_array(total, sizeof(*array), GFP_NOFS); 431 if (!array) 406 if (!array) 432 return; 407 return; 433 408 434 list_for_each_entry(memb, &ls->ls_node 409 list_for_each_entry(memb, &ls->ls_nodes, list) { 435 if (!all_zero && !memb->weight 410 if (!all_zero && !memb->weight) 436 continue; 411 continue; 437 412 438 if (all_zero) 413 if (all_zero) 439 w = 1; 414 w = 1; 440 else 415 else 441 w = memb->weight; 416 w = memb->weight; 442 417 443 DLM_ASSERT(x < total, printk(" 418 DLM_ASSERT(x < total, printk("total %d x %d\n", total, x);); 444 419 445 for (i = 0; i < w; i++) 420 for (i = 0; i < w; i++) 446 array[x++] = memb->nod 421 array[x++] = memb->nodeid; 447 } 422 } 448 423 449 ls->ls_node_array = array; 424 ls->ls_node_array = array; 450 } 425 } 451 426 452 /* send a status request to all members just t 427 /* send a status request to all members just to establish comms connections */ 453 428 454 static int ping_members(struct dlm_ls *ls, uin !! 429 static int ping_members(struct dlm_ls *ls) 455 { 430 { 456 struct dlm_member *memb; 431 struct dlm_member *memb; 457 int error = 0; 432 int error = 0; 458 433 459 list_for_each_entry(memb, &ls->ls_node 434 list_for_each_entry(memb, &ls->ls_nodes, list) { 460 if (dlm_recovery_stopped(ls)) !! 435 error = dlm_recovery_stopped(ls); 461 error = -EINTR; !! 436 if (error) 462 break; 437 break; 463 } !! 438 error = dlm_rcom_status(ls, memb->nodeid, 0); 464 error = dlm_rcom_status(ls, me << 465 if (error) 439 if (error) 466 break; 440 break; 467 } 441 } 468 if (error) 442 if (error) 469 log_rinfo(ls, "ping_members ab 443 log_rinfo(ls, "ping_members aborted %d last nodeid %d", 470 error, ls->ls_recove 444 error, ls->ls_recover_nodeid); 471 return error; 445 return error; 472 } 446 } 473 447 474 static void dlm_lsop_recover_prep(struct dlm_l 448 static void dlm_lsop_recover_prep(struct dlm_ls *ls) 475 { 449 { 476 if (!ls->ls_ops || !ls->ls_ops->recove 450 if (!ls->ls_ops || !ls->ls_ops->recover_prep) 477 return; 451 return; 478 ls->ls_ops->recover_prep(ls->ls_ops_ar 452 ls->ls_ops->recover_prep(ls->ls_ops_arg); 479 } 453 } 480 454 481 static void dlm_lsop_recover_slot(struct dlm_l 455 static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb) 482 { 456 { 483 struct dlm_slot slot; 457 struct dlm_slot slot; 484 uint32_t seq; 458 uint32_t seq; 485 int error; 459 int error; 486 460 487 if (!ls->ls_ops || !ls->ls_ops->recove 461 if (!ls->ls_ops || !ls->ls_ops->recover_slot) 488 return; 462 return; 489 463 490 /* if there is no comms connection wit 464 /* if there is no comms connection with this node 491 or the present comms connection is 465 or the present comms connection is newer 492 than the one when this member was a 466 than the one when this member was added, then 493 we consider the node to have failed 467 we consider the node to have failed (versus 494 being removed due to dlm_release_lo 468 being removed due to dlm_release_lockspace) */ 495 469 496 error = dlm_comm_seq(memb->nodeid, &se 470 error = dlm_comm_seq(memb->nodeid, &seq); 497 471 498 if (!error && seq == memb->comm_seq) 472 if (!error && seq == memb->comm_seq) 499 return; 473 return; 500 474 501 slot.nodeid = memb->nodeid; 475 slot.nodeid = memb->nodeid; 502 slot.slot = memb->slot; 476 slot.slot = memb->slot; 503 477 504 ls->ls_ops->recover_slot(ls->ls_ops_ar 478 ls->ls_ops->recover_slot(ls->ls_ops_arg, &slot); 505 } 479 } 506 480 507 void dlm_lsop_recover_done(struct dlm_ls *ls) 481 void dlm_lsop_recover_done(struct dlm_ls *ls) 508 { 482 { 509 struct dlm_member *memb; 483 struct dlm_member *memb; 510 struct dlm_slot *slots; 484 struct dlm_slot *slots; 511 int i, num; 485 int i, num; 512 486 513 if (!ls->ls_ops || !ls->ls_ops->recove 487 if (!ls->ls_ops || !ls->ls_ops->recover_done) 514 return; 488 return; 515 489 516 num = ls->ls_num_nodes; 490 num = ls->ls_num_nodes; 517 slots = kcalloc(num, sizeof(*slots), G 491 slots = kcalloc(num, sizeof(*slots), GFP_KERNEL); 518 if (!slots) 492 if (!slots) 519 return; 493 return; 520 494 521 i = 0; 495 i = 0; 522 list_for_each_entry(memb, &ls->ls_node 496 list_for_each_entry(memb, &ls->ls_nodes, list) { 523 if (i == num) { 497 if (i == num) { 524 log_error(ls, "dlm_lso 498 log_error(ls, "dlm_lsop_recover_done bad num %d", num); 525 goto out; 499 goto out; 526 } 500 } 527 slots[i].nodeid = memb->nodeid 501 slots[i].nodeid = memb->nodeid; 528 slots[i].slot = memb->slot; 502 slots[i].slot = memb->slot; 529 i++; 503 i++; 530 } 504 } 531 505 532 ls->ls_ops->recover_done(ls->ls_ops_ar 506 ls->ls_ops->recover_done(ls->ls_ops_arg, slots, num, 533 ls->ls_slot, 507 ls->ls_slot, ls->ls_generation); 534 out: 508 out: 535 kfree(slots); 509 kfree(slots); 536 } 510 } 537 511 538 static struct dlm_config_node *find_config_nod 512 static struct dlm_config_node *find_config_node(struct dlm_recover *rv, 539 513 int nodeid) 540 { 514 { 541 int i; 515 int i; 542 516 543 for (i = 0; i < rv->nodes_count; i++) 517 for (i = 0; i < rv->nodes_count; i++) { 544 if (rv->nodes[i].nodeid == nod 518 if (rv->nodes[i].nodeid == nodeid) 545 return &rv->nodes[i]; 519 return &rv->nodes[i]; 546 } 520 } 547 return NULL; 521 return NULL; 548 } 522 } 549 523 550 int dlm_recover_members(struct dlm_ls *ls, str 524 int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out) 551 { 525 { 552 struct dlm_member *memb, *safe; 526 struct dlm_member *memb, *safe; 553 struct dlm_config_node *node; 527 struct dlm_config_node *node; 554 int i, error, neg = 0, low = -1; 528 int i, error, neg = 0, low = -1; 555 529 556 /* previously removed members that we' 530 /* previously removed members that we've not finished removing need to 557 * count as a negative change so the " !! 531 count as a negative change so the "neg" recovery steps will happen */ 558 * << 559 * This functionality must report all << 560 * midcomms layer and must never retur << 561 */ << 562 532 563 list_for_each_entry(memb, &ls->ls_node 533 list_for_each_entry(memb, &ls->ls_nodes_gone, list) { 564 log_rinfo(ls, "prev removed me 534 log_rinfo(ls, "prev removed member %d", memb->nodeid); 565 neg++; 535 neg++; 566 } 536 } 567 537 568 /* move departed members from ls_nodes 538 /* move departed members from ls_nodes to ls_nodes_gone */ 569 539 570 list_for_each_entry_safe(memb, safe, & 540 list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) { 571 node = find_config_node(rv, me 541 node = find_config_node(rv, memb->nodeid); 572 if (node && !node->new) 542 if (node && !node->new) 573 continue; 543 continue; 574 544 575 if (!node) { 545 if (!node) { 576 log_rinfo(ls, "remove 546 log_rinfo(ls, "remove member %d", memb->nodeid); 577 } else { 547 } else { 578 /* removed and re-adde 548 /* removed and re-added */ 579 log_rinfo(ls, "remove 549 log_rinfo(ls, "remove member %d comm_seq %u %u", 580 memb->nodeid 550 memb->nodeid, memb->comm_seq, node->comm_seq); 581 } 551 } 582 552 583 neg++; 553 neg++; 584 list_move(&memb->list, &ls->ls 554 list_move(&memb->list, &ls->ls_nodes_gone); 585 remove_remote_member(memb->nod << 586 ls->ls_num_nodes--; 555 ls->ls_num_nodes--; 587 dlm_lsop_recover_slot(ls, memb 556 dlm_lsop_recover_slot(ls, memb); 588 } 557 } 589 558 590 /* add new members to ls_nodes */ 559 /* add new members to ls_nodes */ 591 560 592 for (i = 0; i < rv->nodes_count; i++) 561 for (i = 0; i < rv->nodes_count; i++) { 593 node = &rv->nodes[i]; 562 node = &rv->nodes[i]; 594 if (dlm_is_member(ls, node->no 563 if (dlm_is_member(ls, node->nodeid)) 595 continue; 564 continue; 596 error = dlm_add_member(ls, nod !! 565 dlm_add_member(ls, node); 597 if (error) << 598 return error; << 599 << 600 log_rinfo(ls, "add member %d", 566 log_rinfo(ls, "add member %d", node->nodeid); 601 } 567 } 602 568 603 list_for_each_entry(memb, &ls->ls_node 569 list_for_each_entry(memb, &ls->ls_nodes, list) { 604 if (low == -1 || memb->nodeid 570 if (low == -1 || memb->nodeid < low) 605 low = memb->nodeid; 571 low = memb->nodeid; 606 } 572 } 607 ls->ls_low_nodeid = low; 573 ls->ls_low_nodeid = low; 608 574 609 make_member_array(ls); 575 make_member_array(ls); 610 *neg_out = neg; 576 *neg_out = neg; 611 577 612 error = ping_members(ls, rv->seq); !! 578 error = ping_members(ls); >> 579 if (!error || error == -EPROTO) { >> 580 /* new_lockspace() may be waiting to know if the config >> 581 is good or bad */ >> 582 ls->ls_members_result = error; >> 583 complete(&ls->ls_members_done); >> 584 } >> 585 613 log_rinfo(ls, "dlm_recover_members %d 586 log_rinfo(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes); 614 return error; 587 return error; 615 } 588 } 616 589 617 /* Userspace guarantees that dlm_ls_stop() has 590 /* Userspace guarantees that dlm_ls_stop() has completed on all nodes before 618 dlm_ls_start() is called on any of them to 591 dlm_ls_start() is called on any of them to start the new recovery. */ 619 592 620 int dlm_ls_stop(struct dlm_ls *ls) 593 int dlm_ls_stop(struct dlm_ls *ls) 621 { 594 { 622 int new; 595 int new; 623 596 624 /* 597 /* 625 * Prevent dlm_recv from being in the 598 * Prevent dlm_recv from being in the middle of something when we do 626 * the stop. This includes ensuring d 599 * the stop. This includes ensuring dlm_recv isn't processing a 627 * recovery message (rcom), while dlm_ 600 * recovery message (rcom), while dlm_recoverd is aborting and 628 * resetting things from an in-progres 601 * resetting things from an in-progress recovery. i.e. we want 629 * dlm_recoverd to abort its recovery 602 * dlm_recoverd to abort its recovery without worrying about dlm_recv 630 * processing an rcom at the same time 603 * processing an rcom at the same time. Stopping dlm_recv also makes 631 * it easy for dlm_receive_message() t 604 * it easy for dlm_receive_message() to check locking stopped and add a 632 * message to the requestqueue without 605 * message to the requestqueue without races. 633 */ 606 */ 634 607 635 write_lock_bh(&ls->ls_recv_active); !! 608 down_write(&ls->ls_recv_active); 636 609 637 /* 610 /* 638 * Abort any recovery that's in progre 611 * Abort any recovery that's in progress (see RECOVER_STOP, 639 * dlm_recovery_stopped()) and tell an 612 * dlm_recovery_stopped()) and tell any other threads running in the 640 * dlm to quit any processing (see RUN 613 * dlm to quit any processing (see RUNNING, dlm_locking_stopped()). 641 */ 614 */ 642 615 643 spin_lock_bh(&ls->ls_recover_lock); !! 616 spin_lock(&ls->ls_recover_lock); 644 set_bit(LSFL_RECOVER_STOP, &ls->ls_fla 617 set_bit(LSFL_RECOVER_STOP, &ls->ls_flags); 645 new = test_and_clear_bit(LSFL_RUNNING, 618 new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags); 646 if (new) << 647 timer_delete_sync(&ls->ls_scan << 648 ls->ls_recover_seq++; 619 ls->ls_recover_seq++; 649 !! 620 spin_unlock(&ls->ls_recover_lock); 650 /* activate requestqueue and stop proc << 651 write_lock_bh(&ls->ls_requestqueue_loc << 652 set_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls << 653 write_unlock_bh(&ls->ls_requestqueue_l << 654 spin_unlock_bh(&ls->ls_recover_lock); << 655 621 656 /* 622 /* 657 * Let dlm_recv run again, now any nor 623 * Let dlm_recv run again, now any normal messages will be saved on the 658 * requestqueue for later. 624 * requestqueue for later. 659 */ 625 */ 660 626 661 write_unlock_bh(&ls->ls_recv_active); !! 627 up_write(&ls->ls_recv_active); 662 628 663 /* 629 /* 664 * This in_recovery lock does two thin 630 * This in_recovery lock does two things: 665 * 1) Keeps this function from returni 631 * 1) Keeps this function from returning until all threads are out 666 * of locking routines and locking 632 * of locking routines and locking is truly stopped. 667 * 2) Keeps any new requests from bein 633 * 2) Keeps any new requests from being processed until it's unlocked 668 * when recovery is complete. 634 * when recovery is complete. 669 */ 635 */ 670 636 671 if (new) { 637 if (new) { 672 set_bit(LSFL_RECOVER_DOWN, &ls 638 set_bit(LSFL_RECOVER_DOWN, &ls->ls_flags); 673 wake_up_process(ls->ls_recover 639 wake_up_process(ls->ls_recoverd_task); 674 wait_event(ls->ls_recover_lock 640 wait_event(ls->ls_recover_lock_wait, 675 test_bit(LSFL_RECOV 641 test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags)); 676 } 642 } 677 643 678 /* 644 /* 679 * The recoverd suspend/resume makes s 645 * The recoverd suspend/resume makes sure that dlm_recoverd (if 680 * running) has noticed RECOVER_STOP a 646 * running) has noticed RECOVER_STOP above and quit processing the 681 * previous recovery. 647 * previous recovery. 682 */ 648 */ 683 649 684 dlm_recoverd_suspend(ls); 650 dlm_recoverd_suspend(ls); 685 651 686 spin_lock_bh(&ls->ls_recover_lock); !! 652 spin_lock(&ls->ls_recover_lock); 687 kfree(ls->ls_slots); 653 kfree(ls->ls_slots); 688 ls->ls_slots = NULL; 654 ls->ls_slots = NULL; 689 ls->ls_num_slots = 0; 655 ls->ls_num_slots = 0; 690 ls->ls_slots_size = 0; 656 ls->ls_slots_size = 0; 691 ls->ls_recover_status = 0; 657 ls->ls_recover_status = 0; 692 spin_unlock_bh(&ls->ls_recover_lock); !! 658 spin_unlock(&ls->ls_recover_lock); 693 659 694 dlm_recoverd_resume(ls); 660 dlm_recoverd_resume(ls); 695 661 696 if (!ls->ls_recover_begin) 662 if (!ls->ls_recover_begin) 697 ls->ls_recover_begin = jiffies 663 ls->ls_recover_begin = jiffies; 698 664 699 /* call recover_prep ops only once and !! 665 dlm_lsop_recover_prep(ls); 700 * for each possible dlm_ls_stop() whe << 701 * stopped. << 702 * << 703 * If we successful was able to clear << 704 * it was set we know it is the first << 705 */ << 706 if (new) << 707 dlm_lsop_recover_prep(ls); << 708 << 709 return 0; 666 return 0; 710 } 667 } 711 668 712 int dlm_ls_start(struct dlm_ls *ls) 669 int dlm_ls_start(struct dlm_ls *ls) 713 { 670 { 714 struct dlm_recover *rv, *rv_old; 671 struct dlm_recover *rv, *rv_old; 715 struct dlm_config_node *nodes = NULL; 672 struct dlm_config_node *nodes = NULL; 716 int error, count; 673 int error, count; 717 674 718 rv = kzalloc(sizeof(*rv), GFP_NOFS); 675 rv = kzalloc(sizeof(*rv), GFP_NOFS); 719 if (!rv) 676 if (!rv) 720 return -ENOMEM; 677 return -ENOMEM; 721 678 722 error = dlm_config_nodes(ls->ls_name, 679 error = dlm_config_nodes(ls->ls_name, &nodes, &count); 723 if (error < 0) 680 if (error < 0) 724 goto fail_rv; 681 goto fail_rv; 725 682 726 spin_lock_bh(&ls->ls_recover_lock); !! 683 spin_lock(&ls->ls_recover_lock); 727 684 728 /* the lockspace needs to be stopped b 685 /* the lockspace needs to be stopped before it can be started */ 729 686 730 if (!dlm_locking_stopped(ls)) { 687 if (!dlm_locking_stopped(ls)) { 731 spin_unlock_bh(&ls->ls_recover !! 688 spin_unlock(&ls->ls_recover_lock); 732 log_error(ls, "start ignored: 689 log_error(ls, "start ignored: lockspace running"); 733 error = -EINVAL; 690 error = -EINVAL; 734 goto fail; 691 goto fail; 735 } 692 } 736 693 737 rv->nodes = nodes; 694 rv->nodes = nodes; 738 rv->nodes_count = count; 695 rv->nodes_count = count; 739 rv->seq = ++ls->ls_recover_seq; 696 rv->seq = ++ls->ls_recover_seq; 740 rv_old = ls->ls_recover_args; 697 rv_old = ls->ls_recover_args; 741 ls->ls_recover_args = rv; 698 ls->ls_recover_args = rv; 742 spin_unlock_bh(&ls->ls_recover_lock); !! 699 spin_unlock(&ls->ls_recover_lock); 743 700 744 if (rv_old) { 701 if (rv_old) { 745 log_error(ls, "unused recovery 702 log_error(ls, "unused recovery %llx %d", 746 (unsigned long long) 703 (unsigned long long)rv_old->seq, rv_old->nodes_count); 747 kfree(rv_old->nodes); 704 kfree(rv_old->nodes); 748 kfree(rv_old); 705 kfree(rv_old); 749 } 706 } 750 707 751 set_bit(LSFL_RECOVER_WORK, &ls->ls_fla 708 set_bit(LSFL_RECOVER_WORK, &ls->ls_flags); 752 wake_up_process(ls->ls_recoverd_task); 709 wake_up_process(ls->ls_recoverd_task); 753 return 0; 710 return 0; 754 711 755 fail: 712 fail: 756 kfree(nodes); 713 kfree(nodes); 757 fail_rv: 714 fail_rv: 758 kfree(rv); 715 kfree(rv); 759 return error; 716 return error; 760 } 717 } 761 718 762 719
Linux® is a registered trademark of Linus Torvalds in the United States and other countries.
TOMOYO® is a registered trademark of NTT DATA CORPORATION.