~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~

TOMOYO Linux Cross Reference
Linux/fs/dlm/lock.c

Version: ~ [ linux-6.11.5 ] ~ [ linux-6.10.14 ] ~ [ linux-6.9.12 ] ~ [ linux-6.8.12 ] ~ [ linux-6.7.12 ] ~ [ linux-6.6.58 ] ~ [ linux-6.5.13 ] ~ [ linux-6.4.16 ] ~ [ linux-6.3.13 ] ~ [ linux-6.2.16 ] ~ [ linux-6.1.114 ] ~ [ linux-6.0.19 ] ~ [ linux-5.19.17 ] ~ [ linux-5.18.19 ] ~ [ linux-5.17.15 ] ~ [ linux-5.16.20 ] ~ [ linux-5.15.169 ] ~ [ linux-5.14.21 ] ~ [ linux-5.13.19 ] ~ [ linux-5.12.19 ] ~ [ linux-5.11.22 ] ~ [ linux-5.10.228 ] ~ [ linux-5.9.16 ] ~ [ linux-5.8.18 ] ~ [ linux-5.7.19 ] ~ [ linux-5.6.19 ] ~ [ linux-5.5.19 ] ~ [ linux-5.4.284 ] ~ [ linux-5.3.18 ] ~ [ linux-5.2.21 ] ~ [ linux-5.1.21 ] ~ [ linux-5.0.21 ] ~ [ linux-4.20.17 ] ~ [ linux-4.19.322 ] ~ [ linux-4.18.20 ] ~ [ linux-4.17.19 ] ~ [ linux-4.16.18 ] ~ [ linux-4.15.18 ] ~ [ linux-4.14.336 ] ~ [ linux-4.13.16 ] ~ [ linux-4.12.14 ] ~ [ linux-4.11.12 ] ~ [ linux-4.10.17 ] ~ [ linux-4.9.337 ] ~ [ linux-4.4.302 ] ~ [ linux-3.10.108 ] ~ [ linux-2.6.32.71 ] ~ [ linux-2.6.0 ] ~ [ linux-2.4.37.11 ] ~ [ unix-v6-master ] ~ [ ccs-tools-1.8.9 ] ~ [ policy-sample ] ~
Architecture: ~ [ i386 ] ~ [ alpha ] ~ [ m68k ] ~ [ mips ] ~ [ ppc ] ~ [ sparc ] ~ [ sparc64 ] ~

  1 // SPDX-License-Identifier: GPL-2.0-only
  2 /******************************************************************************
  3 *******************************************************************************
  4 **
  5 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
  6 **
  7 **
  8 *******************************************************************************
  9 ******************************************************************************/
 10 
 11 /* Central locking logic has four stages:
 12 
 13    dlm_lock()
 14    dlm_unlock()
 15 
 16    request_lock(ls, lkb)
 17    convert_lock(ls, lkb)
 18    unlock_lock(ls, lkb)
 19    cancel_lock(ls, lkb)
 20 
 21    _request_lock(r, lkb)
 22    _convert_lock(r, lkb)
 23    _unlock_lock(r, lkb)
 24    _cancel_lock(r, lkb)
 25 
 26    do_request(r, lkb)
 27    do_convert(r, lkb)
 28    do_unlock(r, lkb)
 29    do_cancel(r, lkb)
 30 
 31    Stage 1 (lock, unlock) is mainly about checking input args and
 32    splitting into one of the four main operations:
 33 
 34        dlm_lock          = request_lock
 35        dlm_lock+CONVERT  = convert_lock
 36        dlm_unlock        = unlock_lock
 37        dlm_unlock+CANCEL = cancel_lock
 38 
 39    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
 40    provided to the next stage.
 41 
 42    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
 43    When remote, it calls send_xxxx(), when local it calls do_xxxx().
 44 
 45    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
 46    given rsb and lkb and queues callbacks.
 47 
 48    For remote operations, send_xxxx() results in the corresponding do_xxxx()
 49    function being executed on the remote node.  The connecting send/receive
 50    calls on local (L) and remote (R) nodes:
 51 
 52    L: send_xxxx()              ->  R: receive_xxxx()
 53                                    R: do_xxxx()
 54    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
 55 */
 56 #include <trace/events/dlm.h>
 57 
 58 #include <linux/types.h>
 59 #include <linux/rbtree.h>
 60 #include <linux/slab.h>
 61 #include "dlm_internal.h"
 62 #include <linux/dlm_device.h>
 63 #include "memory.h"
 64 #include "midcomms.h"
 65 #include "requestqueue.h"
 66 #include "util.h"
 67 #include "dir.h"
 68 #include "member.h"
 69 #include "lockspace.h"
 70 #include "ast.h"
 71 #include "lock.h"
 72 #include "rcom.h"
 73 #include "recover.h"
 74 #include "lvb_table.h"
 75 #include "user.h"
 76 #include "config.h"
 77 
 78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
 79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
 80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
 81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
 82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
 83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
 84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
 85 static int send_remove(struct dlm_rsb *r);
 86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
 87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
 88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
 89                                     const struct dlm_message *ms, bool local);
 90 static int receive_extralen(const struct dlm_message *ms);
 91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
 92 static void deactivate_rsb(struct kref *kref);
 93 
 94 /*
 95  * Lock compatibilty matrix - thanks Steve
 96  * UN = Unlocked state. Not really a state, used as a flag
 97  * PD = Padding. Used to make the matrix a nice power of two in size
 98  * Other states are the same as the VMS DLM.
 99  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
100  */
101 
102 static const int __dlm_compat_matrix[8][8] = {
103       /* UN NL CR CW PR PW EX PD */
104         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
105         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
106         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
107         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
108         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
109         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
110         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
111         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
112 };
113 
114 /*
115  * This defines the direction of transfer of LVB data.
116  * Granted mode is the row; requested mode is the column.
117  * Usage: matrix[grmode+1][rqmode+1]
118  * 1 = LVB is returned to the caller
119  * 0 = LVB is written to the resource
120  * -1 = nothing happens to the LVB
121  */
122 
123 const int dlm_lvb_operations[8][8] = {
124         /* UN   NL  CR  CW  PR  PW  EX  PD*/
125         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
126         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
127         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
128         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
129         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
130         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
131         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
132         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
133 };
134 
135 #define modes_compat(gr, rq) \
136         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
137 
138 int dlm_modes_compat(int mode1, int mode2)
139 {
140         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
141 }
142 
143 /*
144  * Compatibility matrix for conversions with QUECVT set.
145  * Granted mode is the row; requested mode is the column.
146  * Usage: matrix[grmode+1][rqmode+1]
147  */
148 
149 static const int __quecvt_compat_matrix[8][8] = {
150       /* UN NL CR CW PR PW EX PD */
151         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
152         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
153         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
154         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
155         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
156         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
157         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
158         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
159 };
160 
161 void dlm_print_lkb(struct dlm_lkb *lkb)
162 {
163         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
164                "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
165                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166                dlm_iflags_val(lkb), lkb->lkb_status, lkb->lkb_rqmode,
167                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
168                (unsigned long long)lkb->lkb_recover_seq);
169 }
170 
171 static void dlm_print_rsb(struct dlm_rsb *r)
172 {
173         printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
174                "rlc %d name %s\n",
175                r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
176                r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
177                r->res_name);
178 }
179 
180 void dlm_dump_rsb(struct dlm_rsb *r)
181 {
182         struct dlm_lkb *lkb;
183 
184         dlm_print_rsb(r);
185 
186         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
187                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
188         printk(KERN_ERR "rsb lookup list\n");
189         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
190                 dlm_print_lkb(lkb);
191         printk(KERN_ERR "rsb grant queue:\n");
192         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
193                 dlm_print_lkb(lkb);
194         printk(KERN_ERR "rsb convert queue:\n");
195         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
196                 dlm_print_lkb(lkb);
197         printk(KERN_ERR "rsb wait queue:\n");
198         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
199                 dlm_print_lkb(lkb);
200 }
201 
202 /* Threads cannot use the lockspace while it's being recovered */
203 
204 void dlm_lock_recovery(struct dlm_ls *ls)
205 {
206         down_read(&ls->ls_in_recovery);
207 }
208 
209 void dlm_unlock_recovery(struct dlm_ls *ls)
210 {
211         up_read(&ls->ls_in_recovery);
212 }
213 
214 int dlm_lock_recovery_try(struct dlm_ls *ls)
215 {
216         return down_read_trylock(&ls->ls_in_recovery);
217 }
218 
219 static inline int can_be_queued(struct dlm_lkb *lkb)
220 {
221         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
222 }
223 
224 static inline int force_blocking_asts(struct dlm_lkb *lkb)
225 {
226         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
227 }
228 
229 static inline int is_demoted(struct dlm_lkb *lkb)
230 {
231         return test_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
232 }
233 
234 static inline int is_altmode(struct dlm_lkb *lkb)
235 {
236         return test_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
237 }
238 
239 static inline int is_granted(struct dlm_lkb *lkb)
240 {
241         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
242 }
243 
244 static inline int is_remote(struct dlm_rsb *r)
245 {
246         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
247         return !!r->res_nodeid;
248 }
249 
250 static inline int is_process_copy(struct dlm_lkb *lkb)
251 {
252         return lkb->lkb_nodeid &&
253                !test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
254 }
255 
256 static inline int is_master_copy(struct dlm_lkb *lkb)
257 {
258         return test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
259 }
260 
261 static inline int middle_conversion(struct dlm_lkb *lkb)
262 {
263         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265                 return 1;
266         return 0;
267 }
268 
269 static inline int down_conversion(struct dlm_lkb *lkb)
270 {
271         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272 }
273 
274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
275 {
276         return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
277 }
278 
279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
280 {
281         return test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
282 }
283 
284 static inline int is_overlap(struct dlm_lkb *lkb)
285 {
286         return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags) ||
287                test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
288 }
289 
290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
291 {
292         if (is_master_copy(lkb))
293                 return;
294 
295         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
296 
297         if (rv == -DLM_ECANCEL &&
298             test_and_clear_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags))
299                 rv = -EDEADLK;
300 
301         dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, dlm_sbflags_val(lkb));
302 }
303 
304 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
305 {
306         queue_cast(r, lkb,
307                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
308 }
309 
310 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
311 {
312         if (is_master_copy(lkb)) {
313                 send_bast(r, lkb, rqmode);
314         } else {
315                 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
316         }
317 }
318 
319 /*
320  * Basic operations on rsb's and lkb's
321  */
322 
323 static inline unsigned long rsb_toss_jiffies(void)
324 {
325         return jiffies + (READ_ONCE(dlm_config.ci_toss_secs) * HZ);
326 }
327 
328 /* This is only called to add a reference when the code already holds
329    a valid reference to the rsb, so there's no need for locking. */
330 
331 static inline void hold_rsb(struct dlm_rsb *r)
332 {
333         /* inactive rsbs are not ref counted */
334         WARN_ON(rsb_flag(r, RSB_INACTIVE));
335         kref_get(&r->res_ref);
336 }
337 
338 void dlm_hold_rsb(struct dlm_rsb *r)
339 {
340         hold_rsb(r);
341 }
342 
343 /* TODO move this to lib/refcount.c */
344 static __must_check bool
345 dlm_refcount_dec_and_write_lock_bh(refcount_t *r, rwlock_t *lock)
346 __cond_acquires(lock)
347 {
348         if (refcount_dec_not_one(r))
349                 return false;
350 
351         write_lock_bh(lock);
352         if (!refcount_dec_and_test(r)) {
353                 write_unlock_bh(lock);
354                 return false;
355         }
356 
357         return true;
358 }
359 
360 /* TODO move this to include/linux/kref.h */
361 static inline int dlm_kref_put_write_lock_bh(struct kref *kref,
362                                              void (*release)(struct kref *kref),
363                                              rwlock_t *lock)
364 {
365         if (dlm_refcount_dec_and_write_lock_bh(&kref->refcount, lock)) {
366                 release(kref);
367                 return 1;
368         }
369 
370         return 0;
371 }
372 
373 static void put_rsb(struct dlm_rsb *r)
374 {
375         struct dlm_ls *ls = r->res_ls;
376         int rv;
377 
378         rv = dlm_kref_put_write_lock_bh(&r->res_ref, deactivate_rsb,
379                                         &ls->ls_rsbtbl_lock);
380         if (rv)
381                 write_unlock_bh(&ls->ls_rsbtbl_lock);
382 }
383 
384 void dlm_put_rsb(struct dlm_rsb *r)
385 {
386         put_rsb(r);
387 }
388 
389 /* connected with timer_delete_sync() in dlm_ls_stop() to stop
390  * new timers when recovery is triggered and don't run them
391  * again until a resume_scan_timer() tries it again.
392  */
393 static void enable_scan_timer(struct dlm_ls *ls, unsigned long jiffies)
394 {
395         if (!dlm_locking_stopped(ls))
396                 mod_timer(&ls->ls_scan_timer, jiffies);
397 }
398 
399 /* This function tries to resume the timer callback if a rsb
400  * is on the scan list and no timer is pending. It might that
401  * the first entry is on currently executed as timer callback
402  * but we don't care if a timer queued up again and does
403  * nothing. Should be a rare case.
404  */
405 void resume_scan_timer(struct dlm_ls *ls)
406 {
407         struct dlm_rsb *r;
408 
409         spin_lock_bh(&ls->ls_scan_lock);
410         r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
411                                      res_scan_list);
412         if (r && !timer_pending(&ls->ls_scan_timer))
413                 enable_scan_timer(ls, r->res_toss_time);
414         spin_unlock_bh(&ls->ls_scan_lock);
415 }
416 
417 /* ls_rsbtbl_lock must be held */
418 
419 static void del_scan(struct dlm_ls *ls, struct dlm_rsb *r)
420 {
421         struct dlm_rsb *first;
422 
423         /* active rsbs should never be on the scan list */
424         WARN_ON(!rsb_flag(r, RSB_INACTIVE));
425 
426         spin_lock_bh(&ls->ls_scan_lock);
427         r->res_toss_time = 0;
428 
429         /* if the rsb is not queued do nothing */
430         if (list_empty(&r->res_scan_list))
431                 goto out;
432 
433         /* get the first element before delete */
434         first = list_first_entry(&ls->ls_scan_list, struct dlm_rsb,
435                                  res_scan_list);
436         list_del_init(&r->res_scan_list);
437         /* check if the first element was the rsb we deleted */
438         if (first == r) {
439                 /* try to get the new first element, if the list
440                  * is empty now try to delete the timer, if we are
441                  * too late we don't care.
442                  *
443                  * if the list isn't empty and a new first element got
444                  * in place, set the new timer expire time.
445                  */
446                 first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
447                                                  res_scan_list);
448                 if (!first)
449                         timer_delete(&ls->ls_scan_timer);
450                 else
451                         enable_scan_timer(ls, first->res_toss_time);
452         }
453 
454 out:
455         spin_unlock_bh(&ls->ls_scan_lock);
456 }
457 
458 static void add_scan(struct dlm_ls *ls, struct dlm_rsb *r)
459 {
460         int our_nodeid = dlm_our_nodeid();
461         struct dlm_rsb *first;
462 
463         /* A dir record for a remote master rsb should never be on the scan list. */
464         WARN_ON(!dlm_no_directory(ls) &&
465                 (r->res_master_nodeid != our_nodeid) &&
466                 (dlm_dir_nodeid(r) == our_nodeid));
467 
468         /* An active rsb should never be on the scan list. */
469         WARN_ON(!rsb_flag(r, RSB_INACTIVE));
470 
471         /* An rsb should not already be on the scan list. */
472         WARN_ON(!list_empty(&r->res_scan_list));
473 
474         spin_lock_bh(&ls->ls_scan_lock);
475         /* set the new rsb absolute expire time in the rsb */
476         r->res_toss_time = rsb_toss_jiffies();
477         if (list_empty(&ls->ls_scan_list)) {
478                 /* if the queue is empty add the element and it's
479                  * our new expire time
480                  */
481                 list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
482                 enable_scan_timer(ls, r->res_toss_time);
483         } else {
484                 /* try to get the maybe new first element and then add
485                  * to this rsb with the oldest expire time to the end
486                  * of the queue. If the list was empty before this
487                  * rsb expire time is our next expiration if it wasn't
488                  * the now new first elemet is our new expiration time
489                  */
490                 first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
491                                                  res_scan_list);
492                 list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
493                 if (!first)
494                         enable_scan_timer(ls, r->res_toss_time);
495                 else
496                         enable_scan_timer(ls, first->res_toss_time);
497         }
498         spin_unlock_bh(&ls->ls_scan_lock);
499 }
500 
501 /* if we hit contention we do in 250 ms a retry to trylock.
502  * if there is any other mod_timer in between we don't care
503  * about that it expires earlier again this is only for the
504  * unlikely case nothing happened in this time.
505  */
506 #define DLM_TOSS_TIMER_RETRY    (jiffies + msecs_to_jiffies(250))
507 
508 /* Called by lockspace scan_timer to free unused rsb's. */
509 
510 void dlm_rsb_scan(struct timer_list *timer)
511 {
512         struct dlm_ls *ls = from_timer(ls, timer, ls_scan_timer);
513         int our_nodeid = dlm_our_nodeid();
514         struct dlm_rsb *r;
515         int rv;
516 
517         while (1) {
518                 /* interrupting point to leave iteration when
519                  * recovery waits for timer_delete_sync(), recovery
520                  * will take care to delete everything in scan list.
521                  */
522                 if (dlm_locking_stopped(ls))
523                         break;
524 
525                 rv = spin_trylock(&ls->ls_scan_lock);
526                 if (!rv) {
527                         /* rearm again try timer */
528                         enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
529                         break;
530                 }
531 
532                 r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
533                                              res_scan_list);
534                 if (!r) {
535                         /* the next add_scan will enable the timer again */
536                         spin_unlock(&ls->ls_scan_lock);
537                         break;
538                 }
539 
540                 /*
541                  * If the first rsb is not yet expired, then stop because the
542                  * list is sorted with nearest expiration first.
543                  */
544                 if (time_before(jiffies, r->res_toss_time)) {
545                         /* rearm with the next rsb to expire in the future */
546                         enable_scan_timer(ls, r->res_toss_time);
547                         spin_unlock(&ls->ls_scan_lock);
548                         break;
549                 }
550 
551                 /* in find_rsb_dir/nodir there is a reverse order of this
552                  * lock, however this is only a trylock if we hit some
553                  * possible contention we try it again.
554                  */
555                 rv = write_trylock(&ls->ls_rsbtbl_lock);
556                 if (!rv) {
557                         spin_unlock(&ls->ls_scan_lock);
558                         /* rearm again try timer */
559                         enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
560                         break;
561                 }
562 
563                 list_del(&r->res_slow_list);
564                 rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
565                                        dlm_rhash_rsb_params);
566                 rsb_clear_flag(r, RSB_HASHED);
567 
568                 /* ls_rsbtbl_lock is not needed when calling send_remove() */
569                 write_unlock(&ls->ls_rsbtbl_lock);
570 
571                 list_del_init(&r->res_scan_list);
572                 spin_unlock(&ls->ls_scan_lock);
573 
574                 /* An rsb that is a dir record for a remote master rsb
575                  * cannot be removed, and should not have a timer enabled.
576                  */
577                 WARN_ON(!dlm_no_directory(ls) &&
578                         (r->res_master_nodeid != our_nodeid) &&
579                         (dlm_dir_nodeid(r) == our_nodeid));
580 
581                 /* We're the master of this rsb but we're not
582                  * the directory record, so we need to tell the
583                  * dir node to remove the dir record
584                  */
585                 if (!dlm_no_directory(ls) &&
586                     (r->res_master_nodeid == our_nodeid) &&
587                     (dlm_dir_nodeid(r) != our_nodeid))
588                         send_remove(r);
589 
590                 free_inactive_rsb(r);
591         }
592 }
593 
594 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
595    unlock any spinlocks, go back and call pre_rsb_struct again.
596    Otherwise, take an rsb off the list and return it. */
597 
598 static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
599                           struct dlm_rsb **r_ret)
600 {
601         struct dlm_rsb *r;
602 
603         r = dlm_allocate_rsb(ls);
604         if (!r)
605                 return -ENOMEM;
606 
607         r->res_ls = ls;
608         r->res_length = len;
609         memcpy(r->res_name, name, len);
610         spin_lock_init(&r->res_lock);
611 
612         INIT_LIST_HEAD(&r->res_lookup);
613         INIT_LIST_HEAD(&r->res_grantqueue);
614         INIT_LIST_HEAD(&r->res_convertqueue);
615         INIT_LIST_HEAD(&r->res_waitqueue);
616         INIT_LIST_HEAD(&r->res_root_list);
617         INIT_LIST_HEAD(&r->res_scan_list);
618         INIT_LIST_HEAD(&r->res_recover_list);
619         INIT_LIST_HEAD(&r->res_masters_list);
620 
621         *r_ret = r;
622         return 0;
623 }
624 
625 int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
626                         struct dlm_rsb **r_ret)
627 {
628         char key[DLM_RESNAME_MAXLEN] = {};
629 
630         memcpy(key, name, len);
631         *r_ret = rhashtable_lookup_fast(rhash, &key, dlm_rhash_rsb_params);
632         if (*r_ret)
633                 return 0;
634 
635         return -EBADR;
636 }
637 
638 static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
639 {
640         int rv;
641 
642         rv = rhashtable_insert_fast(rhash, &rsb->res_node,
643                                     dlm_rhash_rsb_params);
644         if (!rv)
645                 rsb_set_flag(rsb, RSB_HASHED);
646 
647         return rv;
648 }
649 
650 /*
651  * Find rsb in rsbtbl and potentially create/add one
652  *
653  * Delaying the release of rsb's has a similar benefit to applications keeping
654  * NL locks on an rsb, but without the guarantee that the cached master value
655  * will still be valid when the rsb is reused.  Apps aren't always smart enough
656  * to keep NL locks on an rsb that they may lock again shortly; this can lead
657  * to excessive master lookups and removals if we don't delay the release.
658  *
659  * Searching for an rsb means looking through both the normal list and toss
660  * list.  When found on the toss list the rsb is moved to the normal list with
661  * ref count of 1; when found on normal list the ref count is incremented.
662  *
663  * rsb's on the keep list are being used locally and refcounted.
664  * rsb's on the toss list are not being used locally, and are not refcounted.
665  *
666  * The toss list rsb's were either
667  * - previously used locally but not any more (were on keep list, then
668  *   moved to toss list when last refcount dropped)
669  * - created and put on toss list as a directory record for a lookup
670  *   (we are the dir node for the res, but are not using the res right now,
671  *   but some other node is)
672  *
673  * The purpose of find_rsb() is to return a refcounted rsb for local use.
674  * So, if the given rsb is on the toss list, it is moved to the keep list
675  * before being returned.
676  *
677  * deactivate_rsb() happens when all local usage of the rsb is done, i.e. no
678  * more refcounts exist, so the rsb is moved from the keep list to the
679  * toss list.
680  *
681  * rsb's on both keep and toss lists are used for doing a name to master
682  * lookups.  rsb's that are in use locally (and being refcounted) are on
683  * the keep list, rsb's that are not in use locally (not refcounted) and
684  * only exist for name/master lookups are on the toss list.
685  *
686  * rsb's on the toss list who's dir_nodeid is not local can have stale
687  * name/master mappings.  So, remote requests on such rsb's can potentially
688  * return with an error, which means the mapping is stale and needs to
689  * be updated with a new lookup.  (The idea behind MASTER UNCERTAIN and
690  * first_lkid is to keep only a single outstanding request on an rsb
691  * while that rsb has a potentially stale master.)
692  */
693 
694 static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
695                         uint32_t hash, int dir_nodeid, int from_nodeid,
696                         unsigned int flags, struct dlm_rsb **r_ret)
697 {
698         struct dlm_rsb *r = NULL;
699         int our_nodeid = dlm_our_nodeid();
700         int from_local = 0;
701         int from_other = 0;
702         int from_dir = 0;
703         int create = 0;
704         int error;
705 
706         if (flags & R_RECEIVE_REQUEST) {
707                 if (from_nodeid == dir_nodeid)
708                         from_dir = 1;
709                 else
710                         from_other = 1;
711         } else if (flags & R_REQUEST) {
712                 from_local = 1;
713         }
714 
715         /*
716          * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
717          * from_nodeid has sent us a lock in dlm_recover_locks, believing
718          * we're the new master.  Our local recovery may not have set
719          * res_master_nodeid to our_nodeid yet, so allow either.  Don't
720          * create the rsb; dlm_recover_process_copy() will handle EBADR
721          * by resending.
722          *
723          * If someone sends us a request, we are the dir node, and we do
724          * not find the rsb anywhere, then recreate it.  This happens if
725          * someone sends us a request after we have removed/freed an rsb.
726          * (They sent a request instead of lookup because they are using
727          * an rsb taken from their scan list.)
728          */
729 
730         if (from_local || from_dir ||
731             (from_other && (dir_nodeid == our_nodeid))) {
732                 create = 1;
733         }
734 
735  retry:
736 
737         /* check if the rsb is active under read lock - likely path */
738         read_lock_bh(&ls->ls_rsbtbl_lock);
739         error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
740         if (error) {
741                 read_unlock_bh(&ls->ls_rsbtbl_lock);
742                 goto do_new;
743         }
744         
745         /*
746          * rsb is active, so we can't check master_nodeid without lock_rsb.
747          */
748 
749         if (rsb_flag(r, RSB_INACTIVE)) {
750                 read_unlock_bh(&ls->ls_rsbtbl_lock);
751                 goto do_inactive;
752         }
753 
754         kref_get(&r->res_ref);
755         read_unlock_bh(&ls->ls_rsbtbl_lock);
756         goto out;
757 
758 
759  do_inactive:
760         write_lock_bh(&ls->ls_rsbtbl_lock);
761 
762         /*
763          * The expectation here is that the rsb will have HASHED and
764          * INACTIVE flags set, and that the rsb can be moved from
765          * inactive back to active again.  However, between releasing
766          * the read lock and acquiring the write lock, this rsb could
767          * have been removed from rsbtbl, and had HASHED cleared, to
768          * be freed.  To deal with this case, we would normally need
769          * to repeat dlm_search_rsb_tree while holding the write lock,
770          * but rcu allows us to simply check the HASHED flag, because
771          * the rcu read lock means the rsb will not be freed yet.
772          * If the HASHED flag is not set, then the rsb is being freed,
773          * so we add a new rsb struct.  If the HASHED flag is set,
774          * and INACTIVE is not set, it means another thread has
775          * made the rsb active, as we're expecting to do here, and
776          * we just repeat the lookup (this will be very unlikely.)
777          */
778         if (rsb_flag(r, RSB_HASHED)) {
779                 if (!rsb_flag(r, RSB_INACTIVE)) {
780                         write_unlock_bh(&ls->ls_rsbtbl_lock);
781                         goto retry;
782                 }
783         } else {
784                 write_unlock_bh(&ls->ls_rsbtbl_lock);
785                 goto do_new;
786         }
787 
788         /*
789          * rsb found inactive (master_nodeid may be out of date unless
790          * we are the dir_nodeid or were the master)  No other thread
791          * is using this rsb because it's inactive, so we can
792          * look at or update res_master_nodeid without lock_rsb.
793          */
794 
795         if ((r->res_master_nodeid != our_nodeid) && from_other) {
796                 /* our rsb was not master, and another node (not the dir node)
797                    has sent us a request */
798                 log_debug(ls, "find_rsb inactive from_other %d master %d dir %d %s",
799                           from_nodeid, r->res_master_nodeid, dir_nodeid,
800                           r->res_name);
801                 write_unlock_bh(&ls->ls_rsbtbl_lock);
802                 error = -ENOTBLK;
803                 goto out;
804         }
805 
806         if ((r->res_master_nodeid != our_nodeid) && from_dir) {
807                 /* don't think this should ever happen */
808                 log_error(ls, "find_rsb inactive from_dir %d master %d",
809                           from_nodeid, r->res_master_nodeid);
810                 dlm_print_rsb(r);
811                 /* fix it and go on */
812                 r->res_master_nodeid = our_nodeid;
813                 r->res_nodeid = 0;
814                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
815                 r->res_first_lkid = 0;
816         }
817 
818         if (from_local && (r->res_master_nodeid != our_nodeid)) {
819                 /* Because we have held no locks on this rsb,
820                    res_master_nodeid could have become stale. */
821                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
822                 r->res_first_lkid = 0;
823         }
824 
825         /* A dir record will not be on the scan list. */
826         if (r->res_dir_nodeid != our_nodeid)
827                 del_scan(ls, r);
828         list_move(&r->res_slow_list, &ls->ls_slow_active);
829         rsb_clear_flag(r, RSB_INACTIVE);
830         kref_init(&r->res_ref); /* ref is now used in active state */
831         write_unlock_bh(&ls->ls_rsbtbl_lock);
832 
833         goto out;
834 
835 
836  do_new:
837         /*
838          * rsb not found
839          */
840 
841         if (error == -EBADR && !create)
842                 goto out;
843 
844         error = get_rsb_struct(ls, name, len, &r);
845         if (WARN_ON_ONCE(error))
846                 goto out;
847 
848         r->res_hash = hash;
849         r->res_dir_nodeid = dir_nodeid;
850         kref_init(&r->res_ref);
851 
852         if (from_dir) {
853                 /* want to see how often this happens */
854                 log_debug(ls, "find_rsb new from_dir %d recreate %s",
855                           from_nodeid, r->res_name);
856                 r->res_master_nodeid = our_nodeid;
857                 r->res_nodeid = 0;
858                 goto out_add;
859         }
860 
861         if (from_other && (dir_nodeid != our_nodeid)) {
862                 /* should never happen */
863                 log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
864                           from_nodeid, dir_nodeid, our_nodeid, r->res_name);
865                 dlm_free_rsb(r);
866                 r = NULL;
867                 error = -ENOTBLK;
868                 goto out;
869         }
870 
871         if (from_other) {
872                 log_debug(ls, "find_rsb new from_other %d dir %d %s",
873                           from_nodeid, dir_nodeid, r->res_name);
874         }
875 
876         if (dir_nodeid == our_nodeid) {
877                 /* When we are the dir nodeid, we can set the master
878                    node immediately */
879                 r->res_master_nodeid = our_nodeid;
880                 r->res_nodeid = 0;
881         } else {
882                 /* set_master will send_lookup to dir_nodeid */
883                 r->res_master_nodeid = 0;
884                 r->res_nodeid = -1;
885         }
886 
887  out_add:
888 
889         write_lock_bh(&ls->ls_rsbtbl_lock);
890         error = rsb_insert(r, &ls->ls_rsbtbl);
891         if (error == -EEXIST) {
892                 /* somebody else was faster and it seems the
893                  * rsb exists now, we do a whole relookup
894                  */
895                 write_unlock_bh(&ls->ls_rsbtbl_lock);
896                 dlm_free_rsb(r);
897                 goto retry;
898         } else if (!error) {
899                 list_add(&r->res_slow_list, &ls->ls_slow_active);
900         }
901         write_unlock_bh(&ls->ls_rsbtbl_lock);
902  out:
903         *r_ret = r;
904         return error;
905 }
906 
907 /* During recovery, other nodes can send us new MSTCPY locks (from
908    dlm_recover_locks) before we've made ourself master (in
909    dlm_recover_masters). */
910 
911 static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
912                           uint32_t hash, int dir_nodeid, int from_nodeid,
913                           unsigned int flags, struct dlm_rsb **r_ret)
914 {
915         struct dlm_rsb *r = NULL;
916         int our_nodeid = dlm_our_nodeid();
917         int recover = (flags & R_RECEIVE_RECOVER);
918         int error;
919 
920  retry:
921 
922         /* check if the rsb is in active state under read lock - likely path */
923         read_lock_bh(&ls->ls_rsbtbl_lock);
924         error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
925         if (error) {
926                 read_unlock_bh(&ls->ls_rsbtbl_lock);
927                 goto do_new;
928         }
929 
930         if (rsb_flag(r, RSB_INACTIVE)) {
931                 read_unlock_bh(&ls->ls_rsbtbl_lock);
932                 goto do_inactive;
933         }
934 
935         /*
936          * rsb is active, so we can't check master_nodeid without lock_rsb.
937          */
938 
939         kref_get(&r->res_ref);
940         read_unlock_bh(&ls->ls_rsbtbl_lock);
941 
942         goto out;
943 
944 
945  do_inactive:
946         write_lock_bh(&ls->ls_rsbtbl_lock);
947 
948         /* See comment in find_rsb_dir. */
949         if (rsb_flag(r, RSB_HASHED)) {
950                 if (!rsb_flag(r, RSB_INACTIVE)) {
951                         write_unlock_bh(&ls->ls_rsbtbl_lock);
952                         goto retry;
953                 }
954         } else {
955                 write_unlock_bh(&ls->ls_rsbtbl_lock);
956                 goto do_new;
957         }
958 
959 
960         /*
961          * rsb found inactive. No other thread is using this rsb because
962          * it's inactive, so we can look at or update res_master_nodeid
963          * without lock_rsb.
964          */
965 
966         if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
967                 /* our rsb is not master, and another node has sent us a
968                    request; this should never happen */
969                 log_error(ls, "find_rsb inactive from_nodeid %d master %d dir %d",
970                           from_nodeid, r->res_master_nodeid, dir_nodeid);
971                 dlm_print_rsb(r);
972                 write_unlock_bh(&ls->ls_rsbtbl_lock);
973                 error = -ENOTBLK;
974                 goto out;
975         }
976 
977         if (!recover && (r->res_master_nodeid != our_nodeid) &&
978             (dir_nodeid == our_nodeid)) {
979                 /* our rsb is not master, and we are dir; may as well fix it;
980                    this should never happen */
981                 log_error(ls, "find_rsb inactive our %d master %d dir %d",
982                           our_nodeid, r->res_master_nodeid, dir_nodeid);
983                 dlm_print_rsb(r);
984                 r->res_master_nodeid = our_nodeid;
985                 r->res_nodeid = 0;
986         }
987 
988         list_move(&r->res_slow_list, &ls->ls_slow_active);
989         rsb_clear_flag(r, RSB_INACTIVE);
990         kref_init(&r->res_ref);
991         del_scan(ls, r);
992         write_unlock_bh(&ls->ls_rsbtbl_lock);
993 
994         goto out;
995 
996 
997  do_new:
998         /*
999          * rsb not found
1000          */
1001 
1002         error = get_rsb_struct(ls, name, len, &r);
1003         if (WARN_ON_ONCE(error))
1004                 goto out;
1005 
1006         r->res_hash = hash;
1007         r->res_dir_nodeid = dir_nodeid;
1008         r->res_master_nodeid = dir_nodeid;
1009         r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
1010         kref_init(&r->res_ref);
1011 
1012         write_lock_bh(&ls->ls_rsbtbl_lock);
1013         error = rsb_insert(r, &ls->ls_rsbtbl);
1014         if (error == -EEXIST) {
1015                 /* somebody else was faster and it seems the
1016                  * rsb exists now, we do a whole relookup
1017                  */
1018                 write_unlock_bh(&ls->ls_rsbtbl_lock);
1019                 dlm_free_rsb(r);
1020                 goto retry;
1021         } else if (!error) {
1022                 list_add(&r->res_slow_list, &ls->ls_slow_active);
1023         }
1024         write_unlock_bh(&ls->ls_rsbtbl_lock);
1025 
1026  out:
1027         *r_ret = r;
1028         return error;
1029 }
1030 
1031 /*
1032  * rsb rcu usage
1033  *
1034  * While rcu read lock is held, the rsb cannot be freed,
1035  * which allows a lookup optimization.
1036  *
1037  * Two threads are accessing the same rsb concurrently,
1038  * the first (A) is trying to use the rsb, the second (B)
1039  * is trying to free the rsb.
1040  *
1041  * thread A                 thread B
1042  * (trying to use rsb)      (trying to free rsb)
1043  *
1044  * A1. rcu read lock
1045  * A2. rsbtbl read lock
1046  * A3. look up rsb in rsbtbl
1047  * A4. rsbtbl read unlock
1048  *                          B1. rsbtbl write lock
1049  *                          B2. look up rsb in rsbtbl
1050  *                          B3. remove rsb from rsbtbl
1051  *                          B4. clear rsb HASHED flag
1052  *                          B5. rsbtbl write unlock
1053  *                          B6. begin freeing rsb using rcu...
1054  *
1055  * (rsb is inactive, so try to make it active again)
1056  * A5. read rsb HASHED flag (safe because rsb is not freed yet)
1057  * A6. the rsb HASHED flag is not set, which it means the rsb
1058  *     is being removed from rsbtbl and freed, so don't use it.
1059  * A7. rcu read unlock
1060  *
1061  *                          B7. ...finish freeing rsb using rcu
1062  * A8. create a new rsb
1063  *
1064  * Without the rcu optimization, steps A5-8 would need to do
1065  * an extra rsbtbl lookup:
1066  * A5. rsbtbl write lock
1067  * A6. look up rsb in rsbtbl, not found
1068  * A7. rsbtbl write unlock
1069  * A8. create a new rsb
1070  */
1071 
1072 static int find_rsb(struct dlm_ls *ls, const void *name, int len,
1073                     int from_nodeid, unsigned int flags,
1074                     struct dlm_rsb **r_ret)
1075 {
1076         int dir_nodeid;
1077         uint32_t hash;
1078         int rv;
1079 
1080         if (len > DLM_RESNAME_MAXLEN)
1081                 return -EINVAL;
1082 
1083         hash = jhash(name, len, 0);
1084         dir_nodeid = dlm_hash2nodeid(ls, hash);
1085 
1086         rcu_read_lock();
1087         if (dlm_no_directory(ls))
1088                 rv = find_rsb_nodir(ls, name, len, hash, dir_nodeid,
1089                                       from_nodeid, flags, r_ret);
1090         else
1091                 rv = find_rsb_dir(ls, name, len, hash, dir_nodeid,
1092                                     from_nodeid, flags, r_ret);
1093         rcu_read_unlock();
1094         return rv;
1095 }
1096 
1097 /* we have received a request and found that res_master_nodeid != our_nodeid,
1098    so we need to return an error or make ourself the master */
1099 
1100 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
1101                                   int from_nodeid)
1102 {
1103         if (dlm_no_directory(ls)) {
1104                 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
1105                           from_nodeid, r->res_master_nodeid,
1106                           r->res_dir_nodeid);
1107                 dlm_print_rsb(r);
1108                 return -ENOTBLK;
1109         }
1110 
1111         if (from_nodeid != r->res_dir_nodeid) {
1112                 /* our rsb is not master, and another node (not the dir node)
1113                    has sent us a request.  this is much more common when our
1114                    master_nodeid is zero, so limit debug to non-zero.  */
1115 
1116                 if (r->res_master_nodeid) {
1117                         log_debug(ls, "validate master from_other %d master %d "
1118                                   "dir %d first %x %s", from_nodeid,
1119                                   r->res_master_nodeid, r->res_dir_nodeid,
1120                                   r->res_first_lkid, r->res_name);
1121                 }
1122                 return -ENOTBLK;
1123         } else {
1124                 /* our rsb is not master, but the dir nodeid has sent us a
1125                    request; this could happen with master 0 / res_nodeid -1 */
1126 
1127                 if (r->res_master_nodeid) {
1128                         log_error(ls, "validate master from_dir %d master %d "
1129                                   "first %x %s",
1130                                   from_nodeid, r->res_master_nodeid,
1131                                   r->res_first_lkid, r->res_name);
1132                 }
1133 
1134                 r->res_master_nodeid = dlm_our_nodeid();
1135                 r->res_nodeid = 0;
1136                 return 0;
1137         }
1138 }
1139 
1140 static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid,
1141                                 int from_nodeid, bool is_inactive, unsigned int flags,
1142                                 int *r_nodeid, int *result)
1143 {
1144         int fix_master = (flags & DLM_LU_RECOVER_MASTER);
1145         int from_master = (flags & DLM_LU_RECOVER_DIR);
1146 
1147         if (r->res_dir_nodeid != our_nodeid) {
1148                 /* should not happen, but may as well fix it and carry on */
1149                 log_error(ls, "%s res_dir %d our %d %s", __func__,
1150                           r->res_dir_nodeid, our_nodeid, r->res_name);
1151                 r->res_dir_nodeid = our_nodeid;
1152         }
1153 
1154         if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
1155                 /* Recovery uses this function to set a new master when
1156                  * the previous master failed.  Setting NEW_MASTER will
1157                  * force dlm_recover_masters to call recover_master on this
1158                  * rsb even though the res_nodeid is no longer removed.
1159                  */
1160 
1161                 r->res_master_nodeid = from_nodeid;
1162                 r->res_nodeid = from_nodeid;
1163                 rsb_set_flag(r, RSB_NEW_MASTER);
1164 
1165                 if (is_inactive) {
1166                         /* I don't think we should ever find it inactive. */
1167                         log_error(ls, "%s fix_master inactive", __func__);
1168                         dlm_dump_rsb(r);
1169                 }
1170         }
1171 
1172         if (from_master && (r->res_master_nodeid != from_nodeid)) {
1173                 /* this will happen if from_nodeid became master during
1174                  * a previous recovery cycle, and we aborted the previous
1175                  * cycle before recovering this master value
1176                  */
1177 
1178                 log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s",
1179                           __func__, from_nodeid, r->res_master_nodeid,
1180                           r->res_nodeid, r->res_first_lkid, r->res_name);
1181 
1182                 if (r->res_master_nodeid == our_nodeid) {
1183                         log_error(ls, "from_master %d our_master", from_nodeid);
1184                         dlm_dump_rsb(r);
1185                         goto ret_assign;
1186                 }
1187 
1188                 r->res_master_nodeid = from_nodeid;
1189                 r->res_nodeid = from_nodeid;
1190                 rsb_set_flag(r, RSB_NEW_MASTER);
1191         }
1192 
1193         if (!r->res_master_nodeid) {
1194                 /* this will happen if recovery happens while we're looking
1195                  * up the master for this rsb
1196                  */
1197 
1198                 log_debug(ls, "%s master 0 to %d first %x %s", __func__,
1199                           from_nodeid, r->res_first_lkid, r->res_name);
1200                 r->res_master_nodeid = from_nodeid;
1201                 r->res_nodeid = from_nodeid;
1202         }
1203 
1204         if (!from_master && !fix_master &&
1205             (r->res_master_nodeid == from_nodeid)) {
1206                 /* this can happen when the master sends remove, the dir node
1207                  * finds the rsb on the active list and ignores the remove,
1208                  * and the former master sends a lookup
1209                  */
1210 
1211                 log_limit(ls, "%s from master %d flags %x first %x %s",
1212                           __func__, from_nodeid, flags, r->res_first_lkid,
1213                           r->res_name);
1214         }
1215 
1216  ret_assign:
1217         *r_nodeid = r->res_master_nodeid;
1218         if (result)
1219                 *result = DLM_LU_MATCH;
1220 }
1221 
1222 /*
1223  * We're the dir node for this res and another node wants to know the
1224  * master nodeid.  During normal operation (non recovery) this is only
1225  * called from receive_lookup(); master lookups when the local node is
1226  * the dir node are done by find_rsb().
1227  *
1228  * normal operation, we are the dir node for a resource
1229  * . _request_lock
1230  * . set_master
1231  * . send_lookup
1232  * . receive_lookup
1233  * . dlm_master_lookup flags 0
1234  *
1235  * recover directory, we are rebuilding dir for all resources
1236  * . dlm_recover_directory
1237  * . dlm_rcom_names
1238  *   remote node sends back the rsb names it is master of and we are dir of
1239  * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
1240  *   we either create new rsb setting remote node as master, or find existing
1241  *   rsb and set master to be the remote node.
1242  *
1243  * recover masters, we are finding the new master for resources
1244  * . dlm_recover_masters
1245  * . recover_master
1246  * . dlm_send_rcom_lookup
1247  * . receive_rcom_lookup
1248  * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
1249  */
1250 
1251 static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
1252                               int len, unsigned int flags, int *r_nodeid, int *result)
1253 {
1254         struct dlm_rsb *r = NULL;
1255         uint32_t hash;
1256         int our_nodeid = dlm_our_nodeid();
1257         int dir_nodeid, error;
1258 
1259         if (len > DLM_RESNAME_MAXLEN)
1260                 return -EINVAL;
1261 
1262         if (from_nodeid == our_nodeid) {
1263                 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
1264                           our_nodeid, flags);
1265                 return -EINVAL;
1266         }
1267 
1268         hash = jhash(name, len, 0);
1269         dir_nodeid = dlm_hash2nodeid(ls, hash);
1270         if (dir_nodeid != our_nodeid) {
1271                 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
1272                           from_nodeid, dir_nodeid, our_nodeid, hash,
1273                           ls->ls_num_nodes);
1274                 *r_nodeid = -1;
1275                 return -EINVAL;
1276         }
1277 
1278  retry:
1279 
1280         /* check if the rsb is active under read lock - likely path */
1281         read_lock_bh(&ls->ls_rsbtbl_lock);
1282         error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1283         if (!error) {
1284                 if (rsb_flag(r, RSB_INACTIVE)) {
1285                         read_unlock_bh(&ls->ls_rsbtbl_lock);
1286                         goto do_inactive;
1287                 }
1288 
1289                 /* because the rsb is active, we need to lock_rsb before
1290                  * checking/changing re_master_nodeid
1291                  */
1292 
1293                 hold_rsb(r);
1294                 read_unlock_bh(&ls->ls_rsbtbl_lock);
1295                 lock_rsb(r);
1296 
1297                 __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
1298                                     flags, r_nodeid, result);
1299 
1300                 /* the rsb was active */
1301                 unlock_rsb(r);
1302                 put_rsb(r);
1303 
1304                 return 0;
1305         } else {
1306                 read_unlock_bh(&ls->ls_rsbtbl_lock);
1307                 goto not_found;
1308         }
1309 
1310  do_inactive:
1311         /* unlikely path - relookup under write */
1312         write_lock_bh(&ls->ls_rsbtbl_lock);
1313 
1314         error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1315         if (!error) {
1316                 if (!rsb_flag(r, RSB_INACTIVE)) {
1317                         write_unlock_bh(&ls->ls_rsbtbl_lock);
1318                         /* something as changed, very unlikely but
1319                          * try again
1320                          */
1321                         goto retry;
1322                 }
1323         } else {
1324                 write_unlock_bh(&ls->ls_rsbtbl_lock);
1325                 goto not_found;
1326         }
1327 
1328         /* because the rsb is inactive, it's not refcounted and lock_rsb
1329            is not used, but is protected by the rsbtbl lock */
1330 
1331         __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
1332                             r_nodeid, result);
1333 
1334         /* A dir record rsb should never be on scan list. */
1335         /* Try to fix this with del_scan? */
1336         WARN_ON(!list_empty(&r->res_scan_list));
1337 
1338         write_unlock_bh(&ls->ls_rsbtbl_lock);
1339 
1340         return 0;
1341 
1342  not_found:
1343         error = get_rsb_struct(ls, name, len, &r);
1344         if (WARN_ON_ONCE(error))
1345                 goto out;
1346 
1347         r->res_hash = hash;
1348         r->res_dir_nodeid = our_nodeid;
1349         r->res_master_nodeid = from_nodeid;
1350         r->res_nodeid = from_nodeid;
1351         rsb_set_flag(r, RSB_INACTIVE);
1352 
1353         write_lock_bh(&ls->ls_rsbtbl_lock);
1354         error = rsb_insert(r, &ls->ls_rsbtbl);
1355         if (error == -EEXIST) {
1356                 /* somebody else was faster and it seems the
1357                  * rsb exists now, we do a whole relookup
1358                  */
1359                 write_unlock_bh(&ls->ls_rsbtbl_lock);
1360                 dlm_free_rsb(r);
1361                 goto retry;
1362         } else if (error) {
1363                 write_unlock_bh(&ls->ls_rsbtbl_lock);
1364                 /* should never happen */
1365                 dlm_free_rsb(r);
1366                 goto retry;
1367         }
1368 
1369         list_add(&r->res_slow_list, &ls->ls_slow_inactive);
1370         write_unlock_bh(&ls->ls_rsbtbl_lock);
1371 
1372         if (result)
1373                 *result = DLM_LU_ADD;
1374         *r_nodeid = from_nodeid;
1375  out:
1376         return error;
1377 }
1378 
1379 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
1380                       int len, unsigned int flags, int *r_nodeid, int *result)
1381 {
1382         int rv;
1383         rcu_read_lock();
1384         rv = _dlm_master_lookup(ls, from_nodeid, name, len, flags, r_nodeid, result);
1385         rcu_read_unlock();
1386         return rv;
1387 }
1388 
1389 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1390 {
1391         struct dlm_rsb *r;
1392 
1393         read_lock_bh(&ls->ls_rsbtbl_lock);
1394         list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
1395                 if (r->res_hash == hash)
1396                         dlm_dump_rsb(r);
1397         }
1398         read_unlock_bh(&ls->ls_rsbtbl_lock);
1399 }
1400 
1401 void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
1402 {
1403         struct dlm_rsb *r = NULL;
1404         int error;
1405 
1406         read_lock_bh(&ls->ls_rsbtbl_lock);
1407         error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1408         if (!error)
1409                 goto out;
1410 
1411         dlm_dump_rsb(r);
1412  out:
1413         read_unlock_bh(&ls->ls_rsbtbl_lock);
1414 }
1415 
1416 static void deactivate_rsb(struct kref *kref)
1417 {
1418         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1419         struct dlm_ls *ls = r->res_ls;
1420         int our_nodeid = dlm_our_nodeid();
1421 
1422         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1423         rsb_set_flag(r, RSB_INACTIVE);
1424         list_move(&r->res_slow_list, &ls->ls_slow_inactive);
1425 
1426         /*
1427          * When the rsb becomes unused:
1428          * - If it's not a dir record for a remote master rsb,
1429          *   then it is put on the scan list to be freed.
1430          * - If it's a dir record for a remote master rsb,
1431          *   then it is kept in the inactive state until
1432          *   receive_remove() from the master node.
1433          */
1434         if (!dlm_no_directory(ls) &&
1435             (r->res_master_nodeid != our_nodeid) &&
1436             (dlm_dir_nodeid(r) != our_nodeid))
1437                 add_scan(ls, r);
1438 
1439         if (r->res_lvbptr) {
1440                 dlm_free_lvb(r->res_lvbptr);
1441                 r->res_lvbptr = NULL;
1442         }
1443 }
1444 
1445 /* See comment for unhold_lkb */
1446 
1447 static void unhold_rsb(struct dlm_rsb *r)
1448 {
1449         int rv;
1450 
1451         /* inactive rsbs are not ref counted */
1452         WARN_ON(rsb_flag(r, RSB_INACTIVE));
1453         rv = kref_put(&r->res_ref, deactivate_rsb);
1454         DLM_ASSERT(!rv, dlm_dump_rsb(r););
1455 }
1456 
1457 void free_inactive_rsb(struct dlm_rsb *r)
1458 {
1459         WARN_ON_ONCE(!rsb_flag(r, RSB_INACTIVE));
1460 
1461         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1462         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1463         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1464         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1465         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1466         DLM_ASSERT(list_empty(&r->res_scan_list), dlm_dump_rsb(r););
1467         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1468         DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r););
1469 
1470         dlm_free_rsb(r);
1471 }
1472 
1473 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1474    The rsb must exist as long as any lkb's for it do. */
1475 
1476 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1477 {
1478         hold_rsb(r);
1479         lkb->lkb_resource = r;
1480 }
1481 
1482 static void detach_lkb(struct dlm_lkb *lkb)
1483 {
1484         if (lkb->lkb_resource) {
1485                 put_rsb(lkb->lkb_resource);
1486                 lkb->lkb_resource = NULL;
1487         }
1488 }
1489 
1490 static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
1491                        unsigned long start, unsigned long end)
1492 {
1493         struct xa_limit limit;
1494         struct dlm_lkb *lkb;
1495         int rv;
1496 
1497         limit.max = end;
1498         limit.min = start;
1499 
1500         lkb = dlm_allocate_lkb(ls);
1501         if (!lkb)
1502                 return -ENOMEM;
1503 
1504         lkb->lkb_last_bast_cb_mode = DLM_LOCK_IV;
1505         lkb->lkb_last_cast_cb_mode = DLM_LOCK_IV;
1506         lkb->lkb_last_cb_mode = DLM_LOCK_IV;
1507         lkb->lkb_nodeid = -1;
1508         lkb->lkb_grmode = DLM_LOCK_IV;
1509         kref_init(&lkb->lkb_ref);
1510         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1511         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1512 
1513         write_lock_bh(&ls->ls_lkbxa_lock);
1514         rv = xa_alloc(&ls->ls_lkbxa, &lkb->lkb_id, lkb, limit, GFP_ATOMIC);
1515         write_unlock_bh(&ls->ls_lkbxa_lock);
1516 
1517         if (rv < 0) {
1518                 log_error(ls, "create_lkb xa error %d", rv);
1519                 dlm_free_lkb(lkb);
1520                 return rv;
1521         }
1522 
1523         *lkb_ret = lkb;
1524         return 0;
1525 }
1526 
1527 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1528 {
1529         return _create_lkb(ls, lkb_ret, 1, ULONG_MAX);
1530 }
1531 
1532 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1533 {
1534         struct dlm_lkb *lkb;
1535 
1536         read_lock_bh(&ls->ls_lkbxa_lock);
1537         lkb = xa_load(&ls->ls_lkbxa, lkid);
1538         if (lkb)
1539                 kref_get(&lkb->lkb_ref);
1540         read_unlock_bh(&ls->ls_lkbxa_lock);
1541 
1542         *lkb_ret = lkb;
1543         return lkb ? 0 : -ENOENT;
1544 }
1545 
1546 static void kill_lkb(struct kref *kref)
1547 {
1548         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1549 
1550         /* All work is done after the return from kref_put() so we
1551            can release the write_lock before the detach_lkb */
1552 
1553         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1554 }
1555 
1556 /* __put_lkb() is used when an lkb may not have an rsb attached to
1557    it so we need to provide the lockspace explicitly */
1558 
1559 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1560 {
1561         uint32_t lkid = lkb->lkb_id;
1562         int rv;
1563 
1564         rv = dlm_kref_put_write_lock_bh(&lkb->lkb_ref, kill_lkb,
1565                                         &ls->ls_lkbxa_lock);
1566         if (rv) {
1567                 xa_erase(&ls->ls_lkbxa, lkid);
1568                 write_unlock_bh(&ls->ls_lkbxa_lock);
1569 
1570                 detach_lkb(lkb);
1571 
1572                 /* for local/process lkbs, lvbptr points to caller's lksb */
1573                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
1574                         dlm_free_lvb(lkb->lkb_lvbptr);
1575                 dlm_free_lkb(lkb);
1576         }
1577 
1578         return rv;
1579 }
1580 
1581 int dlm_put_lkb(struct dlm_lkb *lkb)
1582 {
1583         struct dlm_ls *ls;
1584 
1585         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1586         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1587 
1588         ls = lkb->lkb_resource->res_ls;
1589         return __put_lkb(ls, lkb);
1590 }
1591 
1592 /* This is only called to add a reference when the code already holds
1593    a valid reference to the lkb, so there's no need for locking. */
1594 
1595 static inline void hold_lkb(struct dlm_lkb *lkb)
1596 {
1597         kref_get(&lkb->lkb_ref);
1598 }
1599 
1600 static void unhold_lkb_assert(struct kref *kref)
1601 {
1602         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1603 
1604         DLM_ASSERT(false, dlm_print_lkb(lkb););
1605 }
1606 
1607 /* This is called when we need to remove a reference and are certain
1608    it's not the last ref.  e.g. del_lkb is always called between a
1609    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1610    put_lkb would work fine, but would involve unnecessary locking */
1611 
1612 static inline void unhold_lkb(struct dlm_lkb *lkb)
1613 {
1614         kref_put(&lkb->lkb_ref, unhold_lkb_assert);
1615 }
1616 
1617 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1618                             int mode)
1619 {
1620         struct dlm_lkb *lkb = NULL, *iter;
1621 
1622         list_for_each_entry(iter, head, lkb_statequeue)
1623                 if (iter->lkb_rqmode < mode) {
1624                         lkb = iter;
1625                         list_add_tail(new, &iter->lkb_statequeue);
1626                         break;
1627                 }
1628 
1629         if (!lkb)
1630                 list_add_tail(new, head);
1631 }
1632 
1633 /* add/remove lkb to rsb's grant/convert/wait queue */
1634 
1635 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1636 {
1637         kref_get(&lkb->lkb_ref);
1638 
1639         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1640 
1641         lkb->lkb_timestamp = ktime_get();
1642 
1643         lkb->lkb_status = status;
1644 
1645         switch (status) {
1646         case DLM_LKSTS_WAITING:
1647                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1648                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1649                 else
1650                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1651                 break;
1652         case DLM_LKSTS_GRANTED:
1653                 /* convention says granted locks kept in order of grmode */
1654                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1655                                 lkb->lkb_grmode);
1656                 break;
1657         case DLM_LKSTS_CONVERT:
1658                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1659                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1660                 else
1661                         list_add_tail(&lkb->lkb_statequeue,
1662                                       &r->res_convertqueue);
1663                 break;
1664         default:
1665                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1666         }
1667 }
1668 
1669 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1670 {
1671         lkb->lkb_status = 0;
1672         list_del(&lkb->lkb_statequeue);
1673         unhold_lkb(lkb);
1674 }
1675 
1676 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1677 {
1678         hold_lkb(lkb);
1679         del_lkb(r, lkb);
1680         add_lkb(r, lkb, sts);
1681         unhold_lkb(lkb);
1682 }
1683 
1684 static int msg_reply_type(int mstype)
1685 {
1686         switch (mstype) {
1687         case DLM_MSG_REQUEST:
1688                 return DLM_MSG_REQUEST_REPLY;
1689         case DLM_MSG_CONVERT:
1690                 return DLM_MSG_CONVERT_REPLY;
1691         case DLM_MSG_UNLOCK:
1692                 return DLM_MSG_UNLOCK_REPLY;
1693         case DLM_MSG_CANCEL:
1694                 return DLM_MSG_CANCEL_REPLY;
1695         case DLM_MSG_LOOKUP:
1696                 return DLM_MSG_LOOKUP_REPLY;
1697         }
1698         return -1;
1699 }
1700 
1701 /* add/remove lkb from global waiters list of lkb's waiting for
1702    a reply from a remote node */
1703 
1704 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1705 {
1706         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1707         int error = 0;
1708 
1709         spin_lock_bh(&ls->ls_waiters_lock);
1710 
1711         if (is_overlap_unlock(lkb) ||
1712             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
1713                 error = -EINVAL;
1714                 goto out;
1715         }
1716 
1717         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1718                 switch (mstype) {
1719                 case DLM_MSG_UNLOCK:
1720                         set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
1721                         break;
1722                 case DLM_MSG_CANCEL:
1723                         set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
1724                         break;
1725                 default:
1726                         error = -EBUSY;
1727                         goto out;
1728                 }
1729                 lkb->lkb_wait_count++;
1730                 hold_lkb(lkb);
1731 
1732                 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1733                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
1734                           lkb->lkb_wait_count, dlm_iflags_val(lkb));
1735                 goto out;
1736         }
1737 
1738         DLM_ASSERT(!lkb->lkb_wait_count,
1739                    dlm_print_lkb(lkb);
1740                    printk("wait_count %d\n", lkb->lkb_wait_count););
1741 
1742         lkb->lkb_wait_count++;
1743         lkb->lkb_wait_type = mstype;
1744         lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1745         hold_lkb(lkb);
1746         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1747  out:
1748         if (error)
1749                 log_error(ls, "addwait error %x %d flags %x %d %d %s",
1750                           lkb->lkb_id, error, dlm_iflags_val(lkb), mstype,
1751                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1752         spin_unlock_bh(&ls->ls_waiters_lock);
1753         return error;
1754 }
1755 
1756 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1757    list as part of process_requestqueue (e.g. a lookup that has an optimized
1758    request reply on the requestqueue) between dlm_recover_waiters_pre() which
1759    set RESEND and dlm_recover_waiters_post() */
1760 
1761 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1762                                 const struct dlm_message *ms)
1763 {
1764         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1765         int overlap_done = 0;
1766 
1767         if (mstype == DLM_MSG_UNLOCK_REPLY &&
1768             test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
1769                 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1770                 overlap_done = 1;
1771                 goto out_del;
1772         }
1773 
1774         if (mstype == DLM_MSG_CANCEL_REPLY &&
1775             test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1776                 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1777                 overlap_done = 1;
1778                 goto out_del;
1779         }
1780 
1781         /* Cancel state was preemptively cleared by a successful convert,
1782            see next comment, nothing to do. */
1783 
1784         if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1785             (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1786                 log_debug(ls, "remwait %x cancel_reply wait_type %d",
1787                           lkb->lkb_id, lkb->lkb_wait_type);
1788                 return -1;
1789         }
1790 
1791         /* Remove for the convert reply, and premptively remove for the
1792            cancel reply.  A convert has been granted while there's still
1793            an outstanding cancel on it (the cancel is moot and the result
1794            in the cancel reply should be 0).  We preempt the cancel reply
1795            because the app gets the convert result and then can follow up
1796            with another op, like convert.  This subsequent op would see the
1797            lingering state of the cancel and fail with -EBUSY. */
1798 
1799         if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1800             (lkb->lkb_wait_type == DLM_MSG_CONVERT) && ms && !ms->m_result &&
1801             test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1802                 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1803                           lkb->lkb_id);
1804                 lkb->lkb_wait_type = 0;
1805                 lkb->lkb_wait_count--;
1806                 unhold_lkb(lkb);
1807                 goto out_del;
1808         }
1809 
1810         /* N.B. type of reply may not always correspond to type of original
1811            msg due to lookup->request optimization, verify others? */
1812 
1813         if (lkb->lkb_wait_type) {
1814                 lkb->lkb_wait_type = 0;
1815                 goto out_del;
1816         }
1817 
1818         log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1819                   lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0,
1820                   lkb->lkb_remid, mstype, dlm_iflags_val(lkb));
1821         return -1;
1822 
1823  out_del:
1824         /* the force-unlock/cancel has completed and we haven't recvd a reply
1825            to the op that was in progress prior to the unlock/cancel; we
1826            give up on any reply to the earlier op.  FIXME: not sure when/how
1827            this would happen */
1828 
1829         if (overlap_done && lkb->lkb_wait_type) {
1830                 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1831                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
1832                 lkb->lkb_wait_count--;
1833                 unhold_lkb(lkb);
1834                 lkb->lkb_wait_type = 0;
1835         }
1836 
1837         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1838 
1839         clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
1840         lkb->lkb_wait_count--;
1841         if (!lkb->lkb_wait_count)
1842                 list_del_init(&lkb->lkb_wait_reply);
1843         unhold_lkb(lkb);
1844         return 0;
1845 }
1846 
1847 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1848 {
1849         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1850         int error;
1851 
1852         spin_lock_bh(&ls->ls_waiters_lock);
1853         error = _remove_from_waiters(lkb, mstype, NULL);
1854         spin_unlock_bh(&ls->ls_waiters_lock);
1855         return error;
1856 }
1857 
1858 /* Handles situations where we might be processing a "fake" or "local" reply in
1859  * the recovery context which stops any locking activity. Only debugfs might
1860  * change the lockspace waiters but they will held the recovery lock to ensure
1861  * remove_from_waiters_ms() in local case will be the only user manipulating the
1862  * lockspace waiters in recovery context.
1863  */
1864 
1865 static int remove_from_waiters_ms(struct dlm_lkb *lkb,
1866                                   const struct dlm_message *ms, bool local)
1867 {
1868         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1869         int error;
1870 
1871         if (!local)
1872                 spin_lock_bh(&ls->ls_waiters_lock);
1873         else
1874                 WARN_ON_ONCE(!rwsem_is_locked(&ls->ls_in_recovery) ||
1875                              !dlm_locking_stopped(ls));
1876         error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
1877         if (!local)
1878                 spin_unlock_bh(&ls->ls_waiters_lock);
1879         return error;
1880 }
1881 
1882 /* lkb is master or local copy */
1883 
1884 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1885 {
1886         int b, len = r->res_ls->ls_lvblen;
1887 
1888         /* b=1 lvb returned to caller
1889            b=0 lvb written to rsb or invalidated
1890            b=-1 do nothing */
1891 
1892         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1893 
1894         if (b == 1) {
1895                 if (!lkb->lkb_lvbptr)
1896                         return;
1897 
1898                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1899                         return;
1900 
1901                 if (!r->res_lvbptr)
1902                         return;
1903 
1904                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1905                 lkb->lkb_lvbseq = r->res_lvbseq;
1906 
1907         } else if (b == 0) {
1908                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1909                         rsb_set_flag(r, RSB_VALNOTVALID);
1910                         return;
1911                 }
1912 
1913                 if (!lkb->lkb_lvbptr)
1914                         return;
1915 
1916                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1917                         return;
1918 
1919                 if (!r->res_lvbptr)
1920                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1921 
1922                 if (!r->res_lvbptr)
1923                         return;
1924 
1925                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1926                 r->res_lvbseq++;
1927                 lkb->lkb_lvbseq = r->res_lvbseq;
1928                 rsb_clear_flag(r, RSB_VALNOTVALID);
1929         }
1930 
1931         if (rsb_flag(r, RSB_VALNOTVALID))
1932                 set_bit(DLM_SBF_VALNOTVALID_BIT, &lkb->lkb_sbflags);
1933 }
1934 
1935 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1936 {
1937         if (lkb->lkb_grmode < DLM_LOCK_PW)
1938                 return;
1939 
1940         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1941                 rsb_set_flag(r, RSB_VALNOTVALID);
1942                 return;
1943         }
1944 
1945         if (!lkb->lkb_lvbptr)
1946                 return;
1947 
1948         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1949                 return;
1950 
1951         if (!r->res_lvbptr)
1952                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1953 
1954         if (!r->res_lvbptr)
1955                 return;
1956 
1957         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1958         r->res_lvbseq++;
1959         rsb_clear_flag(r, RSB_VALNOTVALID);
1960 }
1961 
1962 /* lkb is process copy (pc) */
1963 
1964 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1965                             const struct dlm_message *ms)
1966 {
1967         int b;
1968 
1969         if (!lkb->lkb_lvbptr)
1970                 return;
1971 
1972         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1973                 return;
1974 
1975         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1976         if (b == 1) {
1977                 int len = receive_extralen(ms);
1978                 if (len > r->res_ls->ls_lvblen)
1979                         len = r->res_ls->ls_lvblen;
1980                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1981                 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
1982         }
1983 }
1984 
1985 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1986    remove_lock -- used for unlock, removes lkb from granted
1987    revert_lock -- used for cancel, moves lkb from convert to granted
1988    grant_lock  -- used for request and convert, adds lkb to granted or
1989                   moves lkb from convert or waiting to granted
1990 
1991    Each of these is used for master or local copy lkb's.  There is
1992    also a _pc() variation used to make the corresponding change on
1993    a process copy (pc) lkb. */
1994 
1995 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1996 {
1997         del_lkb(r, lkb);
1998         lkb->lkb_grmode = DLM_LOCK_IV;
1999         /* this unhold undoes the original ref from create_lkb()
2000            so this leads to the lkb being freed */
2001         unhold_lkb(lkb);
2002 }
2003 
2004 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2005 {
2006         set_lvb_unlock(r, lkb);
2007         _remove_lock(r, lkb);
2008 }
2009 
2010 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2011 {
2012         _remove_lock(r, lkb);
2013 }
2014 
2015 /* returns: 0 did nothing
2016             1 moved lock to granted
2017            -1 removed lock */
2018 
2019 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2020 {
2021         int rv = 0;
2022 
2023         lkb->lkb_rqmode = DLM_LOCK_IV;
2024 
2025         switch (lkb->lkb_status) {
2026         case DLM_LKSTS_GRANTED:
2027                 break;
2028         case DLM_LKSTS_CONVERT:
2029                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2030                 rv = 1;
2031                 break;
2032         case DLM_LKSTS_WAITING:
2033                 del_lkb(r, lkb);
2034                 lkb->lkb_grmode = DLM_LOCK_IV;
2035                 /* this unhold undoes the original ref from create_lkb()
2036                    so this leads to the lkb being freed */
2037                 unhold_lkb(lkb);
2038                 rv = -1;
2039                 break;
2040         default:
2041                 log_print("invalid status for revert %d", lkb->lkb_status);
2042         }
2043         return rv;
2044 }
2045 
2046 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2047 {
2048         return revert_lock(r, lkb);
2049 }
2050 
2051 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2052 {
2053         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2054                 lkb->lkb_grmode = lkb->lkb_rqmode;
2055                 if (lkb->lkb_status)
2056                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2057                 else
2058                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2059         }
2060 
2061         lkb->lkb_rqmode = DLM_LOCK_IV;
2062         lkb->lkb_highbast = 0;
2063 }
2064 
2065 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2066 {
2067         set_lvb_lock(r, lkb);
2068         _grant_lock(r, lkb);
2069 }
2070 
2071 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2072                           const struct dlm_message *ms)
2073 {
2074         set_lvb_lock_pc(r, lkb, ms);
2075         _grant_lock(r, lkb);
2076 }
2077 
2078 /* called by grant_pending_locks() which means an async grant message must
2079    be sent to the requesting node in addition to granting the lock if the
2080    lkb belongs to a remote node. */
2081 
2082 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2083 {
2084         grant_lock(r, lkb);
2085         if (is_master_copy(lkb))
2086                 send_grant(r, lkb);
2087         else
2088                 queue_cast(r, lkb, 0);
2089 }
2090 
2091 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2092    change the granted/requested modes.  We're munging things accordingly in
2093    the process copy.
2094    CONVDEADLK: our grmode may have been forced down to NL to resolve a
2095    conversion deadlock
2096    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2097    compatible with other granted locks */
2098 
2099 static void munge_demoted(struct dlm_lkb *lkb)
2100 {
2101         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2102                 log_print("munge_demoted %x invalid modes gr %d rq %d",
2103                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2104                 return;
2105         }
2106 
2107         lkb->lkb_grmode = DLM_LOCK_NL;
2108 }
2109 
2110 static void munge_altmode(struct dlm_lkb *lkb, const struct dlm_message *ms)
2111 {
2112         if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) &&
2113             ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) {
2114                 log_print("munge_altmode %x invalid reply type %d",
2115                           lkb->lkb_id, le32_to_cpu(ms->m_type));
2116                 return;
2117         }
2118 
2119         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2120                 lkb->lkb_rqmode = DLM_LOCK_PR;
2121         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2122                 lkb->lkb_rqmode = DLM_LOCK_CW;
2123         else {
2124                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2125                 dlm_print_lkb(lkb);
2126         }
2127 }
2128 
2129 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2130 {
2131         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2132                                            lkb_statequeue);
2133         if (lkb->lkb_id == first->lkb_id)
2134                 return 1;
2135 
2136         return 0;
2137 }
2138 
2139 /* Check if the given lkb conflicts with another lkb on the queue. */
2140 
2141 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2142 {
2143         struct dlm_lkb *this;
2144 
2145         list_for_each_entry(this, head, lkb_statequeue) {
2146                 if (this == lkb)
2147                         continue;
2148                 if (!modes_compat(this, lkb))
2149                         return 1;
2150         }
2151         return 0;
2152 }
2153 
2154 /*
2155  * "A conversion deadlock arises with a pair of lock requests in the converting
2156  * queue for one resource.  The granted mode of each lock blocks the requested
2157  * mode of the other lock."
2158  *
2159  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2160  * convert queue from being granted, then deadlk/demote lkb.
2161  *
2162  * Example:
2163  * Granted Queue: empty
2164  * Convert Queue: NL->EX (first lock)
2165  *                PR->EX (second lock)
2166  *
2167  * The first lock can't be granted because of the granted mode of the second
2168  * lock and the second lock can't be granted because it's not first in the
2169  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2170  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2171  * flag set and return DEMOTED in the lksb flags.
2172  *
2173  * Originally, this function detected conv-deadlk in a more limited scope:
2174  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2175  * - if lkb1 was the first entry in the queue (not just earlier), and was
2176  *   blocked by the granted mode of lkb2, and there was nothing on the
2177  *   granted queue preventing lkb1 from being granted immediately, i.e.
2178  *   lkb2 was the only thing preventing lkb1 from being granted.
2179  *
2180  * That second condition meant we'd only say there was conv-deadlk if
2181  * resolving it (by demotion) would lead to the first lock on the convert
2182  * queue being granted right away.  It allowed conversion deadlocks to exist
2183  * between locks on the convert queue while they couldn't be granted anyway.
2184  *
2185  * Now, we detect and take action on conversion deadlocks immediately when
2186  * they're created, even if they may not be immediately consequential.  If
2187  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2188  * mode that would prevent lkb1's conversion from being granted, we do a
2189  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2190  * I think this means that the lkb_is_ahead condition below should always
2191  * be zero, i.e. there will never be conv-deadlk between two locks that are
2192  * both already on the convert queue.
2193  */
2194 
2195 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2196 {
2197         struct dlm_lkb *lkb1;
2198         int lkb_is_ahead = 0;
2199 
2200         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2201                 if (lkb1 == lkb2) {
2202                         lkb_is_ahead = 1;
2203                         continue;
2204                 }
2205 
2206                 if (!lkb_is_ahead) {
2207                         if (!modes_compat(lkb2, lkb1))
2208                                 return 1;
2209                 } else {
2210                         if (!modes_compat(lkb2, lkb1) &&
2211                             !modes_compat(lkb1, lkb2))
2212                                 return 1;
2213                 }
2214         }
2215         return 0;
2216 }
2217 
2218 /*
2219  * Return 1 if the lock can be granted, 0 otherwise.
2220  * Also detect and resolve conversion deadlocks.
2221  *
2222  * lkb is the lock to be granted
2223  *
2224  * now is 1 if the function is being called in the context of the
2225  * immediate request, it is 0 if called later, after the lock has been
2226  * queued.
2227  *
2228  * recover is 1 if dlm_recover_grant() is trying to grant conversions
2229  * after recovery.
2230  *
2231  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2232  */
2233 
2234 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2235                            int recover)
2236 {
2237         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2238 
2239         /*
2240          * 6-10: Version 5.4 introduced an option to address the phenomenon of
2241          * a new request for a NL mode lock being blocked.
2242          *
2243          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2244          * request, then it would be granted.  In essence, the use of this flag
2245          * tells the Lock Manager to expedite theis request by not considering
2246          * what may be in the CONVERTING or WAITING queues...  As of this
2247          * writing, the EXPEDITE flag can be used only with new requests for NL
2248          * mode locks.  This flag is not valid for conversion requests.
2249          *
2250          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
2251          * conversion or used with a non-NL requested mode.  We also know an
2252          * EXPEDITE request is always granted immediately, so now must always
2253          * be 1.  The full condition to grant an expedite request: (now &&
2254          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2255          * therefore be shortened to just checking the flag.
2256          */
2257 
2258         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2259                 return 1;
2260 
2261         /*
2262          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2263          * added to the remaining conditions.
2264          */
2265 
2266         if (queue_conflict(&r->res_grantqueue, lkb))
2267                 return 0;
2268 
2269         /*
2270          * 6-3: By default, a conversion request is immediately granted if the
2271          * requested mode is compatible with the modes of all other granted
2272          * locks
2273          */
2274 
2275         if (queue_conflict(&r->res_convertqueue, lkb))
2276                 return 0;
2277 
2278         /*
2279          * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2280          * locks for a recovered rsb, on which lkb's have been rebuilt.
2281          * The lkb's may have been rebuilt on the queues in a different
2282          * order than they were in on the previous master.  So, granting
2283          * queued conversions in order after recovery doesn't make sense
2284          * since the order hasn't been preserved anyway.  The new order
2285          * could also have created a new "in place" conversion deadlock.
2286          * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2287          * After recovery, there would be no granted locks, and possibly
2288          * NL->EX, PR->EX, an in-place conversion deadlock.)  So, after
2289          * recovery, grant conversions without considering order.
2290          */
2291 
2292         if (conv && recover)
2293                 return 1;
2294 
2295         /*
2296          * 6-5: But the default algorithm for deciding whether to grant or
2297          * queue conversion requests does not by itself guarantee that such
2298          * requests are serviced on a "first come first serve" basis.  This, in
2299          * turn, can lead to a phenomenon known as "indefinate postponement".
2300          *
2301          * 6-7: This issue is dealt with by using the optional QUECVT flag with
2302          * the system service employed to request a lock conversion.  This flag
2303          * forces certain conversion requests to be queued, even if they are
2304          * compatible with the granted modes of other locks on the same
2305          * resource.  Thus, the use of this flag results in conversion requests
2306          * being ordered on a "first come first servce" basis.
2307          *
2308          * DCT: This condition is all about new conversions being able to occur
2309          * "in place" while the lock remains on the granted queue (assuming
2310          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
2311          * doesn't _have_ to go onto the convert queue where it's processed in
2312          * order.  The "now" variable is necessary to distinguish converts
2313          * being received and processed for the first time now, because once a
2314          * convert is moved to the conversion queue the condition below applies
2315          * requiring fifo granting.
2316          */
2317 
2318         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2319                 return 1;
2320 
2321         /*
2322          * Even if the convert is compat with all granted locks,
2323          * QUECVT forces it behind other locks on the convert queue.
2324          */
2325 
2326         if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2327                 if (list_empty(&r->res_convertqueue))
2328                         return 1;
2329                 else
2330                         return 0;
2331         }
2332 
2333         /*
2334          * The NOORDER flag is set to avoid the standard vms rules on grant
2335          * order.
2336          */
2337 
2338         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2339                 return 1;
2340 
2341         /*
2342          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2343          * granted until all other conversion requests ahead of it are granted
2344          * and/or canceled.
2345          */
2346 
2347         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2348                 return 1;
2349 
2350         /*
2351          * 6-4: By default, a new request is immediately granted only if all
2352          * three of the following conditions are satisfied when the request is
2353          * issued:
2354          * - The queue of ungranted conversion requests for the resource is
2355          *   empty.
2356          * - The queue of ungranted new requests for the resource is empty.
2357          * - The mode of the new request is compatible with the most
2358          *   restrictive mode of all granted locks on the resource.
2359          */
2360 
2361         if (now && !conv && list_empty(&r->res_convertqueue) &&
2362             list_empty(&r->res_waitqueue))
2363                 return 1;
2364 
2365         /*
2366          * 6-4: Once a lock request is in the queue of ungranted new requests,
2367          * it cannot be granted until the queue of ungranted conversion
2368          * requests is empty, all ungranted new requests ahead of it are
2369          * granted and/or canceled, and it is compatible with the granted mode
2370          * of the most restrictive lock granted on the resource.
2371          */
2372 
2373         if (!now && !conv && list_empty(&r->res_convertqueue) &&
2374             first_in_list(lkb, &r->res_waitqueue))
2375                 return 1;
2376 
2377         return 0;
2378 }
2379 
2380 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2381                           int recover, int *err)
2382 {
2383         int rv;
2384         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2385         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2386 
2387         if (err)
2388                 *err = 0;
2389 
2390         rv = _can_be_granted(r, lkb, now, recover);
2391         if (rv)
2392                 goto out;
2393 
2394         /*
2395          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2396          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2397          * cancels one of the locks.
2398          */
2399 
2400         if (is_convert && can_be_queued(lkb) &&
2401             conversion_deadlock_detect(r, lkb)) {
2402                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2403                         lkb->lkb_grmode = DLM_LOCK_NL;
2404                         set_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
2405                 } else if (err) {
2406                         *err = -EDEADLK;
2407                 } else {
2408                         log_print("can_be_granted deadlock %x now %d",
2409                                   lkb->lkb_id, now);
2410                         dlm_dump_rsb(r);
2411                 }
2412                 goto out;
2413         }
2414 
2415         /*
2416          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2417          * to grant a request in a mode other than the normal rqmode.  It's a
2418          * simple way to provide a big optimization to applications that can
2419          * use them.
2420          */
2421 
2422         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2423                 alt = DLM_LOCK_PR;
2424         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2425                 alt = DLM_LOCK_CW;
2426 
2427         if (alt) {
2428                 lkb->lkb_rqmode = alt;
2429                 rv = _can_be_granted(r, lkb, now, 0);
2430                 if (rv)
2431                         set_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
2432                 else
2433                         lkb->lkb_rqmode = rqmode;
2434         }
2435  out:
2436         return rv;
2437 }
2438 
2439 /* Returns the highest requested mode of all blocked conversions; sets
2440    cw if there's a blocked conversion to DLM_LOCK_CW. */
2441 
2442 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2443                                  unsigned int *count)
2444 {
2445         struct dlm_lkb *lkb, *s;
2446         int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2447         int hi, demoted, quit, grant_restart, demote_restart;
2448         int deadlk;
2449 
2450         quit = 0;
2451  restart:
2452         grant_restart = 0;
2453         demote_restart = 0;
2454         hi = DLM_LOCK_IV;
2455 
2456         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2457                 demoted = is_demoted(lkb);
2458                 deadlk = 0;
2459 
2460                 if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2461                         grant_lock_pending(r, lkb);
2462                         grant_restart = 1;
2463                         if (count)
2464                                 (*count)++;
2465                         continue;
2466                 }
2467 
2468                 if (!demoted && is_demoted(lkb)) {
2469                         log_print("WARN: pending demoted %x node %d %s",
2470                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2471                         demote_restart = 1;
2472                         continue;
2473                 }
2474 
2475                 if (deadlk) {
2476                         /*
2477                          * If DLM_LKB_NODLKWT flag is set and conversion
2478                          * deadlock is detected, we request blocking AST and
2479                          * down (or cancel) conversion.
2480                          */
2481                         if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
2482                                 if (lkb->lkb_highbast < lkb->lkb_rqmode) {
2483                                         queue_bast(r, lkb, lkb->lkb_rqmode);
2484                                         lkb->lkb_highbast = lkb->lkb_rqmode;
2485                                 }
2486                         } else {
2487                                 log_print("WARN: pending deadlock %x node %d %s",
2488                                           lkb->lkb_id, lkb->lkb_nodeid,
2489                                           r->res_name);
2490                                 dlm_dump_rsb(r);
2491                         }
2492                         continue;
2493                 }
2494 
2495                 hi = max_t(int, lkb->lkb_rqmode, hi);
2496 
2497                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2498                         *cw = 1;
2499         }
2500 
2501         if (grant_restart)
2502                 goto restart;
2503         if (demote_restart && !quit) {
2504                 quit = 1;
2505                 goto restart;
2506         }
2507 
2508         return max_t(int, high, hi);
2509 }
2510 
2511 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2512                               unsigned int *count)
2513 {
2514         struct dlm_lkb *lkb, *s;
2515 
2516         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2517                 if (can_be_granted(r, lkb, 0, 0, NULL)) {
2518                         grant_lock_pending(r, lkb);
2519                         if (count)
2520                                 (*count)++;
2521                 } else {
2522                         high = max_t(int, lkb->lkb_rqmode, high);
2523                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
2524                                 *cw = 1;
2525                 }
2526         }
2527 
2528         return high;
2529 }
2530 
2531 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2532    on either the convert or waiting queue.
2533    high is the largest rqmode of all locks blocked on the convert or
2534    waiting queue. */
2535 
2536 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2537 {
2538         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2539                 if (gr->lkb_highbast < DLM_LOCK_EX)
2540                         return 1;
2541                 return 0;
2542         }
2543 
2544         if (gr->lkb_highbast < high &&
2545             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2546                 return 1;
2547         return 0;
2548 }
2549 
2550 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2551 {
2552         struct dlm_lkb *lkb, *s;
2553         int high = DLM_LOCK_IV;
2554         int cw = 0;
2555 
2556         if (!is_master(r)) {
2557                 log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2558                 dlm_dump_rsb(r);
2559                 return;
2560         }
2561 
2562         high = grant_pending_convert(r, high, &cw, count);
2563         high = grant_pending_wait(r, high, &cw, count);
2564 
2565         if (high == DLM_LOCK_IV)
2566                 return;
2567 
2568         /*
2569          * If there are locks left on the wait/convert queue then send blocking
2570          * ASTs to granted locks based on the largest requested mode (high)
2571          * found above.
2572          */
2573 
2574         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2575                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2576                         if (cw && high == DLM_LOCK_PR &&
2577                             lkb->lkb_grmode == DLM_LOCK_PR)
2578                                 queue_bast(r, lkb, DLM_LOCK_CW);
2579                         else
2580                                 queue_bast(r, lkb, high);
2581                         lkb->lkb_highbast = high;
2582                 }
2583         }
2584 }
2585 
2586 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2587 {
2588         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2589             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2590                 if (gr->lkb_highbast < DLM_LOCK_EX)
2591                         return 1;
2592                 return 0;
2593         }
2594 
2595         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2596                 return 1;
2597         return 0;
2598 }
2599 
2600 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2601                             struct dlm_lkb *lkb)
2602 {
2603         struct dlm_lkb *gr;
2604 
2605         list_for_each_entry(gr, head, lkb_statequeue) {
2606                 /* skip self when sending basts to convertqueue */
2607                 if (gr == lkb)
2608                         continue;
2609                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2610                         queue_bast(r, gr, lkb->lkb_rqmode);
2611                         gr->lkb_highbast = lkb->lkb_rqmode;
2612                 }
2613         }
2614 }
2615 
2616 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2617 {
2618         send_bast_queue(r, &r->res_grantqueue, lkb);
2619 }
2620 
2621 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2622 {
2623         send_bast_queue(r, &r->res_grantqueue, lkb);
2624         send_bast_queue(r, &r->res_convertqueue, lkb);
2625 }
2626 
2627 /* set_master(r, lkb) -- set the master nodeid of a resource
2628 
2629    The purpose of this function is to set the nodeid field in the given
2630    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2631    known, it can just be copied to the lkb and the function will return
2632    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2633    before it can be copied to the lkb.
2634 
2635    When the rsb nodeid is being looked up remotely, the initial lkb
2636    causing the lookup is kept on the ls_waiters list waiting for the
2637    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2638    on the rsb's res_lookup list until the master is verified.
2639 
2640    Return values:
2641    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2642    1: the rsb master is not available and the lkb has been placed on
2643       a wait queue
2644 */
2645 
2646 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2647 {
2648         int our_nodeid = dlm_our_nodeid();
2649 
2650         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2651                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2652                 r->res_first_lkid = lkb->lkb_id;
2653                 lkb->lkb_nodeid = r->res_nodeid;
2654                 return 0;
2655         }
2656 
2657         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2658                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2659                 return 1;
2660         }
2661 
2662         if (r->res_master_nodeid == our_nodeid) {
2663                 lkb->lkb_nodeid = 0;
2664                 return 0;
2665         }
2666 
2667         if (r->res_master_nodeid) {
2668                 lkb->lkb_nodeid = r->res_master_nodeid;
2669                 return 0;
2670         }
2671 
2672         if (dlm_dir_nodeid(r) == our_nodeid) {
2673                 /* This is a somewhat unusual case; find_rsb will usually
2674                    have set res_master_nodeid when dir nodeid is local, but
2675                    there are cases where we become the dir node after we've
2676                    past find_rsb and go through _request_lock again.
2677                    confirm_master() or process_lookup_list() needs to be
2678                    called after this. */
2679                 log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2680                           lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2681                           r->res_name);
2682                 r->res_master_nodeid = our_nodeid;
2683                 r->res_nodeid = 0;
2684                 lkb->lkb_nodeid = 0;
2685                 return 0;
2686         }
2687 
2688         r->res_first_lkid = lkb->lkb_id;
2689         send_lookup(r, lkb);
2690         return 1;
2691 }
2692 
2693 static void process_lookup_list(struct dlm_rsb *r)
2694 {
2695         struct dlm_lkb *lkb, *safe;
2696 
2697         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2698                 list_del_init(&lkb->lkb_rsb_lookup);
2699                 _request_lock(r, lkb);
2700         }
2701 }
2702 
2703 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2704 
2705 static void confirm_master(struct dlm_rsb *r, int error)
2706 {
2707         struct dlm_lkb *lkb;
2708 
2709         if (!r->res_first_lkid)
2710                 return;
2711 
2712         switch (error) {
2713         case 0:
2714         case -EINPROGRESS:
2715                 r->res_first_lkid = 0;
2716                 process_lookup_list(r);
2717                 break;
2718 
2719         case -EAGAIN:
2720         case -EBADR:
2721         case -ENOTBLK:
2722                 /* the remote request failed and won't be retried (it was
2723                    a NOQUEUE, or has been canceled/unlocked); make a waiting
2724                    lkb the first_lkid */
2725 
2726                 r->res_first_lkid = 0;
2727 
2728                 if (!list_empty(&r->res_lookup)) {
2729                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2730                                          lkb_rsb_lookup);
2731                         list_del_init(&lkb->lkb_rsb_lookup);
2732                         r->res_first_lkid = lkb->lkb_id;
2733                         _request_lock(r, lkb);
2734                 }
2735                 break;
2736 
2737         default:
2738                 log_error(r->res_ls, "confirm_master unknown error %d", error);
2739         }
2740 }
2741 
2742 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2743                          int namelen, void (*ast)(void *astparam),
2744                          void *astparam,
2745                          void (*bast)(void *astparam, int mode),
2746                          struct dlm_args *args)
2747 {
2748         int rv = -EINVAL;
2749 
2750         /* check for invalid arg usage */
2751 
2752         if (mode < 0 || mode > DLM_LOCK_EX)
2753                 goto out;
2754 
2755         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2756                 goto out;
2757 
2758         if (flags & DLM_LKF_CANCEL)
2759                 goto out;
2760 
2761         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2762                 goto out;
2763 
2764         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2765                 goto out;
2766 
2767         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2768                 goto out;
2769 
2770         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2771                 goto out;
2772 
2773         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2774                 goto out;
2775 
2776         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2777                 goto out;
2778 
2779         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2780                 goto out;
2781 
2782         if (!ast || !lksb)
2783                 goto out;
2784 
2785         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2786                 goto out;
2787 
2788         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2789                 goto out;
2790 
2791         /* these args will be copied to the lkb in validate_lock_args,
2792            it cannot be done now because when converting locks, fields in
2793            an active lkb cannot be modified before locking the rsb */
2794 
2795         args->flags = flags;
2796         args->astfn = ast;
2797         args->astparam = astparam;
2798         args->bastfn = bast;
2799         args->mode = mode;
2800         args->lksb = lksb;
2801         rv = 0;
2802  out:
2803         return rv;
2804 }
2805 
2806 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2807 {
2808         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2809                       DLM_LKF_FORCEUNLOCK))
2810                 return -EINVAL;
2811 
2812         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2813                 return -EINVAL;
2814 
2815         args->flags = flags;
2816         args->astparam = astarg;
2817         return 0;
2818 }
2819 
2820 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2821                               struct dlm_args *args)
2822 {
2823         int rv = -EBUSY;
2824 
2825         if (args->flags & DLM_LKF_CONVERT) {
2826                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2827                         goto out;
2828 
2829                 /* lock not allowed if there's any op in progress */
2830                 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2831                         goto out;
2832 
2833                 if (is_overlap(lkb))
2834                         goto out;
2835 
2836                 rv = -EINVAL;
2837                 if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
2838                         goto out;
2839 
2840                 if (args->flags & DLM_LKF_QUECVT &&
2841                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2842                         goto out;
2843         }
2844 
2845         lkb->lkb_exflags = args->flags;
2846         dlm_set_sbflags_val(lkb, 0);
2847         lkb->lkb_astfn = args->astfn;
2848         lkb->lkb_astparam = args->astparam;
2849         lkb->lkb_bastfn = args->bastfn;
2850         lkb->lkb_rqmode = args->mode;
2851         lkb->lkb_lksb = args->lksb;
2852         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2853         lkb->lkb_ownpid = (int) current->pid;
2854         rv = 0;
2855  out:
2856         switch (rv) {
2857         case 0:
2858                 break;
2859         case -EINVAL:
2860                 /* annoy the user because dlm usage is wrong */
2861                 WARN_ON(1);
2862                 log_error(ls, "%s %d %x %x %x %d %d %s", __func__,
2863                           rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2864                           lkb->lkb_status, lkb->lkb_wait_type,
2865                           lkb->lkb_resource->res_name);
2866                 break;
2867         default:
2868                 log_debug(ls, "%s %d %x %x %x %d %d %s", __func__,
2869                           rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2870                           lkb->lkb_status, lkb->lkb_wait_type,
2871                           lkb->lkb_resource->res_name);
2872                 break;
2873         }
2874 
2875         return rv;
2876 }
2877 
2878 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2879    for success */
2880 
2881 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2882    because there may be a lookup in progress and it's valid to do
2883    cancel/unlockf on it */
2884 
2885 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2886 {
2887         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2888         int rv = -EBUSY;
2889 
2890         /* normal unlock not allowed if there's any op in progress */
2891         if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) &&
2892             (lkb->lkb_wait_type || lkb->lkb_wait_count))
2893                 goto out;
2894 
2895         /* an lkb may be waiting for an rsb lookup to complete where the
2896            lookup was initiated by another lock */
2897 
2898         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2899                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2900                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2901                         list_del_init(&lkb->lkb_rsb_lookup);
2902                         queue_cast(lkb->lkb_resource, lkb,
2903                                    args->flags & DLM_LKF_CANCEL ?
2904                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2905                         unhold_lkb(lkb); /* undoes create_lkb() */
2906                 }
2907                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2908                 goto out;
2909         }
2910 
2911         rv = -EINVAL;
2912         if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) {
2913                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2914                 dlm_print_lkb(lkb);
2915                 goto out;
2916         }
2917 
2918         /* an lkb may still exist even though the lock is EOL'ed due to a
2919          * cancel, unlock or failed noqueue request; an app can't use these
2920          * locks; return same error as if the lkid had not been found at all
2921          */
2922 
2923         if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) {
2924                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2925                 rv = -ENOENT;
2926                 goto out;
2927         }
2928 
2929         /* cancel not allowed with another cancel/unlock in progress */
2930 
2931         if (args->flags & DLM_LKF_CANCEL) {
2932                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2933                         goto out;
2934 
2935                 if (is_overlap(lkb))
2936                         goto out;
2937 
2938                 if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2939                         set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2940                         rv = -EBUSY;
2941                         goto out;
2942                 }
2943 
2944                 /* there's nothing to cancel */
2945                 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2946                     !lkb->lkb_wait_type) {
2947                         rv = -EBUSY;
2948                         goto out;
2949                 }
2950 
2951                 switch (lkb->lkb_wait_type) {
2952                 case DLM_MSG_LOOKUP:
2953                 case DLM_MSG_REQUEST:
2954                         set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2955                         rv = -EBUSY;
2956                         goto out;
2957                 case DLM_MSG_UNLOCK:
2958                 case DLM_MSG_CANCEL:
2959                         goto out;
2960                 }
2961                 /* add_to_waiters() will set OVERLAP_CANCEL */
2962                 goto out_ok;
2963         }
2964 
2965         /* do we need to allow a force-unlock if there's a normal unlock
2966            already in progress?  in what conditions could the normal unlock
2967            fail such that we'd want to send a force-unlock to be sure? */
2968 
2969         if (args->flags & DLM_LKF_FORCEUNLOCK) {
2970                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2971                         goto out;
2972 
2973                 if (is_overlap_unlock(lkb))
2974                         goto out;
2975 
2976                 if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2977                         set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2978                         rv = -EBUSY;
2979                         goto out;
2980                 }
2981 
2982                 switch (lkb->lkb_wait_type) {
2983                 case DLM_MSG_LOOKUP:
2984                 case DLM_MSG_REQUEST:
2985                         set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2986                         rv = -EBUSY;
2987                         goto out;
2988                 case DLM_MSG_UNLOCK:
2989                         goto out;
2990                 }
2991                 /* add_to_waiters() will set OVERLAP_UNLOCK */
2992         }
2993 
2994  out_ok:
2995         /* an overlapping op shouldn't blow away exflags from other op */
2996         lkb->lkb_exflags |= args->flags;
2997         dlm_set_sbflags_val(lkb, 0);
2998         lkb->lkb_astparam = args->astparam;
2999         rv = 0;
3000  out:
3001         switch (rv) {
3002         case 0:
3003                 break;
3004         case -EINVAL:
3005                 /* annoy the user because dlm usage is wrong */
3006                 WARN_ON(1);
3007                 log_error(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
3008                           lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
3009                           args->flags, lkb->lkb_wait_type,
3010                           lkb->lkb_resource->res_name);
3011                 break;
3012         default:
3013                 log_debug(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
3014                           lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
3015                           args->flags, lkb->lkb_wait_type,
3016                           lkb->lkb_resource->res_name);
3017                 break;
3018         }
3019 
3020         return rv;
3021 }
3022 
3023 /*
3024  * Four stage 4 varieties:
3025  * do_request(), do_convert(), do_unlock(), do_cancel()
3026  * These are called on the master node for the given lock and
3027  * from the central locking logic.
3028  */
3029 
3030 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3031 {
3032         int error = 0;
3033 
3034         if (can_be_granted(r, lkb, 1, 0, NULL)) {
3035                 grant_lock(r, lkb);
3036                 queue_cast(r, lkb, 0);
3037                 goto out;
3038         }
3039 
3040         if (can_be_queued(lkb)) {
3041                 error = -EINPROGRESS;
3042                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3043                 goto out;
3044         }
3045 
3046         error = -EAGAIN;
3047         queue_cast(r, lkb, -EAGAIN);
3048  out:
3049         return error;
3050 }
3051 
3052 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3053                                int error)
3054 {
3055         switch (error) {
3056         case -EAGAIN:
3057                 if (force_blocking_asts(lkb))
3058                         send_blocking_asts_all(r, lkb);
3059                 break;
3060         case -EINPROGRESS:
3061                 send_blocking_asts(r, lkb);
3062                 break;
3063         }
3064 }
3065 
3066 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3067 {
3068         int error = 0;
3069         int deadlk = 0;
3070 
3071         /* changing an existing lock may allow others to be granted */
3072 
3073         if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
3074                 grant_lock(r, lkb);
3075                 queue_cast(r, lkb, 0);
3076                 goto out;
3077         }
3078 
3079         /* can_be_granted() detected that this lock would block in a conversion
3080            deadlock, so we leave it on the granted queue and return EDEADLK in
3081            the ast for the convert. */
3082 
3083         if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
3084                 /* it's left on the granted queue */
3085                 revert_lock(r, lkb);
3086                 queue_cast(r, lkb, -EDEADLK);
3087                 error = -EDEADLK;
3088                 goto out;
3089         }
3090 
3091         /* is_demoted() means the can_be_granted() above set the grmode
3092            to NL, and left us on the granted queue.  This auto-demotion
3093            (due to CONVDEADLK) might mean other locks, and/or this lock, are
3094            now grantable.  We have to try to grant other converting locks
3095            before we try again to grant this one. */
3096 
3097         if (is_demoted(lkb)) {
3098                 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3099                 if (_can_be_granted(r, lkb, 1, 0)) {
3100                         grant_lock(r, lkb);
3101                         queue_cast(r, lkb, 0);
3102                         goto out;
3103                 }
3104                 /* else fall through and move to convert queue */
3105         }
3106 
3107         if (can_be_queued(lkb)) {
3108                 error = -EINPROGRESS;
3109                 del_lkb(r, lkb);
3110                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3111                 goto out;
3112         }
3113 
3114         error = -EAGAIN;
3115         queue_cast(r, lkb, -EAGAIN);
3116  out:
3117         return error;
3118 }
3119 
3120 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3121                                int error)
3122 {
3123         switch (error) {
3124         case 0:
3125                 grant_pending_locks(r, NULL);
3126                 /* grant_pending_locks also sends basts */
3127                 break;
3128         case -EAGAIN:
3129                 if (force_blocking_asts(lkb))
3130                         send_blocking_asts_all(r, lkb);
3131                 break;
3132         case -EINPROGRESS:
3133                 send_blocking_asts(r, lkb);
3134                 break;
3135         }
3136 }
3137 
3138 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3139 {
3140         remove_lock(r, lkb);
3141         queue_cast(r, lkb, -DLM_EUNLOCK);
3142         return -DLM_EUNLOCK;
3143 }
3144 
3145 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3146                               int error)
3147 {
3148         grant_pending_locks(r, NULL);
3149 }
3150 
3151 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
3152 
3153 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3154 {
3155         int error;
3156 
3157         error = revert_lock(r, lkb);
3158         if (error) {
3159                 queue_cast(r, lkb, -DLM_ECANCEL);
3160                 return -DLM_ECANCEL;
3161         }
3162         return 0;
3163 }
3164 
3165 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3166                               int error)
3167 {
3168         if (error)
3169                 grant_pending_locks(r, NULL);
3170 }
3171 
3172 /*
3173  * Four stage 3 varieties:
3174  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3175  */
3176 
3177 /* add a new lkb to a possibly new rsb, called by requesting process */
3178 
3179 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3180 {
3181         int error;
3182 
3183         /* set_master: sets lkb nodeid from r */
3184 
3185         error = set_master(r, lkb);
3186         if (error < 0)
3187                 goto out;
3188         if (error) {
3189                 error = 0;
3190                 goto out;
3191         }
3192 
3193         if (is_remote(r)) {
3194                 /* receive_request() calls do_request() on remote node */
3195                 error = send_request(r, lkb);
3196         } else {
3197                 error = do_request(r, lkb);
3198                 /* for remote locks the request_reply is sent
3199                    between do_request and do_request_effects */
3200                 do_request_effects(r, lkb, error);
3201         }
3202  out:
3203         return error;
3204 }
3205 
3206 /* change some property of an existing lkb, e.g. mode */
3207 
3208 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3209 {
3210         int error;
3211 
3212         if (is_remote(r)) {
3213                 /* receive_convert() calls do_convert() on remote node */
3214                 error = send_convert(r, lkb);
3215         } else {
3216                 error = do_convert(r, lkb);
3217                 /* for remote locks the convert_reply is sent
3218                    between do_convert and do_convert_effects */
3219                 do_convert_effects(r, lkb, error);
3220         }
3221 
3222         return error;
3223 }
3224 
3225 /* remove an existing lkb from the granted queue */
3226 
3227 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3228 {
3229         int error;
3230 
3231         if (is_remote(r)) {
3232                 /* receive_unlock() calls do_unlock() on remote node */
3233                 error = send_unlock(r, lkb);
3234         } else {
3235                 error = do_unlock(r, lkb);
3236                 /* for remote locks the unlock_reply is sent
3237                    between do_unlock and do_unlock_effects */
3238                 do_unlock_effects(r, lkb, error);
3239         }
3240 
3241         return error;
3242 }
3243 
3244 /* remove an existing lkb from the convert or wait queue */
3245 
3246 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3247 {
3248         int error;
3249 
3250         if (is_remote(r)) {
3251                 /* receive_cancel() calls do_cancel() on remote node */
3252                 error = send_cancel(r, lkb);
3253         } else {
3254                 error = do_cancel(r, lkb);
3255                 /* for remote locks the cancel_reply is sent
3256                    between do_cancel and do_cancel_effects */
3257                 do_cancel_effects(r, lkb, error);
3258         }
3259 
3260         return error;
3261 }
3262 
3263 /*
3264  * Four stage 2 varieties:
3265  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3266  */
3267 
3268 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3269                         const void *name, int len,
3270                         struct dlm_args *args)
3271 {
3272         struct dlm_rsb *r;
3273         int error;
3274 
3275         error = validate_lock_args(ls, lkb, args);
3276         if (error)
3277                 return error;
3278 
3279         error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3280         if (error)
3281                 return error;
3282 
3283         lock_rsb(r);
3284 
3285         attach_lkb(r, lkb);
3286         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3287 
3288         error = _request_lock(r, lkb);
3289 
3290         unlock_rsb(r);
3291         put_rsb(r);
3292         return error;
3293 }
3294 
3295 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3296                         struct dlm_args *args)
3297 {
3298         struct dlm_rsb *r;
3299         int error;
3300 
3301         r = lkb->lkb_resource;
3302 
3303         hold_rsb(r);
3304         lock_rsb(r);
3305 
3306         error = validate_lock_args(ls, lkb, args);
3307         if (error)
3308                 goto out;
3309 
3310         error = _convert_lock(r, lkb);
3311  out:
3312         unlock_rsb(r);
3313         put_rsb(r);
3314         return error;
3315 }
3316 
3317 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3318                        struct dlm_args *args)
3319 {
3320         struct dlm_rsb *r;
3321         int error;
3322 
3323         r = lkb->lkb_resource;
3324 
3325         hold_rsb(r);
3326         lock_rsb(r);
3327 
3328         error = validate_unlock_args(lkb, args);
3329         if (error)
3330                 goto out;
3331 
3332         error = _unlock_lock(r, lkb);
3333  out:
3334         unlock_rsb(r);
3335         put_rsb(r);
3336         return error;
3337 }
3338 
3339 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3340                        struct dlm_args *args)
3341 {
3342         struct dlm_rsb *r;
3343         int error;
3344 
3345         r = lkb->lkb_resource;
3346 
3347         hold_rsb(r);
3348         lock_rsb(r);
3349 
3350         error = validate_unlock_args(lkb, args);
3351         if (error)
3352                 goto out;
3353 
3354         error = _cancel_lock(r, lkb);
3355  out:
3356         unlock_rsb(r);
3357         put_rsb(r);
3358         return error;
3359 }
3360 
3361 /*
3362  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
3363  */
3364 
3365 int dlm_lock(dlm_lockspace_t *lockspace,
3366              int mode,
3367              struct dlm_lksb *lksb,
3368              uint32_t flags,
3369              const void *name,
3370              unsigned int namelen,
3371              uint32_t parent_lkid,
3372              void (*ast) (void *astarg),
3373              void *astarg,
3374              void (*bast) (void *astarg, int mode))
3375 {
3376         struct dlm_ls *ls;
3377         struct dlm_lkb *lkb;
3378         struct dlm_args args;
3379         int error, convert = flags & DLM_LKF_CONVERT;
3380 
3381         ls = dlm_find_lockspace_local(lockspace);
3382         if (!ls)
3383                 return -EINVAL;
3384 
3385         dlm_lock_recovery(ls);
3386 
3387         if (convert)
3388                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
3389         else
3390                 error = create_lkb(ls, &lkb);
3391 
3392         if (error)
3393                 goto out;
3394 
3395         trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
3396 
3397         error = set_lock_args(mode, lksb, flags, namelen, ast, astarg, bast,
3398                               &args);
3399         if (error)
3400                 goto out_put;
3401 
3402         if (convert)
3403                 error = convert_lock(ls, lkb, &args);
3404         else
3405                 error = request_lock(ls, lkb, name, namelen, &args);
3406 
3407         if (error == -EINPROGRESS)
3408                 error = 0;
3409  out_put:
3410         trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, true);
3411 
3412         if (convert || error)
3413                 __put_lkb(ls, lkb);
3414         if (error == -EAGAIN || error == -EDEADLK)
3415                 error = 0;
3416  out:
3417         dlm_unlock_recovery(ls);
3418         dlm_put_lockspace(ls);
3419         return error;
3420 }
3421 
3422 int dlm_unlock(dlm_lockspace_t *lockspace,
3423                uint32_t lkid,
3424                uint32_t flags,
3425                struct dlm_lksb *lksb,
3426                void *astarg)
3427 {
3428         struct dlm_ls *ls;
3429         struct dlm_lkb *lkb;
3430         struct dlm_args args;
3431         int error;
3432 
3433         ls = dlm_find_lockspace_local(lockspace);
3434         if (!ls)
3435                 return -EINVAL;
3436 
3437         dlm_lock_recovery(ls);
3438 
3439         error = find_lkb(ls, lkid, &lkb);
3440         if (error)
3441                 goto out;
3442 
3443         trace_dlm_unlock_start(ls, lkb, flags);
3444 
3445         error = set_unlock_args(flags, astarg, &args);
3446         if (error)
3447                 goto out_put;
3448 
3449         if (flags & DLM_LKF_CANCEL)
3450                 error = cancel_lock(ls, lkb, &args);
3451         else
3452                 error = unlock_lock(ls, lkb, &args);
3453 
3454         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3455                 error = 0;
3456         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3457                 error = 0;
3458  out_put:
3459         trace_dlm_unlock_end(ls, lkb, flags, error);
3460 
3461         dlm_put_lkb(lkb);
3462  out:
3463         dlm_unlock_recovery(ls);
3464         dlm_put_lockspace(ls);
3465         return error;
3466 }
3467 
3468 /*
3469  * send/receive routines for remote operations and replies
3470  *
3471  * send_args
3472  * send_common
3473  * send_request                 receive_request
3474  * send_convert                 receive_convert
3475  * send_unlock                  receive_unlock
3476  * send_cancel                  receive_cancel
3477  * send_grant                   receive_grant
3478  * send_bast                    receive_bast
3479  * send_lookup                  receive_lookup
3480  * send_remove                  receive_remove
3481  *
3482  *                              send_common_reply
3483  * receive_request_reply        send_request_reply
3484  * receive_convert_reply        send_convert_reply
3485  * receive_unlock_reply         send_unlock_reply
3486  * receive_cancel_reply         send_cancel_reply
3487  * receive_lookup_reply         send_lookup_reply
3488  */
3489 
3490 static int _create_message(struct dlm_ls *ls, int mb_len,
3491                            int to_nodeid, int mstype,
3492                            struct dlm_message **ms_ret,
3493                            struct dlm_mhandle **mh_ret)
3494 {
3495         struct dlm_message *ms;
3496         struct dlm_mhandle *mh;
3497         char *mb;
3498 
3499         /* get_buffer gives us a message handle (mh) that we need to
3500            pass into midcomms_commit and a message buffer (mb) that we
3501            write our data into */
3502 
3503         mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb);
3504         if (!mh)
3505                 return -ENOBUFS;
3506 
3507         ms = (struct dlm_message *) mb;
3508 
3509         ms->m_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3510         ms->m_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
3511         ms->m_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
3512         ms->m_header.h_length = cpu_to_le16(mb_len);
3513         ms->m_header.h_cmd = DLM_MSG;
3514 
3515         ms->m_type = cpu_to_le32(mstype);
3516 
3517         *mh_ret = mh;
3518         *ms_ret = ms;
3519         return 0;
3520 }
3521 
3522 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3523                           int to_nodeid, int mstype,
3524                           struct dlm_message **ms_ret,
3525                           struct dlm_mhandle **mh_ret)
3526 {
3527         int mb_len = sizeof(struct dlm_message);
3528 
3529         switch (mstype) {
3530         case DLM_MSG_REQUEST:
3531         case DLM_MSG_LOOKUP:
3532         case DLM_MSG_REMOVE:
3533                 mb_len += r->res_length;
3534                 break;
3535         case DLM_MSG_CONVERT:
3536         case DLM_MSG_UNLOCK:
3537         case DLM_MSG_REQUEST_REPLY:
3538         case DLM_MSG_CONVERT_REPLY:
3539         case DLM_MSG_GRANT:
3540                 if (lkb && lkb->lkb_lvbptr && (lkb->lkb_exflags & DLM_LKF_VALBLK))
3541                         mb_len += r->res_ls->ls_lvblen;
3542                 break;
3543         }
3544 
3545         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3546                                ms_ret, mh_ret);
3547 }
3548 
3549 /* further lowcomms enhancements or alternate implementations may make
3550    the return value from this function useful at some point */
3551 
3552 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms,
3553                         const void *name, int namelen)
3554 {
3555         dlm_midcomms_commit_mhandle(mh, name, namelen);
3556         return 0;
3557 }
3558 
3559 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3560                       struct dlm_message *ms)
3561 {
3562         ms->m_nodeid   = cpu_to_le32(lkb->lkb_nodeid);
3563         ms->m_pid      = cpu_to_le32(lkb->lkb_ownpid);
3564         ms->m_lkid     = cpu_to_le32(lkb->lkb_id);
3565         ms->m_remid    = cpu_to_le32(lkb->lkb_remid);
3566         ms->m_exflags  = cpu_to_le32(lkb->lkb_exflags);
3567         ms->m_sbflags  = cpu_to_le32(dlm_sbflags_val(lkb));
3568         ms->m_flags    = cpu_to_le32(dlm_dflags_val(lkb));
3569         ms->m_lvbseq   = cpu_to_le32(lkb->lkb_lvbseq);
3570         ms->m_status   = cpu_to_le32(lkb->lkb_status);
3571         ms->m_grmode   = cpu_to_le32(lkb->lkb_grmode);
3572         ms->m_rqmode   = cpu_to_le32(lkb->lkb_rqmode);
3573         ms->m_hash     = cpu_to_le32(r->res_hash);
3574 
3575         /* m_result and m_bastmode are set from function args,
3576            not from lkb fields */
3577 
3578         if (lkb->lkb_bastfn)
3579                 ms->m_asts |= cpu_to_le32(DLM_CB_BAST);
3580         if (lkb->lkb_astfn)
3581                 ms->m_asts |= cpu_to_le32(DLM_CB_CAST);
3582 
3583         /* compare with switch in create_message; send_remove() doesn't
3584            use send_args() */
3585 
3586         switch (ms->m_type) {
3587         case cpu_to_le32(DLM_MSG_REQUEST):
3588         case cpu_to_le32(DLM_MSG_LOOKUP):
3589                 memcpy(ms->m_extra, r->res_name, r->res_length);
3590                 break;
3591         case cpu_to_le32(DLM_MSG_CONVERT):
3592         case cpu_to_le32(DLM_MSG_UNLOCK):
3593         case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3594         case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3595         case cpu_to_le32(DLM_MSG_GRANT):
3596                 if (!lkb->lkb_lvbptr || !(lkb->lkb_exflags & DLM_LKF_VALBLK))
3597                         break;
3598                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3599                 break;
3600         }
3601 }
3602 
3603 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3604 {
3605         struct dlm_message *ms;
3606         struct dlm_mhandle *mh;
3607         int to_nodeid, error;
3608 
3609         to_nodeid = r->res_nodeid;
3610 
3611         error = add_to_waiters(lkb, mstype, to_nodeid);
3612         if (error)
3613                 return error;
3614 
3615         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3616         if (error)
3617                 goto fail;
3618 
3619         send_args(r, lkb, ms);
3620 
3621         error = send_message(mh, ms, r->res_name, r->res_length);
3622         if (error)
3623                 goto fail;
3624         return 0;
3625 
3626  fail:
3627         remove_from_waiters(lkb, msg_reply_type(mstype));
3628         return error;
3629 }
3630 
3631 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3632 {
3633         return send_common(r, lkb, DLM_MSG_REQUEST);
3634 }
3635 
3636 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3637 {
3638         int error;
3639 
3640         error = send_common(r, lkb, DLM_MSG_CONVERT);
3641 
3642         /* down conversions go without a reply from the master */
3643         if (!error && down_conversion(lkb)) {
3644                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3645                 r->res_ls->ls_local_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
3646                 r->res_ls->ls_local_ms.m_result = 0;
3647                 __receive_convert_reply(r, lkb, &r->res_ls->ls_local_ms, true);
3648         }
3649 
3650         return error;
3651 }
3652 
3653 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3654    MASTER_UNCERTAIN to force the next request on the rsb to confirm
3655    that the master is still correct. */
3656 
3657 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3658 {
3659         return send_common(r, lkb, DLM_MSG_UNLOCK);
3660 }
3661 
3662 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3663 {
3664         return send_common(r, lkb, DLM_MSG_CANCEL);
3665 }
3666 
3667 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3668 {
3669         struct dlm_message *ms;
3670         struct dlm_mhandle *mh;
3671         int to_nodeid, error;
3672 
3673         to_nodeid = lkb->lkb_nodeid;
3674 
3675         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3676         if (error)
3677                 goto out;
3678 
3679         send_args(r, lkb, ms);
3680 
3681         ms->m_result = 0;
3682 
3683         error = send_message(mh, ms, r->res_name, r->res_length);
3684  out:
3685         return error;
3686 }
3687 
3688 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3689 {
3690         struct dlm_message *ms;
3691         struct dlm_mhandle *mh;
3692         int to_nodeid, error;
3693 
3694         to_nodeid = lkb->lkb_nodeid;
3695 
3696         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3697         if (error)
3698                 goto out;
3699 
3700         send_args(r, lkb, ms);
3701 
3702         ms->m_bastmode = cpu_to_le32(mode);
3703 
3704         error = send_message(mh, ms, r->res_name, r->res_length);
3705  out:
3706         return error;
3707 }
3708 
3709 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3710 {
3711         struct dlm_message *ms;
3712         struct dlm_mhandle *mh;
3713         int to_nodeid, error;
3714 
3715         to_nodeid = dlm_dir_nodeid(r);
3716 
3717         error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3718         if (error)
3719                 return error;
3720 
3721         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3722         if (error)
3723                 goto fail;
3724 
3725         send_args(r, lkb, ms);
3726 
3727         error = send_message(mh, ms, r->res_name, r->res_length);
3728         if (error)
3729                 goto fail;
3730         return 0;
3731 
3732  fail:
3733         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3734         return error;
3735 }
3736 
3737 static int send_remove(struct dlm_rsb *r)
3738 {
3739         struct dlm_message *ms;
3740         struct dlm_mhandle *mh;
3741         int to_nodeid, error;
3742 
3743         to_nodeid = dlm_dir_nodeid(r);
3744 
3745         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3746         if (error)
3747                 goto out;
3748 
3749         memcpy(ms->m_extra, r->res_name, r->res_length);
3750         ms->m_hash = cpu_to_le32(r->res_hash);
3751 
3752         error = send_message(mh, ms, r->res_name, r->res_length);
3753  out:
3754         return error;
3755 }
3756 
3757 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3758                              int mstype, int rv)
3759 {
3760         struct dlm_message *ms;
3761         struct dlm_mhandle *mh;
3762         int to_nodeid, error;
3763 
3764         to_nodeid = lkb->lkb_nodeid;
3765 
3766         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3767         if (error)
3768                 goto out;
3769 
3770         send_args(r, lkb, ms);
3771 
3772         ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3773 
3774         error = send_message(mh, ms, r->res_name, r->res_length);
3775  out:
3776         return error;
3777 }
3778 
3779 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3780 {
3781         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3782 }
3783 
3784 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3785 {
3786         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3787 }
3788 
3789 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3790 {
3791         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3792 }
3793 
3794 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3795 {
3796         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3797 }
3798 
3799 static int send_lookup_reply(struct dlm_ls *ls,
3800                              const struct dlm_message *ms_in, int ret_nodeid,
3801                              int rv)
3802 {
3803         struct dlm_rsb *r = &ls->ls_local_rsb;
3804         struct dlm_message *ms;
3805         struct dlm_mhandle *mh;
3806         int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
3807 
3808         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3809         if (error)
3810                 goto out;
3811 
3812         ms->m_lkid = ms_in->m_lkid;
3813         ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3814         ms->m_nodeid = cpu_to_le32(ret_nodeid);
3815 
3816         error = send_message(mh, ms, ms_in->m_extra, receive_extralen(ms_in));
3817  out:
3818         return error;
3819 }
3820 
3821 /* which args we save from a received message depends heavily on the type
3822    of message, unlike the send side where we can safely send everything about
3823    the lkb for any type of message */
3824 
3825 static void receive_flags(struct dlm_lkb *lkb, const struct dlm_message *ms)
3826 {
3827         lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
3828         dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3829         dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3830 }
3831 
3832 static void receive_flags_reply(struct dlm_lkb *lkb,
3833                                 const struct dlm_message *ms,
3834                                 bool local)
3835 {
3836         if (local)
3837                 return;
3838 
3839         dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3840         dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3841 }
3842 
3843 static int receive_extralen(const struct dlm_message *ms)
3844 {
3845         return (le16_to_cpu(ms->m_header.h_length) -
3846                 sizeof(struct dlm_message));
3847 }
3848 
3849 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3850                        const struct dlm_message *ms)
3851 {
3852         int len;
3853 
3854         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3855                 if (!lkb->lkb_lvbptr)
3856                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3857                 if (!lkb->lkb_lvbptr)
3858                         return -ENOMEM;
3859                 len = receive_extralen(ms);
3860                 if (len > ls->ls_lvblen)
3861                         len = ls->ls_lvblen;
3862                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3863         }
3864         return 0;
3865 }
3866 
3867 static void fake_bastfn(void *astparam, int mode)
3868 {
3869         log_print("fake_bastfn should not be called");
3870 }
3871 
3872 static void fake_astfn(void *astparam)
3873 {
3874         log_print("fake_astfn should not be called");
3875 }
3876 
3877 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3878                                 const struct dlm_message *ms)
3879 {
3880         lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3881         lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
3882         lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3883         lkb->lkb_grmode = DLM_LOCK_IV;
3884         lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3885 
3886         lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL;
3887         lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL;
3888 
3889         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3890                 /* lkb was just created so there won't be an lvb yet */
3891                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3892                 if (!lkb->lkb_lvbptr)
3893                         return -ENOMEM;
3894         }
3895 
3896         return 0;
3897 }
3898 
3899 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3900                                 const struct dlm_message *ms)
3901 {
3902         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3903                 return -EBUSY;
3904 
3905         if (receive_lvb(ls, lkb, ms))
3906                 return -ENOMEM;
3907 
3908         lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3909         lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
3910 
3911         return 0;
3912 }
3913 
3914 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3915                                const struct dlm_message *ms)
3916 {
3917         if (receive_lvb(ls, lkb, ms))
3918                 return -ENOMEM;
3919         return 0;
3920 }
3921 
3922 /* We fill in the local-lkb fields with the info that send_xxxx_reply()
3923    uses to send a reply and that the remote end uses to process the reply. */
3924 
3925 static void setup_local_lkb(struct dlm_ls *ls, const struct dlm_message *ms)
3926 {
3927         struct dlm_lkb *lkb = &ls->ls_local_lkb;
3928         lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3929         lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3930 }
3931 
3932 /* This is called after the rsb is locked so that we can safely inspect
3933    fields in the lkb. */
3934 
3935 static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
3936 {
3937         int from = le32_to_cpu(ms->m_header.h_nodeid);
3938         int error = 0;
3939 
3940         /* currently mixing of user/kernel locks are not supported */
3941         if (ms->m_flags & cpu_to_le32(BIT(DLM_DFL_USER_BIT)) &&
3942             !test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
3943                 log_error(lkb->lkb_resource->res_ls,
3944                           "got user dlm message for a kernel lock");
3945                 error = -EINVAL;
3946                 goto out;
3947         }
3948 
3949         switch (ms->m_type) {
3950         case cpu_to_le32(DLM_MSG_CONVERT):
3951         case cpu_to_le32(DLM_MSG_UNLOCK):
3952         case cpu_to_le32(DLM_MSG_CANCEL):
3953                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3954                         error = -EINVAL;
3955                 break;
3956 
3957         case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3958         case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
3959         case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
3960         case cpu_to_le32(DLM_MSG_GRANT):
3961         case cpu_to_le32(DLM_MSG_BAST):
3962                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3963                         error = -EINVAL;
3964                 break;
3965 
3966         case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3967                 if (!is_process_copy(lkb))
3968                         error = -EINVAL;
3969                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3970                         error = -EINVAL;
3971                 break;
3972 
3973         default:
3974                 error = -EINVAL;
3975         }
3976 
3977 out:
3978         if (error)
3979                 log_error(lkb->lkb_resource->res_ls,
3980                           "ignore invalid message %d from %d %x %x %x %d",
3981                           le32_to_cpu(ms->m_type), from, lkb->lkb_id,
3982                           lkb->lkb_remid, dlm_iflags_val(lkb),
3983                           lkb->lkb_nodeid);
3984         return error;
3985 }
3986 
3987 static int receive_request(struct dlm_ls *ls, const struct dlm_message *ms)
3988 {
3989         struct dlm_lkb *lkb;
3990         struct dlm_rsb *r;
3991         int from_nodeid;
3992         int error, namelen = 0;
3993 
3994         from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3995 
3996         error = create_lkb(ls, &lkb);
3997         if (error)
3998                 goto fail;
3999 
4000         receive_flags(lkb, ms);
4001         set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
4002         error = receive_request_args(ls, lkb, ms);
4003         if (error) {
4004                 __put_lkb(ls, lkb);
4005                 goto fail;
4006         }
4007 
4008         /* The dir node is the authority on whether we are the master
4009            for this rsb or not, so if the master sends us a request, we should
4010            recreate the rsb if we've destroyed it.   This race happens when we
4011            send a remove message to the dir node at the same time that the dir
4012            node sends us a request for the rsb. */
4013 
4014         namelen = receive_extralen(ms);
4015 
4016         error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4017                          R_RECEIVE_REQUEST, &r);
4018         if (error) {
4019                 __put_lkb(ls, lkb);
4020                 goto fail;
4021         }
4022 
4023         lock_rsb(r);
4024 
4025         if (r->res_master_nodeid != dlm_our_nodeid()) {
4026                 error = validate_master_nodeid(ls, r, from_nodeid);
4027                 if (error) {
4028                         unlock_rsb(r);
4029                         put_rsb(r);
4030                         __put_lkb(ls, lkb);
4031                         goto fail;
4032                 }
4033         }
4034 
4035         attach_lkb(r, lkb);
4036         error = do_request(r, lkb);
4037         send_request_reply(r, lkb, error);
4038         do_request_effects(r, lkb, error);
4039 
4040         unlock_rsb(r);
4041         put_rsb(r);
4042 
4043         if (error == -EINPROGRESS)
4044                 error = 0;
4045         if (error)
4046                 dlm_put_lkb(lkb);
4047         return 0;
4048 
4049  fail:
4050         /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4051            and do this receive_request again from process_lookup_list once
4052            we get the lookup reply.  This would avoid a many repeated
4053            ENOTBLK request failures when the lookup reply designating us
4054            as master is delayed. */
4055 
4056         if (error != -ENOTBLK) {
4057                 log_limit(ls, "receive_request %x from %d %d",
4058                           le32_to_cpu(ms->m_lkid), from_nodeid, error);
4059         }
4060 
4061         setup_local_lkb(ls, ms);
4062         send_request_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4063         return error;
4064 }
4065 
4066 static int receive_convert(struct dlm_ls *ls, const struct dlm_message *ms)
4067 {
4068         struct dlm_lkb *lkb;
4069         struct dlm_rsb *r;
4070         int error, reply = 1;
4071 
4072         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4073         if (error)
4074                 goto fail;
4075 
4076         if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4077                 log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4078                           "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4079                           (unsigned long long)lkb->lkb_recover_seq,
4080                           le32_to_cpu(ms->m_header.h_nodeid),
4081                           le32_to_cpu(ms->m_lkid));
4082                 error = -ENOENT;
4083                 dlm_put_lkb(lkb);
4084                 goto fail;
4085         }
4086 
4087         r = lkb->lkb_resource;
4088 
4089         hold_rsb(r);
4090         lock_rsb(r);
4091 
4092         error = validate_message(lkb, ms);
4093         if (error)
4094                 goto out;
4095 
4096         receive_flags(lkb, ms);
4097 
4098         error = receive_convert_args(ls, lkb, ms);
4099         if (error) {
4100                 send_convert_reply(r, lkb, error);
4101                 goto out;
4102         }
4103 
4104         reply = !down_conversion(lkb);
4105 
4106         error = do_convert(r, lkb);
4107         if (reply)
4108                 send_convert_reply(r, lkb, error);
4109         do_convert_effects(r, lkb, error);
4110  out:
4111         unlock_rsb(r);
4112         put_rsb(r);
4113         dlm_put_lkb(lkb);
4114         return 0;
4115 
4116  fail:
4117         setup_local_lkb(ls, ms);
4118         send_convert_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4119         return error;
4120 }
4121 
4122 static int receive_unlock(struct dlm_ls *ls, const struct dlm_message *ms)
4123 {
4124         struct dlm_lkb *lkb;
4125         struct dlm_rsb *r;
4126         int error;
4127 
4128         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4129         if (error)
4130                 goto fail;
4131 
4132         if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4133                 log_error(ls, "receive_unlock %x remid %x remote %d %x",
4134                           lkb->lkb_id, lkb->lkb_remid,
4135                           le32_to_cpu(ms->m_header.h_nodeid),
4136                           le32_to_cpu(ms->m_lkid));
4137                 error = -ENOENT;
4138                 dlm_put_lkb(lkb);
4139                 goto fail;
4140         }
4141 
4142         r = lkb->lkb_resource;
4143 
4144         hold_rsb(r);
4145         lock_rsb(r);
4146 
4147         error = validate_message(lkb, ms);
4148         if (error)
4149                 goto out;
4150 
4151         receive_flags(lkb, ms);
4152 
4153         error = receive_unlock_args(ls, lkb, ms);
4154         if (error) {
4155                 send_unlock_reply(r, lkb, error);
4156                 goto out;
4157         }
4158 
4159         error = do_unlock(r, lkb);
4160         send_unlock_reply(r, lkb, error);
4161         do_unlock_effects(r, lkb, error);
4162  out:
4163         unlock_rsb(r);
4164         put_rsb(r);
4165         dlm_put_lkb(lkb);
4166         return 0;
4167 
4168  fail:
4169         setup_local_lkb(ls, ms);
4170         send_unlock_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4171         return error;
4172 }
4173 
4174 static int receive_cancel(struct dlm_ls *ls, const struct dlm_message *ms)
4175 {
4176         struct dlm_lkb *lkb;
4177         struct dlm_rsb *r;
4178         int error;
4179 
4180         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4181         if (error)
4182                 goto fail;
4183 
4184         receive_flags(lkb, ms);
4185 
4186         r = lkb->lkb_resource;
4187 
4188         hold_rsb(r);
4189         lock_rsb(r);
4190 
4191         error = validate_message(lkb, ms);
4192         if (error)
4193                 goto out;
4194 
4195         error = do_cancel(r, lkb);
4196         send_cancel_reply(r, lkb, error);
4197         do_cancel_effects(r, lkb, error);
4198  out:
4199         unlock_rsb(r);
4200         put_rsb(r);
4201         dlm_put_lkb(lkb);
4202         return 0;
4203 
4204  fail:
4205         setup_local_lkb(ls, ms);
4206         send_cancel_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4207         return error;
4208 }
4209 
4210 static int receive_grant(struct dlm_ls *ls, const struct dlm_message *ms)
4211 {
4212         struct dlm_lkb *lkb;
4213         struct dlm_rsb *r;
4214         int error;
4215 
4216         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4217         if (error)
4218                 return error;
4219 
4220         r = lkb->lkb_resource;
4221 
4222         hold_rsb(r);
4223         lock_rsb(r);
4224 
4225         error = validate_message(lkb, ms);
4226         if (error)
4227                 goto out;
4228 
4229         receive_flags_reply(lkb, ms, false);
4230         if (is_altmode(lkb))
4231                 munge_altmode(lkb, ms);
4232         grant_lock_pc(r, lkb, ms);
4233         queue_cast(r, lkb, 0);
4234  out:
4235         unlock_rsb(r);
4236         put_rsb(r);
4237         dlm_put_lkb(lkb);
4238         return 0;
4239 }
4240 
4241 static int receive_bast(struct dlm_ls *ls, const struct dlm_message *ms)
4242 {
4243         struct dlm_lkb *lkb;
4244         struct dlm_rsb *r;
4245         int error;
4246 
4247         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4248         if (error)
4249                 return error;
4250 
4251         r = lkb->lkb_resource;
4252 
4253         hold_rsb(r);
4254         lock_rsb(r);
4255 
4256         error = validate_message(lkb, ms);
4257         if (error)
4258                 goto out;
4259 
4260         queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode));
4261         lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode);
4262  out:
4263         unlock_rsb(r);
4264         put_rsb(r);
4265         dlm_put_lkb(lkb);
4266         return 0;
4267 }
4268 
4269 static void receive_lookup(struct dlm_ls *ls, const struct dlm_message *ms)
4270 {
4271         int len, error, ret_nodeid, from_nodeid, our_nodeid;
4272 
4273         from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4274         our_nodeid = dlm_our_nodeid();
4275 
4276         len = receive_extralen(ms);
4277 
4278         error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4279                                   &ret_nodeid, NULL);
4280 
4281         /* Optimization: we're master so treat lookup as a request */
4282         if (!error && ret_nodeid == our_nodeid) {
4283                 receive_request(ls, ms);
4284                 return;
4285         }
4286         send_lookup_reply(ls, ms, ret_nodeid, error);
4287 }
4288 
4289 static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
4290 {
4291         char name[DLM_RESNAME_MAXLEN+1];
4292         struct dlm_rsb *r;
4293         int rv, len, dir_nodeid, from_nodeid;
4294 
4295         from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4296 
4297         len = receive_extralen(ms);
4298 
4299         if (len > DLM_RESNAME_MAXLEN) {
4300                 log_error(ls, "receive_remove from %d bad len %d",
4301                           from_nodeid, len);
4302                 return;
4303         }
4304 
4305         dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash));
4306         if (dir_nodeid != dlm_our_nodeid()) {
4307                 log_error(ls, "receive_remove from %d bad nodeid %d",
4308                           from_nodeid, dir_nodeid);
4309                 return;
4310         }
4311 
4312         /*
4313          * Look for inactive rsb, if it's there, free it.
4314          * If the rsb is active, it's being used, and we should ignore this
4315          * message.  This is an expected race between the dir node sending a
4316          * request to the master node at the same time as the master node sends
4317          * a remove to the dir node.  The resolution to that race is for the
4318          * dir node to ignore the remove message, and the master node to
4319          * recreate the master rsb when it gets a request from the dir node for
4320          * an rsb it doesn't have.
4321          */
4322 
4323         memset(name, 0, sizeof(name));
4324         memcpy(name, ms->m_extra, len);
4325 
4326         write_lock_bh(&ls->ls_rsbtbl_lock);
4327 
4328         rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
4329         if (rv) {
4330                 /* should not happen */
4331                 log_error(ls, "%s from %d not found %s", __func__,
4332                           from_nodeid, name);
4333                 write_unlock_bh(&ls->ls_rsbtbl_lock);
4334                 return;
4335         }
4336 
4337         if (!rsb_flag(r, RSB_INACTIVE)) {
4338                 if (r->res_master_nodeid != from_nodeid) {
4339                         /* should not happen */
4340                         log_error(ls, "receive_remove on active rsb from %d master %d",
4341                                   from_nodeid, r->res_master_nodeid);
4342                         dlm_print_rsb(r);
4343                         write_unlock_bh(&ls->ls_rsbtbl_lock);
4344                         return;
4345                 }
4346 
4347                 /* Ignore the remove message, see race comment above. */
4348 
4349                 log_debug(ls, "receive_remove from %d master %d first %x %s",
4350                           from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4351                           name);
4352                 write_unlock_bh(&ls->ls_rsbtbl_lock);
4353                 return;
4354         }
4355 
4356         if (r->res_master_nodeid != from_nodeid) {
4357                 log_error(ls, "receive_remove inactive from %d master %d",
4358                           from_nodeid, r->res_master_nodeid);
4359                 dlm_print_rsb(r);
4360                 write_unlock_bh(&ls->ls_rsbtbl_lock);
4361                 return;
4362         }
4363 
4364         list_del(&r->res_slow_list);
4365         rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
4366                                dlm_rhash_rsb_params);
4367         rsb_clear_flag(r, RSB_HASHED);
4368         write_unlock_bh(&ls->ls_rsbtbl_lock);
4369 
4370         free_inactive_rsb(r);
4371 }
4372 
4373 static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms)
4374 {
4375         do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid));
4376 }
4377 
4378 static int receive_request_reply(struct dlm_ls *ls,
4379                                  const struct dlm_message *ms)
4380 {
4381         struct dlm_lkb *lkb;
4382         struct dlm_rsb *r;
4383         int error, mstype, result;
4384         int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4385 
4386         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4387         if (error)
4388                 return error;
4389 
4390         r = lkb->lkb_resource;
4391         hold_rsb(r);
4392         lock_rsb(r);
4393 
4394         error = validate_message(lkb, ms);
4395         if (error)
4396                 goto out;
4397 
4398         mstype = lkb->lkb_wait_type;
4399         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4400         if (error) {
4401                 log_error(ls, "receive_request_reply %x remote %d %x result %d",
4402                           lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid),
4403                           from_dlm_errno(le32_to_cpu(ms->m_result)));
4404                 dlm_dump_rsb(r);
4405                 goto out;
4406         }
4407 
4408         /* Optimization: the dir node was also the master, so it took our
4409            lookup as a request and sent request reply instead of lookup reply */
4410         if (mstype == DLM_MSG_LOOKUP) {
4411                 r->res_master_nodeid = from_nodeid;
4412                 r->res_nodeid = from_nodeid;
4413                 lkb->lkb_nodeid = from_nodeid;
4414         }
4415 
4416         /* this is the value returned from do_request() on the master */
4417         result = from_dlm_errno(le32_to_cpu(ms->m_result));
4418 
4419         switch (result) {
4420         case -EAGAIN:
4421                 /* request would block (be queued) on remote master */
4422                 queue_cast(r, lkb, -EAGAIN);
4423                 confirm_master(r, -EAGAIN);
4424                 unhold_lkb(lkb); /* undoes create_lkb() */
4425                 break;
4426 
4427         case -EINPROGRESS:
4428         case 0:
4429                 /* request was queued or granted on remote master */
4430                 receive_flags_reply(lkb, ms, false);
4431                 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
4432                 if (is_altmode(lkb))
4433                         munge_altmode(lkb, ms);
4434                 if (result) {
4435                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
4436                 } else {
4437                         grant_lock_pc(r, lkb, ms);
4438                         queue_cast(r, lkb, 0);
4439                 }
4440                 confirm_master(r, result);
4441                 break;
4442 
4443         case -EBADR:
4444         case -ENOTBLK:
4445                 /* find_rsb failed to find rsb or rsb wasn't master */
4446                 log_limit(ls, "receive_request_reply %x from %d %d "
4447                           "master %d dir %d first %x %s", lkb->lkb_id,
4448                           from_nodeid, result, r->res_master_nodeid,
4449                           r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4450 
4451                 if (r->res_dir_nodeid != dlm_our_nodeid() &&
4452                     r->res_master_nodeid != dlm_our_nodeid()) {
4453                         /* cause _request_lock->set_master->send_lookup */
4454                         r->res_master_nodeid = 0;
4455                         r->res_nodeid = -1;
4456                         lkb->lkb_nodeid = -1;
4457                 }
4458 
4459                 if (is_overlap(lkb)) {
4460                         /* we'll ignore error in cancel/unlock reply */
4461                         queue_cast_overlap(r, lkb);
4462                         confirm_master(r, result);
4463                         unhold_lkb(lkb); /* undoes create_lkb() */
4464                 } else {
4465                         _request_lock(r, lkb);
4466 
4467                         if (r->res_master_nodeid == dlm_our_nodeid())
4468                                 confirm_master(r, 0);
4469                 }
4470                 break;
4471 
4472         default:
4473                 log_error(ls, "receive_request_reply %x error %d",
4474                           lkb->lkb_id, result);
4475         }
4476 
4477         if ((result == 0 || result == -EINPROGRESS) &&
4478             test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
4479                 log_debug(ls, "receive_request_reply %x result %d unlock",
4480                           lkb->lkb_id, result);
4481                 clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4482                 send_unlock(r, lkb);
4483         } else if ((result == -EINPROGRESS) &&
4484                    test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
4485                                       &lkb->lkb_iflags)) {
4486                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4487                 clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4488                 send_cancel(r, lkb);
4489         } else {
4490                 clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4491                 clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4492         }
4493  out:
4494         unlock_rsb(r);
4495         put_rsb(r);
4496         dlm_put_lkb(lkb);
4497         return 0;
4498 }
4499 
4500 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4501                                     const struct dlm_message *ms, bool local)
4502 {
4503         /* this is the value returned from do_convert() on the master */
4504         switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4505         case -EAGAIN:
4506                 /* convert would block (be queued) on remote master */
4507                 queue_cast(r, lkb, -EAGAIN);
4508                 break;
4509 
4510         case -EDEADLK:
4511                 receive_flags_reply(lkb, ms, local);
4512                 revert_lock_pc(r, lkb);
4513                 queue_cast(r, lkb, -EDEADLK);
4514                 break;
4515 
4516         case -EINPROGRESS:
4517                 /* convert was queued on remote master */
4518                 receive_flags_reply(lkb, ms, local);
4519                 if (is_demoted(lkb))
4520                         munge_demoted(lkb);
4521                 del_lkb(r, lkb);
4522                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4523                 break;
4524 
4525         case 0:
4526                 /* convert was granted on remote master */
4527                 receive_flags_reply(lkb, ms, local);
4528                 if (is_demoted(lkb))
4529                         munge_demoted(lkb);
4530                 grant_lock_pc(r, lkb, ms);
4531                 queue_cast(r, lkb, 0);
4532                 break;
4533 
4534         default:
4535                 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4536                           lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4537                           le32_to_cpu(ms->m_lkid),
4538                           from_dlm_errno(le32_to_cpu(ms->m_result)));
4539                 dlm_print_rsb(r);
4540                 dlm_print_lkb(lkb);
4541         }
4542 }
4543 
4544 static void _receive_convert_reply(struct dlm_lkb *lkb,
4545                                    const struct dlm_message *ms, bool local)
4546 {
4547         struct dlm_rsb *r = lkb->lkb_resource;
4548         int error;
4549 
4550         hold_rsb(r);
4551         lock_rsb(r);
4552 
4553         error = validate_message(lkb, ms);
4554         if (error)
4555                 goto out;
4556 
4557         error = remove_from_waiters_ms(lkb, ms, local);
4558         if (error)
4559                 goto out;
4560 
4561         __receive_convert_reply(r, lkb, ms, local);
4562  out:
4563         unlock_rsb(r);
4564         put_rsb(r);
4565 }
4566 
4567 static int receive_convert_reply(struct dlm_ls *ls,
4568                                  const struct dlm_message *ms)
4569 {
4570         struct dlm_lkb *lkb;
4571         int error;
4572 
4573         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4574         if (error)
4575                 return error;
4576 
4577         _receive_convert_reply(lkb, ms, false);
4578         dlm_put_lkb(lkb);
4579         return 0;
4580 }
4581 
4582 static void _receive_unlock_reply(struct dlm_lkb *lkb,
4583                                   const struct dlm_message *ms, bool local)
4584 {
4585         struct dlm_rsb *r = lkb->lkb_resource;
4586         int error;
4587 
4588         hold_rsb(r);
4589         lock_rsb(r);
4590 
4591         error = validate_message(lkb, ms);
4592         if (error)
4593                 goto out;
4594 
4595         error = remove_from_waiters_ms(lkb, ms, local);
4596         if (error)
4597                 goto out;
4598 
4599         /* this is the value returned from do_unlock() on the master */
4600 
4601         switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4602         case -DLM_EUNLOCK:
4603                 receive_flags_reply(lkb, ms, local);
4604                 remove_lock_pc(r, lkb);
4605                 queue_cast(r, lkb, -DLM_EUNLOCK);
4606                 break;
4607         case -ENOENT:
4608                 break;
4609         default:
4610                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
4611                           lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result)));
4612         }
4613  out:
4614         unlock_rsb(r);
4615         put_rsb(r);
4616 }
4617 
4618 static int receive_unlock_reply(struct dlm_ls *ls,
4619                                 const struct dlm_message *ms)
4620 {
4621         struct dlm_lkb *lkb;
4622         int error;
4623 
4624         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4625         if (error)
4626                 return error;
4627 
4628         _receive_unlock_reply(lkb, ms, false);
4629         dlm_put_lkb(lkb);
4630         return 0;
4631 }
4632 
4633 static void _receive_cancel_reply(struct dlm_lkb *lkb,
4634                                   const struct dlm_message *ms, bool local)
4635 {
4636         struct dlm_rsb *r = lkb->lkb_resource;
4637         int error;
4638 
4639         hold_rsb(r);
4640         lock_rsb(r);
4641 
4642         error = validate_message(lkb, ms);
4643         if (error)
4644                 goto out;
4645 
4646         error = remove_from_waiters_ms(lkb, ms, local);
4647         if (error)
4648                 goto out;
4649 
4650         /* this is the value returned from do_cancel() on the master */
4651 
4652         switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4653         case -DLM_ECANCEL:
4654                 receive_flags_reply(lkb, ms, local);
4655                 revert_lock_pc(r, lkb);
4656                 queue_cast(r, lkb, -DLM_ECANCEL);
4657                 break;
4658         case 0:
4659                 break;
4660         default:
4661                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
4662                           lkb->lkb_id,
4663                           from_dlm_errno(le32_to_cpu(ms->m_result)));
4664         }
4665  out:
4666         unlock_rsb(r);
4667         put_rsb(r);
4668 }
4669 
4670 static int receive_cancel_reply(struct dlm_ls *ls,
4671                                 const struct dlm_message *ms)
4672 {
4673         struct dlm_lkb *lkb;
4674         int error;
4675 
4676         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4677         if (error)
4678                 return error;
4679 
4680         _receive_cancel_reply(lkb, ms, false);
4681         dlm_put_lkb(lkb);
4682         return 0;
4683 }
4684 
4685 static void receive_lookup_reply(struct dlm_ls *ls,
4686                                  const struct dlm_message *ms)
4687 {
4688         struct dlm_lkb *lkb;
4689         struct dlm_rsb *r;
4690         int error, ret_nodeid;
4691         int do_lookup_list = 0;
4692 
4693         error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb);
4694         if (error) {
4695                 log_error(ls, "%s no lkid %x", __func__,
4696                           le32_to_cpu(ms->m_lkid));
4697                 return;
4698         }
4699 
4700         /* ms->m_result is the value returned by dlm_master_lookup on dir node
4701            FIXME: will a non-zero error ever be returned? */
4702 
4703         r = lkb->lkb_resource;
4704         hold_rsb(r);
4705         lock_rsb(r);
4706 
4707         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4708         if (error)
4709                 goto out;
4710 
4711         ret_nodeid = le32_to_cpu(ms->m_nodeid);
4712 
4713         /* We sometimes receive a request from the dir node for this
4714            rsb before we've received the dir node's loookup_reply for it.
4715            The request from the dir node implies we're the master, so we set
4716            ourself as master in receive_request_reply, and verify here that
4717            we are indeed the master. */
4718 
4719         if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4720                 /* This should never happen */
4721                 log_error(ls, "receive_lookup_reply %x from %d ret %d "
4722                           "master %d dir %d our %d first %x %s",
4723                           lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4724                           ret_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
4725                           dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4726         }
4727 
4728         if (ret_nodeid == dlm_our_nodeid()) {
4729                 r->res_master_nodeid = ret_nodeid;
4730                 r->res_nodeid = 0;
4731                 do_lookup_list = 1;
4732                 r->res_first_lkid = 0;
4733         } else if (ret_nodeid == -1) {
4734                 /* the remote node doesn't believe it's the dir node */
4735                 log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4736                           lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid));
4737                 r->res_master_nodeid = 0;
4738                 r->res_nodeid = -1;
4739                 lkb->lkb_nodeid = -1;
4740         } else {
4741                 /* set_master() will set lkb_nodeid from r */
4742                 r->res_master_nodeid = ret_nodeid;
4743                 r->res_nodeid = ret_nodeid;
4744         }
4745 
4746         if (is_overlap(lkb)) {
4747                 log_debug(ls, "receive_lookup_reply %x unlock %x",
4748                           lkb->lkb_id, dlm_iflags_val(lkb));
4749                 queue_cast_overlap(r, lkb);
4750                 unhold_lkb(lkb); /* undoes create_lkb() */
4751                 goto out_list;
4752         }
4753 
4754         _request_lock(r, lkb);
4755 
4756  out_list:
4757         if (do_lookup_list)
4758                 process_lookup_list(r);
4759  out:
4760         unlock_rsb(r);
4761         put_rsb(r);
4762         dlm_put_lkb(lkb);
4763 }
4764 
4765 static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4766                              uint32_t saved_seq)
4767 {
4768         int error = 0, noent = 0;
4769 
4770         if (WARN_ON_ONCE(!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid)))) {
4771                 log_limit(ls, "receive %d from non-member %d %x %x %d",
4772                           le32_to_cpu(ms->m_type),
4773                           le32_to_cpu(ms->m_header.h_nodeid),
4774                           le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4775                           from_dlm_errno(le32_to_cpu(ms->m_result)));
4776                 return;
4777         }
4778 
4779         switch (ms->m_type) {
4780 
4781         /* messages sent to a master node */
4782 
4783         case cpu_to_le32(DLM_MSG_REQUEST):
4784                 error = receive_request(ls, ms);
4785                 break;
4786 
4787         case cpu_to_le32(DLM_MSG_CONVERT):
4788                 error = receive_convert(ls, ms);
4789                 break;
4790 
4791         case cpu_to_le32(DLM_MSG_UNLOCK):
4792                 error = receive_unlock(ls, ms);
4793                 break;
4794 
4795         case cpu_to_le32(DLM_MSG_CANCEL):
4796                 noent = 1;
4797                 error = receive_cancel(ls, ms);
4798                 break;
4799 
4800         /* messages sent from a master node (replies to above) */
4801 
4802         case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
4803                 error = receive_request_reply(ls, ms);
4804                 break;
4805 
4806         case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
4807                 error = receive_convert_reply(ls, ms);
4808                 break;
4809 
4810         case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
4811                 error = receive_unlock_reply(ls, ms);
4812                 break;
4813 
4814         case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
4815                 error = receive_cancel_reply(ls, ms);
4816                 break;
4817 
4818         /* messages sent from a master node (only two types of async msg) */
4819 
4820         case cpu_to_le32(DLM_MSG_GRANT):
4821                 noent = 1;
4822                 error = receive_grant(ls, ms);
4823                 break;
4824 
4825         case cpu_to_le32(DLM_MSG_BAST):
4826                 noent = 1;
4827                 error = receive_bast(ls, ms);
4828                 break;
4829 
4830         /* messages sent to a dir node */
4831 
4832         case cpu_to_le32(DLM_MSG_LOOKUP):
4833                 receive_lookup(ls, ms);
4834                 break;
4835 
4836         case cpu_to_le32(DLM_MSG_REMOVE):
4837                 receive_remove(ls, ms);
4838                 break;
4839 
4840         /* messages sent from a dir node (remove has no reply) */
4841 
4842         case cpu_to_le32(DLM_MSG_LOOKUP_REPLY):
4843                 receive_lookup_reply(ls, ms);
4844                 break;
4845 
4846         /* other messages */
4847 
4848         case cpu_to_le32(DLM_MSG_PURGE):
4849                 receive_purge(ls, ms);
4850                 break;
4851 
4852         default:
4853                 log_error(ls, "unknown message type %d",
4854                           le32_to_cpu(ms->m_type));
4855         }
4856 
4857         /*
4858          * When checking for ENOENT, we're checking the result of
4859          * find_lkb(m_remid):
4860          *
4861          * The lock id referenced in the message wasn't found.  This may
4862          * happen in normal usage for the async messages and cancel, so
4863          * only use log_debug for them.
4864          *
4865          * Some errors are expected and normal.
4866          */
4867 
4868         if (error == -ENOENT && noent) {
4869                 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4870                           le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4871                           le32_to_cpu(ms->m_header.h_nodeid),
4872                           le32_to_cpu(ms->m_lkid), saved_seq);
4873         } else if (error == -ENOENT) {
4874                 log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4875                           le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4876                           le32_to_cpu(ms->m_header.h_nodeid),
4877                           le32_to_cpu(ms->m_lkid), saved_seq);
4878 
4879                 if (ms->m_type == cpu_to_le32(DLM_MSG_CONVERT))
4880                         dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash));
4881         }
4882 
4883         if (error == -EINVAL) {
4884                 log_error(ls, "receive %d inval from %d lkid %x remid %x "
4885                           "saved_seq %u",
4886                           le32_to_cpu(ms->m_type),
4887                           le32_to_cpu(ms->m_header.h_nodeid),
4888                           le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4889                           saved_seq);
4890         }
4891 }
4892 
4893 /* If the lockspace is in recovery mode (locking stopped), then normal
4894    messages are saved on the requestqueue for processing after recovery is
4895    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4896    messages off the requestqueue before we process new ones. This occurs right
4897    after recovery completes when we transition from saving all messages on
4898    requestqueue, to processing all the saved messages, to processing new
4899    messages as they arrive. */
4900 
4901 static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4902                                 int nodeid)
4903 {
4904 try_again:
4905         read_lock_bh(&ls->ls_requestqueue_lock);
4906         if (test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4907                 /* If we were a member of this lockspace, left, and rejoined,
4908                    other nodes may still be sending us messages from the
4909                    lockspace generation before we left. */
4910                 if (WARN_ON_ONCE(!ls->ls_generation)) {
4911                         read_unlock_bh(&ls->ls_requestqueue_lock);
4912                         log_limit(ls, "receive %d from %d ignore old gen",
4913                                   le32_to_cpu(ms->m_type), nodeid);
4914                         return;
4915                 }
4916 
4917                 read_unlock_bh(&ls->ls_requestqueue_lock);
4918                 write_lock_bh(&ls->ls_requestqueue_lock);
4919                 /* recheck because we hold writelock now */
4920                 if (!test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4921                         write_unlock_bh(&ls->ls_requestqueue_lock);
4922                         goto try_again;
4923                 }
4924 
4925                 dlm_add_requestqueue(ls, nodeid, ms);
4926                 write_unlock_bh(&ls->ls_requestqueue_lock);
4927         } else {
4928                 _receive_message(ls, ms, 0);
4929                 read_unlock_bh(&ls->ls_requestqueue_lock);
4930         }
4931 }
4932 
4933 /* This is called by dlm_recoverd to process messages that were saved on
4934    the requestqueue. */
4935 
4936 void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms,
4937                                uint32_t saved_seq)
4938 {
4939         _receive_message(ls, ms, saved_seq);
4940 }
4941 
4942 /* This is called by the midcomms layer when something is received for
4943    the lockspace.  It could be either a MSG (normal message sent as part of
4944    standard locking activity) or an RCOM (recovery message sent as part of
4945    lockspace recovery). */
4946 
4947 void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
4948 {
4949         const struct dlm_header *hd = &p->header;
4950         struct dlm_ls *ls;
4951         int type = 0;
4952 
4953         switch (hd->h_cmd) {
4954         case DLM_MSG:
4955                 type = le32_to_cpu(p->message.m_type);
4956                 break;
4957         case DLM_RCOM:
4958                 type = le32_to_cpu(p->rcom.rc_type);
4959                 break;
4960         default:
4961                 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4962                 return;
4963         }
4964 
4965         if (le32_to_cpu(hd->h_nodeid) != nodeid) {
4966                 log_print("invalid h_nodeid %d from %d lockspace %x",
4967                           le32_to_cpu(hd->h_nodeid), nodeid,
4968                           le32_to_cpu(hd->u.h_lockspace));
4969                 return;
4970         }
4971 
4972         ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace));
4973         if (!ls) {
4974                 if (dlm_config.ci_log_debug) {
4975                         printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
4976                                 "%u from %d cmd %d type %d\n",
4977                                 le32_to_cpu(hd->u.h_lockspace), nodeid,
4978                                 hd->h_cmd, type);
4979                 }
4980 
4981                 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4982                         dlm_send_ls_not_ready(nodeid, &p->rcom);
4983                 return;
4984         }
4985 
4986         /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4987            be inactive (in this ls) before transitioning to recovery mode */
4988 
4989         read_lock_bh(&ls->ls_recv_active);
4990         if (hd->h_cmd == DLM_MSG)
4991                 dlm_receive_message(ls, &p->message, nodeid);
4992         else if (hd->h_cmd == DLM_RCOM)
4993                 dlm_receive_rcom(ls, &p->rcom, nodeid);
4994         else
4995                 log_error(ls, "invalid h_cmd %d from %d lockspace %x",
4996                           hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace));
4997         read_unlock_bh(&ls->ls_recv_active);
4998 
4999         dlm_put_lockspace(ls);
5000 }
5001 
5002 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
5003                                    struct dlm_message *ms_local)
5004 {
5005         if (middle_conversion(lkb)) {
5006                 hold_lkb(lkb);
5007                 memset(ms_local, 0, sizeof(struct dlm_message));
5008                 ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
5009                 ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
5010                 ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5011                 _receive_convert_reply(lkb, ms_local, true);
5012 
5013                 /* Same special case as in receive_rcom_lock_args() */
5014                 lkb->lkb_grmode = DLM_LOCK_IV;
5015                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
5016                 unhold_lkb(lkb);
5017 
5018         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
5019                 set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5020         }
5021 
5022         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
5023            conversions are async; there's no reply from the remote master */
5024 }
5025 
5026 /* A waiting lkb needs recovery if the master node has failed, or
5027    the master node is changing (only when no directory is used) */
5028 
5029 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5030                                  int dir_nodeid)
5031 {
5032         if (dlm_no_directory(ls))
5033                 return 1;
5034 
5035         if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5036                 return 1;
5037 
5038         return 0;
5039 }
5040 
5041 /* Recovery for locks that are waiting for replies from nodes that are now
5042    gone.  We can just complete unlocks and cancels by faking a reply from the
5043    dead node.  Requests and up-conversions we flag to be resent after
5044    recovery.  Down-conversions can just be completed with a fake reply like
5045    unlocks.  Conversions between PR and CW need special attention. */
5046 
5047 void dlm_recover_waiters_pre(struct dlm_ls *ls)
5048 {
5049         struct dlm_lkb *lkb, *safe;
5050         struct dlm_message *ms_local;
5051         int wait_type, local_unlock_result, local_cancel_result;
5052         int dir_nodeid;
5053 
5054         ms_local = kmalloc(sizeof(*ms_local), GFP_KERNEL);
5055         if (!ms_local)
5056                 return;
5057 
5058         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5059 
5060                 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5061 
5062                 /* exclude debug messages about unlocks because there can be so
5063                    many and they aren't very interesting */
5064 
5065                 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
5066                         log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5067                                   "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5068                                   lkb->lkb_id,
5069                                   lkb->lkb_remid,
5070                                   lkb->lkb_wait_type,
5071                                   lkb->lkb_resource->res_nodeid,
5072                                   lkb->lkb_nodeid,
5073                                   lkb->lkb_wait_nodeid,
5074                                   dir_nodeid);
5075                 }
5076 
5077                 /* all outstanding lookups, regardless of destination  will be
5078                    resent after recovery is done */
5079 
5080                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5081                         set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5082                         continue;
5083                 }
5084 
5085                 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5086                         continue;
5087 
5088                 wait_type = lkb->lkb_wait_type;
5089                 local_unlock_result = -DLM_EUNLOCK;
5090                 local_cancel_result = -DLM_ECANCEL;
5091 
5092                 /* Main reply may have been received leaving a zero wait_type,
5093                    but a reply for the overlapping op may not have been
5094                    received.  In that case we need to fake the appropriate
5095                    reply for the overlap op. */
5096 
5097                 if (!wait_type) {
5098                         if (is_overlap_cancel(lkb)) {
5099                                 wait_type = DLM_MSG_CANCEL;
5100                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
5101                                         local_cancel_result = 0;
5102                         }
5103                         if (is_overlap_unlock(lkb)) {
5104                                 wait_type = DLM_MSG_UNLOCK;
5105                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
5106                                         local_unlock_result = -ENOENT;
5107                         }
5108 
5109                         log_debug(ls, "rwpre overlap %x %x %d %d %d",
5110                                   lkb->lkb_id, dlm_iflags_val(lkb), wait_type,
5111                                   local_cancel_result, local_unlock_result);
5112                 }
5113 
5114                 switch (wait_type) {
5115 
5116                 case DLM_MSG_REQUEST:
5117                         set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5118                         break;
5119 
5120                 case DLM_MSG_CONVERT:
5121                         recover_convert_waiter(ls, lkb, ms_local);
5122                         break;
5123 
5124                 case DLM_MSG_UNLOCK:
5125                         hold_lkb(lkb);
5126                         memset(ms_local, 0, sizeof(struct dlm_message));
5127                         ms_local->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
5128                         ms_local->m_result = cpu_to_le32(to_dlm_errno(local_unlock_result));
5129                         ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5130                         _receive_unlock_reply(lkb, ms_local, true);
5131                         dlm_put_lkb(lkb);
5132                         break;
5133 
5134                 case DLM_MSG_CANCEL:
5135                         hold_lkb(lkb);
5136                         memset(ms_local, 0, sizeof(struct dlm_message));
5137                         ms_local->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
5138                         ms_local->m_result = cpu_to_le32(to_dlm_errno(local_cancel_result));
5139                         ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5140                         _receive_cancel_reply(lkb, ms_local, true);
5141                         dlm_put_lkb(lkb);
5142                         break;
5143 
5144                 default:
5145                         log_error(ls, "invalid lkb wait_type %d %d",
5146                                   lkb->lkb_wait_type, wait_type);
5147                 }
5148                 schedule();
5149         }
5150         kfree(ms_local);
5151 }
5152 
5153 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5154 {
5155         struct dlm_lkb *lkb = NULL, *iter;
5156 
5157         spin_lock_bh(&ls->ls_waiters_lock);
5158         list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
5159                 if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) {
5160                         hold_lkb(iter);
5161                         lkb = iter;
5162                         break;
5163                 }
5164         }
5165         spin_unlock_bh(&ls->ls_waiters_lock);
5166 
5167         return lkb;
5168 }
5169 
5170 /*
5171  * Forced state reset for locks that were in the middle of remote operations
5172  * when recovery happened (i.e. lkbs that were on the waiters list, waiting
5173  * for a reply from a remote operation.)  The lkbs remaining on the waiters
5174  * list need to be reevaluated; some may need resending to a different node
5175  * than previously, and some may now need local handling rather than remote.
5176  *
5177  * First, the lkb state for the voided remote operation is forcibly reset,
5178  * equivalent to what remove_from_waiters() would normally do:
5179  * . lkb removed from ls_waiters list
5180  * . lkb wait_type cleared
5181  * . lkb waiters_count cleared
5182  * . lkb ref count decremented for each waiters_count (almost always 1,
5183  *   but possibly 2 in case of cancel/unlock overlapping, which means
5184  *   two remote replies were being expected for the lkb.)
5185  *
5186  * Second, the lkb is reprocessed like an original operation would be,
5187  * by passing it to _request_lock or _convert_lock, which will either
5188  * process the lkb operation locally, or send it to a remote node again
5189  * and put the lkb back onto the waiters list.
5190  *
5191  * When reprocessing the lkb, we may find that it's flagged for an overlapping
5192  * force-unlock or cancel, either from before recovery began, or after recovery
5193  * finished.  If this is the case, the unlock/cancel is done directly, and the
5194  * original operation is not initiated again (no _request_lock/_convert_lock.)
5195  */
5196 
5197 int dlm_recover_waiters_post(struct dlm_ls *ls)
5198 {
5199         struct dlm_lkb *lkb;
5200         struct dlm_rsb *r;
5201         int error = 0, mstype, err, oc, ou;
5202 
5203         while (1) {
5204                 if (dlm_locking_stopped(ls)) {
5205                         log_debug(ls, "recover_waiters_post aborted");
5206                         error = -EINTR;
5207                         break;
5208                 }
5209 
5210                 /* 
5211                  * Find an lkb from the waiters list that's been affected by
5212                  * recovery node changes, and needs to be reprocessed.  Does
5213                  * hold_lkb(), adding a refcount.
5214                  */
5215                 lkb = find_resend_waiter(ls);
5216                 if (!lkb)
5217                         break;
5218 
5219                 r = lkb->lkb_resource;
5220                 hold_rsb(r);
5221                 lock_rsb(r);
5222 
5223                 /*
5224                  * If the lkb has been flagged for a force unlock or cancel,
5225                  * then the reprocessing below will be replaced by just doing
5226                  * the unlock/cancel directly.
5227                  */
5228                 mstype = lkb->lkb_wait_type;
5229                 oc = test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
5230                                         &lkb->lkb_iflags);
5231                 ou = test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT,
5232                                         &lkb->lkb_iflags);
5233                 err = 0;
5234 
5235                 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5236                           "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5237                           "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5238                           r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5239                           dlm_dir_nodeid(r), oc, ou);
5240 
5241                 /*
5242                  * No reply to the pre-recovery operation will now be received,
5243                  * so a forced equivalent of remove_from_waiters() is needed to
5244                  * reset the waiters state that was in place before recovery.
5245                  */
5246 
5247                 clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5248 
5249                 /* Forcibly clear wait_type */
5250                 lkb->lkb_wait_type = 0;
5251 
5252                 /*
5253                  * Forcibly reset wait_count and associated refcount.  The
5254                  * wait_count will almost always be 1, but in case of an
5255                  * overlapping unlock/cancel it could be 2: see where
5256                  * add_to_waiters() finds the lkb is already on the waiters
5257                  * list and does lkb_wait_count++; hold_lkb().
5258                  */
5259                 while (lkb->lkb_wait_count) {
5260                         lkb->lkb_wait_count--;
5261                         unhold_lkb(lkb);
5262                 }
5263 
5264                 /* Forcibly remove from waiters list */
5265                 spin_lock_bh(&ls->ls_waiters_lock);
5266                 list_del_init(&lkb->lkb_wait_reply);
5267                 spin_unlock_bh(&ls->ls_waiters_lock);
5268 
5269                 /*
5270                  * The lkb is now clear of all prior waiters state and can be
5271                  * processed locally, or sent to remote node again, or directly
5272                  * cancelled/unlocked.
5273                  */
5274 
5275                 if (oc || ou) {
5276                         /* do an unlock or cancel instead of resending */
5277                         switch (mstype) {
5278                         case DLM_MSG_LOOKUP:
5279                         case DLM_MSG_REQUEST:
5280                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5281                                                         -DLM_ECANCEL);
5282                                 unhold_lkb(lkb); /* undoes create_lkb() */
5283                                 break;
5284                         case DLM_MSG_CONVERT:
5285                                 if (oc) {
5286                                         queue_cast(r, lkb, -DLM_ECANCEL);
5287                                 } else {
5288                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5289                                         _unlock_lock(r, lkb);
5290                                 }
5291                                 break;
5292                         default:
5293                                 err = 1;
5294                         }
5295                 } else {
5296                         switch (mstype) {
5297                         case DLM_MSG_LOOKUP:
5298                         case DLM_MSG_REQUEST:
5299                                 _request_lock(r, lkb);
5300                                 if (is_master(r))
5301                                         confirm_master(r, 0);
5302                                 break;
5303                         case DLM_MSG_CONVERT:
5304                                 _convert_lock(r, lkb);
5305                                 break;
5306                         default:
5307                                 err = 1;
5308                         }
5309                 }
5310 
5311                 if (err) {
5312                         log_error(ls, "waiter %x msg %d r_nodeid %d "
5313                                   "dir_nodeid %d overlap %d %d",
5314                                   lkb->lkb_id, mstype, r->res_nodeid,
5315                                   dlm_dir_nodeid(r), oc, ou);
5316                 }
5317                 unlock_rsb(r);
5318                 put_rsb(r);
5319                 dlm_put_lkb(lkb);
5320         }
5321 
5322         return error;
5323 }
5324 
5325 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5326                               struct list_head *list)
5327 {
5328         struct dlm_lkb *lkb, *safe;
5329 
5330         list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5331                 if (!is_master_copy(lkb))
5332                         continue;
5333 
5334                 /* don't purge lkbs we've added in recover_master_copy for
5335                    the current recovery seq */
5336 
5337                 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5338                         continue;
5339 
5340                 del_lkb(r, lkb);
5341 
5342                 /* this put should free the lkb */
5343                 if (!dlm_put_lkb(lkb))
5344                         log_error(ls, "purged mstcpy lkb not released");
5345         }
5346 }
5347 
5348 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5349 {
5350         struct dlm_ls *ls = r->res_ls;
5351 
5352         purge_mstcpy_list(ls, r, &r->res_grantqueue);
5353         purge_mstcpy_list(ls, r, &r->res_convertqueue);
5354         purge_mstcpy_list(ls, r, &r->res_waitqueue);
5355 }
5356 
5357 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5358                             struct list_head *list,
5359                             int nodeid_gone, unsigned int *count)
5360 {
5361         struct dlm_lkb *lkb, *safe;
5362 
5363         list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5364                 if (!is_master_copy(lkb))
5365                         continue;
5366 
5367                 if ((lkb->lkb_nodeid == nodeid_gone) ||
5368                     dlm_is_removed(ls, lkb->lkb_nodeid)) {
5369 
5370                         /* tell recover_lvb to invalidate the lvb
5371                            because a node holding EX/PW failed */
5372                         if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5373                             (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5374                                 rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
5375                         }
5376 
5377                         del_lkb(r, lkb);
5378 
5379                         /* this put should free the lkb */
5380                         if (!dlm_put_lkb(lkb))
5381                                 log_error(ls, "purged dead lkb not released");
5382 
5383                         rsb_set_flag(r, RSB_RECOVER_GRANT);
5384 
5385                         (*count)++;
5386                 }
5387         }
5388 }
5389 
5390 /* Get rid of locks held by nodes that are gone. */
5391 
5392 void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list)
5393 {
5394         struct dlm_rsb *r;
5395         struct dlm_member *memb;
5396         int nodes_count = 0;
5397         int nodeid_gone = 0;
5398         unsigned int lkb_count = 0;
5399 
5400         /* cache one removed nodeid to optimize the common
5401            case of a single node removed */
5402 
5403         list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5404                 nodes_count++;
5405                 nodeid_gone = memb->nodeid;
5406         }
5407 
5408         if (!nodes_count)
5409                 return;
5410 
5411         list_for_each_entry(r, root_list, res_root_list) {
5412                 hold_rsb(r);
5413                 lock_rsb(r);
5414                 if (is_master(r)) {
5415                         purge_dead_list(ls, r, &r->res_grantqueue,
5416                                         nodeid_gone, &lkb_count);
5417                         purge_dead_list(ls, r, &r->res_convertqueue,
5418                                         nodeid_gone, &lkb_count);
5419                         purge_dead_list(ls, r, &r->res_waitqueue,
5420                                         nodeid_gone, &lkb_count);
5421                 }
5422                 unlock_rsb(r);
5423                 unhold_rsb(r);
5424                 cond_resched();
5425         }
5426 
5427         if (lkb_count)
5428                 log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
5429                           lkb_count, nodes_count);
5430 }
5431 
5432 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls)
5433 {
5434         struct dlm_rsb *r;
5435 
5436         read_lock_bh(&ls->ls_rsbtbl_lock);
5437         list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
5438                 if (!rsb_flag(r, RSB_RECOVER_GRANT))
5439                         continue;
5440                 if (!is_master(r)) {
5441                         rsb_clear_flag(r, RSB_RECOVER_GRANT);
5442                         continue;
5443                 }
5444                 hold_rsb(r);
5445                 read_unlock_bh(&ls->ls_rsbtbl_lock);
5446                 return r;
5447         }
5448         read_unlock_bh(&ls->ls_rsbtbl_lock);
5449         return NULL;
5450 }
5451 
5452 /*
5453  * Attempt to grant locks on resources that we are the master of.
5454  * Locks may have become grantable during recovery because locks
5455  * from departed nodes have been purged (or not rebuilt), allowing
5456  * previously blocked locks to now be granted.  The subset of rsb's
5457  * we are interested in are those with lkb's on either the convert or
5458  * waiting queues.
5459  *
5460  * Simplest would be to go through each master rsb and check for non-empty
5461  * convert or waiting queues, and attempt to grant on those rsbs.
5462  * Checking the queues requires lock_rsb, though, for which we'd need
5463  * to release the rsbtbl lock.  This would make iterating through all
5464  * rsb's very inefficient.  So, we rely on earlier recovery routines
5465  * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5466  * locks for.
5467  */
5468 
5469 void dlm_recover_grant(struct dlm_ls *ls)
5470 {
5471         struct dlm_rsb *r;
5472         unsigned int count = 0;
5473         unsigned int rsb_count = 0;
5474         unsigned int lkb_count = 0;
5475 
5476         while (1) {
5477                 r = find_grant_rsb(ls);
5478                 if (!r)
5479                         break;
5480 
5481                 rsb_count++;
5482                 count = 0;
5483                 lock_rsb(r);
5484                 /* the RECOVER_GRANT flag is checked in the grant path */
5485                 grant_pending_locks(r, &count);
5486                 rsb_clear_flag(r, RSB_RECOVER_GRANT);
5487                 lkb_count += count;
5488                 confirm_master(r, 0);
5489                 unlock_rsb(r);
5490                 put_rsb(r);
5491                 cond_resched();
5492         }
5493 
5494         if (lkb_count)
5495                 log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
5496                           lkb_count, rsb_count);
5497 }
5498 
5499 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5500                                          uint32_t remid)
5501 {
5502         struct dlm_lkb *lkb;
5503 
5504         list_for_each_entry(lkb, head, lkb_statequeue) {
5505                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5506                         return lkb;
5507         }
5508         return NULL;
5509 }
5510 
5511 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5512                                     uint32_t remid)
5513 {
5514         struct dlm_lkb *lkb;
5515 
5516         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5517         if (lkb)
5518                 return lkb;
5519         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5520         if (lkb)
5521                 return lkb;
5522         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5523         if (lkb)
5524                 return lkb;
5525         return NULL;
5526 }
5527 
5528 /* needs at least dlm_rcom + rcom_lock */
5529 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5530                                   struct dlm_rsb *r, const struct dlm_rcom *rc)
5531 {
5532         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5533 
5534         lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5535         lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5536         lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5537         lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5538         dlm_set_dflags_val(lkb, le32_to_cpu(rl->rl_flags));
5539         set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
5540         lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5541         lkb->lkb_rqmode = rl->rl_rqmode;
5542         lkb->lkb_grmode = rl->rl_grmode;
5543         /* don't set lkb_status because add_lkb wants to itself */
5544 
5545         lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5546         lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5547 
5548         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5549                 int lvblen = le16_to_cpu(rc->rc_header.h_length) -
5550                         sizeof(struct dlm_rcom) - sizeof(struct rcom_lock);
5551                 if (lvblen > ls->ls_lvblen)
5552                         return -EINVAL;
5553                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5554                 if (!lkb->lkb_lvbptr)
5555                         return -ENOMEM;
5556                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5557         }
5558 
5559         /* Conversions between PR and CW (middle modes) need special handling.
5560            The real granted mode of these converting locks cannot be determined
5561            until all locks have been rebuilt on the rsb (recover_conversion) */
5562 
5563         if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
5564             middle_conversion(lkb)) {
5565                 rl->rl_status = DLM_LKSTS_CONVERT;
5566                 lkb->lkb_grmode = DLM_LOCK_IV;
5567                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
5568         }
5569 
5570         return 0;
5571 }
5572 
5573 /* This lkb may have been recovered in a previous aborted recovery so we need
5574    to check if the rsb already has an lkb with the given remote nodeid/lkid.
5575    If so we just send back a standard reply.  If not, we create a new lkb with
5576    the given values and send back our lkid.  We send back our lkid by sending
5577    back the rcom_lock struct we got but with the remid field filled in. */
5578 
5579 /* needs at least dlm_rcom + rcom_lock */
5580 int dlm_recover_master_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5581                             __le32 *rl_remid, __le32 *rl_result)
5582 {
5583         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5584         struct dlm_rsb *r;
5585         struct dlm_lkb *lkb;
5586         uint32_t remid = 0;
5587         int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5588         int error;
5589 
5590         /* init rl_remid with rcom lock rl_remid */
5591         *rl_remid = rl->rl_remid;
5592 
5593         if (rl->rl_parent_lkid) {
5594                 error = -EOPNOTSUPP;
5595                 goto out;
5596         }
5597 
5598         remid = le32_to_cpu(rl->rl_lkid);
5599 
5600         /* In general we expect the rsb returned to be R_MASTER, but we don't
5601            have to require it.  Recovery of masters on one node can overlap
5602            recovery of locks on another node, so one node can send us MSTCPY
5603            locks before we've made ourselves master of this rsb.  We can still
5604            add new MSTCPY locks that we receive here without any harm; when
5605            we make ourselves master, dlm_recover_masters() won't touch the
5606            MSTCPY locks we've received early. */
5607 
5608         error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5609                          from_nodeid, R_RECEIVE_RECOVER, &r);
5610         if (error)
5611                 goto out;
5612 
5613         lock_rsb(r);
5614 
5615         if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5616                 log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5617                           from_nodeid, remid);
5618                 error = -EBADR;
5619                 goto out_unlock;
5620         }
5621 
5622         lkb = search_remid(r, from_nodeid, remid);
5623         if (lkb) {
5624                 error = -EEXIST;
5625                 goto out_remid;
5626         }
5627 
5628         error = create_lkb(ls, &lkb);
5629         if (error)
5630                 goto out_unlock;
5631 
5632         error = receive_rcom_lock_args(ls, lkb, r, rc);
5633         if (error) {
5634                 __put_lkb(ls, lkb);
5635                 goto out_unlock;
5636         }
5637 
5638         attach_lkb(r, lkb);
5639         add_lkb(r, lkb, rl->rl_status);
5640         ls->ls_recover_locks_in++;
5641 
5642         if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5643                 rsb_set_flag(r, RSB_RECOVER_GRANT);
5644 
5645  out_remid:
5646         /* this is the new value returned to the lock holder for
5647            saving in its process-copy lkb */
5648         *rl_remid = cpu_to_le32(lkb->lkb_id);
5649 
5650         lkb->lkb_recover_seq = ls->ls_recover_seq;
5651 
5652  out_unlock:
5653         unlock_rsb(r);
5654         put_rsb(r);
5655  out:
5656         if (error && error != -EEXIST)
5657                 log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5658                           from_nodeid, remid, error);
5659         *rl_result = cpu_to_le32(error);
5660         return error;
5661 }
5662 
5663 /* needs at least dlm_rcom + rcom_lock */
5664 int dlm_recover_process_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5665                              uint64_t seq)
5666 {
5667         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5668         struct dlm_rsb *r;
5669         struct dlm_lkb *lkb;
5670         uint32_t lkid, remid;
5671         int error, result;
5672 
5673         lkid = le32_to_cpu(rl->rl_lkid);
5674         remid = le32_to_cpu(rl->rl_remid);
5675         result = le32_to_cpu(rl->rl_result);
5676 
5677         error = find_lkb(ls, lkid, &lkb);
5678         if (error) {
5679                 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5680                           lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5681                           result);
5682                 return error;
5683         }
5684 
5685         r = lkb->lkb_resource;
5686         hold_rsb(r);
5687         lock_rsb(r);
5688 
5689         if (!is_process_copy(lkb)) {
5690                 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5691                           lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5692                           result);
5693                 dlm_dump_rsb(r);
5694                 unlock_rsb(r);
5695                 put_rsb(r);
5696                 dlm_put_lkb(lkb);
5697                 return -EINVAL;
5698         }
5699 
5700         switch (result) {
5701         case -EBADR:
5702                 /* There's a chance the new master received our lock before
5703                    dlm_recover_master_reply(), this wouldn't happen if we did
5704                    a barrier between recover_masters and recover_locks. */
5705 
5706                 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5707                           lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5708                           result);
5709         
5710                 dlm_send_rcom_lock(r, lkb, seq);
5711                 goto out;
5712         case -EEXIST:
5713         case 0:
5714                 lkb->lkb_remid = remid;
5715                 break;
5716         default:
5717                 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5718                           lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5719                           result);
5720         }
5721 
5722         /* an ack for dlm_recover_locks() which waits for replies from
5723            all the locks it sends to new masters */
5724         dlm_recovered_lock(r);
5725  out:
5726         unlock_rsb(r);
5727         put_rsb(r);
5728         dlm_put_lkb(lkb);
5729 
5730         return 0;
5731 }
5732 
5733 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5734                      int mode, uint32_t flags, void *name, unsigned int namelen)
5735 {
5736         struct dlm_lkb *lkb;
5737         struct dlm_args args;
5738         bool do_put = true;
5739         int error;
5740 
5741         dlm_lock_recovery(ls);
5742 
5743         error = create_lkb(ls, &lkb);
5744         if (error) {
5745                 kfree(ua);
5746                 goto out;
5747         }
5748 
5749         trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
5750 
5751         if (flags & DLM_LKF_VALBLK) {
5752                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5753                 if (!ua->lksb.sb_lvbptr) {
5754                         kfree(ua);
5755                         error = -ENOMEM;
5756                         goto out_put;
5757                 }
5758         }
5759         error = set_lock_args(mode, &ua->lksb, flags, namelen, fake_astfn, ua,
5760                               fake_bastfn, &args);
5761         if (error) {
5762                 kfree(ua->lksb.sb_lvbptr);
5763                 ua->lksb.sb_lvbptr = NULL;
5764                 kfree(ua);
5765                 goto out_put;
5766         }
5767 
5768         /* After ua is attached to lkb it will be freed by dlm_free_lkb().
5769            When DLM_DFL_USER_BIT is set, the dlm knows that this is a userspace
5770            lock and that lkb_astparam is the dlm_user_args structure. */
5771         set_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags);
5772         error = request_lock(ls, lkb, name, namelen, &args);
5773 
5774         switch (error) {
5775         case 0:
5776                 break;
5777         case -EINPROGRESS:
5778                 error = 0;
5779                 break;
5780         case -EAGAIN:
5781                 error = 0;
5782                 fallthrough;
5783         default:
5784                 goto out_put;
5785         }
5786 
5787         /* add this new lkb to the per-process list of locks */
5788         spin_lock_bh(&ua->proc->locks_spin);
5789         hold_lkb(lkb);
5790         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5791         spin_unlock_bh(&ua->proc->locks_spin);
5792         do_put = false;
5793  out_put:
5794         trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false);
5795         if (do_put)
5796                 __put_lkb(ls, lkb);
5797  out:
5798         dlm_unlock_recovery(ls);
5799         return error;
5800 }
5801 
5802 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5803                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
5804 {
5805         struct dlm_lkb *lkb;
5806         struct dlm_args args;
5807         struct dlm_user_args *ua;
5808         int error;
5809 
5810         dlm_lock_recovery(ls);
5811 
5812         error = find_lkb(ls, lkid, &lkb);
5813         if (error)
5814                 goto out;
5815 
5816         trace_dlm_lock_start(ls, lkb, NULL, 0, mode, flags);
5817 
5818         /* user can change the params on its lock when it converts it, or
5819            add an lvb that didn't exist before */
5820 
5821         ua = lkb->lkb_ua;
5822 
5823         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5824                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5825                 if (!ua->lksb.sb_lvbptr) {
5826                         error = -ENOMEM;
5827                         goto out_put;
5828                 }
5829         }
5830         if (lvb_in && ua->lksb.sb_lvbptr)
5831                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5832 
5833         ua->xid = ua_tmp->xid;
5834         ua->castparam = ua_tmp->castparam;
5835         ua->castaddr = ua_tmp->castaddr;
5836         ua->bastparam = ua_tmp->bastparam;
5837         ua->bastaddr = ua_tmp->bastaddr;
5838         ua->user_lksb = ua_tmp->user_lksb;
5839 
5840         error = set_lock_args(mode, &ua->lksb, flags, 0, fake_astfn, ua,
5841                               fake_bastfn, &args);
5842         if (error)
5843                 goto out_put;
5844 
5845         error = convert_lock(ls, lkb, &args);
5846 
5847         if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5848                 error = 0;
5849  out_put:
5850         trace_dlm_lock_end(ls, lkb, NULL, 0, mode, flags, error, false);
5851         dlm_put_lkb(lkb);
5852  out:
5853         dlm_unlock_recovery(ls);
5854         kfree(ua_tmp);
5855         return error;
5856 }
5857 
5858 /*
5859  * The caller asks for an orphan lock on a given resource with a given mode.
5860  * If a matching lock exists, it's moved to the owner's list of locks and
5861  * the lkid is returned.
5862  */
5863 
5864 int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5865                      int mode, uint32_t flags, void *name, unsigned int namelen,
5866                      uint32_t *lkid)
5867 {
5868         struct dlm_lkb *lkb = NULL, *iter;
5869         struct dlm_user_args *ua;
5870         int found_other_mode = 0;
5871         int rv = 0;
5872 
5873         spin_lock_bh(&ls->ls_orphans_lock);
5874         list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) {
5875                 if (iter->lkb_resource->res_length != namelen)
5876                         continue;
5877                 if (memcmp(iter->lkb_resource->res_name, name, namelen))
5878                         continue;
5879                 if (iter->lkb_grmode != mode) {
5880                         found_other_mode = 1;
5881                         continue;
5882                 }
5883 
5884                 lkb = iter;
5885                 list_del_init(&iter->lkb_ownqueue);
5886                 clear_bit(DLM_DFL_ORPHAN_BIT, &iter->lkb_dflags);
5887                 *lkid = iter->lkb_id;
5888                 break;
5889         }
5890         spin_unlock_bh(&ls->ls_orphans_lock);
5891 
5892         if (!lkb && found_other_mode) {
5893                 rv = -EAGAIN;
5894                 goto out;
5895         }
5896 
5897         if (!lkb) {
5898                 rv = -ENOENT;
5899                 goto out;
5900         }
5901 
5902         lkb->lkb_exflags = flags;
5903         lkb->lkb_ownpid = (int) current->pid;
5904 
5905         ua = lkb->lkb_ua;
5906 
5907         ua->proc = ua_tmp->proc;
5908         ua->xid = ua_tmp->xid;
5909         ua->castparam = ua_tmp->castparam;
5910         ua->castaddr = ua_tmp->castaddr;
5911         ua->bastparam = ua_tmp->bastparam;
5912         ua->bastaddr = ua_tmp->bastaddr;
5913         ua->user_lksb = ua_tmp->user_lksb;
5914 
5915         /*
5916          * The lkb reference from the ls_orphans list was not
5917          * removed above, and is now considered the reference
5918          * for the proc locks list.
5919          */
5920 
5921         spin_lock_bh(&ua->proc->locks_spin);
5922         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5923         spin_unlock_bh(&ua->proc->locks_spin);
5924  out:
5925         kfree(ua_tmp);
5926         return rv;
5927 }
5928 
5929 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5930                     uint32_t flags, uint32_t lkid, char *lvb_in)
5931 {
5932         struct dlm_lkb *lkb;
5933         struct dlm_args args;
5934         struct dlm_user_args *ua;
5935         int error;
5936 
5937         dlm_lock_recovery(ls);
5938 
5939         error = find_lkb(ls, lkid, &lkb);
5940         if (error)
5941                 goto out;
5942 
5943         trace_dlm_unlock_start(ls, lkb, flags);
5944 
5945         ua = lkb->lkb_ua;
5946 
5947         if (lvb_in && ua->lksb.sb_lvbptr)
5948                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5949         if (ua_tmp->castparam)
5950                 ua->castparam = ua_tmp->castparam;
5951         ua->user_lksb = ua_tmp->user_lksb;
5952 
5953         error = set_unlock_args(flags, ua, &args);
5954         if (error)
5955                 goto out_put;
5956 
5957         error = unlock_lock(ls, lkb, &args);
5958 
5959         if (error == -DLM_EUNLOCK)
5960                 error = 0;
5961         /* from validate_unlock_args() */
5962         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
5963                 error = 0;
5964         if (error)
5965                 goto out_put;
5966 
5967         spin_lock_bh(&ua->proc->locks_spin);
5968         /* dlm_user_add_cb() may have already taken lkb off the proc list */
5969         if (!list_empty(&lkb->lkb_ownqueue))
5970                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
5971         spin_unlock_bh(&ua->proc->locks_spin);
5972  out_put:
5973         trace_dlm_unlock_end(ls, lkb, flags, error);
5974         dlm_put_lkb(lkb);
5975  out:
5976         dlm_unlock_recovery(ls);
5977         kfree(ua_tmp);
5978         return error;
5979 }
5980 
5981 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5982                     uint32_t flags, uint32_t lkid)
5983 {
5984         struct dlm_lkb *lkb;
5985         struct dlm_args args;
5986         struct dlm_user_args *ua;
5987         int error;
5988 
5989         dlm_lock_recovery(ls);
5990 
5991         error = find_lkb(ls, lkid, &lkb);
5992         if (error)
5993                 goto out;
5994 
5995         trace_dlm_unlock_start(ls, lkb, flags);
5996 
5997         ua = lkb->lkb_ua;
5998         if (ua_tmp->castparam)
5999                 ua->castparam = ua_tmp->castparam;
6000         ua->user_lksb = ua_tmp->user_lksb;
6001 
6002         error = set_unlock_args(flags, ua, &args);
6003         if (error)
6004                 goto out_put;
6005 
6006         error = cancel_lock(ls, lkb, &args);
6007 
6008         if (error == -DLM_ECANCEL)
6009                 error = 0;
6010         /* from validate_unlock_args() */
6011         if (error == -EBUSY)
6012                 error = 0;
6013  out_put:
6014         trace_dlm_unlock_end(ls, lkb, flags, error);
6015         dlm_put_lkb(lkb);
6016  out:
6017         dlm_unlock_recovery(ls);
6018         kfree(ua_tmp);
6019         return error;
6020 }
6021 
6022 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
6023 {
6024         struct dlm_lkb *lkb;
6025         struct dlm_args args;
6026         struct dlm_user_args *ua;
6027         struct dlm_rsb *r;
6028         int error;
6029 
6030         dlm_lock_recovery(ls);
6031 
6032         error = find_lkb(ls, lkid, &lkb);
6033         if (error)
6034                 goto out;
6035 
6036         trace_dlm_unlock_start(ls, lkb, flags);
6037 
6038         ua = lkb->lkb_ua;
6039 
6040         error = set_unlock_args(flags, ua, &args);
6041         if (error)
6042                 goto out_put;
6043 
6044         /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
6045 
6046         r = lkb->lkb_resource;
6047         hold_rsb(r);
6048         lock_rsb(r);
6049 
6050         error = validate_unlock_args(lkb, &args);
6051         if (error)
6052                 goto out_r;
6053         set_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags);
6054 
6055         error = _cancel_lock(r, lkb);
6056  out_r:
6057         unlock_rsb(r);
6058         put_rsb(r);
6059 
6060         if (error == -DLM_ECANCEL)
6061                 error = 0;
6062         /* from validate_unlock_args() */
6063         if (error == -EBUSY)
6064                 error = 0;
6065  out_put:
6066         trace_dlm_unlock_end(ls, lkb, flags, error);
6067         dlm_put_lkb(lkb);
6068  out:
6069         dlm_unlock_recovery(ls);
6070         return error;
6071 }
6072 
6073 /* lkb's that are removed from the waiters list by revert are just left on the
6074    orphans list with the granted orphan locks, to be freed by purge */
6075 
6076 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6077 {
6078         struct dlm_args args;
6079         int error;
6080 
6081         hold_lkb(lkb); /* reference for the ls_orphans list */
6082         spin_lock_bh(&ls->ls_orphans_lock);
6083         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
6084         spin_unlock_bh(&ls->ls_orphans_lock);
6085 
6086         set_unlock_args(0, lkb->lkb_ua, &args);
6087 
6088         error = cancel_lock(ls, lkb, &args);
6089         if (error == -DLM_ECANCEL)
6090                 error = 0;
6091         return error;
6092 }
6093 
6094 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
6095    granted.  Regardless of what rsb queue the lock is on, it's removed and
6096    freed.  The IVVALBLK flag causes the lvb on the resource to be invalidated
6097    if our lock is PW/EX (it's ignored if our granted mode is smaller.) */
6098 
6099 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6100 {
6101         struct dlm_args args;
6102         int error;
6103 
6104         set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
6105                         lkb->lkb_ua, &args);
6106 
6107         error = unlock_lock(ls, lkb, &args);
6108         if (error == -DLM_EUNLOCK)
6109                 error = 0;
6110         return error;
6111 }
6112 
6113 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
6114    (which does lock_rsb) due to deadlock with receiving a message that does
6115    lock_rsb followed by dlm_user_add_cb() */
6116 
6117 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
6118                                      struct dlm_user_proc *proc)
6119 {
6120         struct dlm_lkb *lkb = NULL;
6121 
6122         spin_lock_bh(&ls->ls_clear_proc_locks);
6123         if (list_empty(&proc->locks))
6124                 goto out;
6125 
6126         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
6127         list_del_init(&lkb->lkb_ownqueue);
6128 
6129         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6130                 set_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags);
6131         else
6132                 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6133  out:
6134         spin_unlock_bh(&ls->ls_clear_proc_locks);
6135         return lkb;
6136 }
6137 
6138 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
6139    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
6140    which we clear here. */
6141 
6142 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
6143    list, and no more device_writes should add lkb's to proc->locks list; so we
6144    shouldn't need to take asts_spin or locks_spin here.  this assumes that
6145    device reads/writes/closes are serialized -- FIXME: we may need to serialize
6146    them ourself. */
6147 
6148 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6149 {
6150         struct dlm_callback *cb, *cb_safe;
6151         struct dlm_lkb *lkb, *safe;
6152 
6153         dlm_lock_recovery(ls);
6154 
6155         while (1) {
6156                 lkb = del_proc_lock(ls, proc);
6157                 if (!lkb)
6158                         break;
6159                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6160                         orphan_proc_lock(ls, lkb);
6161                 else
6162                         unlock_proc_lock(ls, lkb);
6163 
6164                 /* this removes the reference for the proc->locks list
6165                    added by dlm_user_request, it may result in the lkb
6166                    being freed */
6167 
6168                 dlm_put_lkb(lkb);
6169         }
6170 
6171         spin_lock_bh(&ls->ls_clear_proc_locks);
6172 
6173         /* in-progress unlocks */
6174         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6175                 list_del_init(&lkb->lkb_ownqueue);
6176                 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6177                 dlm_put_lkb(lkb);
6178         }
6179 
6180         list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
6181                 list_del(&cb->list);
6182                 dlm_free_cb(cb);
6183         }
6184 
6185         spin_unlock_bh(&ls->ls_clear_proc_locks);
6186         dlm_unlock_recovery(ls);
6187 }
6188 
6189 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6190 {
6191         struct dlm_callback *cb, *cb_safe;
6192         struct dlm_lkb *lkb, *safe;
6193 
6194         while (1) {
6195                 lkb = NULL;
6196                 spin_lock_bh(&proc->locks_spin);
6197                 if (!list_empty(&proc->locks)) {
6198                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
6199                                          lkb_ownqueue);
6200                         list_del_init(&lkb->lkb_ownqueue);
6201                 }
6202                 spin_unlock_bh(&proc->locks_spin);
6203 
6204                 if (!lkb)
6205                         break;
6206 
6207                 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6208                 unlock_proc_lock(ls, lkb);
6209                 dlm_put_lkb(lkb); /* ref from proc->locks list */
6210         }
6211 
6212         spin_lock_bh(&proc->locks_spin);
6213         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6214                 list_del_init(&lkb->lkb_ownqueue);
6215                 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6216                 dlm_put_lkb(lkb);
6217         }
6218         spin_unlock_bh(&proc->locks_spin);
6219 
6220         spin_lock_bh(&proc->asts_spin);
6221         list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
6222                 list_del(&cb->list);
6223                 dlm_free_cb(cb);
6224         }
6225         spin_unlock_bh(&proc->asts_spin);
6226 }
6227 
6228 /* pid of 0 means purge all orphans */
6229 
6230 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6231 {
6232         struct dlm_lkb *lkb, *safe;
6233 
6234         spin_lock_bh(&ls->ls_orphans_lock);
6235         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6236                 if (pid && lkb->lkb_ownpid != pid)
6237                         continue;
6238                 unlock_proc_lock(ls, lkb);
6239                 list_del_init(&lkb->lkb_ownqueue);
6240                 dlm_put_lkb(lkb);
6241         }
6242         spin_unlock_bh(&ls->ls_orphans_lock);
6243 }
6244 
6245 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6246 {
6247         struct dlm_message *ms;
6248         struct dlm_mhandle *mh;
6249         int error;
6250 
6251         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6252                                 DLM_MSG_PURGE, &ms, &mh);
6253         if (error)
6254                 return error;
6255         ms->m_nodeid = cpu_to_le32(nodeid);
6256         ms->m_pid = cpu_to_le32(pid);
6257 
6258         return send_message(mh, ms, NULL, 0);
6259 }
6260 
6261 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6262                    int nodeid, int pid)
6263 {
6264         int error = 0;
6265 
6266         if (nodeid && (nodeid != dlm_our_nodeid())) {
6267                 error = send_purge(ls, nodeid, pid);
6268         } else {
6269                 dlm_lock_recovery(ls);
6270                 if (pid == current->pid)
6271                         purge_proc_locks(ls, proc);
6272                 else
6273                         do_purge(ls, nodeid, pid);
6274                 dlm_unlock_recovery(ls);
6275         }
6276         return error;
6277 }
6278 
6279 /* debug functionality */
6280 int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
6281                       int lkb_nodeid, unsigned int lkb_dflags, int lkb_status)
6282 {
6283         struct dlm_lksb *lksb;
6284         struct dlm_lkb *lkb;
6285         struct dlm_rsb *r;
6286         int error;
6287 
6288         /* we currently can't set a valid user lock */
6289         if (lkb_dflags & BIT(DLM_DFL_USER_BIT))
6290                 return -EOPNOTSUPP;
6291 
6292         lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
6293         if (!lksb)
6294                 return -ENOMEM;
6295 
6296         error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1);
6297         if (error) {
6298                 kfree(lksb);
6299                 return error;
6300         }
6301 
6302         dlm_set_dflags_val(lkb, lkb_dflags);
6303         lkb->lkb_nodeid = lkb_nodeid;
6304         lkb->lkb_lksb = lksb;
6305         /* user specific pointer, just don't have it NULL for kernel locks */
6306         if (~lkb_dflags & BIT(DLM_DFL_USER_BIT))
6307                 lkb->lkb_astparam = (void *)0xDEADBEEF;
6308 
6309         error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
6310         if (error) {
6311                 kfree(lksb);
6312                 __put_lkb(ls, lkb);
6313                 return error;
6314         }
6315 
6316         lock_rsb(r);
6317         attach_lkb(r, lkb);
6318         add_lkb(r, lkb, lkb_status);
6319         unlock_rsb(r);
6320         put_rsb(r);
6321 
6322         return 0;
6323 }
6324 
6325 int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
6326                                  int mstype, int to_nodeid)
6327 {
6328         struct dlm_lkb *lkb;
6329         int error;
6330 
6331         error = find_lkb(ls, lkb_id, &lkb);
6332         if (error)
6333                 return error;
6334 
6335         error = add_to_waiters(lkb, mstype, to_nodeid);
6336         dlm_put_lkb(lkb);
6337         return error;
6338 }
6339 
6340 

~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~

kernel.org | git.kernel.org | LWN.net | Project Home | SVN repository | Mail admin

Linux® is a registered trademark of Linus Torvalds in the United States and other countries.
TOMOYO® is a registered trademark of NTT DATA CORPORATION.

sflogo.php